diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 384290da..5dd933ed 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,4 +1,5 @@ import json +from datetime import datetime, timezone from typing import assert_never from fastapi import APIRouter, HTTPException, Request @@ -12,6 +13,9 @@ from reflector.dailyco_api import ( RecordingReadyEvent, RecordingStartedEvent, ) +from reflector.db.daily_participant_sessions import ( + daily_participant_sessions_controller, +) from reflector.db.meetings import meetings_controller from reflector.logger import logger as _logger from reflector.settings import settings @@ -141,15 +145,57 @@ async def _handle_participant_joined(event: ParticipantJoinedEvent): async def _handle_participant_left(event: ParticipantLeftEvent): - """Queue poll task for presence reconciliation.""" - await _queue_poll_for_room( - event.payload.room_name, - "participant.left", - event.payload.user_id, - event.payload.session_id, - duration=event.payload.duration, + """Close session directly on webhook and update num_clients. + + The webhook IS the authoritative signal that a participant left. + We close the session immediately rather than polling Daily.co API, + which avoids the race where the API still shows the participant. + A delayed reconciliation poll is queued as a safety net. + """ + room_name = event.payload.room_name + if not room_name: + logger.warning("participant.left: no room in payload") + return + + meeting = await meetings_controller.get_by_room_name(room_name) + if not meeting: + logger.warning("participant.left: meeting not found", room_name=room_name) + return + + log = logger.bind( + meeting_id=meeting.id, + room_name=room_name, + session_id=event.payload.session_id, + user_id=event.payload.user_id, ) + existing = await daily_participant_sessions_controller.get_open_session( + meeting.id, event.payload.session_id + ) + + if existing: + now = datetime.now(timezone.utc) + await daily_participant_sessions_controller.batch_close_sessions( + [existing.id], left_at=now + ) + active = await daily_participant_sessions_controller.get_active_by_meeting( + meeting.id + ) + await meetings_controller.update_meeting(meeting.id, num_clients=len(active)) + log.info( + "Participant left - session closed", + remaining_clients=len(active), + duration=event.payload.duration, + ) + else: + log.info( + "Participant left - no open session found, skipping direct close", + duration=event.payload.duration, + ) + + # Delayed reconciliation poll as safety net + poll_daily_room_presence_task.apply_async(args=[meeting.id], countdown=5) + async def _handle_recording_started(event: RecordingStartedEvent): room_name = event.payload.room_name diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 48aafd76..fb2cf5bd 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -1,8 +1,9 @@ +import json from datetime import datetime, timedelta, timezone from enum import Enum from typing import Annotated, Any, Literal, Optional -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Request from fastapi_pagination import Page from fastapi_pagination.ext.databases import apaginate from pydantic import BaseModel @@ -11,6 +12,10 @@ from redis.exceptions import LockError import reflector.auth as auth from reflector.db import get_database from reflector.db.calendar_events import calendar_events_controller +from reflector.db.daily_participant_sessions import ( + DailyParticipantSession, + daily_participant_sessions_controller, +) from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller from reflector.logger import logger @@ -617,6 +622,12 @@ class JoinedRequest(BaseModel): connection_id: NonEmptyString """Must match the connection_id sent to /joining.""" + session_id: NonEmptyString | None = None + """Daily.co session_id for direct session creation. Optional for backward compat.""" + + user_name: str | None = None + """Display name from Daily.co participant data.""" + class JoinedResponse(BaseModel): status: Literal["ok"] @@ -716,9 +727,32 @@ async def meeting_joined( finally: await redis.aclose() - # Trigger presence poll to detect the new participant faster than periodic poll + # Create session directly when session_id provided (instant presence update) + if body.session_id and meeting.platform == "daily": + session = DailyParticipantSession( + id=f"{meeting.id}:{body.session_id}", + meeting_id=meeting.id, + room_id=room.id, + session_id=body.session_id, + user_id=user["sub"] if user else None, + user_name=body.user_name or "Anonymous", + joined_at=datetime.now(timezone.utc), + ) + await daily_participant_sessions_controller.batch_upsert_sessions([session]) + + active = await daily_participant_sessions_controller.get_active_by_meeting( + meeting.id + ) + await meetings_controller.update_meeting(meeting.id, num_clients=len(active)) + log.info( + "Session created directly", + session_id=body.session_id, + num_clients=len(active), + ) + + # Trigger presence poll as reconciliation safety net if meeting.platform == "daily": - poll_daily_room_presence_task.delay(meeting_id) + poll_daily_room_presence_task.apply_async(args=[meeting_id], countdown=3) return JoinedResponse(status="ok") @@ -733,14 +767,28 @@ class LeaveResponse(BaseModel): async def meeting_leave( room_name: str, meeting_id: str, + request: Request, user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], ) -> LeaveResponse: - """Trigger presence recheck when user leaves meeting. + """Trigger presence update when user leaves meeting. - Called on tab close/navigation via sendBeacon(). Immediately queues presence - poll to detect dirty disconnects faster than 30s periodic poll. - Daily.co webhooks handle clean disconnects, but tab close/crash need this. + When session_id is provided in the body, closes the session directly + for instant presence update. Falls back to polling when session_id + is not available (e.g., sendBeacon without frame access). + Called on tab close/navigation via sendBeacon(). """ + # Parse session_id from body (sendBeacon may send text/plain or no body) + session_id: str | None = None + try: + body_bytes = await request.body() + if body_bytes: + data = json.loads(body_bytes) + raw = data.get("session_id") + if isinstance(raw, str) and raw.strip(): + session_id = raw.strip() + except Exception: + pass + room = await rooms_controller.get_by_name(room_name) if not room: raise HTTPException(status_code=404, detail="Room not found") @@ -749,7 +797,27 @@ async def meeting_leave( if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") - if meeting.platform == "daily": - poll_daily_room_presence_task.delay(meeting_id) + # Close session directly when session_id provided + session_closed = False + if session_id and meeting.platform == "daily": + existing = await daily_participant_sessions_controller.get_open_session( + meeting.id, session_id + ) + if existing: + await daily_participant_sessions_controller.batch_close_sessions( + [existing.id], left_at=datetime.now(timezone.utc) + ) + active = await daily_participant_sessions_controller.get_active_by_meeting( + meeting.id + ) + await meetings_controller.update_meeting( + meeting.id, num_clients=len(active) + ) + session_closed = True + + # Only queue poll if we couldn't close directly — the poll runs before + # Daily.co API removes the participant, which would undo our correct count + if meeting.platform == "daily" and not session_closed: + poll_daily_room_presence_task.apply_async(args=[meeting_id], countdown=3) return LeaveResponse(status="ok") diff --git a/server/tests/test_daily_webhook_participant_left.py b/server/tests/test_daily_webhook_participant_left.py new file mode 100644 index 00000000..8958f5cc --- /dev/null +++ b/server/tests/test_daily_webhook_participant_left.py @@ -0,0 +1,213 @@ +"""Tests for direct session close on participant.left webhook. + +Verifies that _handle_participant_left: +1. Closes the session directly (authoritative signal) +2. Updates num_clients from remaining active sessions +3. Queues a delayed reconciliation poll as safety net +4. Handles missing session gracefully +""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.dailyco_api.webhooks import ParticipantLeftEvent, ParticipantLeftPayload +from reflector.db.daily_participant_sessions import DailyParticipantSession +from reflector.db.meetings import Meeting +from reflector.views.daily import _handle_participant_left + + +@pytest.fixture +def mock_meeting(): + return Meeting( + id="meeting-123", + room_id="room-456", + room_name="test-room-20251118120000", + room_url="https://daily.co/test-room-20251118120000", + host_room_url="https://daily.co/test-room-20251118120000?t=host-token", + platform="daily", + num_clients=2, + is_active=True, + start_date=datetime.now(timezone.utc), + end_date=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def participant_left_event(): + now = datetime.now(timezone.utc) + return ParticipantLeftEvent( + version="1.0.0", + type="participant.left", + id="evt-left-abc123", + payload=ParticipantLeftPayload( + room_name="test-room-20251118120000", + session_id="session-alice", + user_id="user-alice", + user_name="Alice", + joined_at=int((now - timedelta(minutes=10)).timestamp()), + duration=600, + ), + event_ts=int(now.timestamp()), + ) + + +@pytest.fixture +def existing_session(): + now = datetime.now(timezone.utc) + return DailyParticipantSession( + id="meeting-123:session-alice", + meeting_id="meeting-123", + room_id="room-456", + session_id="session-alice", + user_id="user-alice", + user_name="Alice", + joined_at=now - timedelta(minutes=10), + left_at=None, + ) + + +@pytest.mark.asyncio +@patch("reflector.views.daily.poll_daily_room_presence_task") +@patch("reflector.views.daily.meetings_controller") +@patch("reflector.views.daily.daily_participant_sessions_controller") +async def test_closes_session_and_updates_num_clients( + mock_sessions_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_meeting, + participant_left_event, + existing_session, +): + """Webhook directly closes session and updates num_clients from remaining active count.""" + mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting) + mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session) + mock_sessions_ctrl.batch_close_sessions = AsyncMock() + # One remaining active session after close + remaining = DailyParticipantSession( + id="meeting-123:session-bob", + meeting_id="meeting-123", + room_id="room-456", + session_id="session-bob", + user_id="user-bob", + user_name="Bob", + joined_at=datetime.now(timezone.utc), + left_at=None, + ) + mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[remaining]) + mock_meetings_ctrl.update_meeting = AsyncMock() + + await _handle_participant_left(participant_left_event) + + # Session closed + mock_sessions_ctrl.batch_close_sessions.assert_called_once() + closed_ids = mock_sessions_ctrl.batch_close_sessions.call_args.args[0] + assert closed_ids == [existing_session.id] + + # num_clients updated to remaining count + mock_meetings_ctrl.update_meeting.assert_called_once_with( + mock_meeting.id, num_clients=1 + ) + + # Delayed reconciliation poll queued + mock_poll_task.apply_async.assert_called_once() + call_kwargs = mock_poll_task.apply_async.call_args.kwargs + assert call_kwargs["countdown"] == 5 + assert call_kwargs["args"] == [mock_meeting.id] + + +@pytest.mark.asyncio +@patch("reflector.views.daily.poll_daily_room_presence_task") +@patch("reflector.views.daily.meetings_controller") +@patch("reflector.views.daily.daily_participant_sessions_controller") +async def test_handles_missing_session( + mock_sessions_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_meeting, + participant_left_event, +): + """No crash when session not found in DB — still queues reconciliation poll.""" + mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting) + mock_sessions_ctrl.get_open_session = AsyncMock(return_value=None) + + await _handle_participant_left(participant_left_event) + + # No session close attempted + mock_sessions_ctrl.batch_close_sessions.assert_not_called() + # No num_clients update (no authoritative data without session) + mock_meetings_ctrl.update_meeting.assert_not_called() + # Still queues reconciliation poll + mock_poll_task.apply_async.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.views.daily.poll_daily_room_presence_task") +@patch("reflector.views.daily.meetings_controller") +@patch("reflector.views.daily.daily_participant_sessions_controller") +async def test_updates_num_clients_to_zero_when_last_participant_leaves( + mock_sessions_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_meeting, + participant_left_event, + existing_session, +): + """num_clients set to 0 when no active sessions remain.""" + mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting) + mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session) + mock_sessions_ctrl.batch_close_sessions = AsyncMock() + mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[]) + mock_meetings_ctrl.update_meeting = AsyncMock() + + await _handle_participant_left(participant_left_event) + + mock_meetings_ctrl.update_meeting.assert_called_once_with( + mock_meeting.id, num_clients=0 + ) + + +@pytest.mark.asyncio +@patch("reflector.views.daily.poll_daily_room_presence_task") +@patch("reflector.views.daily.meetings_controller") +async def test_no_room_name_in_event( + mock_meetings_ctrl, + mock_poll_task, +): + """No crash when room_name is missing from webhook payload.""" + event = ParticipantLeftEvent( + version="1.0.0", + type="participant.left", + id="evt-left-no-room", + payload=ParticipantLeftPayload( + room_name=None, + session_id="session-x", + user_id="user-x", + user_name="X", + joined_at=int(datetime.now(timezone.utc).timestamp()), + duration=0, + ), + event_ts=int(datetime.now(timezone.utc).timestamp()), + ) + + await _handle_participant_left(event) + + mock_meetings_ctrl.get_by_room_name.assert_not_called() + mock_poll_task.apply_async.assert_not_called() + + +@pytest.mark.asyncio +@patch("reflector.views.daily.poll_daily_room_presence_task") +@patch("reflector.views.daily.meetings_controller") +async def test_meeting_not_found( + mock_meetings_ctrl, + mock_poll_task, + participant_left_event, +): + """No crash when meeting not found for room_name.""" + mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=None) + + await _handle_participant_left(participant_left_event) + + mock_poll_task.apply_async.assert_not_called() diff --git a/server/tests/test_direct_session_management.py b/server/tests/test_direct_session_management.py new file mode 100644 index 00000000..6568c66d --- /dev/null +++ b/server/tests/test_direct_session_management.py @@ -0,0 +1,339 @@ +"""Tests for direct session management via /joined and /leave endpoints. + +Verifies that: +1. /joined with session_id creates session directly, updates num_clients +2. /joined without session_id (backward compat) still works, queues poll +3. /leave with session_id closes session, updates num_clients +4. /leave without session_id falls back to poll +5. Duplicate /joined calls are idempotent (upsert) +6. /leave for already-closed session is a no-op +""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from reflector.db.daily_participant_sessions import DailyParticipantSession +from reflector.db.meetings import Meeting +from reflector.views.rooms import ( + JoinedRequest, + meeting_joined, + meeting_leave, +) + + +@pytest.fixture +def mock_room(): + room = MagicMock() + room.id = "room-456" + room.name = "test-room" + room.platform = "daily" + return room + + +@pytest.fixture +def mock_meeting(): + return Meeting( + id="meeting-123", + room_id="room-456", + room_name="test-room-20251118120000", + room_url="https://daily.co/test-room", + host_room_url="https://daily.co/test-room?t=host", + platform="daily", + num_clients=0, + is_active=True, + start_date=datetime.now(timezone.utc), + end_date=datetime.now(timezone.utc) + timedelta(hours=8), + ) + + +@pytest.fixture +def mock_redis(): + redis = AsyncMock() + redis.aclose = AsyncMock() + return redis + + +@pytest.fixture +def mock_request_with_session_id(): + """Mock Request with session_id in JSON body.""" + request = AsyncMock() + request.body = AsyncMock(return_value=b'{"session_id": "session-abc"}') + return request + + +@pytest.fixture +def mock_request_empty_body(): + """Mock Request with empty JSON body (old frontend / no frame access).""" + request = AsyncMock() + request.body = AsyncMock(return_value=b"{}") + return request + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.delete_pending_join") +@patch("reflector.views.rooms.get_async_redis_client") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_joined_with_session_id_creates_session( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_redis_client, + mock_delete_pending, + mock_poll_task, + mock_room, + mock_meeting, + mock_redis, +): + """session_id in /joined -> create session + update num_clients.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + mock_redis_client.return_value = mock_redis + mock_sessions_ctrl.batch_upsert_sessions = AsyncMock() + mock_sessions_ctrl.get_active_by_meeting = AsyncMock( + return_value=[MagicMock()] # 1 active session + ) + mock_meetings_ctrl.update_meeting = AsyncMock() + + body = JoinedRequest( + connection_id="conn-1", + session_id="session-abc", + user_name="Alice", + ) + result = await meeting_joined( + "test-room", "meeting-123", body, user={"sub": "user-1"} + ) + + assert result.status == "ok" + + # Session created via upsert + mock_sessions_ctrl.batch_upsert_sessions.assert_called_once() + sessions = mock_sessions_ctrl.batch_upsert_sessions.call_args.args[0] + assert len(sessions) == 1 + assert sessions[0].session_id == "session-abc" + assert sessions[0].meeting_id == "meeting-123" + assert sessions[0].room_id == "room-456" + assert sessions[0].user_name == "Alice" + assert sessions[0].user_id == "user-1" + assert sessions[0].id == "meeting-123:session-abc" + + # num_clients updated + mock_meetings_ctrl.update_meeting.assert_called_once_with( + "meeting-123", num_clients=1 + ) + + # Reconciliation poll still queued + mock_poll_task.apply_async.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.delete_pending_join") +@patch("reflector.views.rooms.get_async_redis_client") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_joined_without_session_id_backward_compat( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_redis_client, + mock_delete_pending, + mock_poll_task, + mock_room, + mock_meeting, + mock_redis, +): + """No session_id in /joined -> no session create, still queues poll.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + mock_redis_client.return_value = mock_redis + + body = JoinedRequest(connection_id="conn-1") + result = await meeting_joined( + "test-room", "meeting-123", body, user={"sub": "user-1"} + ) + + assert result.status == "ok" + mock_sessions_ctrl.batch_upsert_sessions.assert_not_called() + mock_poll_task.apply_async.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.delete_pending_join") +@patch("reflector.views.rooms.get_async_redis_client") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_joined_anonymous_user_sets_null_user_id( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_redis_client, + mock_delete_pending, + mock_poll_task, + mock_room, + mock_meeting, + mock_redis, +): + """Anonymous user -> session.user_id is None, user_name defaults to 'Anonymous'.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + mock_redis_client.return_value = mock_redis + mock_sessions_ctrl.batch_upsert_sessions = AsyncMock() + mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[MagicMock()]) + mock_meetings_ctrl.update_meeting = AsyncMock() + + body = JoinedRequest(connection_id="conn-1", session_id="session-abc") + result = await meeting_joined("test-room", "meeting-123", body, user=None) + + assert result.status == "ok" + sessions = mock_sessions_ctrl.batch_upsert_sessions.call_args.args[0] + assert sessions[0].user_id is None + assert sessions[0].user_name == "Anonymous" + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_leave_with_session_id_closes_session( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_room, + mock_meeting, + mock_request_with_session_id, +): + """session_id in /leave -> close session + update num_clients.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + + existing_session = DailyParticipantSession( + id="meeting-123:session-abc", + meeting_id="meeting-123", + room_id="room-456", + session_id="session-abc", + user_id="user-1", + user_name="Alice", + joined_at=datetime.now(timezone.utc) - timedelta(minutes=5), + left_at=None, + ) + mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session) + mock_sessions_ctrl.batch_close_sessions = AsyncMock() + mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[]) + mock_meetings_ctrl.update_meeting = AsyncMock() + + result = await meeting_leave( + "test-room", "meeting-123", mock_request_with_session_id, user={"sub": "user-1"} + ) + + assert result.status == "ok" + + # Session closed + mock_sessions_ctrl.batch_close_sessions.assert_called_once() + closed_ids = mock_sessions_ctrl.batch_close_sessions.call_args.args[0] + assert closed_ids == ["meeting-123:session-abc"] + + # num_clients updated + mock_meetings_ctrl.update_meeting.assert_called_once_with( + "meeting-123", num_clients=0 + ) + + # No poll — direct close is authoritative, poll would race with API latency + mock_poll_task.apply_async.assert_not_called() + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +async def test_leave_without_session_id_falls_back_to_poll( + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_room, + mock_meeting, + mock_request_empty_body, +): + """No session_id in /leave -> just queues poll as before.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + + result = await meeting_leave( + "test-room", "meeting-123", mock_request_empty_body, user=None + ) + + assert result.status == "ok" + mock_poll_task.apply_async.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.delete_pending_join") +@patch("reflector.views.rooms.get_async_redis_client") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_duplicate_joined_is_idempotent( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_redis_client, + mock_delete_pending, + mock_poll_task, + mock_room, + mock_meeting, + mock_redis, +): + """Calling /joined twice with same session_id -> upsert both times, no error.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + mock_redis_client.return_value = mock_redis + mock_sessions_ctrl.batch_upsert_sessions = AsyncMock() + mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[MagicMock()]) + mock_meetings_ctrl.update_meeting = AsyncMock() + + body = JoinedRequest( + connection_id="conn-1", session_id="session-abc", user_name="Alice" + ) + await meeting_joined("test-room", "meeting-123", body, user={"sub": "user-1"}) + await meeting_joined("test-room", "meeting-123", body, user={"sub": "user-1"}) + + assert mock_sessions_ctrl.batch_upsert_sessions.call_count == 2 + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.poll_daily_room_presence_task") +@patch("reflector.views.rooms.meetings_controller") +@patch("reflector.views.rooms.rooms_controller") +@patch("reflector.views.rooms.daily_participant_sessions_controller") +async def test_leave_already_closed_session_is_noop( + mock_sessions_ctrl, + mock_rooms_ctrl, + mock_meetings_ctrl, + mock_poll_task, + mock_room, + mock_meeting, + mock_request_with_session_id, +): + """/leave for already-closed session -> no close attempted, just poll.""" + mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room) + mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting) + mock_sessions_ctrl.get_open_session = AsyncMock(return_value=None) + + result = await meeting_leave( + "test-room", "meeting-123", mock_request_with_session_id, user=None + ) + + assert result.status == "ok" + mock_sessions_ctrl.batch_close_sessions.assert_not_called() + mock_meetings_ctrl.update_meeting.assert_not_called() + mock_poll_task.apply_async.assert_called_once() diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index 57493624..d00340d9 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -91,7 +91,7 @@ const useFrame = ( cbs: { onLeftMeeting: () => void; onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void; - onJoinMeeting: () => void; + onJoinMeeting: (sessionId: string | null) => void; }, ) => { const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE); @@ -142,7 +142,8 @@ const useFrame = ( console.error("frame is null in joined-meeting callback"); return; } - cbs.onJoinMeeting(); + const local = frame.participants()?.local; + cbs.onJoinMeeting(local?.session_id ?? null); }; frame.on("joined-meeting", joinCb); return () => { @@ -193,6 +194,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const joiningMutation = useMeetingJoining(); const joinedMutation = useMeetingJoined(); const [joinedMeeting, setJoinedMeeting] = useState(null); + const sessionIdRef = useRef(null); // Generate a stable connection ID for this component instance // Used to track pending joins per browser tab (prevents key collision for anonymous users) @@ -243,8 +245,17 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const roomUrl = joinedMeeting?.room_url; const handleLeave = useCallback(() => { + if (meeting?.id && roomName) { + const payload = sessionIdRef.current + ? { session_id: sessionIdRef.current } + : {}; + navigator.sendBeacon( + buildMeetingLeaveUrl(roomName, meeting.id), + JSON.stringify(payload), + ); + } router.push("/browse"); - }, [router]); + }, [router, roomName, meeting?.id]); // Trigger presence recheck on dirty disconnects (tab close, navigation away) useEffect(() => { @@ -253,7 +264,10 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const handleBeforeUnload = () => { // sendBeacon guarantees delivery even if tab closes mid-request const url = buildMeetingLeaveUrl(roomName, meeting.id); - navigator.sendBeacon(url, JSON.stringify({})); + const payload = sessionIdRef.current + ? { session_id: sessionIdRef.current } + : {}; + navigator.sendBeacon(url, JSON.stringify(payload)); }; window.addEventListener("beforeunload", handleBeforeUnload); @@ -271,97 +285,106 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { ], ); - const handleFrameJoinMeeting = useCallback(() => { - // Signal that WebRTC connection is established - // This clears the pending join intent, confirming successful connection - joinedMutation.mutate( - { - params: { - path: { - room_name: roomName, - meeting_id: meeting.id, + const handleFrameJoinMeeting = useCallback( + (sessionId: string | null) => { + sessionIdRef.current = sessionId; + + // Signal that WebRTC connection is established + // This clears the pending join intent and creates session record directly + joinedMutation.mutate( + { + params: { + path: { + room_name: roomName, + meeting_id: meeting.id, + }, + }, + body: { + connection_id: connectionId, + session_id: sessionId, }, }, - body: { - connection_id: connectionId, + { + onError: (error: unknown) => { + // Non-blocking: log but don't fail - this is cleanup, not critical + console.warn("Failed to signal joined:", error); + }, }, - }, - { - onError: (error: unknown) => { - // Non-blocking: log but don't fail - this is cleanup, not critical - console.warn("Failed to signal joined:", error); - }, - }, - ); + ); - if (meeting.recording_type === "cloud") { - console.log("Starting dual recording via REST API", { - cloudInstanceId, - rawTracksInstanceId, - }); + if (meeting.recording_type === "cloud") { + console.log("Starting dual recording via REST API", { + cloudInstanceId, + rawTracksInstanceId, + }); - // Start both cloud and raw-tracks via backend REST API (with retry on 404) - // Daily.co needs time to register call as "hosting" for REST API - const startRecordingWithRetry = ( - type: DailyRecordingType, - instanceId: NonEmptyString, - attempt: number = 1, - ) => { - setTimeout(() => { - startRecordingMutation.mutate( - { - params: { - path: { - meeting_id: meeting.id, + // Start both cloud and raw-tracks via backend REST API (with retry on 404) + // Daily.co needs time to register call as "hosting" for REST API + const startRecordingWithRetry = ( + type: DailyRecordingType, + instanceId: NonEmptyString, + attempt: number = 1, + ) => { + setTimeout(() => { + startRecordingMutation.mutate( + { + params: { + path: { + meeting_id: meeting.id, + }, + }, + body: { + type, + instanceId, }, }, - body: { - type, - instanceId, - }, - }, - { - onError: (error: any) => { - const errorText = error?.detail || error?.message || ""; - const is404NotHosting = errorText.includes( - "does not seem to be hosting a call", - ); - const isActiveStream = errorText.includes( - "has an active stream", - ); - - if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) { - console.log( - `${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`, + { + onError: (error: any) => { + const errorText = error?.detail || error?.message || ""; + const is404NotHosting = errorText.includes( + "does not seem to be hosting a call", ); - startRecordingWithRetry(type, instanceId, attempt + 1); - } else if (isActiveStream) { - console.log( - `${type}: Recording already active (started by another participant)`, + const isActiveStream = errorText.includes( + "has an active stream", ); - } else { - console.error(`Failed to start ${type} recording:`, error); - } - }, - }, - ); - }, RECORDING_START_DELAY_MS); - }; - // Start both recordings - startRecordingWithRetry("cloud", cloudInstanceId); - startRecordingWithRetry("raw-tracks", rawTracksInstanceId); - } - }, [ - meeting.recording_type, - meeting.id, - roomName, - connectionId, - joinedMutation, - startRecordingMutation, - cloudInstanceId, - rawTracksInstanceId, - ]); + if ( + is404NotHosting && + attempt < RECORDING_START_MAX_RETRIES + ) { + console.log( + `${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`, + ); + startRecordingWithRetry(type, instanceId, attempt + 1); + } else if (isActiveStream) { + console.log( + `${type}: Recording already active (started by another participant)`, + ); + } else { + console.error(`Failed to start ${type} recording:`, error); + } + }, + }, + ); + }, RECORDING_START_DELAY_MS); + }; + + // Start both recordings + startRecordingWithRetry("cloud", cloudInstanceId); + startRecordingWithRetry("raw-tracks", rawTracksInstanceId); + } + }, + [ + meeting.recording_type, + meeting.id, + roomName, + connectionId, + joinedMutation, + startRecordingMutation, + cloudInstanceId, + rawTracksInstanceId, + ], + ); const recordingIconUrl = useMemo( () => new URL("/recording-icon.svg", window.location.origin), diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index 06c82517..dd4843c8 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -773,6 +773,7 @@ export function useRoomActiveMeetings(roomName: string | null) { }, { enabled: !!roomName, + refetchInterval: 5000, }, ); } diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index 58ea7eda..f0cda9a5 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -372,11 +372,12 @@ export interface paths { put?: never; /** * Meeting Leave - * @description Trigger presence recheck when user leaves meeting. + * @description Trigger presence update when user leaves meeting. * - * Called on tab close/navigation via sendBeacon(). Immediately queues presence - * poll to detect dirty disconnects faster than 30s periodic poll. - * Daily.co webhooks handle clean disconnects, but tab close/crash need this. + * When session_id is provided in the body, closes the session directly + * for instant presence update. Falls back to polling when session_id + * is not available (e.g., sendBeacon without frame access). + * Called on tab close/navigation via sendBeacon(). */ post: operations["v1_meeting_leave"]; delete?: never; @@ -1579,6 +1580,10 @@ export interface components { * @description A non-empty string */ connection_id: string; + /** Session Id */ + session_id?: string | null; + /** User Name */ + user_name?: string | null; }; /** JoinedResponse */ JoinedResponse: {