Compare commits

..

6 Commits

Author SHA1 Message Date
Igor Loskutov
ad64f43202 Merge main into feature-leave-endpoint
Resolve conflict in apiHooks.ts by keeping import for createFinalURL
and createQuerySerializer which are used by leave/join room functions.
2026-02-05 14:34:01 -05:00
Igor Loskutov
485b455c69 race condition debug wip 2026-01-30 14:38:12 -05:00
Igor Loskutov
74c9ec2ff1 race condition debug wip 2026-01-30 14:37:53 -05:00
Igor Loskutov
aac89e8d03 rejoin tags backend 2026-01-29 15:57:09 -05:00
Igor Loskutov
13088e72f8 feat: Trigger presence poll on join endpoint for Daily meetings
Also trigger poll_daily_room_presence_task when user joins meeting via
/join endpoint, not just on /leave. Webhooks can fail or not exist
(e.g., Whereby has no participant.joined webhook), so frontend-triggered
polls needed for both join and leave events.
2026-01-26 18:05:44 -05:00
Igor Loskutov
775c9b667d feat: Add meeting leave endpoint for faster presence detection (no-mistaken)
Backend:
- Add POST /rooms/{room_name}/meetings/{meeting_id}/leave endpoint
- Triggers poll_daily_room_presence_task immediately on user disconnect
- Reduces detection latency from 0-30s (periodic poll) to ~1-2s

Frontend:
- Add useRoomLeaveMeeting() mutation hook
- Add beforeunload handler in DailyRoom that calls sendBeacon()
- Guarantees API call completion even if tab closes mid-request

Context:
- Daily.co webhooks handle clean disconnects
- This endpoint handles dirty disconnects (tab close, crash, network drop)
- Redis lock prevents spam if multiple users leave simultaneously

This commit is no-mistaken and follows user requirements for readonly research
task that was later approved for implementation.
2026-01-26 17:59:33 -05:00
12 changed files with 625 additions and 1048 deletions

View File

@@ -1,17 +0,0 @@
"""Presence tracking for meetings."""
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
__all__ = [
"PENDING_JOIN_PREFIX",
"PENDING_JOIN_TTL",
"create_pending_join",
"delete_pending_join",
"has_pending_joins",
]

View File

@@ -1,59 +0,0 @@
"""Track pending join intents in Redis.
When a user signals intent to join a meeting (before WebRTC handshake completes),
we store a pending join record. This prevents the meeting from being deactivated
while users are still connecting.
"""
import time
from redis.asyncio import Redis
from reflector.logger import logger
PENDING_JOIN_TTL = 30 # seconds
PENDING_JOIN_PREFIX = "pending_join"
# Max keys to scan per Redis SCAN iteration
SCAN_BATCH_SIZE = 100
async def create_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Create a pending join record. Called before WebRTC handshake."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.setex(key, PENDING_JOIN_TTL, str(time.time()))
log.debug("Created pending join")
async def delete_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Delete pending join. Called after WebRTC connection established."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.delete(key)
log.debug("Deleted pending join")
async def has_pending_joins(redis: Redis, meeting_id: str) -> bool:
"""Check if meeting has any pending joins.
Uses Redis SCAN to iterate through all keys matching the pattern.
Properly iterates until cursor returns 0 to ensure all keys are checked.
"""
pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
log = logger.bind(meeting_id=meeting_id, pattern=pattern)
cursor = 0
iterations = 0
while True:
cursor, keys = await redis.scan(
cursor=cursor, match=pattern, count=SCAN_BATCH_SIZE
)
iterations += 1
if keys:
log.debug("Found pending joins", count=len(keys), iterations=iterations)
return True
if cursor == 0:
break
log.debug("No pending joins found", iterations=iterations)
return False

View File

@@ -129,6 +129,10 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants(
self, meeting_id: str
) -> MeetingParticipantsResponse:

View File

@@ -1,3 +1,4 @@
import logging
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Literal, Optional
@@ -13,17 +14,17 @@ from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.presence.pending_joins import create_pending_join, delete_pending_join
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.redis_cache import RedisAsyncLock
from reflector.schemas.platform import Platform
from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
class Room(BaseModel):
id: str
@@ -365,6 +366,53 @@ async def rooms_create_meeting(
return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,
@@ -597,112 +645,3 @@ async def rooms_join_meeting(
meeting.room_url = add_query_param(meeting.room_url, "t", token)
return meeting
class JoiningRequest(BaseModel):
"""Request body for /joining and /joined endpoints."""
connection_id: NonEmptyString
"""Unique identifier for this connection. Should be a UUID generated by the client.
Must be the same value for both /joining and /joined calls from the same tab."""
class JoiningResponse(BaseModel):
status: Literal["ok"]
def _get_pending_join_key(
user: Optional[auth.UserInfo], connection_id: NonEmptyString
) -> str:
"""Get a unique key for pending join tracking.
Uses user ID for authenticated users, connection_id for anonymous users.
This ensures each browser tab has its own unique pending join record.
"""
if user:
return f"{user['sub']}:{connection_id}"
return f"anon:{connection_id}"
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joining", response_model=JoiningResponse
)
async def meeting_joining(
room_name: str,
meeting_id: str,
body: JoiningRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoiningResponse:
"""Signal intent to join meeting. Called before WebRTC handshake starts.
This creates a pending join record that prevents the meeting from being
deactivated while the WebRTC handshake is in progress. The record expires
automatically after 30 seconds if the connection is not established.
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if not meeting.is_active:
raise HTTPException(status_code=400, detail="Meeting is not active")
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await create_pending_join(redis, meeting_id, join_key)
log.debug("Created pending join intent", join_key=join_key)
finally:
await redis.aclose()
return JoiningResponse(status="ok")
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joined", response_model=JoiningResponse
)
async def meeting_joined(
room_name: str,
meeting_id: str,
body: JoiningRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoiningResponse:
"""Signal that WebRTC connection is established.
This clears the pending join record, confirming the user has successfully
connected to the meeting. Safe to call even if meeting was deactivated
during the handshake (idempotent cleanup).
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Note: We don't check is_active here - the /joined call is a cleanup operation
# and should succeed even if the meeting was deactivated during the handshake
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await delete_pending_join(redis, meeting_id, join_key)
log.debug("Cleared pending join intent", join_key=join_key)
finally:
await redis.aclose()
return JoiningResponse(status="ok")

View File

@@ -31,10 +31,9 @@ from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.presence.pending_joins import has_pending_joins
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.redis_cache import RedisAsyncLock
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
@@ -846,8 +845,40 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = False
has_had_sessions = False
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
@@ -870,22 +901,23 @@ async def process_meetings():
logger_.debug("Meeting not yet started, keep it")
if should_deactivate:
# Check for pending joins before deactivating
# Users might be in the process of connecting via WebRTC
redis = await get_async_redis_client()
try:
if await has_pending_joins(redis, meeting.id):
logger_.info(
"Meeting has pending joins, skipping deactivation"
)
continue
finally:
await redis.aclose()
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting is deactivated")
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1

View File

@@ -0,0 +1,286 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -1,362 +0,0 @@
"""Integration tests for /joining and /joined endpoints.
Tests for the join intent tracking to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
from reflector.presence.pending_joins import PENDING_JOIN_PREFIX
TEST_CONNECTION_ID = "test-connection-uuid-12345"
@pytest.fixture
def mock_room():
"""Mock room object."""
from reflector.db.rooms import Room
return Room(
id="room-123",
name="test-room",
user_id="owner-user",
created_at=datetime.now(timezone.utc),
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=True,
platform="daily",
skip_consent=False,
)
@pytest.fixture
def mock_meeting():
"""Mock meeting object."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-456",
room_id="room-123",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now,
end_date=now + timedelta(hours=1),
)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_creates_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining endpoint creates pending join in Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with correct key pattern
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joined_endpoint_deletes_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joined endpoint deletes pending join from Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.delete = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joined",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis delete was called with correct key pattern
mock_redis.delete.assert_called_once()
call_args = mock_redis.delete.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
async def test_joining_endpoint_room_not_found(
mock_get_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when room not found."""
mock_get_room.return_value = None
response = await client.post(
"/rooms/nonexistent-room/meetings/meeting-123/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Room not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_found(
mock_get_meeting,
mock_get_room,
mock_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when meeting not found."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = None
response = await client.post(
f"/rooms/{mock_room.name}/meetings/nonexistent-meeting/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Meeting not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_active(
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining returns 400 when meeting is not active."""
mock_get_room.return_value = mock_room
inactive_meeting = mock_meeting.model_copy(update={"is_active": False})
mock_get_meeting.return_value = inactive_meeting
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 400
assert response.json()["detail"] == "Meeting is not active"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_anonymous_user(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that /joining works for anonymous users with unique connection_id."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with "anon:" prefix and connection_id
call_args = mock_redis.setex.call_args[0]
assert ":anon:" in call_args[0]
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_success(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed after successful operation."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_error(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed even when operation fails."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock(side_effect=Exception("Redis error"))
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
with pytest.raises(Exception):
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
async def test_joining_endpoint_requires_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is missing."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={},
)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_joining_endpoint_rejects_empty_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is empty string."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={"connection_id": ""},
)
assert response.status_code == 422 # Validation error (NonEmptyString)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_different_connection_ids_create_different_keys(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that different connection_ids create different Redis keys."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# First connection
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-1"},
)
key1 = mock_redis.setex.call_args[0][0]
mock_redis.setex.reset_mock()
# Second connection (different tab)
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-2"},
)
key2 = mock_redis.setex.call_args[0][0]
# Keys should be different
assert key1 != key2
assert "connection-1" in key1
assert "connection-2" in key2

View File

@@ -1,153 +0,0 @@
"""Tests for pending joins Redis helper functions.
TDD tests for tracking join intent to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from unittest.mock import AsyncMock
import pytest
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
@pytest.fixture
def mock_redis():
"""Mock async Redis client."""
redis = AsyncMock()
redis.setex = AsyncMock()
redis.delete = AsyncMock()
redis.scan = AsyncMock(return_value=(0, []))
return redis
@pytest.mark.asyncio
async def test_create_pending_join_sets_key_with_ttl(mock_redis):
"""Test that create_pending_join stores key with correct TTL."""
meeting_id = "meeting-123"
user_id = "user-456"
await create_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args
assert call_args[0][0] == expected_key
assert call_args[0][1] == PENDING_JOIN_TTL
# Value should be a timestamp string
assert call_args[0][2] is not None
@pytest.mark.asyncio
async def test_delete_pending_join_removes_key(mock_redis):
"""Test that delete_pending_join removes the key."""
meeting_id = "meeting-123"
user_id = "user-456"
await delete_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.delete.assert_called_once_with(expected_key)
@pytest.mark.asyncio
async def test_has_pending_joins_returns_false_when_no_keys(mock_redis):
"""Test has_pending_joins returns False when no matching keys."""
mock_redis.scan.return_value = (0, [])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == f"{PENDING_JOIN_PREFIX}:meeting-123:*"
@pytest.mark.asyncio
async def test_has_pending_joins_returns_true_when_keys_exist(mock_redis):
"""Test has_pending_joins returns True when matching keys found."""
mock_redis.scan.return_value = (0, [b"pending_join:meeting-123:user-1"])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
@pytest.mark.asyncio
async def test_has_pending_joins_scans_with_correct_pattern(mock_redis):
"""Test has_pending_joins uses correct scan pattern."""
meeting_id = "meeting-abc-def"
mock_redis.scan.return_value = (0, [])
await has_pending_joins(mock_redis, meeting_id)
expected_pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == expected_pattern
assert call_kwargs["count"] == 100
@pytest.mark.asyncio
async def test_multiple_users_pending_joins(mock_redis):
"""Test that multiple users can have pending joins for same meeting."""
meeting_id = "meeting-123"
# Simulate two pending joins
mock_redis.scan.return_value = (
0,
[b"pending_join:meeting-123:user-1", b"pending_join:meeting-123:user-2"],
)
result = await has_pending_joins(mock_redis, meeting_id)
assert result is True
@pytest.mark.asyncio
async def test_pending_join_ttl_value():
"""Test that PENDING_JOIN_TTL has expected value."""
# 30 seconds should be enough for WebRTC handshake but not too long
assert PENDING_JOIN_TTL == 30
@pytest.mark.asyncio
async def test_pending_join_prefix_value():
"""Test that PENDING_JOIN_PREFIX has expected value."""
assert PENDING_JOIN_PREFIX == "pending_join"
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_scan_no_keys(mock_redis):
"""Test has_pending_joins iterates until cursor returns 0."""
# Simulate multi-iteration scan: cursor 100 -> cursor 50 -> cursor 0
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys, continue
(50, []), # Second iteration, no keys, continue
(0, []), # Third iteration, cursor 0, done
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
assert mock_redis.scan.call_count == 3
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_finds_key_later(mock_redis):
"""Test has_pending_joins finds key on second iteration."""
# Simulate finding key on second scan iteration
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys
(0, [b"pending_join:meeting-123:user-1"]), # Second iteration, found key
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
assert mock_redis.scan.call_count == 2

View File

@@ -1,241 +0,0 @@
"""Tests for process_meetings pending joins check.
Tests that process_meetings correctly skips deactivation when
pending joins exist for a meeting.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
def _get_process_meetings_fn():
"""Get the underlying async function without Celery/asynctask decorators."""
from reflector.worker import process
fn = process.process_meetings
# Get through both decorator layers (@shared_task and @asynctask)
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn
@pytest.fixture
def mock_active_meeting():
"""Mock an active meeting that should be considered for deactivation."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-123",
room_id="room-456",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now - timedelta(hours=1),
end_date=now - timedelta(minutes=30), # Already ended
)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_skips_deactivation_with_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings skips deactivation when pending joins exist."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions (triggers deactivation)
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc) # Session ended
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify has_pending_joins was called
mock_has_pending_joins.assert_called_once_with(mock_redis, mock_active_meeting.id)
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_deactivates_without_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings deactivates when no pending joins."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock no pending joins
mock_has_pending_joins.return_value = False
await process_meetings()
# Verify meeting was deactivated
mock_update_meeting.assert_called_once_with(mock_active_meeting.id, is_active=False)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
async def test_process_meetings_no_check_when_active_sessions(
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that pending joins check is skipped when there are active sessions."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - has active session
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = None # Still active
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
with (
patch("reflector.worker.process.get_async_redis_client") as mock_get_redis,
patch("reflector.worker.process.has_pending_joins") as mock_has_pending_joins,
patch(
"reflector.worker.process.meetings_controller.update_meeting"
) as mock_update_meeting,
):
await process_meetings()
# Verify pending joins check was NOT called (no need - active sessions exist)
mock_has_pending_joins.assert_not_called()
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_closes_redis_even_on_continue(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that Redis connection is always closed, even when skipping deactivation."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist (will trigger continue)
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()

View File

@@ -24,17 +24,24 @@ import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording,
useMeetingJoining,
useMeetingJoined,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import {
assertExists,
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import {
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
@@ -181,6 +188,58 @@ const useFrame = (
] as const;
};
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
@@ -188,22 +247,20 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording();
const joiningMutation = useMeetingJoining();
const joinedMutation = useMeetingJoined();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
// Generate a stable connection ID for this component instance
// Used to track pending joins per browser tab (prevents key collision for anonymous users)
const connectionId = useMemo(() => crypto.randomUUID(), []);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
const rawTracksInstanceId = parseNonEmptyString(
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
const roomName = params?.roomName as string;
if (typeof params.roomName === "object")
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const {
showConsentModal,
@@ -245,6 +302,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
router.push("/browse");
}, [router]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) {
@@ -257,27 +316,14 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
);
const handleFrameJoinMeeting = useCallback(() => {
// Signal that WebRTC connection is established
// This clears the pending join intent, confirming successful connection
joinedMutation.mutate(
{
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
},
},
{
onError: (error: unknown) => {
// Non-blocking: log but don't fail - this is cleanup, not critical
console.warn("Failed to signal joined:", error);
},
},
);
});
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
@@ -338,11 +384,10 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
meeting.recording_type,
meeting.id,
roomName,
connectionId,
joinedMutation,
roomName,
meeting.id,
meeting.recording_type,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
@@ -361,28 +406,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useEffect(() => {
if (!frame || !roomUrl) return;
const joinRoom = async () => {
// Signal intent to join before WebRTC handshake starts
// This prevents race condition where meeting is deactivated during handshake
try {
await joiningMutation.mutateAsync({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
},
});
} catch (error) {
// Non-blocking: log but continue with join
console.warn("Failed to signal joining intent:", error);
}
await frame.join({
frame
.join({
url: roomUrl,
sendSettings: {
video: {
@@ -394,13 +419,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
},
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
},
});
};
joinRoom().catch(console.error.bind(console, "Failed to join daily room:"));
// joiningMutation excluded from deps - it's a stable hook reference
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [frame, roomUrl, roomName, meeting.id, connectionId]);
})
.catch(console.error.bind(console, "Failed to join daily room:"));
}, [frame, roomUrl]);
useEffect(() => {
setCustomTrayButton(

View File

@@ -1,12 +1,13 @@
"use client";
import { $api } from "./apiClient";
import { $api, API_URL } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import type { components, operations } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -807,24 +808,42 @@ export function useRoomJoinMeeting() {
);
}
// Presence race fix endpoints (not yet in OpenAPI spec)
// These signal join intent to prevent race conditions during WebRTC handshake
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function useMeetingJoining(): any {
return ($api as any).useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joining",
{},
);
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function useMeetingJoined(): any {
return ($api as any).useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined",
{},
);
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() {

View File

@@ -171,6 +171,48 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": {
parameters: {
query?: never;
@@ -2435,6 +2477,72 @@ export interface operations {
};
};
};
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: {
parameters: {
query?: never;