diff --git a/server/reflector/presence/__init__.py b/server/reflector/presence/__init__.py new file mode 100644 index 00000000..44dfaf8a --- /dev/null +++ b/server/reflector/presence/__init__.py @@ -0,0 +1,17 @@ +"""Presence tracking for meetings.""" + +from reflector.presence.pending_joins import ( + PENDING_JOIN_PREFIX, + PENDING_JOIN_TTL, + create_pending_join, + delete_pending_join, + has_pending_joins, +) + +__all__ = [ + "PENDING_JOIN_PREFIX", + "PENDING_JOIN_TTL", + "create_pending_join", + "delete_pending_join", + "has_pending_joins", +] diff --git a/server/reflector/presence/pending_joins.py b/server/reflector/presence/pending_joins.py new file mode 100644 index 00000000..548f7170 --- /dev/null +++ b/server/reflector/presence/pending_joins.py @@ -0,0 +1,59 @@ +"""Track pending join intents in Redis. + +When a user signals intent to join a meeting (before WebRTC handshake completes), +we store a pending join record. This prevents the meeting from being deactivated +while users are still connecting. +""" + +import time + +from redis.asyncio import Redis + +from reflector.logger import logger + +PENDING_JOIN_TTL = 30 # seconds +PENDING_JOIN_PREFIX = "pending_join" +# Max keys to scan per Redis SCAN iteration +SCAN_BATCH_SIZE = 100 + + +async def create_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None: + """Create a pending join record. Called before WebRTC handshake.""" + key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}" + log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key) + await redis.setex(key, PENDING_JOIN_TTL, str(time.time())) + log.debug("Created pending join") + + +async def delete_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None: + """Delete pending join. Called after WebRTC connection established.""" + key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}" + log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key) + await redis.delete(key) + log.debug("Deleted pending join") + + +async def has_pending_joins(redis: Redis, meeting_id: str) -> bool: + """Check if meeting has any pending joins. + + Uses Redis SCAN to iterate through all keys matching the pattern. + Properly iterates until cursor returns 0 to ensure all keys are checked. + """ + pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*" + log = logger.bind(meeting_id=meeting_id, pattern=pattern) + + cursor = 0 + iterations = 0 + while True: + cursor, keys = await redis.scan( + cursor=cursor, match=pattern, count=SCAN_BATCH_SIZE + ) + iterations += 1 + if keys: + log.debug("Found pending joins", count=len(keys), iterations=iterations) + return True + if cursor == 0: + break + + log.debug("No pending joins found", iterations=iterations) + return False diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 11e668c0..ef1f2625 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -1,4 +1,3 @@ -import logging from datetime import datetime, timedelta, timezone from enum import Enum from typing import Annotated, Any, Literal, Optional @@ -14,16 +13,17 @@ from reflector.db import get_database from reflector.db.calendar_events import calendar_events_controller from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller -from reflector.redis_cache import RedisAsyncLock +from reflector.logger import logger +from reflector.presence.pending_joins import create_pending_join, delete_pending_join +from reflector.redis_cache import RedisAsyncLock, get_async_redis_client from reflector.schemas.platform import Platform from reflector.services.ics_sync import ics_sync_service from reflector.settings import settings +from reflector.utils.string import NonEmptyString from reflector.utils.url import add_query_param from reflector.video_platforms.factory import create_platform_client from reflector.worker.webhook import test_webhook -logger = logging.getLogger(__name__) - class Room(BaseModel): id: str @@ -597,3 +597,112 @@ async def rooms_join_meeting( meeting.room_url = add_query_param(meeting.room_url, "t", token) return meeting + + +class JoiningRequest(BaseModel): + """Request body for /joining and /joined endpoints.""" + + connection_id: NonEmptyString + """Unique identifier for this connection. Should be a UUID generated by the client. + Must be the same value for both /joining and /joined calls from the same tab.""" + + +class JoiningResponse(BaseModel): + status: Literal["ok"] + + +def _get_pending_join_key( + user: Optional[auth.UserInfo], connection_id: NonEmptyString +) -> str: + """Get a unique key for pending join tracking. + + Uses user ID for authenticated users, connection_id for anonymous users. + This ensures each browser tab has its own unique pending join record. + """ + if user: + return f"{user['sub']}:{connection_id}" + return f"anon:{connection_id}" + + +@router.post( + "/rooms/{room_name}/meetings/{meeting_id}/joining", response_model=JoiningResponse +) +async def meeting_joining( + room_name: str, + meeting_id: str, + body: JoiningRequest, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +) -> JoiningResponse: + """Signal intent to join meeting. Called before WebRTC handshake starts. + + This creates a pending join record that prevents the meeting from being + deactivated while the WebRTC handshake is in progress. The record expires + automatically after 30 seconds if the connection is not established. + """ + log = logger.bind( + room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id + ) + + room = await rooms_controller.get_by_name(room_name) + if not room: + raise HTTPException(status_code=404, detail="Room not found") + + meeting = await meetings_controller.get_by_id(meeting_id, room=room) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + if not meeting.is_active: + raise HTTPException(status_code=400, detail="Meeting is not active") + + join_key = _get_pending_join_key(user, body.connection_id) + + redis = await get_async_redis_client() + try: + await create_pending_join(redis, meeting_id, join_key) + log.debug("Created pending join intent", join_key=join_key) + finally: + await redis.aclose() + + return JoiningResponse(status="ok") + + +@router.post( + "/rooms/{room_name}/meetings/{meeting_id}/joined", response_model=JoiningResponse +) +async def meeting_joined( + room_name: str, + meeting_id: str, + body: JoiningRequest, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +) -> JoiningResponse: + """Signal that WebRTC connection is established. + + This clears the pending join record, confirming the user has successfully + connected to the meeting. Safe to call even if meeting was deactivated + during the handshake (idempotent cleanup). + """ + log = logger.bind( + room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id + ) + + room = await rooms_controller.get_by_name(room_name) + if not room: + raise HTTPException(status_code=404, detail="Room not found") + + meeting = await meetings_controller.get_by_id(meeting_id, room=room) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + # Note: We don't check is_active here - the /joined call is a cleanup operation + # and should succeed even if the meeting was deactivated during the handshake + + join_key = _get_pending_join_key(user, body.connection_id) + + redis = await get_async_redis_client() + try: + await delete_pending_join(redis, meeting_id, join_key) + log.debug("Cleared pending join intent", join_key=join_key) + finally: + await redis.aclose() + + return JoiningResponse(status="ok") diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 8d88de43..c8ebf6c0 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -31,9 +31,10 @@ from reflector.pipelines.main_multitrack_pipeline import ( task_pipeline_multitrack_process, ) from reflector.pipelines.topic_processing import EmptyPipeline +from reflector.presence.pending_joins import has_pending_joins from reflector.processors import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor -from reflector.redis_cache import RedisAsyncLock +from reflector.redis_cache import RedisAsyncLock, get_async_redis_client from reflector.settings import settings from reflector.storage import get_transcripts_storage from reflector.utils.daily import ( @@ -869,6 +870,18 @@ async def process_meetings(): logger_.debug("Meeting not yet started, keep it") if should_deactivate: + # Check for pending joins before deactivating + # Users might be in the process of connecting via WebRTC + redis = await get_async_redis_client() + try: + if await has_pending_joins(redis, meeting.id): + logger_.info( + "Meeting has pending joins, skipping deactivation" + ) + continue + finally: + await redis.aclose() + await meetings_controller.update_meeting( meeting.id, is_active=False ) diff --git a/server/tests/test_joining_endpoint.py b/server/tests/test_joining_endpoint.py new file mode 100644 index 00000000..75c323a0 --- /dev/null +++ b/server/tests/test_joining_endpoint.py @@ -0,0 +1,362 @@ +"""Integration tests for /joining and /joined endpoints. + +Tests for the join intent tracking to prevent race conditions during +WebRTC handshake when users join meetings. +""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.db.meetings import Meeting +from reflector.presence.pending_joins import PENDING_JOIN_PREFIX + +TEST_CONNECTION_ID = "test-connection-uuid-12345" + + +@pytest.fixture +def mock_room(): + """Mock room object.""" + from reflector.db.rooms import Room + + return Room( + id="room-123", + name="test-room", + user_id="owner-user", + created_at=datetime.now(timezone.utc), + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic", + is_shared=True, + platform="daily", + skip_consent=False, + ) + + +@pytest.fixture +def mock_meeting(): + """Mock meeting object.""" + now = datetime.now(timezone.utc) + return Meeting( + id="meeting-456", + room_id="room-123", + room_name="test-room-20251118120000", + room_url="https://daily.co/test-room-20251118120000", + host_room_url="https://daily.co/test-room-20251118120000?t=host", + platform="daily", + num_clients=0, + is_active=True, + start_date=now, + end_date=now + timedelta(hours=1), + ) + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_joining_endpoint_creates_pending_join( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, + authenticated_client, +): + """Test that /joining endpoint creates pending join in Redis.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.setex = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + response = await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + # Verify Redis setex was called with correct key pattern + mock_redis.setex.assert_called_once() + call_args = mock_redis.setex.call_args[0] + assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:") + assert TEST_CONNECTION_ID in call_args[0] + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_joined_endpoint_deletes_pending_join( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, + authenticated_client, +): + """Test that /joined endpoint deletes pending join from Redis.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.delete = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + response = await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joined", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + # Verify Redis delete was called with correct key pattern + mock_redis.delete.assert_called_once() + call_args = mock_redis.delete.call_args[0] + assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:") + assert TEST_CONNECTION_ID in call_args[0] + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +async def test_joining_endpoint_room_not_found( + mock_get_room, + client, + authenticated_client, +): + """Test that /joining returns 404 when room not found.""" + mock_get_room.return_value = None + + response = await client.post( + "/rooms/nonexistent-room/meetings/meeting-123/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Room not found" + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +async def test_joining_endpoint_meeting_not_found( + mock_get_meeting, + mock_get_room, + mock_room, + client, + authenticated_client, +): + """Test that /joining returns 404 when meeting not found.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = None + + response = await client.post( + f"/rooms/{mock_room.name}/meetings/nonexistent-meeting/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Meeting not found" + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +async def test_joining_endpoint_meeting_not_active( + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, + authenticated_client, +): + """Test that /joining returns 400 when meeting is not active.""" + mock_get_room.return_value = mock_room + inactive_meeting = mock_meeting.model_copy(update={"is_active": False}) + mock_get_meeting.return_value = inactive_meeting + + response = await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 400 + assert response.json()["detail"] == "Meeting is not active" + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_joining_endpoint_anonymous_user( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, +): + """Test that /joining works for anonymous users with unique connection_id.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.setex = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + response = await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + # Verify Redis setex was called with "anon:" prefix and connection_id + call_args = mock_redis.setex.call_args[0] + assert ":anon:" in call_args[0] + assert TEST_CONNECTION_ID in call_args[0] + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_joining_endpoint_redis_closed_on_success( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, + authenticated_client, +): + """Test that Redis connection is closed after successful operation.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.setex = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + mock_redis.aclose.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_joining_endpoint_redis_closed_on_error( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, + authenticated_client, +): + """Test that Redis connection is closed even when operation fails.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.setex = AsyncMock(side_effect=Exception("Redis error")) + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + with pytest.raises(Exception): + await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": TEST_CONNECTION_ID}, + ) + + mock_redis.aclose.assert_called_once() + + +@pytest.mark.asyncio +async def test_joining_endpoint_requires_connection_id( + client, +): + """Test that /joining returns 422 when connection_id is missing.""" + response = await client.post( + "/rooms/test-room/meetings/meeting-123/joining", + json={}, + ) + + assert response.status_code == 422 # Validation error + + +@pytest.mark.asyncio +async def test_joining_endpoint_rejects_empty_connection_id( + client, +): + """Test that /joining returns 422 when connection_id is empty string.""" + response = await client.post( + "/rooms/test-room/meetings/meeting-123/joining", + json={"connection_id": ""}, + ) + + assert response.status_code == 422 # Validation error (NonEmptyString) + + +@pytest.mark.asyncio +@patch("reflector.views.rooms.rooms_controller.get_by_name") +@patch("reflector.views.rooms.meetings_controller.get_by_id") +@patch("reflector.views.rooms.get_async_redis_client") +async def test_different_connection_ids_create_different_keys( + mock_get_redis, + mock_get_meeting, + mock_get_room, + mock_room, + mock_meeting, + client, +): + """Test that different connection_ids create different Redis keys.""" + mock_get_room.return_value = mock_room + mock_get_meeting.return_value = mock_meeting + + mock_redis = AsyncMock() + mock_redis.setex = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + # First connection + await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": "connection-1"}, + ) + key1 = mock_redis.setex.call_args[0][0] + + mock_redis.setex.reset_mock() + + # Second connection (different tab) + await client.post( + f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining", + json={"connection_id": "connection-2"}, + ) + key2 = mock_redis.setex.call_args[0][0] + + # Keys should be different + assert key1 != key2 + assert "connection-1" in key1 + assert "connection-2" in key2 diff --git a/server/tests/test_pending_joins.py b/server/tests/test_pending_joins.py new file mode 100644 index 00000000..7a515297 --- /dev/null +++ b/server/tests/test_pending_joins.py @@ -0,0 +1,153 @@ +"""Tests for pending joins Redis helper functions. + +TDD tests for tracking join intent to prevent race conditions during +WebRTC handshake when users join meetings. +""" + +from unittest.mock import AsyncMock + +import pytest + +from reflector.presence.pending_joins import ( + PENDING_JOIN_PREFIX, + PENDING_JOIN_TTL, + create_pending_join, + delete_pending_join, + has_pending_joins, +) + + +@pytest.fixture +def mock_redis(): + """Mock async Redis client.""" + redis = AsyncMock() + redis.setex = AsyncMock() + redis.delete = AsyncMock() + redis.scan = AsyncMock(return_value=(0, [])) + return redis + + +@pytest.mark.asyncio +async def test_create_pending_join_sets_key_with_ttl(mock_redis): + """Test that create_pending_join stores key with correct TTL.""" + meeting_id = "meeting-123" + user_id = "user-456" + + await create_pending_join(mock_redis, meeting_id, user_id) + + expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}" + mock_redis.setex.assert_called_once() + call_args = mock_redis.setex.call_args + assert call_args[0][0] == expected_key + assert call_args[0][1] == PENDING_JOIN_TTL + # Value should be a timestamp string + assert call_args[0][2] is not None + + +@pytest.mark.asyncio +async def test_delete_pending_join_removes_key(mock_redis): + """Test that delete_pending_join removes the key.""" + meeting_id = "meeting-123" + user_id = "user-456" + + await delete_pending_join(mock_redis, meeting_id, user_id) + + expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}" + mock_redis.delete.assert_called_once_with(expected_key) + + +@pytest.mark.asyncio +async def test_has_pending_joins_returns_false_when_no_keys(mock_redis): + """Test has_pending_joins returns False when no matching keys.""" + mock_redis.scan.return_value = (0, []) + + result = await has_pending_joins(mock_redis, "meeting-123") + + assert result is False + mock_redis.scan.assert_called_once() + call_kwargs = mock_redis.scan.call_args.kwargs + assert call_kwargs["match"] == f"{PENDING_JOIN_PREFIX}:meeting-123:*" + + +@pytest.mark.asyncio +async def test_has_pending_joins_returns_true_when_keys_exist(mock_redis): + """Test has_pending_joins returns True when matching keys found.""" + mock_redis.scan.return_value = (0, [b"pending_join:meeting-123:user-1"]) + + result = await has_pending_joins(mock_redis, "meeting-123") + + assert result is True + + +@pytest.mark.asyncio +async def test_has_pending_joins_scans_with_correct_pattern(mock_redis): + """Test has_pending_joins uses correct scan pattern.""" + meeting_id = "meeting-abc-def" + mock_redis.scan.return_value = (0, []) + + await has_pending_joins(mock_redis, meeting_id) + + expected_pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*" + mock_redis.scan.assert_called_once() + call_kwargs = mock_redis.scan.call_args.kwargs + assert call_kwargs["match"] == expected_pattern + assert call_kwargs["count"] == 100 + + +@pytest.mark.asyncio +async def test_multiple_users_pending_joins(mock_redis): + """Test that multiple users can have pending joins for same meeting.""" + meeting_id = "meeting-123" + # Simulate two pending joins + mock_redis.scan.return_value = ( + 0, + [b"pending_join:meeting-123:user-1", b"pending_join:meeting-123:user-2"], + ) + + result = await has_pending_joins(mock_redis, meeting_id) + + assert result is True + + +@pytest.mark.asyncio +async def test_pending_join_ttl_value(): + """Test that PENDING_JOIN_TTL has expected value.""" + # 30 seconds should be enough for WebRTC handshake but not too long + assert PENDING_JOIN_TTL == 30 + + +@pytest.mark.asyncio +async def test_pending_join_prefix_value(): + """Test that PENDING_JOIN_PREFIX has expected value.""" + assert PENDING_JOIN_PREFIX == "pending_join" + + +@pytest.mark.asyncio +async def test_has_pending_joins_multi_iteration_scan_no_keys(mock_redis): + """Test has_pending_joins iterates until cursor returns 0.""" + # Simulate multi-iteration scan: cursor 100 -> cursor 50 -> cursor 0 + mock_redis.scan.side_effect = [ + (100, []), # First iteration, no keys, continue + (50, []), # Second iteration, no keys, continue + (0, []), # Third iteration, cursor 0, done + ] + + result = await has_pending_joins(mock_redis, "meeting-123") + + assert result is False + assert mock_redis.scan.call_count == 3 + + +@pytest.mark.asyncio +async def test_has_pending_joins_multi_iteration_finds_key_later(mock_redis): + """Test has_pending_joins finds key on second iteration.""" + # Simulate finding key on second scan iteration + mock_redis.scan.side_effect = [ + (100, []), # First iteration, no keys + (0, [b"pending_join:meeting-123:user-1"]), # Second iteration, found key + ] + + result = await has_pending_joins(mock_redis, "meeting-123") + + assert result is True + assert mock_redis.scan.call_count == 2 diff --git a/server/tests/test_process_meetings_pending_joins.py b/server/tests/test_process_meetings_pending_joins.py new file mode 100644 index 00000000..17206141 --- /dev/null +++ b/server/tests/test_process_meetings_pending_joins.py @@ -0,0 +1,241 @@ +"""Tests for process_meetings pending joins check. + +Tests that process_meetings correctly skips deactivation when +pending joins exist for a meeting. +""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.db.meetings import Meeting + + +def _get_process_meetings_fn(): + """Get the underlying async function without Celery/asynctask decorators.""" + from reflector.worker import process + + fn = process.process_meetings + # Get through both decorator layers (@shared_task and @asynctask) + if hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + if hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + return fn + + +@pytest.fixture +def mock_active_meeting(): + """Mock an active meeting that should be considered for deactivation.""" + now = datetime.now(timezone.utc) + return Meeting( + id="meeting-123", + room_id="room-456", + room_name="test-room-20251118120000", + room_url="https://daily.co/test-room-20251118120000", + host_room_url="https://daily.co/test-room-20251118120000?t=host", + platform="daily", + num_clients=0, + is_active=True, + start_date=now - timedelta(hours=1), + end_date=now - timedelta(minutes=30), # Already ended + ) + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_all_active") +@patch("reflector.worker.process.RedisAsyncLock") +@patch("reflector.worker.process.create_platform_client") +@patch("reflector.worker.process.get_async_redis_client") +@patch("reflector.worker.process.has_pending_joins") +@patch("reflector.worker.process.meetings_controller.update_meeting") +async def test_process_meetings_skips_deactivation_with_pending_joins( + mock_update_meeting, + mock_has_pending_joins, + mock_get_redis, + mock_create_client, + mock_redis_lock_class, + mock_get_all_active, + mock_active_meeting, +): + """Test that process_meetings skips deactivation when pending joins exist.""" + process_meetings = _get_process_meetings_fn() + + mock_get_all_active.return_value = [mock_active_meeting] + + # Mock lock acquired + mock_lock_instance = AsyncMock() + mock_lock_instance.acquired = True + mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance) + mock_lock_instance.__aexit__ = AsyncMock() + mock_redis_lock_class.return_value = mock_lock_instance + + # Mock platform client - no active sessions, but had sessions (triggers deactivation) + mock_daily_client = AsyncMock() + mock_session = AsyncMock() + mock_session.ended_at = datetime.now(timezone.utc) # Session ended + mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session]) + mock_create_client.return_value = mock_daily_client + + # Mock Redis client + mock_redis = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + # Mock pending joins exist + mock_has_pending_joins.return_value = True + + await process_meetings() + + # Verify has_pending_joins was called + mock_has_pending_joins.assert_called_once_with(mock_redis, mock_active_meeting.id) + + # Verify meeting was NOT deactivated + mock_update_meeting.assert_not_called() + + # Verify Redis was closed + mock_redis.aclose.assert_called_once() + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_all_active") +@patch("reflector.worker.process.RedisAsyncLock") +@patch("reflector.worker.process.create_platform_client") +@patch("reflector.worker.process.get_async_redis_client") +@patch("reflector.worker.process.has_pending_joins") +@patch("reflector.worker.process.meetings_controller.update_meeting") +async def test_process_meetings_deactivates_without_pending_joins( + mock_update_meeting, + mock_has_pending_joins, + mock_get_redis, + mock_create_client, + mock_redis_lock_class, + mock_get_all_active, + mock_active_meeting, +): + """Test that process_meetings deactivates when no pending joins.""" + process_meetings = _get_process_meetings_fn() + + mock_get_all_active.return_value = [mock_active_meeting] + + # Mock lock acquired + mock_lock_instance = AsyncMock() + mock_lock_instance.acquired = True + mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance) + mock_lock_instance.__aexit__ = AsyncMock() + mock_redis_lock_class.return_value = mock_lock_instance + + # Mock platform client - no active sessions, but had sessions + mock_daily_client = AsyncMock() + mock_session = AsyncMock() + mock_session.ended_at = datetime.now(timezone.utc) + mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session]) + mock_create_client.return_value = mock_daily_client + + # Mock Redis client + mock_redis = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + # Mock no pending joins + mock_has_pending_joins.return_value = False + + await process_meetings() + + # Verify meeting was deactivated + mock_update_meeting.assert_called_once_with(mock_active_meeting.id, is_active=False) + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_all_active") +@patch("reflector.worker.process.RedisAsyncLock") +@patch("reflector.worker.process.create_platform_client") +async def test_process_meetings_no_check_when_active_sessions( + mock_create_client, + mock_redis_lock_class, + mock_get_all_active, + mock_active_meeting, +): + """Test that pending joins check is skipped when there are active sessions.""" + process_meetings = _get_process_meetings_fn() + + mock_get_all_active.return_value = [mock_active_meeting] + + # Mock lock acquired + mock_lock_instance = AsyncMock() + mock_lock_instance.acquired = True + mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance) + mock_lock_instance.__aexit__ = AsyncMock() + mock_redis_lock_class.return_value = mock_lock_instance + + # Mock platform client - has active session + mock_daily_client = AsyncMock() + mock_session = AsyncMock() + mock_session.ended_at = None # Still active + mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session]) + mock_create_client.return_value = mock_daily_client + + with ( + patch("reflector.worker.process.get_async_redis_client") as mock_get_redis, + patch("reflector.worker.process.has_pending_joins") as mock_has_pending_joins, + patch( + "reflector.worker.process.meetings_controller.update_meeting" + ) as mock_update_meeting, + ): + await process_meetings() + + # Verify pending joins check was NOT called (no need - active sessions exist) + mock_has_pending_joins.assert_not_called() + + # Verify meeting was NOT deactivated + mock_update_meeting.assert_not_called() + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_all_active") +@patch("reflector.worker.process.RedisAsyncLock") +@patch("reflector.worker.process.create_platform_client") +@patch("reflector.worker.process.get_async_redis_client") +@patch("reflector.worker.process.has_pending_joins") +@patch("reflector.worker.process.meetings_controller.update_meeting") +async def test_process_meetings_closes_redis_even_on_continue( + mock_update_meeting, + mock_has_pending_joins, + mock_get_redis, + mock_create_client, + mock_redis_lock_class, + mock_get_all_active, + mock_active_meeting, +): + """Test that Redis connection is always closed, even when skipping deactivation.""" + process_meetings = _get_process_meetings_fn() + + mock_get_all_active.return_value = [mock_active_meeting] + + # Mock lock acquired + mock_lock_instance = AsyncMock() + mock_lock_instance.acquired = True + mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance) + mock_lock_instance.__aexit__ = AsyncMock() + mock_redis_lock_class.return_value = mock_lock_instance + + # Mock platform client - no active sessions + mock_daily_client = AsyncMock() + mock_session = AsyncMock() + mock_session.ended_at = datetime.now(timezone.utc) + mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session]) + mock_create_client.return_value = mock_daily_client + + # Mock Redis client + mock_redis = AsyncMock() + mock_redis.aclose = AsyncMock() + mock_get_redis.return_value = mock_redis + + # Mock pending joins exist (will trigger continue) + mock_has_pending_joins.return_value = True + + await process_meetings() + + # Verify Redis was closed + mock_redis.aclose.assert_called_once() diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index d1c00254..1e53337c 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -25,6 +25,8 @@ import { useConsentDialog } from "../../lib/consent"; import { useRoomJoinMeeting, useMeetingStartRecording, + useMeetingJoining, + useMeetingJoined, } from "../../lib/apiHooks"; import { omit } from "remeda"; import { @@ -187,8 +189,14 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const [container, setContainer] = useState(null); const joinMutation = useRoomJoinMeeting(); const startRecordingMutation = useMeetingStartRecording(); + const joiningMutation = useMeetingJoining(); + const joinedMutation = useMeetingJoined(); const [joinedMeeting, setJoinedMeeting] = useState(null); + // Generate a stable connection ID for this component instance + // Used to track pending joins per browser tab (prevents key collision for anonymous users) + const connectionId = useMemo(() => crypto.randomUUID(), []); + // Generate deterministic instanceIds so all participants use SAME IDs const cloudInstanceId = parseNonEmptyString(meeting.id); const rawTracksInstanceId = parseNonEmptyString( @@ -249,6 +257,28 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { ); const handleFrameJoinMeeting = useCallback(() => { + // Signal that WebRTC connection is established + // This clears the pending join intent, confirming successful connection + joinedMutation.mutate( + { + params: { + path: { + room_name: roomName, + meeting_id: meeting.id, + }, + }, + body: { + connection_id: connectionId, + }, + }, + { + onError: (error: unknown) => { + // Non-blocking: log but don't fail - this is cleanup, not critical + console.warn("Failed to signal joined:", error); + }, + }, + ); + if (meeting.recording_type === "cloud") { console.log("Starting dual recording via REST API", { cloudInstanceId, @@ -310,6 +340,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { }, [ meeting.recording_type, meeting.id, + roomName, + connectionId, + joinedMutation, startRecordingMutation, cloudInstanceId, rawTracksInstanceId, @@ -328,8 +361,28 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { useEffect(() => { if (!frame || !roomUrl) return; - frame - .join({ + + const joinRoom = async () => { + // Signal intent to join before WebRTC handshake starts + // This prevents race condition where meeting is deactivated during handshake + try { + await joiningMutation.mutateAsync({ + params: { + path: { + room_name: roomName, + meeting_id: meeting.id, + }, + }, + body: { + connection_id: connectionId, + }, + }); + } catch (error) { + // Non-blocking: log but continue with join + console.warn("Failed to signal joining intent:", error); + } + + await frame.join({ url: roomUrl, sendSettings: { video: { @@ -341,9 +394,13 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { }, // Note: screenVideo intentionally not configured to preserve full quality for screen shares }, - }) - .catch(console.error.bind(console, "Failed to join daily room:")); - }, [frame, roomUrl]); + }); + }; + + joinRoom().catch(console.error.bind(console, "Failed to join daily room:")); + // joiningMutation excluded from deps - it's a stable hook reference + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [frame, roomUrl, roomName, meeting.id, connectionId]); useEffect(() => { setCustomTrayButton( diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index 788dfac6..f0da335a 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -807,6 +807,26 @@ export function useRoomJoinMeeting() { ); } +// Presence race fix endpoints (not yet in OpenAPI spec) +// These signal join intent to prevent race conditions during WebRTC handshake +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function useMeetingJoining(): any { + return ($api as any).useMutation( + "post", + "/v1/rooms/{room_name}/meetings/{meeting_id}/joining", + {}, + ); +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function useMeetingJoined(): any { + return ($api as any).useMutation( + "post", + "/v1/rooms/{room_name}/meetings/{meeting_id}/joined", + {}, + ); +} + export function useRoomIcsSync() { const { setError } = useError();