mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-22 05:09:05 +00:00
feat: calendar integration (#608)
* feat: calendar integration
* feat: add ICS calendar API endpoints for room configuration and sync
* feat: add Celery background tasks for ICS sync
* feat: implement Phase 2 - Multiple active meetings per room with grace period
This commit adds support for multiple concurrent meetings per room, implementing
grace period logic and improved meeting lifecycle management for calendar integration.
## Database Changes
- Remove unique constraint preventing multiple active meetings per room
- Add last_participant_left_at field to track when meeting becomes empty
- Add grace_period_minutes field (default: 15) for configurable grace period
## Meeting Controller Enhancements
- Add get_all_active_for_room() to retrieve all active meetings for a room
- Add get_active_by_calendar_event() to find meetings by calendar event ID
- Maintain backward compatibility with existing get_active() method
## New API Endpoints
- GET /rooms/{room_name}/meetings/active - List all active meetings
- POST /rooms/{room_name}/meetings/{meeting_id}/join - Join specific meeting
## Meeting Lifecycle Improvements
- 15-minute grace period after last participant leaves
- Automatic reactivation when participant rejoins during grace period
- Force close calendar meetings 30 minutes after scheduled end time
- Update process_meetings task to handle multiple active meetings
## Whereby Integration
- Clear grace period when participants join via webhook events
- Track participant count for grace period management
## Testing
- Add comprehensive tests for multiple active meetings
- Test grace period behavior and participant rejoin scenarios
- Test calendar meeting force closure logic
- All 5 new tests passing
This enables proper calendar integration with overlapping meetings while
preventing accidental meeting closures through the grace period mechanism.
* feat: implement frontend for calendar integration (Phase 3 & 4)
- Created MeetingSelection component for choosing between multiple active meetings
- Shows both active meetings and upcoming calendar events (30 min ahead)
- Displays meeting metadata with privacy controls (owner-only details)
- Supports creation of unscheduled meetings alongside calendar meetings
- Added waiting page for users joining before scheduled start time
- Shows countdown timer until meeting begins
- Auto-transitions to meeting when calendar event becomes active
- Handles early joining with proper routing
- Created collapsible info panel showing meeting details
- Displays calendar metadata (title, description, attendees)
- Shows participant count and duration
- Privacy-aware: sensitive info only visible to room owners
- Integrated ICS settings into room configuration dialog
- Test connection functionality with immediate feedback
- Manual sync trigger with detailed results
- Shows last sync time and ETag for monitoring
- Configurable sync intervals (1 min to 1 hour)
- New /room/{roomName} route for meeting selection
- Waiting room at /room/{roomName}/wait?eventId={id}
- Classic room page at /{roomName} with meeting info
- Uses sessionStorage to pass selected meeting between pages
- Added new endpoints for active/upcoming meetings
- Regenerated TypeScript client with latest OpenAPI spec
- Proper error handling and loading states
- Auto-refresh every 30 seconds for live updates
- Color-coded badges for meeting status
- Attendee status indicators (accepted/declined/tentative)
- Responsive design with Chakra UI components
- Clear visual hierarchy between active and upcoming meetings
- Smart truncation for long attendee lists
This completes the frontend implementation for calendar integration,
enabling users to seamlessly join scheduled meetings from their
calendar applications.
* WIP: Migrate calendar integration frontend to React Query
- Migrate all calendar components from useApi to React Query hooks
- Fix Chakra UI v3 compatibility issues (Card, Progress, spacing props, leftIcon)
- Update backend Meeting model to include calendar fields
- Replace imperative API calls with declarative React Query patterns
- Remove old OpenAPI generated files that conflict with new structure
* fix: alembic migrations
* feat: add calendar migration
* feat: update ics, first version working
* feat: implement tabbed interface for room edit dialog
- Add General, Calendar, and Share tabs to organize room settings
- Move ICS settings to dedicated Calendar tab
- Move Zulip configuration to Share tab
- Keep basic room settings and webhooks in General tab
- Remove redundant migration file
- Fix Chakra UI v3 compatibility issues in calendar components
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* fix: infinite loop
* feat: improve ICS calendar sync UX and fix room URL matching
- Replace "Test Connection" button with "Force Sync" button (Edit Room only)
- Show detailed sync results: total events downloaded vs room matches
- Remove emoticons and auto-hide timeout for cleaner UX
- Fix room URL matching to use UI_BASE_URL instead of BASE_URL
- Replace FaSync icon with LuRefreshCw for consistency
- Clear sync results when dialog closes or Force Sync pressed
- Update tests to reflect UI_BASE_URL change and exact URL matching
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* feat: reorganize room edit dialog and fix Force Sync button
- Move WebHook configuration from General to dedicated WebHook tab
- Add WebHook tab after Share tab in room edit dialog
- Fix Force Sync button not appearing by adding missing isEditing prop
- Fix indentation issues in MeetingSelection component
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* feat: complete calendar integration with UI improvements and code cleanup
Calendar Integration Tasks:
- Update upcoming meetings window from 30 to 120 minutes
- Include currently happening events in upcoming meetings API
- Create shared time utility functions (formatDateTime, formatCountdown, formatStartedAgo)
- Improve ongoing meetings UI logic with proper time detection
- Fix backend code organization and remove excessive documentation
UI/UX Improvements:
- Restructure room page layout using MinimalHeader pattern
- Remove borders from header and footer elements
- Change button text from "Leave Meeting" to "Leave Room"
- Remove "Back to Reflector" footer for cleaner design
- Extract WaitPageClient component for better separation
Backend Changes:
- calendar_events.py: Fix import organization and extend timing window
- rooms.py: Update API default from 30 to 120 minutes
- Enhanced test coverage for ongoing meeting scenarios
Frontend Changes:
- MinimalHeader: Add onLeave prop for custom navigation
- MeetingSelection: Complete layout restructure with shared utilities
- timeUtils: New shared utility file for consistent time formatting
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* feat: remove wait page and simplify Join button with 5-minute disable logic
- Remove entire wait page directory and associated files
- Update handleJoinUpcoming to create unscheduled meeting directly
- Simplify Join button to single state:
- Always shows "Join" text
- Blue when meeting can be joined (ongoing or within 5 minutes)
- Gray/disabled when more than 5 minutes away
- Remove confusing "Join Now", "Join Early" text variations
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* feat: improve calendar integration and meeting UI
- Refactor ICS sync tasks to use @asynctask decorator for cleaner async handling
- Extract meeting creation logic into reusable function
- Improve meeting selection UI with distinct current/upcoming sections
- Add early join functionality for upcoming meetings within 5-minute window
- Simplify non-ICS room workflow with direct Whereby embed
- Fix import paths and component organization
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
* feat: restore original recording consent functionality
- Remove custom ConsentDialogButton and WherebyEmbed components
- Merge RoomClient logic back into main room page
- Restore original consent UI: blue button with toast modal
- Maintain calendar integration features for ICS-enabled rooms
- Add consent-handler.md documentation of original functionality
- Preserve focus management and accessibility features
* fix: redirect Join Now button to local meeting page
- Change handleJoinDirect to use onMeetingSelect instead of opening external URL
- Join Now button now navigates to /{roomName}/{meetingId} instead of whereby.com
- Maintains proper routing within the application
* feat: remove restrictive message for non-owners in private rooms
- Remove confusing message about room owner permissions
- Cleaner UI for all users regardless of ownership status
- Users will only see available meetings and join options
* feat: improve meeting selection UI for better readability
- Limit page content to max 800px width for better 4K display readability
- Remove LIVE tag badge for cleaner interface
- Remove shadow from main live meeting box
- Remove blue border and hover effects for minimal design
- Change background to neutral gray for less visual noise
* feat: add room by name endpoint for non-authenticated access
- Add GET /rooms/name/{room_name} backend endpoint
- Endpoint supports non-authenticated access for public rooms
- Returns RoomDetails with webhook fields hidden for non-owners
- Update useRoomGetByName hook to use new direct endpoint
- Remove authentication requirement from frontend hook
- Regenerate API client types
Fixes: Non-authenticated users can now access room lobbies
* feat: add friendly message when no meetings are ongoing
- Show centered message with calendar icon when no meetings are active
- Message text: 'No meetings right now' with helpful description
- Contextual text for owners/shared rooms mentioning quick meeting option
- Consistent gray styling matching the rest of the interface
- Only displays when both currentMeetings and upcomingMeetings are empty
* style: center no meetings message and remove background
- Change from Box to Flex with flex=1 for vertical centering
- Remove gray background, border radius, and padding
- Message now appears cleanly centered in available space
- Maintains horizontal and vertical centering
* feat: move Create Meeting button to header
- Remove 'Start a Quick Meeting' box from main content area
- Add showCreateButton and onCreateMeeting props to MinimalHeader
- Create Meeting button now appears in header left of Leave Room
- Only shows for room owners or shared room users
- Update no meetings message to remove reference to quick meeting below
- Cleaner, more accessible UI with actions in the header
* style: change room title and no meetings text to pure black
- Update room title in MinimalHeader from gray.700 to black
- Update 'No meetings right now' text from gray.700 to black
- Improves visual hierarchy and readability
- Consistent with other pages' styling
* style: linting
* fix: remove plan files
* fix: alembic migration with named foreign keys
* feat: add SyncStatus enum and refactor ICS sync to use rooms controller
- Add SyncStatus enum to replace string literals in ICS sync status
- Replace direct SQL queries in worker with rooms_controller.get_ics_enabled()
- Improve type safety and maintainability of ICS sync code
- Enum values: SUCCESS, UNCHANGED, ERROR, SKIPPED maintain backward compatibility
* refactor: remove unnecessary docstring from get_ics_enabled method
The function name is self-explanatory
* fix: import top level
* feat: use Literal type for ICSStatus.status field
- Changed ICSStatus.status from str to Literal['enabled', 'disabled']
- Improves type safety and API documentation
* feat: update TypeScript definitions for ICSStatus Literal type
- OpenAPI generation now properly reflects Literal['enabled', 'disabled'] type
- Improves type safety for frontend consumers of the API
- Applied automatic formatting via pre-commit hooks
* refactor: replace loguru with structlog in ics_sync service
- Replace loguru import with structlog in services/ics_sync.py
- Update logging calls to use structlog's structured format with keyword args
- Maintains consistency with other services using structlog
- Changes: logger.info(f'...') -> logger.info('...', key=value) format
* chore: remove loguru dependency and improve type annotations
- Remove loguru from dependencies in pyproject.toml (replaced with structlog)
- Update meeting controller methods to properly return Optional types
- Update dependency lock file after loguru removal
* fix: resolve pyflakes warnings in ics_sync and meetings modules
Remove unused imports and variables to clean up code quality
* Remove grace period logic and improve meeting deactivation
- Removed grace_period_minutes and last_participant_left_at fields
- Simplified deactivation logic based on actual usage patterns:
* Active sessions: Keep meeting active regardless of scheduled time
* Calendar meetings: Wait until scheduled end if unused, deactivate immediately once used and empty
* On-the-fly meetings: Deactivate immediately when empty
- Created migration to drop unused database columns
- Updated tests to remove grace period test cases
* Update test to match new deactivation logic for calendar meetings
* fix: remove unwanted file
* fix: incompleted changes from EVENT_WINDOW*
* fix: update room ICS API tests to include required webhook fields and correct URL
- Add webhook_url and webhook_secret fields to room creation tests
- Fix room URL matching in ICS sync test to use UI_BASE_URL instead of BASE_URL
- Aligns test with actual API requirements and ICS sync service implementation
* fix: add Redis distributed locking to prevent race conditions in process_meetings
- Implement per-meeting locks using Redis to prevent concurrent processing
- Add lock extension after slow API calls (Whereby) to handle long-running operations
- Use redis-py's built-in lock.extend() with replace_ttl=True for simple TTL refresh
- Track and log skipped meetings when locked by other workers
- Document SSRF analysis showing it's low-risk due to async worker isolation
This prevents multiple workers from processing the same meeting simultaneously,
which could cause state corruption or duplicate deactivations.
* refactor: rename MinimalHeader to MeetingMinimalHeader for clarity
* fix: minor code quality improvements - add emoji constants, fix type safety, cleanup comments
* fix: database migration
* self-pr review
* self-pr review
* self-pr review treeshake
* fix: local fixes
* fix: creation of meeting
* fix: meeting selection create button
* compile fix
* fix: meeting selection responsive
* fix: rework process logic for meeting
* fix: meeting useEffect frontend-only dedupe (#647)
* meeting useEffect frontend-only dedupe
* format
* also get room by name backend fix
---------
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
* invalidate meeting list on new meeting
* test fix
* room url copy button for ics
* calendar refresh quick action icon
* remove work.md
* meeting page frontend fixes
* hide number of meeting participants
* Revert "hide number of meeting participants"
This reverts commit 38906c5d1a.
* ui bits
* ui bits
* remove log
* room name typing stricten
* feat: protect atomic operation involving external service with redlock
---------
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Igor Monadical <igor@monadical.com>
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
@@ -67,7 +67,8 @@ def current_user(
|
||||
try:
|
||||
payload = jwtauth.verify_token(token)
|
||||
sub = payload["sub"]
|
||||
return UserInfo(sub=sub)
|
||||
email = payload["email"]
|
||||
return UserInfo(sub=sub, email=email)
|
||||
except JWTError as e:
|
||||
logger.error(f"JWT error: {e}")
|
||||
raise HTTPException(status_code=401, detail="Invalid authentication")
|
||||
|
||||
@@ -24,6 +24,7 @@ def get_database() -> databases.Database:
|
||||
|
||||
|
||||
# import models
|
||||
import reflector.db.calendar_events # noqa
|
||||
import reflector.db.meetings # noqa
|
||||
import reflector.db.recordings # noqa
|
||||
import reflector.db.rooms # noqa
|
||||
|
||||
182
server/reflector/db/calendar_events.py
Normal file
182
server/reflector/db/calendar_events.py
Normal file
@@ -0,0 +1,182 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
|
||||
from reflector.db import get_database, metadata
|
||||
from reflector.utils import generate_uuid4
|
||||
|
||||
calendar_events = sa.Table(
|
||||
"calendar_event",
|
||||
metadata,
|
||||
sa.Column("id", sa.String, primary_key=True),
|
||||
sa.Column(
|
||||
"room_id",
|
||||
sa.String,
|
||||
sa.ForeignKey("room.id", ondelete="CASCADE", name="fk_calendar_event_room_id"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("ics_uid", sa.Text, nullable=False),
|
||||
sa.Column("title", sa.Text),
|
||||
sa.Column("description", sa.Text),
|
||||
sa.Column("start_time", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("end_time", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("attendees", JSONB),
|
||||
sa.Column("location", sa.Text),
|
||||
sa.Column("ics_raw_data", sa.Text),
|
||||
sa.Column("last_synced", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("is_deleted", sa.Boolean, nullable=False, server_default=sa.false()),
|
||||
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.UniqueConstraint("room_id", "ics_uid", name="uq_room_calendar_event"),
|
||||
sa.Index("idx_calendar_event_room_start", "room_id", "start_time"),
|
||||
sa.Index(
|
||||
"idx_calendar_event_deleted",
|
||||
"is_deleted",
|
||||
postgresql_where=sa.text("NOT is_deleted"),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class CalendarEvent(BaseModel):
|
||||
id: str = Field(default_factory=generate_uuid4)
|
||||
room_id: str
|
||||
ics_uid: str
|
||||
title: str | None = None
|
||||
description: str | None = None
|
||||
start_time: datetime
|
||||
end_time: datetime
|
||||
attendees: list[dict[str, Any]] | None = None
|
||||
location: str | None = None
|
||||
ics_raw_data: str | None = None
|
||||
last_synced: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
is_deleted: bool = False
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
class CalendarEventController:
|
||||
async def get_by_room(
|
||||
self,
|
||||
room_id: str,
|
||||
include_deleted: bool = False,
|
||||
start_after: datetime | None = None,
|
||||
end_before: datetime | None = None,
|
||||
) -> list[CalendarEvent]:
|
||||
query = calendar_events.select().where(calendar_events.c.room_id == room_id)
|
||||
|
||||
if not include_deleted:
|
||||
query = query.where(calendar_events.c.is_deleted == False)
|
||||
|
||||
if start_after:
|
||||
query = query.where(calendar_events.c.start_time >= start_after)
|
||||
|
||||
if end_before:
|
||||
query = query.where(calendar_events.c.end_time <= end_before)
|
||||
|
||||
query = query.order_by(calendar_events.c.start_time.asc())
|
||||
|
||||
results = await get_database().fetch_all(query)
|
||||
return [CalendarEvent(**result) for result in results]
|
||||
|
||||
async def get_upcoming(
|
||||
self, room_id: str, minutes_ahead: int = 120
|
||||
) -> list[CalendarEvent]:
|
||||
"""Get upcoming events for a room within the specified minutes, including currently happening events."""
|
||||
now = datetime.now(timezone.utc)
|
||||
future_time = now + timedelta(minutes=minutes_ahead)
|
||||
|
||||
query = (
|
||||
calendar_events.select()
|
||||
.where(
|
||||
sa.and_(
|
||||
calendar_events.c.room_id == room_id,
|
||||
calendar_events.c.is_deleted == False,
|
||||
calendar_events.c.start_time <= future_time,
|
||||
calendar_events.c.end_time >= now,
|
||||
)
|
||||
)
|
||||
.order_by(calendar_events.c.start_time.asc())
|
||||
)
|
||||
|
||||
results = await get_database().fetch_all(query)
|
||||
return [CalendarEvent(**result) for result in results]
|
||||
|
||||
async def get_by_ics_uid(self, room_id: str, ics_uid: str) -> CalendarEvent | None:
|
||||
query = calendar_events.select().where(
|
||||
sa.and_(
|
||||
calendar_events.c.room_id == room_id,
|
||||
calendar_events.c.ics_uid == ics_uid,
|
||||
)
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
return CalendarEvent(**result) if result else None
|
||||
|
||||
async def upsert(self, event: CalendarEvent) -> CalendarEvent:
|
||||
existing = await self.get_by_ics_uid(event.room_id, event.ics_uid)
|
||||
|
||||
if existing:
|
||||
event.id = existing.id
|
||||
event.created_at = existing.created_at
|
||||
event.updated_at = datetime.now(timezone.utc)
|
||||
|
||||
query = (
|
||||
calendar_events.update()
|
||||
.where(calendar_events.c.id == existing.id)
|
||||
.values(**event.model_dump())
|
||||
)
|
||||
else:
|
||||
query = calendar_events.insert().values(**event.model_dump())
|
||||
|
||||
await get_database().execute(query)
|
||||
return event
|
||||
|
||||
async def soft_delete_missing(
|
||||
self, room_id: str, current_ics_uids: list[str]
|
||||
) -> int:
|
||||
"""Soft delete future events that are no longer in the calendar."""
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
select_query = calendar_events.select().where(
|
||||
sa.and_(
|
||||
calendar_events.c.room_id == room_id,
|
||||
calendar_events.c.start_time > now,
|
||||
calendar_events.c.is_deleted == False,
|
||||
calendar_events.c.ics_uid.notin_(current_ics_uids)
|
||||
if current_ics_uids
|
||||
else True,
|
||||
)
|
||||
)
|
||||
|
||||
to_delete = await get_database().fetch_all(select_query)
|
||||
delete_count = len(to_delete)
|
||||
|
||||
if delete_count > 0:
|
||||
update_query = (
|
||||
calendar_events.update()
|
||||
.where(
|
||||
sa.and_(
|
||||
calendar_events.c.room_id == room_id,
|
||||
calendar_events.c.start_time > now,
|
||||
calendar_events.c.is_deleted == False,
|
||||
calendar_events.c.ics_uid.notin_(current_ics_uids)
|
||||
if current_ics_uids
|
||||
else True,
|
||||
)
|
||||
)
|
||||
.values(is_deleted=True, updated_at=now)
|
||||
)
|
||||
|
||||
await get_database().execute(update_query)
|
||||
|
||||
return delete_count
|
||||
|
||||
async def delete_by_room(self, room_id: str) -> int:
|
||||
query = calendar_events.delete().where(calendar_events.c.room_id == room_id)
|
||||
result = await get_database().execute(query)
|
||||
return result.rowcount
|
||||
|
||||
|
||||
calendar_events_controller = CalendarEventController()
|
||||
@@ -1,8 +1,9 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
from typing import Any, Literal
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
|
||||
from reflector.db import get_database, metadata
|
||||
from reflector.db.rooms import Room
|
||||
@@ -44,13 +45,18 @@ meetings = sa.Table(
|
||||
nullable=False,
|
||||
server_default=sa.true(),
|
||||
),
|
||||
sa.Index("idx_meeting_room_id", "room_id"),
|
||||
sa.Index(
|
||||
"idx_one_active_meeting_per_room",
|
||||
"room_id",
|
||||
unique=True,
|
||||
postgresql_where=sa.text("is_active = true"),
|
||||
sa.Column(
|
||||
"calendar_event_id",
|
||||
sa.String,
|
||||
sa.ForeignKey(
|
||||
"calendar_event.id",
|
||||
ondelete="SET NULL",
|
||||
name="fk_meeting_calendar_event_id",
|
||||
),
|
||||
),
|
||||
sa.Column("calendar_metadata", JSONB),
|
||||
sa.Index("idx_meeting_room_id", "room_id"),
|
||||
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
|
||||
)
|
||||
|
||||
meeting_consent = sa.Table(
|
||||
@@ -92,6 +98,9 @@ class Meeting(BaseModel):
|
||||
"none", "prompt", "automatic", "automatic-2nd-participant"
|
||||
] = "automatic-2nd-participant"
|
||||
num_clients: int = 0
|
||||
is_active: bool = True
|
||||
calendar_event_id: str | None = None
|
||||
calendar_metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class MeetingController:
|
||||
@@ -104,6 +113,8 @@ class MeetingController:
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
room: Room,
|
||||
calendar_event_id: str | None = None,
|
||||
calendar_metadata: dict[str, Any] | None = None,
|
||||
):
|
||||
meeting = Meeting(
|
||||
id=id,
|
||||
@@ -117,6 +128,8 @@ class MeetingController:
|
||||
room_mode=room.room_mode,
|
||||
recording_type=room.recording_type,
|
||||
recording_trigger=room.recording_trigger,
|
||||
calendar_event_id=calendar_event_id,
|
||||
calendar_metadata=calendar_metadata,
|
||||
)
|
||||
query = meetings.insert().values(**meeting.model_dump())
|
||||
await get_database().execute(query)
|
||||
@@ -130,7 +143,16 @@ class MeetingController:
|
||||
self,
|
||||
room_name: str,
|
||||
) -> Meeting | None:
|
||||
query = meetings.select().where(meetings.c.room_name == room_name)
|
||||
"""
|
||||
Get a meeting by room name.
|
||||
For backward compatibility, returns the most recent meeting.
|
||||
"""
|
||||
end_date = getattr(meetings.c, "end_date")
|
||||
query = (
|
||||
meetings.select()
|
||||
.where(meetings.c.room_name == room_name)
|
||||
.order_by(end_date.desc())
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
if not result:
|
||||
return None
|
||||
@@ -138,6 +160,10 @@ class MeetingController:
|
||||
return Meeting(**result)
|
||||
|
||||
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
|
||||
"""
|
||||
Get latest active meeting for a room.
|
||||
For backward compatibility, returns the most recent active meeting.
|
||||
"""
|
||||
end_date = getattr(meetings.c, "end_date")
|
||||
query = (
|
||||
meetings.select()
|
||||
@@ -156,6 +182,43 @@ class MeetingController:
|
||||
|
||||
return Meeting(**result)
|
||||
|
||||
async def get_all_active_for_room(
|
||||
self, room: Room, current_time: datetime
|
||||
) -> list[Meeting]:
|
||||
end_date = getattr(meetings.c, "end_date")
|
||||
query = (
|
||||
meetings.select()
|
||||
.where(
|
||||
sa.and_(
|
||||
meetings.c.room_id == room.id,
|
||||
meetings.c.end_date > current_time,
|
||||
meetings.c.is_active,
|
||||
)
|
||||
)
|
||||
.order_by(end_date.desc())
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [Meeting(**result) for result in results]
|
||||
|
||||
async def get_active_by_calendar_event(
|
||||
self, room: Room, calendar_event_id: str, current_time: datetime
|
||||
) -> Meeting | None:
|
||||
"""
|
||||
Get active meeting for a specific calendar event.
|
||||
"""
|
||||
query = meetings.select().where(
|
||||
sa.and_(
|
||||
meetings.c.room_id == room.id,
|
||||
meetings.c.calendar_event_id == calendar_event_id,
|
||||
meetings.c.end_date > current_time,
|
||||
meetings.c.is_active,
|
||||
)
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
if not result:
|
||||
return None
|
||||
return Meeting(**result)
|
||||
|
||||
async def get_by_id(self, meeting_id: str, **kwargs) -> Meeting | None:
|
||||
query = meetings.select().where(meetings.c.id == meeting_id)
|
||||
result = await get_database().fetch_one(query)
|
||||
@@ -163,6 +226,15 @@ class MeetingController:
|
||||
return None
|
||||
return Meeting(**result)
|
||||
|
||||
async def get_by_calendar_event(self, calendar_event_id: str) -> Meeting | None:
|
||||
query = meetings.select().where(
|
||||
meetings.c.calendar_event_id == calendar_event_id
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
if not result:
|
||||
return None
|
||||
return Meeting(**result)
|
||||
|
||||
async def update_meeting(self, meeting_id: str, **kwargs):
|
||||
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
|
||||
await get_database().execute(query)
|
||||
@@ -190,7 +262,6 @@ class MeetingConsentController:
|
||||
return MeetingConsent(**result)
|
||||
|
||||
async def upsert(self, consent: MeetingConsent) -> MeetingConsent:
|
||||
"""Create new consent or update existing one for authenticated users"""
|
||||
if consent.user_id:
|
||||
# For authenticated users, check if consent already exists
|
||||
# not transactional but we're ok with that; the consents ain't deleted anyways
|
||||
|
||||
@@ -43,7 +43,15 @@ rooms = sqlalchemy.Table(
|
||||
),
|
||||
sqlalchemy.Column("webhook_url", sqlalchemy.String, nullable=True),
|
||||
sqlalchemy.Column("webhook_secret", sqlalchemy.String, nullable=True),
|
||||
sqlalchemy.Column("ics_url", sqlalchemy.Text),
|
||||
sqlalchemy.Column("ics_fetch_interval", sqlalchemy.Integer, server_default="300"),
|
||||
sqlalchemy.Column(
|
||||
"ics_enabled", sqlalchemy.Boolean, nullable=False, server_default=false()
|
||||
),
|
||||
sqlalchemy.Column("ics_last_sync", sqlalchemy.DateTime(timezone=True)),
|
||||
sqlalchemy.Column("ics_last_etag", sqlalchemy.Text),
|
||||
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
|
||||
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
|
||||
)
|
||||
|
||||
|
||||
@@ -64,6 +72,11 @@ class Room(BaseModel):
|
||||
is_shared: bool = False
|
||||
webhook_url: str | None = None
|
||||
webhook_secret: str | None = None
|
||||
ics_url: str | None = None
|
||||
ics_fetch_interval: int = 300
|
||||
ics_enabled: bool = False
|
||||
ics_last_sync: datetime | None = None
|
||||
ics_last_etag: str | None = None
|
||||
|
||||
|
||||
class RoomController:
|
||||
@@ -114,6 +127,9 @@ class RoomController:
|
||||
is_shared: bool,
|
||||
webhook_url: str = "",
|
||||
webhook_secret: str = "",
|
||||
ics_url: str | None = None,
|
||||
ics_fetch_interval: int = 300,
|
||||
ics_enabled: bool = False,
|
||||
):
|
||||
"""
|
||||
Add a new room
|
||||
@@ -134,6 +150,9 @@ class RoomController:
|
||||
is_shared=is_shared,
|
||||
webhook_url=webhook_url,
|
||||
webhook_secret=webhook_secret,
|
||||
ics_url=ics_url,
|
||||
ics_fetch_interval=ics_fetch_interval,
|
||||
ics_enabled=ics_enabled,
|
||||
)
|
||||
query = rooms.insert().values(**room.model_dump())
|
||||
try:
|
||||
@@ -198,6 +217,13 @@ class RoomController:
|
||||
|
||||
return room
|
||||
|
||||
async def get_ics_enabled(self) -> list[Room]:
|
||||
query = rooms.select().where(
|
||||
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [Room(**result) for result in results]
|
||||
|
||||
async def remove_by_id(
|
||||
self,
|
||||
room_id: str,
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
import asyncio
|
||||
import functools
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import redis
|
||||
import redis.asyncio as redis_async
|
||||
import structlog
|
||||
from redis.exceptions import LockError
|
||||
|
||||
from reflector.settings import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
redis_clients = {}
|
||||
|
||||
|
||||
@@ -21,6 +28,12 @@ def get_redis_client(db=0):
|
||||
return redis_clients[db]
|
||||
|
||||
|
||||
async def get_async_redis_client(db: int = 0):
|
||||
return await redis_async.from_url(
|
||||
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{db}"
|
||||
)
|
||||
|
||||
|
||||
def redis_cache(prefix="cache", duration=3600, db=settings.REDIS_CACHE_DB, argidx=1):
|
||||
"""
|
||||
Cache the result of a function in Redis.
|
||||
@@ -49,3 +62,87 @@ def redis_cache(prefix="cache", duration=3600, db=settings.REDIS_CACHE_DB, argid
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class RedisAsyncLock:
|
||||
def __init__(
|
||||
self,
|
||||
key: str,
|
||||
timeout: int = 120,
|
||||
extend_interval: int = 30,
|
||||
skip_if_locked: bool = False,
|
||||
blocking: bool = True,
|
||||
blocking_timeout: Optional[float] = None,
|
||||
):
|
||||
self.key = f"async_lock:{key}"
|
||||
self.timeout = timeout
|
||||
self.extend_interval = extend_interval
|
||||
self.skip_if_locked = skip_if_locked
|
||||
self.blocking = blocking
|
||||
self.blocking_timeout = blocking_timeout
|
||||
self._lock = None
|
||||
self._redis = None
|
||||
self._extend_task = None
|
||||
self._acquired = False
|
||||
|
||||
async def _extend_lock_periodically(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(self.extend_interval)
|
||||
if self._lock:
|
||||
await self._lock.extend(self.timeout, replace_ttl=True)
|
||||
logger.debug("Extended lock", key=self.key)
|
||||
except LockError:
|
||||
logger.warning("Failed to extend lock", key=self.key)
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Error extending lock", key=self.key, error=str(e))
|
||||
break
|
||||
|
||||
async def __aenter__(self):
|
||||
self._redis = await get_async_redis_client()
|
||||
self._lock = self._redis.lock(
|
||||
self.key,
|
||||
timeout=self.timeout,
|
||||
blocking=self.blocking,
|
||||
blocking_timeout=self.blocking_timeout,
|
||||
)
|
||||
|
||||
self._acquired = await self._lock.acquire()
|
||||
|
||||
if not self._acquired:
|
||||
if self.skip_if_locked:
|
||||
logger.warning(
|
||||
"Lock already acquired by another process, skipping", key=self.key
|
||||
)
|
||||
return self
|
||||
else:
|
||||
raise LockError(f"Failed to acquire lock: {self.key}")
|
||||
|
||||
self._extend_task = asyncio.create_task(self._extend_lock_periodically())
|
||||
logger.info("Acquired lock", key=self.key)
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
if self._extend_task:
|
||||
self._extend_task.cancel()
|
||||
try:
|
||||
await self._extend_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
if self._acquired and self._lock:
|
||||
try:
|
||||
await self._lock.release()
|
||||
logger.info("Released lock", key=self.key)
|
||||
except LockError:
|
||||
logger.debug("Lock already released or expired", key=self.key)
|
||||
|
||||
if self._redis:
|
||||
await self._redis.aclose()
|
||||
|
||||
@property
|
||||
def acquired(self) -> bool:
|
||||
return self._acquired
|
||||
|
||||
408
server/reflector/services/ics_sync.py
Normal file
408
server/reflector/services/ics_sync.py
Normal file
@@ -0,0 +1,408 @@
|
||||
"""
|
||||
ICS Calendar Synchronization Service
|
||||
|
||||
This module provides services for fetching, parsing, and synchronizing ICS (iCalendar)
|
||||
calendar feeds with room booking data in the database.
|
||||
|
||||
Key Components:
|
||||
- ICSFetchService: Handles HTTP fetching and parsing of ICS calendar data
|
||||
- ICSSyncService: Manages the synchronization process between ICS feeds and database
|
||||
|
||||
Example Usage:
|
||||
# Sync a room's calendar
|
||||
room = Room(id="room1", name="conference-room", ics_url="https://cal.example.com/room.ics")
|
||||
result = await ics_sync_service.sync_room_calendar(room)
|
||||
|
||||
# Result structure:
|
||||
{
|
||||
"status": "success", # success|unchanged|error|skipped
|
||||
"hash": "abc123...", # MD5 hash of ICS content
|
||||
"events_found": 5, # Events matching this room
|
||||
"total_events": 12, # Total events in calendar within time window
|
||||
"events_created": 2, # New events added to database
|
||||
"events_updated": 3, # Existing events modified
|
||||
"events_deleted": 1 # Events soft-deleted (no longer in calendar)
|
||||
}
|
||||
|
||||
Event Matching:
|
||||
Events are matched to rooms by checking if the room's full URL appears in the
|
||||
event's LOCATION or DESCRIPTION fields. Only events within a 25-hour window
|
||||
(1 hour ago to 24 hours from now) are processed.
|
||||
|
||||
Input: ICS calendar URL (e.g., "https://calendar.google.com/calendar/ical/...")
|
||||
Output: EventData objects with structured calendar information:
|
||||
{
|
||||
"ics_uid": "event123@google.com",
|
||||
"title": "Team Meeting",
|
||||
"description": "Weekly sync meeting",
|
||||
"location": "https://meet.company.com/conference-room",
|
||||
"start_time": datetime(2024, 1, 15, 14, 0, tzinfo=UTC),
|
||||
"end_time": datetime(2024, 1, 15, 15, 0, tzinfo=UTC),
|
||||
"attendees": [
|
||||
{"email": "user@company.com", "name": "John Doe", "role": "ORGANIZER"},
|
||||
{"email": "attendee@company.com", "name": "Jane Smith", "status": "ACCEPTED"}
|
||||
],
|
||||
"ics_raw_data": "BEGIN:VEVENT\nUID:event123@google.com\n..."
|
||||
}
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from typing import TypedDict
|
||||
|
||||
import httpx
|
||||
import pytz
|
||||
import structlog
|
||||
from icalendar import Calendar, Event
|
||||
|
||||
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
|
||||
from reflector.db.rooms import Room, rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.settings import settings
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
EVENT_WINDOW_DELTA_START = timedelta(hours=-1)
|
||||
EVENT_WINDOW_DELTA_END = timedelta(hours=24)
|
||||
|
||||
|
||||
class SyncStatus(str, Enum):
|
||||
SUCCESS = "success"
|
||||
UNCHANGED = "unchanged"
|
||||
ERROR = "error"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
class AttendeeData(TypedDict, total=False):
|
||||
email: str | None
|
||||
name: str | None
|
||||
status: str | None
|
||||
role: str | None
|
||||
|
||||
|
||||
class EventData(TypedDict):
|
||||
ics_uid: str
|
||||
title: str | None
|
||||
description: str | None
|
||||
location: str | None
|
||||
start_time: datetime
|
||||
end_time: datetime
|
||||
attendees: list[AttendeeData]
|
||||
ics_raw_data: str
|
||||
|
||||
|
||||
class SyncStats(TypedDict):
|
||||
events_created: int
|
||||
events_updated: int
|
||||
events_deleted: int
|
||||
|
||||
|
||||
class SyncResultBase(TypedDict):
|
||||
status: SyncStatus
|
||||
|
||||
|
||||
class SyncResult(SyncResultBase, total=False):
|
||||
hash: str | None
|
||||
events_found: int
|
||||
total_events: int
|
||||
events_created: int
|
||||
events_updated: int
|
||||
events_deleted: int
|
||||
error: str | None
|
||||
reason: str | None
|
||||
|
||||
|
||||
class ICSFetchService:
|
||||
def __init__(self):
|
||||
self.client = httpx.AsyncClient(
|
||||
timeout=30.0, headers={"User-Agent": "Reflector/1.0"}
|
||||
)
|
||||
|
||||
async def fetch_ics(self, url: str) -> str:
|
||||
response = await self.client.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
|
||||
def parse_ics(self, ics_content: str) -> Calendar:
|
||||
return Calendar.from_ical(ics_content)
|
||||
|
||||
def extract_room_events(
|
||||
self, calendar: Calendar, room_name: str, room_url: str
|
||||
) -> tuple[list[EventData], int]:
|
||||
events = []
|
||||
total_events = 0
|
||||
now = datetime.now(timezone.utc)
|
||||
window_start = now + EVENT_WINDOW_DELTA_START
|
||||
window_end = now + EVENT_WINDOW_DELTA_END
|
||||
|
||||
for component in calendar.walk():
|
||||
if component.name != "VEVENT":
|
||||
continue
|
||||
|
||||
status = component.get("STATUS", "").upper()
|
||||
if status == "CANCELLED":
|
||||
continue
|
||||
|
||||
# Count total non-cancelled events in the time window
|
||||
event_data = self._parse_event(component)
|
||||
if event_data and window_start <= event_data["start_time"] <= window_end:
|
||||
total_events += 1
|
||||
|
||||
# Check if event matches this room
|
||||
if self._event_matches_room(component, room_name, room_url):
|
||||
events.append(event_data)
|
||||
|
||||
return events, total_events
|
||||
|
||||
def _event_matches_room(self, event: Event, room_name: str, room_url: str) -> bool:
|
||||
location = str(event.get("LOCATION", ""))
|
||||
description = str(event.get("DESCRIPTION", ""))
|
||||
|
||||
# Only match full room URL
|
||||
# XXX leaved here as a patterns, to later be extended with tinyurl or such too
|
||||
patterns = [
|
||||
room_url,
|
||||
]
|
||||
|
||||
# Check location and description for patterns
|
||||
text_to_check = f"{location} {description}".lower()
|
||||
for pattern in patterns:
|
||||
if pattern.lower() in text_to_check:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _parse_event(self, event: Event) -> EventData | None:
|
||||
uid = str(event.get("UID", ""))
|
||||
summary = str(event.get("SUMMARY", ""))
|
||||
description = str(event.get("DESCRIPTION", ""))
|
||||
location = str(event.get("LOCATION", ""))
|
||||
dtstart = event.get("DTSTART")
|
||||
dtend = event.get("DTEND")
|
||||
|
||||
if not dtstart:
|
||||
return None
|
||||
|
||||
# Convert fields
|
||||
start_time = self._normalize_datetime(
|
||||
dtstart.dt if hasattr(dtstart, "dt") else dtstart
|
||||
)
|
||||
end_time = (
|
||||
self._normalize_datetime(dtend.dt if hasattr(dtend, "dt") else dtend)
|
||||
if dtend
|
||||
else start_time + timedelta(hours=1)
|
||||
)
|
||||
attendees = self._parse_attendees(event)
|
||||
|
||||
# Get raw event data for storage
|
||||
raw_data = event.to_ical().decode("utf-8")
|
||||
|
||||
return {
|
||||
"ics_uid": uid,
|
||||
"title": summary,
|
||||
"description": description,
|
||||
"location": location,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"attendees": attendees,
|
||||
"ics_raw_data": raw_data,
|
||||
}
|
||||
|
||||
def _normalize_datetime(self, dt) -> datetime:
|
||||
# Ensure datetime is with timezone, if not, assume UTC
|
||||
if isinstance(dt, date) and not isinstance(dt, datetime):
|
||||
dt = datetime.combine(dt, datetime.min.time())
|
||||
dt = pytz.UTC.localize(dt)
|
||||
elif isinstance(dt, datetime):
|
||||
if dt.tzinfo is None:
|
||||
dt = pytz.UTC.localize(dt)
|
||||
else:
|
||||
dt = dt.astimezone(pytz.UTC)
|
||||
|
||||
return dt
|
||||
|
||||
def _parse_attendees(self, event: Event) -> list[AttendeeData]:
|
||||
# Extracts attendee information from both ATTENDEE and ORGANIZER properties.
|
||||
# Handles malformed comma-separated email addresses in single ATTENDEE fields
|
||||
# by splitting them into separate attendee entries. Returns a list of attendee
|
||||
# data including email, name, status, and role information.
|
||||
final_attendees = []
|
||||
|
||||
attendees = event.get("ATTENDEE", [])
|
||||
if not isinstance(attendees, list):
|
||||
attendees = [attendees]
|
||||
for att in attendees:
|
||||
email_str = str(att).replace("mailto:", "") if att else None
|
||||
|
||||
# Handle malformed comma-separated email addresses in a single ATTENDEE field
|
||||
if email_str and "," in email_str:
|
||||
# Split comma-separated emails and create separate attendee entries
|
||||
email_parts = [email.strip() for email in email_str.split(",")]
|
||||
for email in email_parts:
|
||||
if email and "@" in email:
|
||||
clean_email = email.replace("MAILTO:", "").replace(
|
||||
"mailto:", ""
|
||||
)
|
||||
att_data: AttendeeData = {
|
||||
"email": clean_email,
|
||||
"name": att.params.get("CN")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
"status": att.params.get("PARTSTAT")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
"role": att.params.get("ROLE")
|
||||
if hasattr(att, "params") and email == email_parts[0]
|
||||
else None,
|
||||
}
|
||||
final_attendees.append(att_data)
|
||||
else:
|
||||
# Normal single attendee
|
||||
att_data: AttendeeData = {
|
||||
"email": email_str,
|
||||
"name": att.params.get("CN") if hasattr(att, "params") else None,
|
||||
"status": att.params.get("PARTSTAT")
|
||||
if hasattr(att, "params")
|
||||
else None,
|
||||
"role": att.params.get("ROLE") if hasattr(att, "params") else None,
|
||||
}
|
||||
final_attendees.append(att_data)
|
||||
|
||||
# Add organizer
|
||||
organizer = event.get("ORGANIZER")
|
||||
if organizer:
|
||||
org_email = (
|
||||
str(organizer).replace("mailto:", "").replace("MAILTO:", "")
|
||||
if organizer
|
||||
else None
|
||||
)
|
||||
org_data: AttendeeData = {
|
||||
"email": org_email,
|
||||
"name": organizer.params.get("CN")
|
||||
if hasattr(organizer, "params")
|
||||
else None,
|
||||
"role": "ORGANIZER",
|
||||
}
|
||||
final_attendees.append(org_data)
|
||||
|
||||
return final_attendees
|
||||
|
||||
|
||||
class ICSSyncService:
|
||||
def __init__(self):
|
||||
self.fetch_service = ICSFetchService()
|
||||
|
||||
async def sync_room_calendar(self, room: Room) -> SyncResult:
|
||||
async with RedisAsyncLock(
|
||||
f"ics_sync_room:{room.id}", skip_if_locked=True
|
||||
) as lock:
|
||||
if not lock.acquired:
|
||||
logger.warning("ICS sync already in progress for room", room_id=room.id)
|
||||
return {
|
||||
"status": SyncStatus.SKIPPED,
|
||||
"reason": "Sync already in progress",
|
||||
}
|
||||
|
||||
return await self._sync_room_calendar(room)
|
||||
|
||||
async def _sync_room_calendar(self, room: Room) -> SyncResult:
|
||||
if not room.ics_enabled or not room.ics_url:
|
||||
return {"status": SyncStatus.SKIPPED, "reason": "ICS not configured"}
|
||||
|
||||
try:
|
||||
if not self._should_sync(room):
|
||||
return {"status": SyncStatus.SKIPPED, "reason": "Not time to sync yet"}
|
||||
|
||||
ics_content = await self.fetch_service.fetch_ics(room.ics_url)
|
||||
calendar = self.fetch_service.parse_ics(ics_content)
|
||||
|
||||
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
|
||||
if room.ics_last_etag == content_hash:
|
||||
logger.info("No changes in ICS for room", room_id=room.id)
|
||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||
events, total_events = self.fetch_service.extract_room_events(
|
||||
calendar, room.name, room_url
|
||||
)
|
||||
return {
|
||||
"status": SyncStatus.UNCHANGED,
|
||||
"hash": content_hash,
|
||||
"events_found": len(events),
|
||||
"total_events": total_events,
|
||||
"events_created": 0,
|
||||
"events_updated": 0,
|
||||
"events_deleted": 0,
|
||||
}
|
||||
|
||||
# Extract matching events
|
||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||
events, total_events = self.fetch_service.extract_room_events(
|
||||
calendar, room.name, room_url
|
||||
)
|
||||
sync_result = await self._sync_events_to_database(room.id, events)
|
||||
|
||||
# Update room sync metadata
|
||||
await rooms_controller.update(
|
||||
room,
|
||||
{
|
||||
"ics_last_sync": datetime.now(timezone.utc),
|
||||
"ics_last_etag": content_hash,
|
||||
},
|
||||
mutate=False,
|
||||
)
|
||||
|
||||
return {
|
||||
"status": SyncStatus.SUCCESS,
|
||||
"hash": content_hash,
|
||||
"events_found": len(events),
|
||||
"total_events": total_events,
|
||||
**sync_result,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to sync ICS for room", room_id=room.id, error=str(e))
|
||||
return {"status": SyncStatus.ERROR, "error": str(e)}
|
||||
|
||||
def _should_sync(self, room: Room) -> bool:
|
||||
if not room.ics_last_sync:
|
||||
return True
|
||||
|
||||
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||
|
||||
async def _sync_events_to_database(
|
||||
self, room_id: str, events: list[EventData]
|
||||
) -> SyncStats:
|
||||
created = 0
|
||||
updated = 0
|
||||
|
||||
current_ics_uids = []
|
||||
|
||||
for event_data in events:
|
||||
calendar_event = CalendarEvent(room_id=room_id, **event_data)
|
||||
existing = await calendar_events_controller.get_by_ics_uid(
|
||||
room_id, event_data["ics_uid"]
|
||||
)
|
||||
|
||||
if existing:
|
||||
updated += 1
|
||||
else:
|
||||
created += 1
|
||||
|
||||
await calendar_events_controller.upsert(calendar_event)
|
||||
current_ics_uids.append(event_data["ics_uid"])
|
||||
|
||||
# Soft delete events that are no longer in calendar
|
||||
deleted = await calendar_events_controller.soft_delete_missing(
|
||||
room_id, current_ics_uids
|
||||
)
|
||||
|
||||
return {
|
||||
"events_created": created,
|
||||
"events_updated": updated,
|
||||
"events_deleted": deleted,
|
||||
}
|
||||
|
||||
|
||||
ics_sync_service = ICSSyncService()
|
||||
@@ -10,6 +10,7 @@ from reflector.db.meetings import (
|
||||
meeting_consent_controller,
|
||||
meetings_controller,
|
||||
)
|
||||
from reflector.db.rooms import rooms_controller
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -41,3 +42,34 @@ async def meeting_audio_consent(
|
||||
updated_consent = await meeting_consent_controller.upsert(consent)
|
||||
|
||||
return {"status": "success", "consent_id": updated_consent.id}
|
||||
|
||||
|
||||
@router.patch("/meetings/{meeting_id}/deactivate")
|
||||
async def meeting_deactivate(
|
||||
meeting_id: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
if not user_id:
|
||||
raise HTTPException(status_code=401, detail="Authentication required")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if not meeting.is_active:
|
||||
return {"status": "success", "meeting_id": meeting_id}
|
||||
|
||||
# Only room owner or meeting creator can deactivate
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
if user_id != room.user_id and user_id != meeting.user_id:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Only the room owner can deactivate meetings"
|
||||
)
|
||||
|
||||
await meetings_controller.update_meeting(meeting_id, is_active=False)
|
||||
|
||||
return {"status": "success", "meeting_id": meeting_id}
|
||||
|
||||
@@ -1,34 +1,27 @@
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated, Literal, Optional
|
||||
from enum import Enum
|
||||
from typing import Annotated, Any, Literal, Optional
|
||||
|
||||
import asyncpg.exceptions
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi_pagination import Page
|
||||
from fastapi_pagination.ext.databases import apaginate
|
||||
from pydantic import BaseModel
|
||||
from redis.exceptions import LockError
|
||||
|
||||
import reflector.auth as auth
|
||||
from reflector.db import get_database
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.services.ics_sync import ics_sync_service
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import create_meeting, upload_logo
|
||||
from reflector.worker.webhook import test_webhook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def parse_datetime_with_timezone(iso_string: str) -> datetime:
|
||||
"""Parse ISO datetime string and ensure timezone awareness (defaults to UTC if naive)."""
|
||||
dt = datetime.fromisoformat(iso_string)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
|
||||
|
||||
class Room(BaseModel):
|
||||
id: str
|
||||
@@ -43,6 +36,11 @@ class Room(BaseModel):
|
||||
recording_type: str
|
||||
recording_trigger: str
|
||||
is_shared: bool
|
||||
ics_url: Optional[str] = None
|
||||
ics_fetch_interval: int = 300
|
||||
ics_enabled: bool = False
|
||||
ics_last_sync: Optional[datetime] = None
|
||||
ics_last_etag: Optional[str] = None
|
||||
|
||||
|
||||
class RoomDetails(Room):
|
||||
@@ -54,10 +52,22 @@ class Meeting(BaseModel):
|
||||
id: str
|
||||
room_name: str
|
||||
room_url: str
|
||||
# TODO it's not always present, | None
|
||||
host_room_url: str
|
||||
start_date: datetime
|
||||
end_date: datetime
|
||||
user_id: str | None = None
|
||||
room_id: str | None = None
|
||||
is_locked: bool = False
|
||||
room_mode: Literal["normal", "group"] = "normal"
|
||||
recording_type: Literal["none", "local", "cloud"] = "cloud"
|
||||
recording_trigger: Literal[
|
||||
"none", "prompt", "automatic", "automatic-2nd-participant"
|
||||
] = "automatic-2nd-participant"
|
||||
num_clients: int = 0
|
||||
is_active: bool = True
|
||||
calendar_event_id: str | None = None
|
||||
calendar_metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class CreateRoom(BaseModel):
|
||||
@@ -72,20 +82,30 @@ class CreateRoom(BaseModel):
|
||||
is_shared: bool
|
||||
webhook_url: str
|
||||
webhook_secret: str
|
||||
ics_url: Optional[str] = None
|
||||
ics_fetch_interval: int = 300
|
||||
ics_enabled: bool = False
|
||||
|
||||
|
||||
class UpdateRoom(BaseModel):
|
||||
name: str
|
||||
zulip_auto_post: bool
|
||||
zulip_stream: str
|
||||
zulip_topic: str
|
||||
is_locked: bool
|
||||
room_mode: str
|
||||
recording_type: str
|
||||
recording_trigger: str
|
||||
is_shared: bool
|
||||
webhook_url: str
|
||||
webhook_secret: str
|
||||
name: Optional[str] = None
|
||||
zulip_auto_post: Optional[bool] = None
|
||||
zulip_stream: Optional[str] = None
|
||||
zulip_topic: Optional[str] = None
|
||||
is_locked: Optional[bool] = None
|
||||
room_mode: Optional[str] = None
|
||||
recording_type: Optional[str] = None
|
||||
recording_trigger: Optional[str] = None
|
||||
is_shared: Optional[bool] = None
|
||||
webhook_url: Optional[str] = None
|
||||
webhook_secret: Optional[str] = None
|
||||
ics_url: Optional[str] = None
|
||||
ics_fetch_interval: Optional[int] = None
|
||||
ics_enabled: Optional[bool] = None
|
||||
|
||||
|
||||
class CreateRoomMeeting(BaseModel):
|
||||
allow_duplicated: Optional[bool] = False
|
||||
|
||||
|
||||
class DeletionStatus(BaseModel):
|
||||
@@ -100,6 +120,59 @@ class WebhookTestResult(BaseModel):
|
||||
response_preview: str | None = None
|
||||
|
||||
|
||||
class ICSStatus(BaseModel):
|
||||
status: Literal["enabled", "disabled"]
|
||||
last_sync: Optional[datetime] = None
|
||||
next_sync: Optional[datetime] = None
|
||||
last_etag: Optional[str] = None
|
||||
events_count: int = 0
|
||||
|
||||
|
||||
class SyncStatus(str, Enum):
|
||||
success = "success"
|
||||
unchanged = "unchanged"
|
||||
error = "error"
|
||||
skipped = "skipped"
|
||||
|
||||
|
||||
class ICSSyncResult(BaseModel):
|
||||
status: SyncStatus
|
||||
hash: Optional[str] = None
|
||||
events_found: int = 0
|
||||
total_events: int = 0
|
||||
events_created: int = 0
|
||||
events_updated: int = 0
|
||||
events_deleted: int = 0
|
||||
error: Optional[str] = None
|
||||
reason: Optional[str] = None
|
||||
|
||||
|
||||
class CalendarEventResponse(BaseModel):
|
||||
id: str
|
||||
room_id: str
|
||||
ics_uid: str
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
start_time: datetime
|
||||
end_time: datetime
|
||||
attendees: Optional[list[dict]] = None
|
||||
location: Optional[str] = None
|
||||
last_synced: datetime
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def parse_datetime_with_timezone(iso_string: str) -> datetime:
|
||||
"""Parse ISO datetime string and ensure timezone awareness (defaults to UTC if naive)."""
|
||||
dt = datetime.fromisoformat(iso_string)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
|
||||
|
||||
@router.get("/rooms", response_model=Page[RoomDetails])
|
||||
async def rooms_list(
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
@@ -129,6 +202,30 @@ async def rooms_get(
|
||||
return room
|
||||
|
||||
|
||||
@router.get("/rooms/name/{room_name}", response_model=RoomDetails)
|
||||
async def rooms_get_by_name(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
# Convert to RoomDetails format (add webhook fields if user is owner)
|
||||
room_dict = room.__dict__.copy()
|
||||
if user_id == room.user_id:
|
||||
# User is owner, include webhook details if available
|
||||
room_dict["webhook_url"] = getattr(room, "webhook_url", None)
|
||||
room_dict["webhook_secret"] = getattr(room, "webhook_secret", None)
|
||||
else:
|
||||
# Non-owner, hide webhook details
|
||||
room_dict["webhook_url"] = None
|
||||
room_dict["webhook_secret"] = None
|
||||
|
||||
return RoomDetails(**room_dict)
|
||||
|
||||
|
||||
@router.post("/rooms", response_model=Room)
|
||||
async def rooms_create(
|
||||
room: CreateRoom,
|
||||
@@ -149,6 +246,9 @@ async def rooms_create(
|
||||
is_shared=room.is_shared,
|
||||
webhook_url=room.webhook_url,
|
||||
webhook_secret=room.webhook_secret,
|
||||
ics_url=room.ics_url,
|
||||
ics_fetch_interval=room.ics_fetch_interval,
|
||||
ics_enabled=room.ics_enabled,
|
||||
)
|
||||
|
||||
|
||||
@@ -183,6 +283,7 @@ async def rooms_delete(
|
||||
@router.post("/rooms/{room_name}/meeting", response_model=Meeting)
|
||||
async def rooms_create_meeting(
|
||||
room_name: str,
|
||||
info: CreateRoomMeeting,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
@@ -190,50 +291,44 @@ async def rooms_create_meeting(
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
current_time = datetime.now(timezone.utc)
|
||||
meeting = await meetings_controller.get_active(room=room, current_time=current_time)
|
||||
try:
|
||||
async with RedisAsyncLock(
|
||||
f"create_meeting:{room_name}",
|
||||
timeout=30,
|
||||
extend_interval=10,
|
||||
blocking_timeout=5.0,
|
||||
) as lock:
|
||||
current_time = datetime.now(timezone.utc)
|
||||
|
||||
if meeting is None:
|
||||
end_date = current_time + timedelta(hours=8)
|
||||
meeting = None
|
||||
if not info.allow_duplicated:
|
||||
meeting = await meetings_controller.get_active(
|
||||
room=room, current_time=current_time
|
||||
)
|
||||
|
||||
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
|
||||
|
||||
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
|
||||
|
||||
# Now try to save to database
|
||||
try:
|
||||
meeting = await meetings_controller.create(
|
||||
id=whereby_meeting["meetingId"],
|
||||
room_name=whereby_meeting["roomName"],
|
||||
room_url=whereby_meeting["roomUrl"],
|
||||
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||
start_date=parse_datetime_with_timezone(whereby_meeting["startDate"]),
|
||||
end_date=parse_datetime_with_timezone(whereby_meeting["endDate"]),
|
||||
room=room,
|
||||
)
|
||||
except (asyncpg.exceptions.UniqueViolationError, sqlite3.IntegrityError):
|
||||
# Another request already created a meeting for this room
|
||||
# Log this race condition occurrence
|
||||
logger.warning(
|
||||
"Race condition detected for room %s and meeting %s - fetching existing meeting",
|
||||
room.name,
|
||||
whereby_meeting["meetingId"],
|
||||
)
|
||||
|
||||
# Fetch the meeting that was created by the other request
|
||||
meeting = await meetings_controller.get_active(
|
||||
room=room, current_time=current_time
|
||||
)
|
||||
if meeting is None:
|
||||
# Edge case: meeting was created but expired/deleted between checks
|
||||
logger.error(
|
||||
"Meeting disappeared after race condition for room %s",
|
||||
room.name,
|
||||
exc_info=True,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=503, detail="Unable to join meeting - please try again"
|
||||
end_date = current_time + timedelta(hours=8)
|
||||
|
||||
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
|
||||
|
||||
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
id=whereby_meeting["meetingId"],
|
||||
room_name=whereby_meeting["roomName"],
|
||||
room_url=whereby_meeting["roomUrl"],
|
||||
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||
start_date=parse_datetime_with_timezone(
|
||||
whereby_meeting["startDate"]
|
||||
),
|
||||
end_date=parse_datetime_with_timezone(whereby_meeting["endDate"]),
|
||||
room=room,
|
||||
)
|
||||
except LockError:
|
||||
logger.warning("Failed to acquire lock for room %s within timeout", room_name)
|
||||
raise HTTPException(
|
||||
status_code=503, detail="Meeting creation in progress, please try again"
|
||||
)
|
||||
|
||||
if user_id != room.user_id:
|
||||
meeting.host_room_url = ""
|
||||
@@ -260,3 +355,202 @@ async def rooms_test_webhook(
|
||||
|
||||
result = await test_webhook(room_id)
|
||||
return WebhookTestResult(**result)
|
||||
|
||||
|
||||
@router.post("/rooms/{room_name}/ics/sync", response_model=ICSSyncResult)
|
||||
async def rooms_sync_ics(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
if user_id != room.user_id:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Only room owner can trigger ICS sync"
|
||||
)
|
||||
|
||||
if not room.ics_enabled or not room.ics_url:
|
||||
raise HTTPException(status_code=400, detail="ICS not configured for this room")
|
||||
|
||||
result = await ics_sync_service.sync_room_calendar(room)
|
||||
|
||||
if result["status"] == "error":
|
||||
raise HTTPException(
|
||||
status_code=500, detail=result.get("error", "Unknown error")
|
||||
)
|
||||
|
||||
return ICSSyncResult(**result)
|
||||
|
||||
|
||||
@router.get("/rooms/{room_name}/ics/status", response_model=ICSStatus)
|
||||
async def rooms_ics_status(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
if user_id != room.user_id:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Only room owner can view ICS status"
|
||||
)
|
||||
|
||||
next_sync = None
|
||||
if room.ics_enabled and room.ics_last_sync:
|
||||
next_sync = room.ics_last_sync + timedelta(seconds=room.ics_fetch_interval)
|
||||
|
||||
events = await calendar_events_controller.get_by_room(
|
||||
room.id, include_deleted=False
|
||||
)
|
||||
|
||||
return ICSStatus(
|
||||
status="enabled" if room.ics_enabled else "disabled",
|
||||
last_sync=room.ics_last_sync,
|
||||
next_sync=next_sync,
|
||||
last_etag=room.ics_last_etag,
|
||||
events_count=len(events),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/rooms/{room_name}/meetings", response_model=list[CalendarEventResponse])
|
||||
async def rooms_list_meetings(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
events = await calendar_events_controller.get_by_room(
|
||||
room.id, include_deleted=False
|
||||
)
|
||||
|
||||
if user_id != room.user_id:
|
||||
for event in events:
|
||||
event.description = None
|
||||
event.attendees = None
|
||||
|
||||
return events
|
||||
|
||||
|
||||
@router.get(
|
||||
"/rooms/{room_name}/meetings/upcoming", response_model=list[CalendarEventResponse]
|
||||
)
|
||||
async def rooms_list_upcoming_meetings(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
minutes_ahead: int = 120,
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
events = await calendar_events_controller.get_upcoming(
|
||||
room.id, minutes_ahead=minutes_ahead
|
||||
)
|
||||
|
||||
if user_id != room.user_id:
|
||||
for event in events:
|
||||
event.description = None
|
||||
event.attendees = None
|
||||
|
||||
return events
|
||||
|
||||
|
||||
@router.get("/rooms/{room_name}/meetings/active", response_model=list[Meeting])
|
||||
async def rooms_list_active_meetings(
|
||||
room_name: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
current_time = datetime.now(timezone.utc)
|
||||
meetings = await meetings_controller.get_all_active_for_room(
|
||||
room=room, current_time=current_time
|
||||
)
|
||||
|
||||
# Hide host URLs from non-owners
|
||||
if user_id != room.user_id:
|
||||
for meeting in meetings:
|
||||
meeting.host_room_url = ""
|
||||
|
||||
return meetings
|
||||
|
||||
|
||||
@router.get("/rooms/{room_name}/meetings/{meeting_id}", response_model=Meeting)
|
||||
async def rooms_get_meeting(
|
||||
room_name: str,
|
||||
meeting_id: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
"""Get a single meeting by ID within a specific room."""
|
||||
user_id = user["sub"] if user else None
|
||||
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if meeting.room_id != room.id:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Meeting does not belong to this room"
|
||||
)
|
||||
|
||||
if user_id != room.user_id and not room.is_shared:
|
||||
meeting.host_room_url = ""
|
||||
|
||||
return meeting
|
||||
|
||||
|
||||
@router.post("/rooms/{room_name}/meetings/{meeting_id}/join", response_model=Meeting)
|
||||
async def rooms_join_meeting(
|
||||
room_name: str,
|
||||
meeting_id: str,
|
||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||
):
|
||||
user_id = user["sub"] if user else None
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if meeting.room_id != room.id:
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Meeting does not belong to this room"
|
||||
)
|
||||
|
||||
if not meeting.is_active:
|
||||
raise HTTPException(status_code=400, detail="Meeting is not active")
|
||||
|
||||
current_time = datetime.now(timezone.utc)
|
||||
if meeting.end_date <= current_time:
|
||||
raise HTTPException(status_code=400, detail="Meeting has ended")
|
||||
|
||||
# Hide host URL from non-owners
|
||||
if user_id != room.user_id:
|
||||
meeting.host_room_url = ""
|
||||
|
||||
return meeting
|
||||
|
||||
@@ -68,8 +68,7 @@ async def whereby_webhook(event: WherebyWebhookEvent, request: Request):
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if event.type in ["room.client.joined", "room.client.left"]:
|
||||
await meetings_controller.update_meeting(
|
||||
meeting.id, num_clients=event.data["numClients"]
|
||||
)
|
||||
update_data = {"num_clients": event.data["numClients"]}
|
||||
await meetings_controller.update_meeting(meeting.id, **update_data)
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -20,6 +20,7 @@ else:
|
||||
"reflector.worker.healthcheck",
|
||||
"reflector.worker.process",
|
||||
"reflector.worker.cleanup",
|
||||
"reflector.worker.ics_sync",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -37,6 +38,14 @@ else:
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"sync_all_ics_calendars": {
|
||||
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
|
||||
"schedule": 60.0, # Run every minute to check which rooms need sync
|
||||
},
|
||||
"create_upcoming_meetings": {
|
||||
"task": "reflector.worker.ics_sync.create_upcoming_meetings",
|
||||
"schedule": 30.0, # Run every 30 seconds to create upcoming meetings
|
||||
},
|
||||
}
|
||||
|
||||
if settings.PUBLIC_MODE:
|
||||
|
||||
175
server/reflector/worker/ics_sync.py
Normal file
175
server/reflector/worker/ics_sync.py
Normal file
@@ -0,0 +1,175 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.services.ics_sync import SyncStatus, ics_sync_service
|
||||
from reflector.whereby import create_meeting, upload_logo
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def sync_room_ics(room_id: str):
|
||||
try:
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
if not room:
|
||||
logger.warning("Room not found for ICS sync", room_id=room_id)
|
||||
return
|
||||
|
||||
if not room.ics_enabled or not room.ics_url:
|
||||
logger.debug("ICS not enabled for room", room_id=room_id)
|
||||
return
|
||||
|
||||
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
||||
result = await ics_sync_service.sync_room_calendar(room)
|
||||
|
||||
if result["status"] == SyncStatus.SUCCESS:
|
||||
logger.info(
|
||||
"ICS sync completed successfully",
|
||||
room_id=room_id,
|
||||
events_found=result.get("events_found", 0),
|
||||
events_created=result.get("events_created", 0),
|
||||
events_updated=result.get("events_updated", 0),
|
||||
events_deleted=result.get("events_deleted", 0),
|
||||
)
|
||||
elif result["status"] == SyncStatus.UNCHANGED:
|
||||
logger.debug("ICS content unchanged", room_id=room_id)
|
||||
elif result["status"] == SyncStatus.ERROR:
|
||||
logger.error("ICS sync failed", room_id=room_id, error=result.get("error"))
|
||||
else:
|
||||
logger.debug(
|
||||
"ICS sync skipped", room_id=room_id, reason=result.get("reason")
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error during ICS sync", room_id=room_id, error=str(e))
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def sync_all_ics_calendars():
|
||||
try:
|
||||
logger.info("Starting sync for all ICS-enabled rooms")
|
||||
|
||||
ics_enabled_rooms = await rooms_controller.get_ics_enabled()
|
||||
logger.info(f"Found {len(ics_enabled_rooms)} rooms with ICS enabled")
|
||||
|
||||
for room in ics_enabled_rooms:
|
||||
if not _should_sync(room):
|
||||
logger.debug("Skipping room, not time to sync yet", room_id=room.id)
|
||||
continue
|
||||
|
||||
sync_room_ics.delay(room.id)
|
||||
|
||||
logger.info("Queued sync tasks for all eligible rooms")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in sync_all_ics_calendars", error=str(e))
|
||||
|
||||
|
||||
def _should_sync(room) -> bool:
|
||||
if not room.ics_last_sync:
|
||||
return True
|
||||
|
||||
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||
|
||||
|
||||
MEETING_DEFAULT_DURATION = timedelta(hours=1)
|
||||
|
||||
|
||||
async def create_upcoming_meetings_for_event(event, create_window, room_id, room):
|
||||
if event.start_time <= create_window:
|
||||
return
|
||||
existing_meeting = await meetings_controller.get_by_calendar_event(event.id)
|
||||
|
||||
if existing_meeting:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Pre-creating meeting for calendar event",
|
||||
room_id=room_id,
|
||||
event_id=event.id,
|
||||
event_title=event.title,
|
||||
)
|
||||
|
||||
try:
|
||||
end_date = event.end_time or (event.start_time + MEETING_DEFAULT_DURATION)
|
||||
|
||||
whereby_meeting = await create_meeting(
|
||||
"",
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
)
|
||||
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
id=whereby_meeting["meetingId"],
|
||||
room_name=whereby_meeting["roomName"],
|
||||
room_url=whereby_meeting["roomUrl"],
|
||||
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
|
||||
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
|
||||
room=room,
|
||||
calendar_event_id=event.id,
|
||||
calendar_metadata={
|
||||
"title": event.title,
|
||||
"description": event.description,
|
||||
"attendees": event.attendees,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Meeting pre-created successfully",
|
||||
meeting_id=meeting.id,
|
||||
event_id=event.id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to pre-create meeting",
|
||||
room_id=room_id,
|
||||
event_id=event.id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def create_upcoming_meetings():
|
||||
async with RedisAsyncLock("create_upcoming_meetings", skip_if_locked=True) as lock:
|
||||
if not lock.acquired:
|
||||
logger.warning(
|
||||
"Another worker is already creating upcoming meetings, skipping"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info("Starting creation of upcoming meetings")
|
||||
|
||||
ics_enabled_rooms = await rooms_controller.get_ics_enabled()
|
||||
now = datetime.now(timezone.utc)
|
||||
create_window = now - timedelta(minutes=6)
|
||||
|
||||
for room in ics_enabled_rooms:
|
||||
events = await calendar_events_controller.get_upcoming(
|
||||
room.id,
|
||||
minutes_ahead=7,
|
||||
)
|
||||
|
||||
for event in events:
|
||||
await create_upcoming_meetings_for_event(
|
||||
event, create_window, room.id, room
|
||||
)
|
||||
logger.info("Completed pre-creation check for upcoming meetings")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in create_upcoming_meetings", error=str(e))
|
||||
@@ -9,6 +9,7 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
from redis.exceptions import LockError
|
||||
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
@@ -16,6 +17,7 @@ from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.redis_cache import get_redis_client
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
|
||||
@@ -147,24 +149,94 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_meetings():
|
||||
"""
|
||||
Checks which meetings are still active and deactivates those that have ended.
|
||||
|
||||
Deactivation logic:
|
||||
- Active sessions: Keep meeting active regardless of scheduled time
|
||||
- No active sessions:
|
||||
* Calendar meetings:
|
||||
- If previously used (had sessions): Deactivate immediately
|
||||
- If never used: Keep active until scheduled end time, then deactivate
|
||||
* On-the-fly meetings: Deactivate immediately (created when someone joins,
|
||||
so no sessions means everyone left)
|
||||
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
logger.info("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for meeting in meetings:
|
||||
is_active = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
if end_date > datetime.now(timezone.utc):
|
||||
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
|
||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=120)
|
||||
|
||||
try:
|
||||
if not lock.acquire(blocking=False):
|
||||
logger_.debug("Meeting is being processed by another worker, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Process the meeting
|
||||
should_deactivate = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
# This API call could be slow, extend lock if needed
|
||||
response = await get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after slow operation to ensure we still hold it
|
||||
lock.extend(120, replace_ttl=True)
|
||||
except LockError:
|
||||
logger_.warning("Lost lock for meeting, skipping")
|
||||
continue
|
||||
|
||||
room_sessions = response.get("results", [])
|
||||
is_active = not room_sessions or any(
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
)
|
||||
if not is_active:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger.info("Meeting %s is deactivated", meeting.id)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
|
||||
logger.info("Processed meetings")
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
elif has_had_sessions:
|
||||
should_deactivate = True
|
||||
logger_.info("Meeting ended - all participants left")
|
||||
elif current_time > end_date:
|
||||
should_deactivate = True
|
||||
logger_.info(
|
||||
"Meeting deactivated - scheduled time ended with no participants",
|
||||
meeting.id,
|
||||
)
|
||||
else:
|
||||
logger_.debug("Meeting not yet started, keep it")
|
||||
|
||||
if should_deactivate:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger_.info("Meeting is deactivated")
|
||||
|
||||
processed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger_.error(f"Error processing meeting", exc_info=True)
|
||||
finally:
|
||||
try:
|
||||
lock.release()
|
||||
except LockError:
|
||||
pass # Lock already released or expired
|
||||
|
||||
logger.info(
|
||||
f"Processed meetings finished",
|
||||
processed_count=processed_count,
|
||||
skipped_count=skipped_count,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
|
||||
Reference in New Issue
Block a user