diff --git a/server/migrations/versions/6025e9b2bef2_remove_one_active_meeting_per_room_.py b/server/migrations/versions/6025e9b2bef2_remove_one_active_meeting_per_room_.py new file mode 100644 index 00000000..9bf8af41 --- /dev/null +++ b/server/migrations/versions/6025e9b2bef2_remove_one_active_meeting_per_room_.py @@ -0,0 +1,53 @@ +"""remove_one_active_meeting_per_room_constraint + +Revision ID: 6025e9b2bef2 +Revises: 9f5c78d352d6 +Create Date: 2025-08-18 18:45:44.418392 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "6025e9b2bef2" +down_revision: Union[str, None] = "9f5c78d352d6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Remove the unique constraint that prevents multiple active meetings per room + # This is needed to support calendar integration with overlapping meetings + # Check if index exists before trying to drop it + from alembic import context + + if context.get_context().dialect.name == "postgresql": + conn = op.get_bind() + result = conn.execute( + sa.text( + "SELECT 1 FROM pg_indexes WHERE indexname = 'idx_one_active_meeting_per_room'" + ) + ) + if result.fetchone(): + op.drop_index("idx_one_active_meeting_per_room", table_name="meeting") + else: + # For SQLite, just try to drop it + try: + op.drop_index("idx_one_active_meeting_per_room", table_name="meeting") + except: + pass + + +def downgrade() -> None: + # Restore the unique constraint + op.create_index( + "idx_one_active_meeting_per_room", + "meeting", + ["room_id"], + unique=True, + postgresql_where=sa.text("is_active = true"), + sqlite_where=sa.text("is_active = 1"), + ) diff --git a/server/migrations/versions/d4a1c446458c_add_grace_period_fields_to_meeting.py b/server/migrations/versions/d4a1c446458c_add_grace_period_fields_to_meeting.py new file mode 100644 index 00000000..868e3479 --- /dev/null +++ b/server/migrations/versions/d4a1c446458c_add_grace_period_fields_to_meeting.py @@ -0,0 +1,34 @@ +"""add_grace_period_fields_to_meeting + +Revision ID: d4a1c446458c +Revises: 6025e9b2bef2 +Create Date: 2025-08-18 18:50:37.768052 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "d4a1c446458c" +down_revision: Union[str, None] = "6025e9b2bef2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add fields to track when participants left for grace period logic + op.add_column( + "meeting", sa.Column("last_participant_left_at", sa.DateTime(timezone=True)) + ) + op.add_column( + "meeting", + sa.Column("grace_period_minutes", sa.Integer, server_default=sa.text("15")), + ) + + +def downgrade() -> None: + op.drop_column("meeting", "grace_period_minutes") + op.drop_column("meeting", "last_participant_left_at") diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index d5816c86..ed5635db 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -48,6 +48,8 @@ meetings = sa.Table( sa.ForeignKey("calendar_event.id", ondelete="SET NULL"), ), sa.Column("calendar_metadata", JSONB), + sa.Column("last_participant_left_at", sa.DateTime(timezone=True)), + sa.Column("grace_period_minutes", sa.Integer, server_default=sa.text("15")), sa.Index("idx_meeting_room_id", "room_id"), sa.Index("idx_meeting_calendar_event", "calendar_event_id"), ) @@ -90,6 +92,8 @@ class Meeting(BaseModel): is_active: bool = True calendar_event_id: str | None = None calendar_metadata: dict[str, Any] | None = None + last_participant_left_at: datetime | None = None + grace_period_minutes: int = 15 class MeetingController: @@ -153,6 +157,7 @@ class MeetingController: async def get_active(self, room: Room, current_time: datetime) -> Meeting: """ Get latest active meeting for a room. + For backward compatibility, returns the most recent active meeting. """ end_date = getattr(meetings.c, "end_date") query = ( @@ -172,6 +177,47 @@ class MeetingController: return Meeting(**result) + async def get_all_active_for_room( + self, room: Room, current_time: datetime + ) -> list[Meeting]: + """ + Get all active meetings for a room. + This supports multiple concurrent meetings per room. + """ + end_date = getattr(meetings.c, "end_date") + query = ( + meetings.select() + .where( + sa.and_( + meetings.c.room_id == room.id, + meetings.c.end_date > current_time, + meetings.c.is_active, + ) + ) + .order_by(end_date.desc()) + ) + results = await get_database().fetch_all(query) + return [Meeting(**result) for result in results] + + async def get_active_by_calendar_event( + self, room: Room, calendar_event_id: str, current_time: datetime + ) -> Meeting | None: + """ + Get active meeting for a specific calendar event. + """ + query = meetings.select().where( + sa.and_( + meetings.c.room_id == room.id, + meetings.c.calendar_event_id == calendar_event_id, + meetings.c.end_date > current_time, + meetings.c.is_active, + ) + ) + result = await get_database().fetch_one(query) + if not result: + return None + return Meeting(**result) + async def get_by_id(self, meeting_id: str, **kwargs) -> Meeting | None: """ Get a meeting by id diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 0170bd2f..5d28614b 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -375,3 +375,65 @@ async def rooms_list_upcoming_meetings( event.attendees = None return events + + +@router.get("/rooms/{room_name}/meetings/active", response_model=list[Meeting]) +async def rooms_list_active_meetings( + room_name: str, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +): + """List all active meetings for a room (supports multiple active meetings)""" + user_id = user["sub"] if user else None + room = await rooms_controller.get_by_name(room_name) + + if not room: + raise HTTPException(status_code=404, detail="Room not found") + + current_time = datetime.now(timezone.utc) + meetings = await meetings_controller.get_all_active_for_room( + room=room, current_time=current_time + ) + + # Hide host URLs from non-owners + if user_id != room.user_id: + for meeting in meetings: + meeting.host_room_url = "" + + return meetings + + +@router.post("/rooms/{room_name}/meetings/{meeting_id}/join", response_model=Meeting) +async def rooms_join_meeting( + room_name: str, + meeting_id: str, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +): + """Join a specific meeting by ID""" + user_id = user["sub"] if user else None + room = await rooms_controller.get_by_name(room_name) + + if not room: + raise HTTPException(status_code=404, detail="Room not found") + + meeting = await meetings_controller.get_by_id(meeting_id) + + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + if meeting.room_id != room.id: + raise HTTPException( + status_code=403, detail="Meeting does not belong to this room" + ) + + if not meeting.is_active: + raise HTTPException(status_code=400, detail="Meeting is not active") + + current_time = datetime.now(timezone.utc) + if meeting.end_date <= current_time: + raise HTTPException(status_code=400, detail="Meeting has ended") + + # Hide host URL from non-owners + if user_id != room.user_id: + meeting.host_room_url = "" + + return meeting diff --git a/server/reflector/views/whereby.py b/server/reflector/views/whereby.py index c1682621..7a6af45e 100644 --- a/server/reflector/views/whereby.py +++ b/server/reflector/views/whereby.py @@ -68,8 +68,13 @@ async def whereby_webhook(event: WherebyWebhookEvent, request: Request): raise HTTPException(status_code=404, detail="Meeting not found") if event.type in ["room.client.joined", "room.client.left"]: - await meetings_controller.update_meeting( - meeting.id, num_clients=event.data["numClients"] - ) + update_data = {"num_clients": event.data["numClients"]} + + # Clear grace period if participant joined + if event.type == "room.client.joined" and event.data["numClients"] > 0: + if meeting.last_participant_left_at: + update_data["last_participant_left_at"] = None + + await meetings_controller.update_meeting(meeting.id, **update_data) return {"status": "ok"} diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index c3704207..73b19046 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -1,6 +1,6 @@ import json import os -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from urllib.parse import unquote import av @@ -146,24 +146,76 @@ async def process_recording(bucket_name: str, object_key: str): @shared_task @asynctask async def process_meetings(): + """ + Checks which meetings are still active and deactivates those that have ended. + Supports multiple active meetings per room and grace period logic. + """ logger.info("Processing meetings") meetings = await meetings_controller.get_all_active() + current_time = datetime.now(timezone.utc) + for meeting in meetings: - is_active = False + should_deactivate = False end_date = meeting.end_date if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) - if end_date > datetime.now(timezone.utc): + + # Check if meeting has passed its scheduled end time + if end_date <= current_time: + # For calendar meetings, force close 30 minutes after scheduled end + if meeting.calendar_event_id: + if current_time > end_date + timedelta(minutes=30): + should_deactivate = True + logger.info( + "Meeting %s forced closed 30 min after calendar end", meeting.id + ) + else: + # Unscheduled meetings follow normal closure rules + should_deactivate = True + + # Check Whereby room sessions only if not already deactivating + if not should_deactivate and end_date > current_time: response = await get_room_sessions(meeting.room_name) room_sessions = response.get("results", []) - is_active = not room_sessions or any( + has_active_sessions = room_sessions and any( rs["endedAt"] is None for rs in room_sessions ) - if not is_active: + + if not has_active_sessions: + # No active sessions - check grace period + if meeting.num_clients == 0: + if meeting.last_participant_left_at: + # Check if grace period has expired + grace_period = timedelta(minutes=meeting.grace_period_minutes) + if ( + current_time + > meeting.last_participant_left_at + grace_period + ): + should_deactivate = True + logger.info("Meeting %s grace period expired", meeting.id) + else: + # First time all participants left, record the time + await meetings_controller.update_meeting( + meeting.id, last_participant_left_at=current_time + ) + logger.info( + "Meeting %s marked empty at %s", meeting.id, current_time + ) + else: + # Has active sessions - clear grace period if set + if meeting.last_participant_left_at: + await meetings_controller.update_meeting( + meeting.id, last_participant_left_at=None + ) + logger.info( + "Meeting %s reactivated - participant rejoined", meeting.id + ) + + if should_deactivate: await meetings_controller.update_meeting(meeting.id, is_active=False) logger.info("Meeting %s is deactivated", meeting.id) - logger.info("Processed meetings") + logger.info("Processed %d meetings", len(meetings)) @shared_task diff --git a/server/tests/test_multiple_active_meetings.py b/server/tests/test_multiple_active_meetings.py new file mode 100644 index 00000000..680790f6 --- /dev/null +++ b/server/tests/test_multiple_active_meetings.py @@ -0,0 +1,283 @@ +"""Tests for multiple active meetings per room functionality.""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from reflector.db.calendar_events import CalendarEvent, calendar_events_controller +from reflector.db.meetings import meetings_controller +from reflector.db.rooms import rooms_controller + + +@pytest.mark.asyncio +async def test_multiple_active_meetings_per_room(): + """Test that multiple active meetings can exist for the same room.""" + # Create a room + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + + current_time = datetime.now(timezone.utc) + end_time = current_time + timedelta(hours=2) + + # Create first meeting + meeting1 = await meetings_controller.create( + id="meeting-1", + room_name="test-meeting-1", + room_url="https://whereby.com/test-1", + host_room_url="https://whereby.com/test-1-host", + start_date=current_time, + end_date=end_time, + user_id="test-user", + room=room, + ) + + # Create second meeting for the same room (should succeed now) + meeting2 = await meetings_controller.create( + id="meeting-2", + room_name="test-meeting-2", + room_url="https://whereby.com/test-2", + host_room_url="https://whereby.com/test-2-host", + start_date=current_time, + end_date=end_time, + user_id="test-user", + room=room, + ) + + # Both meetings should be active + active_meetings = await meetings_controller.get_all_active_for_room( + room=room, current_time=current_time + ) + + assert len(active_meetings) == 2 + assert meeting1.id in [m.id for m in active_meetings] + assert meeting2.id in [m.id for m in active_meetings] + + +@pytest.mark.asyncio +async def test_get_active_by_calendar_event(): + """Test getting active meeting by calendar event ID.""" + # Create a room + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + + # Create a calendar event + event = CalendarEvent( + room_id=room.id, + ics_uid="test-event-uid", + title="Test Meeting", + start_time=datetime.now(timezone.utc), + end_time=datetime.now(timezone.utc) + timedelta(hours=1), + ) + event = await calendar_events_controller.upsert(event) + + current_time = datetime.now(timezone.utc) + end_time = current_time + timedelta(hours=2) + + # Create meeting linked to calendar event + meeting = await meetings_controller.create( + id="meeting-cal-1", + room_name="test-meeting-cal", + room_url="https://whereby.com/test-cal", + host_room_url="https://whereby.com/test-cal-host", + start_date=current_time, + end_date=end_time, + user_id="test-user", + room=room, + calendar_event_id=event.id, + calendar_metadata={"title": event.title}, + ) + + # Should find the meeting by calendar event + found_meeting = await meetings_controller.get_active_by_calendar_event( + room=room, calendar_event_id=event.id, current_time=current_time + ) + + assert found_meeting is not None + assert found_meeting.id == meeting.id + assert found_meeting.calendar_event_id == event.id + + +@pytest.mark.asyncio +async def test_grace_period_logic(): + """Test that meetings have a grace period after last participant leaves.""" + # Create a room + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + + current_time = datetime.now(timezone.utc) + end_time = current_time + timedelta(hours=2) + + # Create meeting + meeting = await meetings_controller.create( + id="meeting-grace", + room_name="test-meeting-grace", + room_url="https://whereby.com/test-grace", + host_room_url="https://whereby.com/test-grace-host", + start_date=current_time, + end_date=end_time, + user_id="test-user", + room=room, + ) + + # Test grace period logic by simulating different states + + # Simulate first time all participants left + await meetings_controller.update_meeting( + meeting.id, num_clients=0, last_participant_left_at=current_time + ) + + # Within grace period (10 min) - should still be active + await meetings_controller.update_meeting( + meeting.id, last_participant_left_at=current_time - timedelta(minutes=10) + ) + + updated_meeting = await meetings_controller.get_by_id(meeting.id) + assert updated_meeting.is_active is True # Still active during grace period + + # Simulate grace period expired (20 min) and deactivate + await meetings_controller.update_meeting( + meeting.id, last_participant_left_at=current_time - timedelta(minutes=20) + ) + + # Manually test the grace period logic that would be in process_meetings + updated_meeting = await meetings_controller.get_by_id(meeting.id) + if updated_meeting.last_participant_left_at: + grace_period = timedelta(minutes=updated_meeting.grace_period_minutes) + if current_time > updated_meeting.last_participant_left_at + grace_period: + await meetings_controller.update_meeting(meeting.id, is_active=False) + + updated_meeting = await meetings_controller.get_by_id(meeting.id) + assert updated_meeting.is_active is False # Now deactivated + + +@pytest.mark.asyncio +async def test_calendar_meeting_force_close_after_30_min(): + """Test that calendar meetings force close 30 minutes after scheduled end.""" + # Create a room + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + + # Create a calendar event + event = CalendarEvent( + room_id=room.id, + ics_uid="test-event-force", + title="Test Meeting Force Close", + start_time=datetime.now(timezone.utc) - timedelta(hours=2), + end_time=datetime.now(timezone.utc) - timedelta(minutes=35), # Ended 35 min ago + ) + event = await calendar_events_controller.upsert(event) + + current_time = datetime.now(timezone.utc) + + # Create meeting linked to calendar event + meeting = await meetings_controller.create( + id="meeting-force", + room_name="test-meeting-force", + room_url="https://whereby.com/test-force", + host_room_url="https://whereby.com/test-force-host", + start_date=event.start_time, + end_date=event.end_time, + user_id="test-user", + room=room, + calendar_event_id=event.id, + ) + + # Test that calendar meetings force close 30 min after scheduled end + # The meeting ended 35 minutes ago, so it should be force closed + + # Manually test the force close logic that would be in process_meetings + if meeting.calendar_event_id: + if current_time > meeting.end_date + timedelta(minutes=30): + await meetings_controller.update_meeting(meeting.id, is_active=False) + + updated_meeting = await meetings_controller.get_by_id(meeting.id) + assert updated_meeting.is_active is False # Force closed after 30 min + + +@pytest.mark.asyncio +async def test_participant_rejoin_clears_grace_period(): + """Test that participant rejoining clears the grace period.""" + # Create a room + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + + current_time = datetime.now(timezone.utc) + end_time = current_time + timedelta(hours=2) + + # Create meeting with grace period already set + meeting = await meetings_controller.create( + id="meeting-rejoin", + room_name="test-meeting-rejoin", + room_url="https://whereby.com/test-rejoin", + host_room_url="https://whereby.com/test-rejoin-host", + start_date=current_time, + end_date=end_time, + user_id="test-user", + room=room, + ) + + # Set last_participant_left_at to simulate grace period + await meetings_controller.update_meeting( + meeting.id, + last_participant_left_at=current_time - timedelta(minutes=5), + num_clients=0, + ) + + # Simulate participant rejoining - clear grace period + await meetings_controller.update_meeting( + meeting.id, last_participant_left_at=None, num_clients=1 + ) + + updated_meeting = await meetings_controller.get_by_id(meeting.id) + assert updated_meeting.last_participant_left_at is None # Grace period cleared + assert updated_meeting.is_active is True # Still active