From 8e438ca285152bd48fdc42767e706fb448d3525c Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Mon, 24 Nov 2025 22:24:03 -0500 Subject: [PATCH] feat: dailyco poll (#730) * dailyco api module (no-mistakes) * daily co library self-review * uncurse * self-review: daily resource leak, uniform types, enable_recording bomb, daily custom error, video_platforms/daily typing, daily timestamp dry * dailyco docs parser * phase 1-2 of daily poll * dailyco poll (no-mistakes) * poll docs * fix tests * forgotten utils file * remove generated daily docs * pr comments * dailyco poll pr review and self-review * daily recording poll api fix * daily recording poll api fix * review * review * fix tests --------- Co-authored-by: Igor Loskutov --- server/docs/video-platforms/README.md | 4 +- server/reflector/dailyco_api/__init__.py | 12 + server/reflector/dailyco_api/client.py | 68 ++- server/reflector/dailyco_api/responses.py | 13 +- server/reflector/dailyco_api/webhooks.py | 74 ++- .../db/daily_participant_sessions.py | 60 +++ server/reflector/db/meetings.py | 7 +- server/reflector/db/recordings.py | 15 +- server/reflector/llm.py | 28 +- server/reflector/settings.py | 1 - server/reflector/utils/daily.py | 63 ++- server/reflector/video_platforms/base.py | 4 - server/reflector/video_platforms/daily.py | 24 +- server/reflector/video_platforms/whereby.py | 3 - server/reflector/views/daily.py | 288 ++++------- server/reflector/worker/app.py | 8 + server/reflector/worker/process.py | 480 ++++++++++++++---- server/tests/mocks/mock_platform.py | 6 - .../tests/test_daily_room_presence_polling.py | 466 +++++++++++++++++ server/tests/test_poll_daily_recordings.py | 193 +++++++ server/tests/test_utils_daily.py | 49 +- 21 files changed, 1529 insertions(+), 337 deletions(-) create mode 100644 server/tests/test_daily_room_presence_polling.py create mode 100644 server/tests/test_poll_daily_recordings.py diff --git a/server/docs/video-platforms/README.md b/server/docs/video-platforms/README.md index 45a615c3..15734db3 100644 --- a/server/docs/video-platforms/README.md +++ b/server/docs/video-platforms/README.md @@ -89,7 +89,9 @@ This document explains how Reflector receives and identifies multitrack audio re --- -## Daily.co (Webhook-based) +## Daily.co + +**Note:** Primary discovery via polling (`poll_daily_recordings`), webhooks as backup. Daily.co uses **webhooks** to notify Reflector when recordings are ready. diff --git a/server/reflector/dailyco_api/__init__.py b/server/reflector/dailyco_api/__init__.py index 1a65478b..8ef95274 100644 --- a/server/reflector/dailyco_api/__init__.py +++ b/server/reflector/dailyco_api/__init__.py @@ -46,10 +46,16 @@ from .webhook_utils import ( from .webhooks import ( DailyTrack, DailyWebhookEvent, + DailyWebhookEventUnion, + ParticipantJoinedEvent, ParticipantJoinedPayload, + ParticipantLeftEvent, ParticipantLeftPayload, + RecordingErrorEvent, RecordingErrorPayload, + RecordingReadyEvent, RecordingReadyToDownloadPayload, + RecordingStartedEvent, RecordingStartedPayload, ) @@ -78,11 +84,17 @@ __all__ = [ "WebhookResponse", # Webhooks "DailyWebhookEvent", + "DailyWebhookEventUnion", "DailyTrack", + "ParticipantJoinedEvent", "ParticipantJoinedPayload", + "ParticipantLeftEvent", "ParticipantLeftPayload", + "RecordingStartedEvent", "RecordingStartedPayload", + "RecordingReadyEvent", "RecordingReadyToDownloadPayload", + "RecordingErrorEvent", "RecordingErrorPayload", # Webhook utilities "verify_webhook_signature", diff --git a/server/reflector/dailyco_api/client.py b/server/reflector/dailyco_api/client.py index 24221bb2..e28e1a72 100644 --- a/server/reflector/dailyco_api/client.py +++ b/server/reflector/dailyco_api/client.py @@ -327,18 +327,8 @@ class DailyApiClient: async def get_recording(self, recording_id: NonEmptyString) -> RecordingResponse: """ + https://docs.daily.co/reference/rest-api/recordings/get-recording-information Get recording metadata and status. - - Reference: https://docs.daily.co/reference/rest-api/recordings - - Args: - recording_id: Daily.co recording ID - - Returns: - Recording metadata including status, duration, and S3 info - - Raises: - httpx.HTTPStatusError: If API request fails """ client = await self._get_client() response = await client.get( @@ -349,6 +339,62 @@ class DailyApiClient: data = await self._handle_response(response, "get_recording") return RecordingResponse(**data) + async def list_recordings( + self, + room_name: NonEmptyString | None = None, + starting_after: str | None = None, + ending_before: str | None = None, + limit: int = 100, + ) -> list[RecordingResponse]: + """ + List recordings with optional filters. + + Reference: https://docs.daily.co/reference/rest-api/recordings + + Args: + room_name: Filter by room name + starting_after: Pagination cursor - recording ID to start after + ending_before: Pagination cursor - recording ID to end before + limit: Max results per page (default 100, max 100) + + Note: starting_after/ending_before are pagination cursors (recording IDs), + NOT time filters. API returns recordings in reverse chronological order. + """ + client = await self._get_client() + + params = {"limit": limit} + if room_name: + params["room_name"] = room_name + if starting_after: + params["starting_after"] = starting_after + if ending_before: + params["ending_before"] = ending_before + + response = await client.get( + f"{self.base_url}/recordings", + headers=self.headers, + params=params, + ) + + data = await self._handle_response(response, "list_recordings") + + if not isinstance(data, dict) or "data" not in data: + logger.error( + "Daily.co API returned unexpected format for list_recordings", + data_type=type(data).__name__, + data_keys=list(data.keys()) if isinstance(data, dict) else None, + data_sample=str(data)[:500], + room_name=room_name, + operation="list_recordings", + ) + raise httpx.HTTPStatusError( + message=f"Unexpected response format from list_recordings: {type(data).__name__}", + request=response.request, + response=response, + ) + + return [RecordingResponse(**r) for r in data["data"]] + # ============================================================================ # MEETING TOKENS # ============================================================================ diff --git a/server/reflector/dailyco_api/responses.py b/server/reflector/dailyco_api/responses.py index 4eb84245..3dc18815 100644 --- a/server/reflector/dailyco_api/responses.py +++ b/server/reflector/dailyco_api/responses.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Literal from pydantic import BaseModel, Field +from reflector.dailyco_api.webhooks import DailyTrack from reflector.utils.string import NonEmptyString # not documented in daily; we fill it according to observations @@ -131,12 +132,22 @@ class RecordingResponse(BaseModel): status: RecordingStatus = Field( description="Recording status ('in-progress' or 'finished')" ) - max_participants: int = Field(description="Maximum participants during recording") + max_participants: int | None = Field( + None, description="Maximum participants during recording (may be missing)" + ) duration: int = Field(description="Recording duration in seconds") share_token: NonEmptyString | None = Field( None, description="Token for sharing recording" ) s3: RecordingS3Info | None = Field(None, description="S3 bucket information") + tracks: list[DailyTrack] = Field( + default_factory=list, + description="Track list for raw-tracks recordings (always array, never null)", + ) + # this is not a mistake but a deliberate Daily.co naming decision + mtgSessionId: NonEmptyString | None = Field( + None, description="Meeting session identifier (may be missing)" + ) class MeetingTokenResponse(BaseModel): diff --git a/server/reflector/dailyco_api/webhooks.py b/server/reflector/dailyco_api/webhooks.py index 862f4996..e0ff1f5c 100644 --- a/server/reflector/dailyco_api/webhooks.py +++ b/server/reflector/dailyco_api/webhooks.py @@ -4,7 +4,7 @@ Daily.co Webhook Event Models Reference: https://docs.daily.co/reference/rest-api/webhooks """ -from typing import Any, Dict, Literal +from typing import Annotated, Any, Dict, Literal, Union from pydantic import BaseModel, Field, field_validator @@ -197,3 +197,75 @@ class RecordingErrorPayload(BaseModel): _normalize_timestamp = field_validator("timestamp", mode="before")( normalize_timestamp_to_int ) + + +class ParticipantJoinedEvent(BaseModel): + version: NonEmptyString + type: Literal["participant.joined"] + id: NonEmptyString + payload: ParticipantJoinedPayload + event_ts: int + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class ParticipantLeftEvent(BaseModel): + version: NonEmptyString + type: Literal["participant.left"] + id: NonEmptyString + payload: ParticipantLeftPayload + event_ts: int + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingStartedEvent(BaseModel): + version: NonEmptyString + type: Literal["recording.started"] + id: NonEmptyString + payload: RecordingStartedPayload + event_ts: int + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingReadyEvent(BaseModel): + version: NonEmptyString + type: Literal["recording.ready-to-download"] + id: NonEmptyString + payload: RecordingReadyToDownloadPayload + event_ts: int + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingErrorEvent(BaseModel): + version: NonEmptyString + type: Literal["recording.error"] + id: NonEmptyString + payload: RecordingErrorPayload + event_ts: int + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +DailyWebhookEventUnion = Annotated[ + Union[ + ParticipantJoinedEvent, + ParticipantLeftEvent, + RecordingStartedEvent, + RecordingReadyEvent, + RecordingErrorEvent, + ], + Field(discriminator="type"), +] diff --git a/server/reflector/db/daily_participant_sessions.py b/server/reflector/db/daily_participant_sessions.py index 5fac1912..4326b3c0 100644 --- a/server/reflector/db/daily_participant_sessions.py +++ b/server/reflector/db/daily_participant_sessions.py @@ -165,5 +165,65 @@ class DailyParticipantSessionController: results = await get_database().fetch_all(query) return [DailyParticipantSession(**result) for result in results] + async def get_all_sessions_for_meeting( + self, meeting_id: NonEmptyString + ) -> dict[NonEmptyString, DailyParticipantSession]: + query = daily_participant_sessions.select().where( + daily_participant_sessions.c.meeting_id == meeting_id + ) + results = await get_database().fetch_all(query) + # TODO DailySessionId custom type + return {row["session_id"]: DailyParticipantSession(**row) for row in results} + + async def batch_upsert_sessions( + self, sessions: list[DailyParticipantSession] + ) -> None: + """Upsert multiple sessions in single query. + + Uses ON CONFLICT for idempotency. Updates user_name on conflict since they may change it during a meeting. + + """ + if not sessions: + return + + values = [session.model_dump() for session in sessions] + query = insert(daily_participant_sessions).values(values) + query = query.on_conflict_do_update( + index_elements=["id"], + set_={ + # Preserve existing left_at to prevent race conditions + "left_at": sa.func.coalesce( + daily_participant_sessions.c.left_at, + query.excluded.left_at, + ), + "user_name": query.excluded.user_name, + }, + ) + await get_database().execute(query) + + async def batch_close_sessions( + self, session_ids: list[NonEmptyString], left_at: datetime + ) -> None: + """Mark multiple sessions as left in single query. + + Only updates sessions where left_at is NULL (protects already-closed sessions). + + Left_at mismatch for existing sessions is ignored, assumed to be not important issue if ever happens. + """ + if not session_ids: + return + + query = ( + daily_participant_sessions.update() + .where( + sa.and_( + daily_participant_sessions.c.id.in_(session_ids), + daily_participant_sessions.c.left_at.is_(None), + ) + ) + .values(left_at=left_at) + ) + await get_database().execute(query) + daily_participant_sessions_controller = DailyParticipantSessionController() diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 6912b285..9c290fa5 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -146,8 +146,11 @@ class MeetingController: await get_database().execute(query) return meeting - async def get_all_active(self) -> list[Meeting]: - query = meetings.select().where(meetings.c.is_active) + async def get_all_active(self, platform: str | None = None) -> list[Meeting]: + conditions = [meetings.c.is_active] + if platform is not None: + conditions.append(meetings.c.platform == platform) + query = meetings.select().where(sa.and_(*conditions)) results = await get_database().fetch_all(query) return [Meeting(**result) for result in results] diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index bde4afa5..c67b8413 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -44,12 +44,14 @@ class RecordingController: await get_database().execute(query) return recording - async def get_by_id(self, id: str) -> Recording: + async def get_by_id(self, id: str) -> Recording | None: query = recordings.select().where(recordings.c.id == id) result = await get_database().fetch_one(query) return Recording(**result) if result else None - async def get_by_object_key(self, bucket_name: str, object_key: str) -> Recording: + async def get_by_object_key( + self, bucket_name: str, object_key: str + ) -> Recording | None: query = recordings.select().where( recordings.c.bucket_name == bucket_name, recordings.c.object_key == object_key, @@ -61,5 +63,14 @@ class RecordingController: query = recordings.delete().where(recordings.c.id == id) await get_database().execute(query) + # no check for existence + async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: + if not recording_ids: + return [] + + query = recordings.select().where(recordings.c.id.in_(recording_ids)) + results = await get_database().fetch_all(query) + return [Recording(**row) for row in results] + recordings_controller = RecordingController() diff --git a/server/reflector/llm.py b/server/reflector/llm.py index eed50e4a..09dab3d2 100644 --- a/server/reflector/llm.py +++ b/server/reflector/llm.py @@ -1,3 +1,4 @@ +import logging from typing import Type, TypeVar from llama_index.core import Settings @@ -5,7 +6,7 @@ from llama_index.core.output_parsers import PydanticOutputParser from llama_index.core.program import LLMTextCompletionProgram from llama_index.core.response_synthesizers import TreeSummarize from llama_index.llms.openai_like import OpenAILike -from pydantic import BaseModel +from pydantic import BaseModel, ValidationError T = TypeVar("T", bound=BaseModel) @@ -61,6 +62,8 @@ class LLM: tone_name: str | None = None, ) -> T: """Get structured output from LLM for non-function-calling models""" + logger = logging.getLogger(__name__) + summarizer = TreeSummarize(verbose=True) response = await summarizer.aget_response(prompt, texts, tone_name=tone_name) @@ -76,8 +79,25 @@ class LLM: "Please structure the above information in the following JSON format:" ) - output = await program.acall( - analysis=str(response), format_instructions=format_instructions - ) + try: + output = await program.acall( + analysis=str(response), format_instructions=format_instructions + ) + except ValidationError as e: + # Extract the raw JSON from the error details + errors = e.errors() + if errors and "input" in errors[0]: + raw_json = errors[0]["input"] + logger.error( + f"JSON validation failed for {output_cls.__name__}. " + f"Full raw JSON output:\n{raw_json}\n" + f"Validation errors: {errors}" + ) + else: + logger.error( + f"JSON validation failed for {output_cls.__name__}. " + f"Validation errors: {errors}" + ) + raise return output diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 0e3fb3f7..1ec46d94 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -138,7 +138,6 @@ class Settings(BaseSettings): DAILY_WEBHOOK_UUID: str | None = ( None # Webhook UUID for this environment. Not used by production code ) - # Platform Configuration DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM diff --git a/server/reflector/utils/daily.py b/server/reflector/utils/daily.py index 1c3b367c..72242f78 100644 --- a/server/reflector/utils/daily.py +++ b/server/reflector/utils/daily.py @@ -1,6 +1,67 @@ +import os +import re +from typing import NamedTuple + from reflector.utils.string import NonEmptyString -DailyRoomName = str +DailyRoomName = NonEmptyString + + +class DailyRecordingFilename(NamedTuple): + """Parsed components from Daily.co recording filename. + + Format: {recording_start_ts}-{participant_id}-cam-audio-{track_start_ts} + Example: 1763152299562-12f0b87c-97d4-4dd3-a65c-cee1f854a79c-cam-audio-1763152314582 + + Note: S3 object keys have no extension, but browsers add .webm when downloading + from S3 UI due to MIME type headers. If you download manually and wonder. + """ + + recording_start_ts: int + participant_id: str + track_start_ts: int + + +def parse_daily_recording_filename(filename: str) -> DailyRecordingFilename: + """Parse Daily.co recording filename to extract timestamps and participant ID. + + Args: + filename: Full path or basename of Daily.co recording file + Format: {recording_start_ts}-{participant_id}-cam-audio-{track_start_ts} + + Returns: + DailyRecordingFilename with parsed components + + Raises: + ValueError: If filename doesn't match expected format + + Examples: + >>> parse_daily_recording_filename("1763152299562-12f0b87c-97d4-4dd3-a65c-cee1f854a79c-cam-audio-1763152314582") + DailyRecordingFilename(recording_start_ts=1763152299562, participant_id='12f0b87c-97d4-4dd3-a65c-cee1f854a79c', track_start_ts=1763152314582) + """ + base = os.path.basename(filename) + pattern = r"(\d{13,})-([0-9a-fA-F-]{36})-cam-audio-(\d{13,})" + match = re.search(pattern, base) + + if not match: + raise ValueError( + f"Invalid Daily.co recording filename: {filename}. " + f"Expected format: {{recording_start_ts}}-{{participant_id}}-cam-audio-{{track_start_ts}}" + ) + + recording_start_ts = int(match.group(1)) + participant_id = match.group(2) + track_start_ts = int(match.group(3)) + + return DailyRecordingFilename( + recording_start_ts=recording_start_ts, + participant_id=participant_id, + track_start_ts=track_start_ts, + ) + + +def recording_lock_key(recording_id: NonEmptyString) -> NonEmptyString: + return f"recording:{recording_id}" def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString: diff --git a/server/reflector/video_platforms/base.py b/server/reflector/video_platforms/base.py index 877114f7..90c4ffc2 100644 --- a/server/reflector/video_platforms/base.py +++ b/server/reflector/video_platforms/base.py @@ -30,10 +30,6 @@ class VideoPlatformClient(ABC): """Get session history for a room.""" pass - @abstractmethod - async def delete_room(self, room_name: str) -> bool: - pass - @abstractmethod async def upload_logo(self, room_name: str, logo_path: str) -> bool: pass diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index 7485cc95..2b4d2461 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -19,6 +19,7 @@ from reflector.db.rooms import Room from reflector.logger import logger from reflector.storage import get_dailyco_storage +from ..dailyco_api.responses import RecordingStatus from ..schemas.platform import Platform from ..utils.daily import DailyRoomName from ..utils.string import NonEmptyString @@ -130,10 +131,25 @@ class DailyClient(VideoPlatformClient): async def get_recording(self, recording_id: str) -> RecordingResponse: return await self._api_client.get_recording(recording_id) - async def delete_room(self, room_name: str) -> bool: - """Delete a room (idempotent - succeeds even if room doesn't exist).""" - await self._api_client.delete_room(room_name) - return True + async def list_recordings( + self, + room_name: NonEmptyString | None = None, + starting_after: str | None = None, + ending_before: str | None = None, + limit: int = 100, + ) -> list[RecordingResponse]: + return await self._api_client.list_recordings( + room_name=room_name, + starting_after=starting_after, + ending_before=ending_before, + limit=limit, + ) + + async def get_recording_status( + self, recording_id: NonEmptyString + ) -> RecordingStatus: + recording = await self.get_recording(recording_id) + return recording.status async def upload_logo(self, room_name: str, logo_path: str) -> bool: return True diff --git a/server/reflector/video_platforms/whereby.py b/server/reflector/video_platforms/whereby.py index f4775e89..9ef3128c 100644 --- a/server/reflector/video_platforms/whereby.py +++ b/server/reflector/video_platforms/whereby.py @@ -122,9 +122,6 @@ class WherebyClient(VideoPlatformClient): for s in results ] - async def delete_room(self, room_name: str) -> bool: - return True - async def upload_logo(self, room_name: str, logo_path: str) -> bool: async with httpx.AsyncClient() as client: with open(logo_path, "rb") as f: diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 733c70a3..cbdac409 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,24 +1,25 @@ import json -from datetime import datetime, timezone +from typing import assert_never from fastapi import APIRouter, HTTPException, Request +from pydantic import TypeAdapter from reflector.dailyco_api import ( - DailyTrack, - DailyWebhookEvent, - extract_room_name, - parse_recording_error, -) -from reflector.db import get_database -from reflector.db.daily_participant_sessions import ( - DailyParticipantSession, - daily_participant_sessions_controller, + DailyWebhookEventUnion, + ParticipantJoinedEvent, + ParticipantLeftEvent, + RecordingErrorEvent, + RecordingReadyEvent, + RecordingStartedEvent, ) from reflector.db.meetings import meetings_controller from reflector.logger import logger as _logger from reflector.settings import settings from reflector.video_platforms.factory import create_platform_client -from reflector.worker.process import process_multitrack_recording +from reflector.worker.process import ( + poll_daily_room_presence_task, + process_multitrack_recording, +) router = APIRouter() @@ -74,173 +75,83 @@ async def webhook(request: Request): logger.info("Received Daily webhook test event") return {"status": "ok"} + event_adapter = TypeAdapter(DailyWebhookEventUnion) try: - event = DailyWebhookEvent(**body_json) + event = event_adapter.validate_python(body_json) except Exception as e: logger.error("Failed to parse webhook event", error=str(e), body=body.decode()) raise HTTPException(status_code=422, detail="Invalid event format") - if event.type == "participant.joined": - await _handle_participant_joined(event) - elif event.type == "participant.left": - await _handle_participant_left(event) - elif event.type == "recording.started": - await _handle_recording_started(event) - elif event.type == "recording.ready-to-download": - await _handle_recording_ready(event) - elif event.type == "recording.error": - await _handle_recording_error(event) - else: - logger.warning( - "Unhandled Daily webhook event type", - event_type=event.type, - payload=event.payload, - ) + match event: + case ParticipantJoinedEvent(): + await _handle_participant_joined(event) + case ParticipantLeftEvent(): + await _handle_participant_left(event) + case RecordingStartedEvent(): + await _handle_recording_started(event) + case RecordingReadyEvent(): + await _handle_recording_ready(event) + case RecordingErrorEvent(): + await _handle_recording_error(event) + case _: + assert_never(event) return {"status": "ok"} -""" -{ - "version": "1.0.0", - "type": "participant.joined", - "id": "ptcpt-join-6497c79b-f326-4942-aef8-c36a29140ad1-1708972279961", - "payload": { - "room": "test", - "user_id": "6497c79b-f326-4942-aef8-c36a29140ad1", - "user_name": "testuser", - "session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa", - "joined_at": 1708972279.96, - "will_eject_at": 1708972299.541, - "owner": false, - "permissions": { - "hasPresence": true, - "canSend": true, - "canReceive": { "base": true }, - "canAdmin": false - } - }, - "event_ts": 1708972279.961 -} - -""" - - -async def _handle_participant_joined(event: DailyWebhookEvent): - daily_room_name = extract_room_name(event) - if not daily_room_name: - logger.warning("participant.joined: no room in payload", payload=event.payload) - return - - meeting = await meetings_controller.get_by_room_name(daily_room_name) - if not meeting: - logger.warning( - "participant.joined: meeting not found", room_name=daily_room_name - ) - return - - payload = event.payload - joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) - session_id = f"{meeting.id}:{payload['session_id']}" - - session = DailyParticipantSession( - id=session_id, - meeting_id=meeting.id, - room_id=meeting.room_id, - session_id=payload["session_id"], - user_id=payload.get("user_id", None), - user_name=payload["user_name"], - joined_at=joined_at, - left_at=None, - ) - - # num_clients serves as a projection/cache of active session count for Daily.co - # Both operations must succeed or fail together to maintain consistency - async with get_database().transaction(): - await meetings_controller.increment_num_clients(meeting.id) - await daily_participant_sessions_controller.upsert_joined(session) - - logger.info( - "Participant joined", - meeting_id=meeting.id, - room_name=daily_room_name, - user_id=payload.get("user_id", None), - user_name=payload.get("user_name"), - session_id=session_id, - ) - - -""" -{ - "version": "1.0.0", - "type": "participant.left", - "id": "ptcpt-left-16168c97-f973-4eae-9642-020fe3fda5db-1708972302986", - "payload": { - "room": "test", - "user_id": "16168c97-f973-4eae-9642-020fe3fda5db", - "user_name": "bipol", - "session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa", - "joined_at": 1708972291.567, - "will_eject_at": null, - "owner": false, - "permissions": { - "hasPresence": true, - "canSend": true, - "canReceive": { "base": true }, - "canAdmin": false - }, - "duration": 11.419000148773193 - }, - "event_ts": 1708972302.986 -} -""" - - -async def _handle_participant_left(event: DailyWebhookEvent): - room_name = extract_room_name(event) +async def _queue_poll_for_room( + room_name: str | None, + event_type: str, + user_id: str | None, + session_id: str | None, + **log_kwargs, +) -> None: + """Queue poll task for room by name, handling missing room/meeting cases.""" if not room_name: - logger.warning("participant.left: no room in payload", payload=event.payload) + logger.warning(f"{event_type}: no room in payload") return meeting = await meetings_controller.get_by_room_name(room_name) if not meeting: - logger.warning("participant.left: meeting not found", room_name=room_name) + logger.warning(f"{event_type}: meeting not found", room_name=room_name) return - payload = event.payload - joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) - left_at = datetime.fromtimestamp(event.event_ts, tz=timezone.utc) - session_id = f"{meeting.id}:{payload['session_id']}" - - session = DailyParticipantSession( - id=session_id, - meeting_id=meeting.id, - room_id=meeting.room_id, - session_id=payload["session_id"], - user_id=payload.get("user_id", None), - user_name=payload["user_name"], - joined_at=joined_at, - left_at=left_at, - ) - - # num_clients serves as a projection/cache of active session count for Daily.co - # Both operations must succeed or fail together to maintain consistency - async with get_database().transaction(): - await meetings_controller.decrement_num_clients(meeting.id) - await daily_participant_sessions_controller.upsert_left(session) + poll_daily_room_presence_task.delay(meeting.id) logger.info( - "Participant left", + f"{event_type.replace('.', ' ').title()} - poll queued", meeting_id=meeting.id, room_name=room_name, - user_id=payload.get("user_id", None), - duration=payload.get("duration"), + user_id=user_id, session_id=session_id, + **log_kwargs, ) -async def _handle_recording_started(event: DailyWebhookEvent): - room_name = extract_room_name(event) +async def _handle_participant_joined(event: ParticipantJoinedEvent): + """Queue poll task for presence reconciliation.""" + await _queue_poll_for_room( + event.payload.room_name, + "participant.joined", + event.payload.user_id, + event.payload.session_id, + user_name=event.payload.user_name, + ) + + +async def _handle_participant_left(event: ParticipantLeftEvent): + """Queue poll task for presence reconciliation.""" + await _queue_poll_for_room( + event.payload.room_name, + "participant.left", + event.payload.user_id, + event.payload.session_id, + duration=event.payload.duration, + ) + + +async def _handle_recording_started(event: RecordingStartedEvent): + room_name = event.payload.room_name if not room_name: logger.warning( "recording.started: no room_name in payload", payload=event.payload @@ -253,49 +164,27 @@ async def _handle_recording_started(event: DailyWebhookEvent): "Recording started", meeting_id=meeting.id, room_name=room_name, - recording_id=event.payload.get("recording_id"), + recording_id=event.payload.recording_id, platform="daily", ) else: logger.warning("recording.started: meeting not found", room_name=room_name) -async def _handle_recording_ready(event: DailyWebhookEvent): - """Handle recording ready for download event. +async def _handle_recording_ready(event: RecordingReadyEvent): + room_name = event.payload.room_name + recording_id = event.payload.recording_id + tracks = event.payload.tracks - Daily.co webhook payload for raw-tracks recordings: - { - "recording_id": "...", - "room_name": "test2-20251009192341", - "tracks": [ - {"type": "audio", "s3Key": "monadical/test2-.../uuid-cam-audio-123.webm", "size": 400000}, - {"type": "video", "s3Key": "monadical/test2-.../uuid-cam-video-456.webm", "size": 30000000} - ] - } - """ - room_name = extract_room_name(event) - recording_id = event.payload.get("recording_id") - tracks_raw = event.payload.get("tracks", []) - - if not room_name or not tracks_raw: + if not tracks: logger.warning( - "recording.ready-to-download: missing room_name or tracks", + "recording.ready-to-download: missing tracks", room_name=room_name, - has_tracks=bool(tracks_raw), + recording_id=recording_id, payload=event.payload, ) return - try: - tracks = [DailyTrack(**t) for t in tracks_raw] - except Exception as e: - logger.error( - "recording.ready-to-download: invalid tracks structure", - error=str(e), - tracks=tracks_raw, - ) - return - logger.info( "Recording ready for download", room_name=room_name, @@ -313,6 +202,12 @@ async def _handle_recording_ready(event: DailyWebhookEvent): track_keys = [t.s3Key for t in tracks if t.type == "audio"] + logger.info( + "Recording webhook queuing processing", + recording_id=recording_id, + room_name=room_name, + ) + process_multitrack_recording.delay( bucket_name=bucket_name, daily_room_name=room_name, @@ -321,17 +216,18 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ) -async def _handle_recording_error(event: DailyWebhookEvent): - payload = parse_recording_error(event) +async def _handle_recording_error(event: RecordingErrorEvent): + payload = event.payload room_name = payload.room_name - if room_name: - meeting = await meetings_controller.get_by_room_name(room_name) - if meeting: - logger.error( - "Recording error", - meeting_id=meeting.id, - room_name=room_name, - error=payload.error_msg, - platform="daily", - ) + meeting = await meetings_controller.get_by_room_name(room_name) + if meeting: + logger.error( + "Recording error", + meeting_id=meeting.id, + room_name=room_name, + error=payload.error_msg, + platform="daily", + ) + else: + logger.warning("recording.error: meeting not found", room_name=room_name) diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 3c7795a2..c0e711ae 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -38,6 +38,14 @@ else: "task": "reflector.worker.process.reprocess_failed_recordings", "schedule": crontab(hour=5, minute=0), # Midnight EST }, + "poll_daily_recordings": { + "task": "reflector.worker.process.poll_daily_recordings", + "schedule": 180.0, # Every 3 minutes (configurable lookback window) + }, + "trigger_daily_reconciliation": { + "task": "reflector.worker.process.trigger_daily_reconciliation", + "schedule": 30.0, # Every 30 seconds (queues poll tasks for all active meetings) + }, "sync_all_ics_calendars": { "task": "reflector.worker.ics_sync.sync_all_ics_calendars", "schedule": 60.0, # Run every minute to check which rooms need sync diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index dd9c1059..0e1b4d86 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -10,8 +10,12 @@ import structlog from celery import shared_task from celery.utils.log import get_task_logger from pydantic import ValidationError -from redis.exceptions import LockError +from reflector.dailyco_api import MeetingParticipantsResponse +from reflector.db.daily_participant_sessions import ( + DailyParticipantSession, + daily_participant_sessions_controller, +) from reflector.db.meetings import meetings_controller from reflector.db.recordings import Recording, recordings_controller from reflector.db.rooms import rooms_controller @@ -28,10 +32,15 @@ from reflector.pipelines.main_multitrack_pipeline import ( from reflector.pipelines.topic_processing import EmptyPipeline from reflector.processors import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor -from reflector.redis_cache import get_redis_client +from reflector.redis_cache import RedisAsyncLock from reflector.settings import settings from reflector.storage import get_transcripts_storage -from reflector.utils.daily import DailyRoomName, extract_base_room_name +from reflector.utils.daily import ( + DailyRoomName, + extract_base_room_name, + parse_daily_recording_filename, + recording_lock_key, +) from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.whereby_utils import ( parse_whereby_recording_filename, @@ -178,6 +187,42 @@ async def process_multitrack_recording( logger.warning("No audio track keys provided") return + lock_key = recording_lock_key(recording_id) + async with RedisAsyncLock( + key=lock_key, + timeout=600, # 10min for processing (includes API calls, DB writes) + extend_interval=60, # Auto-extend every 60s + skip_if_locked=True, + blocking=False, + ) as lock: + if not lock.acquired: + logger.warning( + "Recording processing skipped - lock already held (duplicate task or concurrent worker)", + recording_id=recording_id, + lock_key=lock_key, + reason="duplicate_task_or_concurrent_worker", + ) + return + + logger.info( + "Recording worker acquired lock - starting processing", + recording_id=recording_id, + lock_key=lock_key, + ) + + await _process_multitrack_recording_inner( + bucket_name, daily_room_name, recording_id, track_keys + ) + + +async def _process_multitrack_recording_inner( + bucket_name: str, + daily_room_name: DailyRoomName, + recording_id: str, + track_keys: list[str], +): + """Inner function containing the actual processing logic.""" + tz = timezone.utc recorded_at = datetime.now(tz) try: @@ -225,9 +270,7 @@ async def process_multitrack_recording( track_keys=track_keys, ) ) - else: - # Recording already exists; assume metadata was set at creation time - pass + # else: Recording already exists; metadata set at creation time transcript = await transcripts_controller.get_by_recording_id(recording.id) if transcript: @@ -252,60 +295,70 @@ async def process_multitrack_recording( ) try: - daily_client = create_platform_client("daily") + async with create_platform_client("daily") as daily_client: + id_to_name = {} + id_to_user_id = {} - id_to_name = {} - id_to_user_id = {} - - mtg_session_id = None - try: - rec_details = await daily_client.get_recording(recording_id) - mtg_session_id = rec_details.get("mtgSessionId") - except Exception as e: - logger.warning( - "Failed to fetch Daily recording details", - error=str(e), - recording_id=recording_id, - exc_info=True, - ) - - if mtg_session_id: try: - payload = await daily_client.get_meeting_participants(mtg_session_id) - for p in payload.get("data", []): - pid = p.get("participant_id") - name = p.get("user_name") - user_id = p.get("user_id") - if pid and name: - id_to_name[pid] = name - if pid and user_id: - id_to_user_id[pid] = user_id + rec_details = await daily_client.get_recording(recording_id) + mtg_session_id = rec_details.mtgSessionId + if mtg_session_id: + try: + payload: MeetingParticipantsResponse = ( + await daily_client.get_meeting_participants(mtg_session_id) + ) + for p in payload.data: + pid = p.participant_id + assert ( + pid is not None + ), "panic! participant id cannot be None" + name = p.user_name + user_id = p.user_id + if name: + id_to_name[pid] = name + if user_id: + id_to_user_id[pid] = user_id + except Exception as e: + logger.warning( + "Failed to fetch Daily meeting participants", + error=str(e), + mtg_session_id=mtg_session_id, + exc_info=True, + ) + else: + logger.warning( + "No mtgSessionId found for recording; participant names may be generic", + recording_id=recording_id, + ) except Exception as e: logger.warning( - "Failed to fetch Daily meeting participants", + "Failed to fetch Daily recording details", error=str(e), - mtg_session_id=mtg_session_id, + recording_id=recording_id, exc_info=True, ) - else: - logger.warning( - "No mtgSessionId found for recording; participant names may be generic", - recording_id=recording_id, - ) - for idx, key in enumerate(track_keys): - base = os.path.basename(key) - m = re.search(r"\d{13,}-([0-9a-fA-F-]{36})-cam-audio-", base) - participant_id = m.group(1) if m else None + for idx, key in enumerate(track_keys): + try: + parsed = parse_daily_recording_filename(key) + participant_id = parsed.participant_id + except ValueError as e: + logger.error( + "Failed to parse Daily recording filename", + error=str(e), + key=key, + exc_info=True, + ) + continue - default_name = f"Speaker {idx}" - name = id_to_name.get(participant_id, default_name) - user_id = id_to_user_id.get(participant_id) + default_name = f"Speaker {idx}" + name = id_to_name.get(participant_id, default_name) + user_id = id_to_user_id.get(participant_id) - participant = TranscriptParticipant( - id=participant_id, speaker=idx, name=name, user_id=user_id - ) - await transcripts_controller.upsert_participant(transcript, participant) + participant = TranscriptParticipant( + id=participant_id, speaker=idx, name=name, user_id=user_id + ) + await transcripts_controller.upsert_participant(transcript, participant) except Exception as e: logger.warning("Failed to map participant names", error=str(e), exc_info=True) @@ -317,6 +370,207 @@ async def process_multitrack_recording( ) +@shared_task +@asynctask +async def poll_daily_recordings(): + """Poll Daily.co API for recordings and process missing ones. + + Fetches latest recordings from Daily.co API (default limit 100), compares with DB, + and queues processing for recordings not already in DB. + + For each missing recording, uses audio tracks from API response. + + Worker-level locking provides idempotency (see process_multitrack_recording). + """ + bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME + if not bucket_name: + logger.debug( + "DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping recording poll" + ) + return + + async with create_platform_client("daily") as daily_client: + # latest 100. TODO cursor-based state + api_recordings = await daily_client.list_recordings() + + if not api_recordings: + logger.debug( + "No recordings found from Daily.co API", + ) + return + + recording_ids = [rec.id for rec in api_recordings] + existing_recordings = await recordings_controller.get_by_ids(recording_ids) + existing_ids = {rec.id for rec in existing_recordings} + + missing_recordings = [rec for rec in api_recordings if rec.id not in existing_ids] + + if not missing_recordings: + logger.debug( + "All recordings already in DB", + api_count=len(api_recordings), + existing_count=len(existing_recordings), + ) + return + + logger.info( + "Found recordings missing from DB", + missing_count=len(missing_recordings), + total_api_count=len(api_recordings), + existing_count=len(existing_recordings), + ) + + for recording in missing_recordings: + if not recording.tracks: + assert recording.status != "finished", ( + f"Recording {recording.id} has status='finished' but no tracks. " + f"Daily.co API guarantees finished recordings have tracks available. " + f"room_name={recording.room_name}" + ) + logger.debug( + "No tracks in recording yet", + recording_id=recording.id, + room_name=recording.room_name, + status=recording.status, + ) + continue + + track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] + + if not track_keys: + logger.warning( + "No audio tracks found in recording (only video tracks)", + recording_id=recording.id, + room_name=recording.room_name, + total_tracks=len(recording.tracks), + ) + continue + + logger.info( + "Queueing missing recording for processing", + recording_id=recording.id, + room_name=recording.room_name, + track_count=len(track_keys), + ) + + process_multitrack_recording.delay( + bucket_name=bucket_name, + daily_room_name=recording.room_name, + recording_id=recording.id, + track_keys=track_keys, + ) + + +async def poll_daily_room_presence(meeting_id: str) -> None: + """Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed. + Warning: Daily api returns only current state, so there could be missed presence updates, people who went and left the room quickly. + Therefore, set(presences) != set(recordings) even if everyone said something. This is not a problem but should be noted.""" + + async with RedisAsyncLock( + key=f"meeting_presence_poll:{meeting_id}", + timeout=120, + extend_interval=30, + skip_if_locked=True, + blocking=False, + ) as lock: + if not lock.acquired: + logger.debug( + "Concurrent poll skipped (duplicate task)", meeting_id=meeting_id + ) + return + + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + logger.warning("Meeting not found", meeting_id=meeting_id) + return + + async with create_platform_client("daily") as daily_client: + try: + presence = await daily_client.get_room_presence(meeting.room_name) + except Exception as e: + logger.error( + "Daily.co API fetch failed", + meeting_id=meeting.id, + room_name=meeting.room_name, + error=str(e), + exc_info=True, + ) + return + + api_participants = {p.id: p for p in presence.data} + db_sessions = ( + await daily_participant_sessions_controller.get_all_sessions_for_meeting( + meeting.id + ) + ) + + active_session_ids = { + sid for sid, s in db_sessions.items() if s.left_at is None + } + missing_session_ids = set(api_participants.keys()) - active_session_ids + stale_session_ids = active_session_ids - set(api_participants.keys()) + + if missing_session_ids: + missing_sessions = [] + for session_id in missing_session_ids: + p = api_participants[session_id] + session = DailyParticipantSession( + id=f"{meeting.id}:{session_id}", + meeting_id=meeting.id, + room_id=meeting.room_id, + session_id=session_id, + user_id=p.userId, + user_name=p.userName, + joined_at=datetime.fromisoformat(p.joinTime), + left_at=None, + ) + missing_sessions.append(session) + + await daily_participant_sessions_controller.batch_upsert_sessions( + missing_sessions + ) + logger.info( + "Sessions added", + meeting_id=meeting.id, + count=len(missing_sessions), + ) + + if stale_session_ids: + composite_ids = [f"{meeting.id}:{sid}" for sid in stale_session_ids] + await daily_participant_sessions_controller.batch_close_sessions( + composite_ids, + left_at=datetime.now(timezone.utc), + ) + logger.info( + "Stale sessions closed", + meeting_id=meeting.id, + count=len(composite_ids), + ) + + final_active_count = len(api_participants) + if meeting.num_clients != final_active_count: + await meetings_controller.update_meeting( + meeting.id, + num_clients=final_active_count, + ) + logger.info( + "num_clients updated", + meeting_id=meeting.id, + old_value=meeting.num_clients, + new_value=final_active_count, + ) + + +@shared_task +@asynctask +async def poll_daily_room_presence_task(meeting_id: str) -> None: + """Celery task wrapper for poll_daily_room_presence. + + Queued by webhooks or reconciliation timer. + """ + await poll_daily_room_presence(meeting_id) + + @shared_task @asynctask async def process_meetings(): @@ -335,74 +589,71 @@ async def process_meetings(): Uses distributed locking to prevent race conditions when multiple workers process the same meeting simultaneously. """ + meetings = await meetings_controller.get_all_active() logger.info(f"Processing {len(meetings)} meetings") current_time = datetime.now(timezone.utc) - redis_client = get_redis_client() processed_count = 0 skipped_count = 0 for meeting in meetings: logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name) logger_.info("Processing meeting") - lock_key = f"meeting_process_lock:{meeting.id}" - lock = redis_client.lock(lock_key, timeout=120) try: - if not lock.acquire(blocking=False): - logger_.debug("Meeting is being processed by another worker, skipping") - skipped_count += 1 - continue + async with RedisAsyncLock( + key=f"meeting_process_lock:{meeting.id}", + timeout=120, + extend_interval=30, + skip_if_locked=True, + blocking=False, + ) as lock: + if not lock.acquired: + logger_.debug( + "Meeting is being processed by another worker, skipping" + ) + skipped_count += 1 + continue - # Process the meeting - should_deactivate = False - end_date = meeting.end_date - if end_date.tzinfo is None: - end_date = end_date.replace(tzinfo=timezone.utc) + # Process the meeting + should_deactivate = False + end_date = meeting.end_date + if end_date.tzinfo is None: + end_date = end_date.replace(tzinfo=timezone.utc) - client = create_platform_client(meeting.platform) - room_sessions = await client.get_room_sessions(meeting.room_name) + client = create_platform_client(meeting.platform) + room_sessions = await client.get_room_sessions(meeting.room_name) - try: - # Extend lock after operation to ensure we still hold it - lock.extend(120, replace_ttl=True) - except LockError: - logger_.warning("Lost lock for meeting, skipping") - continue - - has_active_sessions = room_sessions and any( - s.ended_at is None for s in room_sessions - ) - has_had_sessions = bool(room_sessions) - logger_.info( - f"found {has_active_sessions} active sessions, had {has_had_sessions}" - ) - - if has_active_sessions: - logger_.debug("Meeting still has active sessions, keep it") - elif has_had_sessions: - should_deactivate = True - logger_.info("Meeting ended - all participants left") - elif current_time > end_date: - should_deactivate = True - logger_.info( - "Meeting deactivated - scheduled time ended with no participants", + has_active_sessions = room_sessions and any( + s.ended_at is None for s in room_sessions + ) + has_had_sessions = bool(room_sessions) + logger_.info( + f"found {has_active_sessions} active sessions, had {has_had_sessions}" ) - else: - logger_.debug("Meeting not yet started, keep it") - if should_deactivate: - await meetings_controller.update_meeting(meeting.id, is_active=False) - logger_.info("Meeting is deactivated") + if has_active_sessions: + logger_.debug("Meeting still has active sessions, keep it") + elif has_had_sessions: + should_deactivate = True + logger_.info("Meeting ended - all participants left") + elif current_time > end_date: + should_deactivate = True + logger_.info( + "Meeting deactivated - scheduled time ended with no participants", + ) + else: + logger_.debug("Meeting not yet started, keep it") - processed_count += 1 + if should_deactivate: + await meetings_controller.update_meeting( + meeting.id, is_active=False + ) + logger_.info("Meeting is deactivated") + + processed_count += 1 except Exception: logger_.error("Error processing meeting", exc_info=True) - finally: - try: - lock.release() - except LockError: - pass # Lock already released or expired logger.debug( "Processed meetings finished", @@ -524,3 +775,34 @@ async def reprocess_failed_recordings(): logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings") return reprocessed_count + + +@shared_task +@asynctask +async def trigger_daily_reconciliation() -> None: + """Daily.co pull""" + try: + active_meetings = await meetings_controller.get_all_active(platform="daily") + queued_count = 0 + + for meeting in active_meetings: + try: + poll_daily_room_presence_task.delay(meeting.id) + queued_count += 1 + except Exception as e: + logger.error( + "Failed to queue reconciliation poll", + meeting_id=meeting.id, + error=str(e), + exc_info=True, + ) + raise + + if queued_count > 0: + logger.debug( + "Reconciliation polls queued", + count=queued_count, + ) + + except Exception as e: + logger.error("Reconciliation trigger failed", error=str(e), exc_info=True) diff --git a/server/tests/mocks/mock_platform.py b/server/tests/mocks/mock_platform.py index b4d9ae90..cb0cba5e 100644 --- a/server/tests/mocks/mock_platform.py +++ b/server/tests/mocks/mock_platform.py @@ -64,12 +64,6 @@ class MockPlatformClient(VideoPlatformClient): ) ] - async def delete_room(self, room_name: str) -> bool: - if room_name in self._rooms: - self._rooms[room_name]["is_active"] = False - return True - return False - async def upload_logo(self, room_name: str, logo_path: str) -> bool: if room_name in self._rooms: self._rooms[room_name]["logo_path"] = logo_path diff --git a/server/tests/test_daily_room_presence_polling.py b/server/tests/test_daily_room_presence_polling.py new file mode 100644 index 00000000..19398a22 --- /dev/null +++ b/server/tests/test_daily_room_presence_polling.py @@ -0,0 +1,466 @@ +"""Tests for Daily.co room presence polling functionality. + +TDD tests for Task 3.2: Room Presence Polling +- Query Daily.co API for current room participants +- Reconcile with DB sessions (add missing, close stale) +- Update meeting.num_clients if different +- Use batch operations for efficiency +""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.dailyco_api.responses import ( + RoomPresenceParticipant, + RoomPresenceResponse, +) +from reflector.db.daily_participant_sessions import DailyParticipantSession +from reflector.db.meetings import Meeting +from reflector.worker.process import poll_daily_room_presence + + +@pytest.fixture +def mock_meeting(): + """Mock meeting with Daily.co room.""" + return Meeting( + id="meeting-123", + room_id="room-456", + room_name="test-room-20251118120000", + room_url="https://daily.co/test-room-20251118120000", + host_room_url="https://daily.co/test-room-20251118120000?t=host-token", + platform="daily", + num_clients=2, + is_active=True, + start_date=datetime.now(timezone.utc), + end_date=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def mock_api_participants(): + """Mock Daily.co API presence response.""" + now = datetime.now(timezone.utc) + return RoomPresenceResponse( + total_count=2, + data=[ + RoomPresenceParticipant( + room="test-room-20251118120000", + id="participant-1", + userName="Alice", + userId="user-alice", + joinTime=(now - timedelta(minutes=10)).isoformat(), + duration=600, + ), + RoomPresenceParticipant( + room="test-room-20251118120000", + id="participant-2", + userName="Bob", + userId="user-bob", + joinTime=(now - timedelta(minutes=5)).isoformat(), + duration=300, + ), + ], + ) + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_upsert_sessions" +) +async def test_poll_presence_adds_missing_sessions( + mock_batch_upsert, + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, + mock_api_participants, +): + """Test that polling creates sessions for participants not in DB.""" + mock_get_by_id.return_value = mock_meeting + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(return_value=mock_api_participants) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + mock_get_sessions.return_value = {} + mock_batch_upsert.return_value = None + + await poll_daily_room_presence(mock_meeting.id) + + assert mock_batch_upsert.call_count == 1 + sessions = mock_batch_upsert.call_args.args[0] + assert len(sessions) == 2 + session_ids = {s.session_id for s in sessions} + assert session_ids == {"participant-1", "participant-2"} + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_upsert_sessions" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_close_sessions" +) +async def test_poll_presence_closes_stale_sessions( + mock_batch_close, + mock_batch_upsert, + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, + mock_api_participants, +): + """Test that polling closes sessions for participants no longer in room.""" + mock_get_by_id.return_value = mock_meeting + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(return_value=mock_api_participants) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + now = datetime.now(timezone.utc) + mock_get_sessions.return_value = { + "participant-1": DailyParticipantSession( + id=f"meeting-123:participant-1", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-1", + user_id="user-alice", + user_name="Alice", + joined_at=now, + left_at=None, + ), + "participant-stale": DailyParticipantSession( + id=f"meeting-123:participant-stale", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-stale", + user_id="user-stale", + user_name="Stale User", + joined_at=now - timedelta(seconds=120), # Joined 2 minutes ago + left_at=None, + ), + } + + await poll_daily_room_presence(mock_meeting.id) + + assert mock_batch_close.call_count == 1 + composite_ids = mock_batch_close.call_args.args[0] + left_at = mock_batch_close.call_args.kwargs["left_at"] + assert len(composite_ids) == 1 + assert "meeting-123:participant-stale" in composite_ids + assert left_at is not None + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_upsert_sessions" +) +@patch("reflector.worker.process.meetings_controller.update_meeting") +async def test_poll_presence_updates_num_clients( + mock_update_meeting, + mock_batch_upsert, + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, + mock_api_participants, +): + """Test that polling updates num_clients when different from API.""" + meeting_with_wrong_count = mock_meeting + meeting_with_wrong_count.num_clients = 5 + mock_get_by_id.return_value = meeting_with_wrong_count + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(return_value=mock_api_participants) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + mock_get_sessions.return_value = {} + mock_batch_upsert.return_value = None + + await poll_daily_room_presence(meeting_with_wrong_count.id) + + assert mock_update_meeting.call_count == 1 + assert mock_update_meeting.call_args.kwargs["num_clients"] == 2 + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +async def test_poll_presence_no_changes_if_synced( + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, + mock_api_participants, +): + """Test that polling skips updates when DB already synced with API.""" + mock_get_by_id.return_value = mock_meeting + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(return_value=mock_api_participants) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + now = datetime.now(timezone.utc) + mock_get_sessions.return_value = { + "participant-1": DailyParticipantSession( + id=f"meeting-123:participant-1", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-1", + user_id="user-alice", + user_name="Alice", + joined_at=now, + left_at=None, + ), + "participant-2": DailyParticipantSession( + id=f"meeting-123:participant-2", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-2", + user_id="user-bob", + user_name="Bob", + joined_at=now, + left_at=None, + ), + } + + await poll_daily_room_presence(mock_meeting.id) + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_upsert_sessions" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_close_sessions" +) +async def test_poll_presence_mixed_add_and_remove( + mock_batch_close, + mock_batch_upsert, + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, +): + """Test that polling handles simultaneous joins and leaves in single poll.""" + mock_get_by_id.return_value = mock_meeting + + now = datetime.now(timezone.utc) + + # API returns: participant-1 and participant-3 (new) + api_response = RoomPresenceResponse( + total_count=2, + data=[ + RoomPresenceParticipant( + room="test-room-20251118120000", + id="participant-1", + userName="Alice", + userId="user-alice", + joinTime=(now - timedelta(minutes=10)).isoformat(), + duration=600, + ), + RoomPresenceParticipant( + room="test-room-20251118120000", + id="participant-3", + userName="Charlie", + userId="user-charlie", + joinTime=now.isoformat(), + duration=0, + ), + ], + ) + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(return_value=api_response) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + # DB has: participant-1 and participant-2 (left but not in API) + mock_get_sessions.return_value = { + "participant-1": DailyParticipantSession( + id=f"meeting-123:participant-1", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-1", + user_id="user-alice", + user_name="Alice", + joined_at=now - timedelta(minutes=10), + left_at=None, + ), + "participant-2": DailyParticipantSession( + id=f"meeting-123:participant-2", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-2", + user_id="user-bob", + user_name="Bob", + joined_at=now - timedelta(minutes=5), + left_at=None, + ), + } + + mock_batch_upsert.return_value = None + mock_batch_close.return_value = None + + await poll_daily_room_presence(mock_meeting.id) + + # Verify participant-3 was added (missing in DB) + assert mock_batch_upsert.call_count == 1 + sessions_added = mock_batch_upsert.call_args.args[0] + assert len(sessions_added) == 1 + assert sessions_added[0].session_id == "participant-3" + assert sessions_added[0].user_name == "Charlie" + + # Verify participant-2 was closed (stale in DB) + assert mock_batch_close.call_count == 1 + composite_ids = mock_batch_close.call_args.args[0] + assert len(composite_ids) == 1 + assert "meeting-123:participant-2" in composite_ids + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +async def test_poll_presence_handles_api_error( + mock_create_client, + mock_get_by_id, + mock_meeting, +): + """Test that polling handles Daily.co API errors gracefully.""" + mock_get_by_id.return_value = mock_meeting + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock(side_effect=Exception("API error")) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + await poll_daily_room_presence(mock_meeting.id) + + +@pytest.mark.asyncio +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +@patch( + "reflector.worker.process.daily_participant_sessions_controller.get_all_sessions_for_meeting" +) +@patch( + "reflector.worker.process.daily_participant_sessions_controller.batch_close_sessions" +) +async def test_poll_presence_closes_all_when_room_empty( + mock_batch_close, + mock_get_sessions, + mock_create_client, + mock_get_by_id, + mock_meeting, +): + """Test that polling closes all sessions when room is empty.""" + mock_get_by_id.return_value = mock_meeting + + mock_daily_client = AsyncMock() + mock_daily_client.get_room_presence = AsyncMock( + return_value=RoomPresenceResponse(total_count=0, data=[]) + ) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + now = datetime.now(timezone.utc) + mock_get_sessions.return_value = { + "participant-1": DailyParticipantSession( + id=f"meeting-123:participant-1", + meeting_id="meeting-123", + room_id="room-456", + session_id="participant-1", + user_id="user-alice", + user_name="Alice", + joined_at=now + - timedelta(seconds=120), # Joined 2 minutes ago (beyond grace period) + left_at=None, + ), + } + + await poll_daily_room_presence(mock_meeting.id) + + assert mock_batch_close.call_count == 1 + composite_ids = mock_batch_close.call_args.args[0] + left_at = mock_batch_close.call_args.kwargs["left_at"] + assert len(composite_ids) == 1 + assert "meeting-123:participant-1" in composite_ids + assert left_at is not None + + +@pytest.mark.asyncio +@patch("reflector.worker.process.RedisAsyncLock") +@patch("reflector.worker.process.meetings_controller.get_by_id") +@patch("reflector.worker.process.create_platform_client") +async def test_poll_presence_skips_if_locked( + mock_create_client, + mock_get_by_id, + mock_redis_lock_class, + mock_meeting, +): + """Test that concurrent polling is prevented by Redis lock.""" + mock_get_by_id.return_value = mock_meeting + + # Mock the RedisAsyncLock to simulate lock not acquired + mock_lock_instance = AsyncMock() + mock_lock_instance.acquired = False # Lock not acquired + mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance) + mock_lock_instance.__aexit__ = AsyncMock() + + mock_redis_lock_class.return_value = mock_lock_instance + + mock_daily_client = AsyncMock() + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + await poll_daily_room_presence(mock_meeting.id) + + # Verify RedisAsyncLock was instantiated + assert mock_redis_lock_class.call_count == 1 + # Verify get_room_presence was NOT called (lock not acquired, so function returned early) + assert mock_daily_client.get_room_presence.call_count == 0 diff --git a/server/tests/test_poll_daily_recordings.py b/server/tests/test_poll_daily_recordings.py new file mode 100644 index 00000000..acbbfcc7 --- /dev/null +++ b/server/tests/test_poll_daily_recordings.py @@ -0,0 +1,193 @@ +"""Tests for poll_daily_recordings task.""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.dailyco_api.responses import RecordingResponse +from reflector.dailyco_api.webhooks import DailyTrack + + +# Import the unwrapped async function for testing +# The function is decorated with @shared_task and @asynctask, +# but we need to test the underlying async implementation +def _get_poll_daily_recordings_fn(): + """Get the underlying async function without Celery/asynctask decorators.""" + from reflector.worker import process + + # Access the actual async function before decorators + fn = process.poll_daily_recordings + # Get through both decorator layers + if hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + if hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + return fn + + +@pytest.fixture +def mock_recording_response(): + """Mock Daily.co API recording response with tracks.""" + now = datetime.now(timezone.utc) + return [ + RecordingResponse( + id="rec-123", + room_name="test-room-20251118120000", + start_ts=int((now - timedelta(hours=1)).timestamp()), + status="finished", + max_participants=2, + duration=3600, + share_token="share-token-123", + tracks=[ + DailyTrack(type="audio", s3Key="track1.webm", size=1024), + DailyTrack(type="audio", s3Key="track2.webm", size=2048), + ], + ), + RecordingResponse( + id="rec-456", + room_name="test-room-20251118130000", + start_ts=int((now - timedelta(hours=2)).timestamp()), + status="finished", + max_participants=3, + duration=7200, + share_token="share-token-456", + tracks=[ + DailyTrack(type="audio", s3Key="track1.webm", size=1024), + ], + ), + ] + + +@pytest.mark.asyncio +@patch("reflector.worker.process.settings") +@patch("reflector.worker.process.create_platform_client") +@patch("reflector.worker.process.recordings_controller.get_by_ids") +@patch("reflector.worker.process.process_multitrack_recording.delay") +async def test_poll_daily_recordings_processes_missing_recordings( + mock_process_delay, + mock_get_recordings, + mock_create_client, + mock_settings, + mock_recording_response, +): + """Test that poll_daily_recordings queues processing for recordings not in DB.""" + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "test-bucket" + + # Mock Daily.co API client + mock_daily_client = AsyncMock() + mock_daily_client.list_recordings = AsyncMock(return_value=mock_recording_response) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + # Mock DB controller - no existing recordings + mock_get_recordings.return_value = [] + + # Execute - call the unwrapped async function + poll_fn = _get_poll_daily_recordings_fn() + await poll_fn() + + # Verify Daily.co API was called without time parameters (uses default limit=100) + assert mock_daily_client.list_recordings.call_count == 1 + call_kwargs = mock_daily_client.list_recordings.call_args.kwargs + + # Should not have time-based parameters (uses cursor-based pagination) + assert "start_time" not in call_kwargs + assert "end_time" not in call_kwargs + + # Verify processing was queued for both missing recordings + assert mock_process_delay.call_count == 2 + + # Verify the processing calls have correct parameters + calls = mock_process_delay.call_args_list + assert calls[0].kwargs["bucket_name"] == "test-bucket" + assert calls[0].kwargs["recording_id"] == "rec-123" + assert calls[0].kwargs["daily_room_name"] == "test-room-20251118120000" + assert calls[0].kwargs["track_keys"] == ["track1.webm", "track2.webm"] + + assert calls[1].kwargs["bucket_name"] == "test-bucket" + assert calls[1].kwargs["recording_id"] == "rec-456" + assert calls[1].kwargs["daily_room_name"] == "test-room-20251118130000" + assert calls[1].kwargs["track_keys"] == ["track1.webm"] + + +@pytest.mark.asyncio +@patch("reflector.worker.process.settings") +@patch("reflector.worker.process.create_platform_client") +@patch("reflector.worker.process.recordings_controller.get_by_ids") +@patch("reflector.worker.process.process_multitrack_recording.delay") +async def test_poll_daily_recordings_skips_existing_recordings( + mock_process_delay, + mock_get_recordings, + mock_create_client, + mock_settings, + mock_recording_response, +): + """Test that poll_daily_recordings skips recordings already in DB.""" + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "test-bucket" + + # Mock Daily.co API client + mock_daily_client = AsyncMock() + mock_daily_client.list_recordings = AsyncMock(return_value=mock_recording_response) + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + # Mock DB controller - all recordings already exist + from reflector.db.recordings import Recording + + mock_get_recordings.return_value = [ + Recording( + id="rec-123", + bucket_name="test-bucket", + object_key="", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-1", + ), + Recording( + id="rec-456", + bucket_name="test-bucket", + object_key="", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-1", + ), + ] + + # Execute - call the unwrapped async function + poll_fn = _get_poll_daily_recordings_fn() + await poll_fn() + + # Verify Daily.co API was called + assert mock_daily_client.list_recordings.call_count == 1 + + # Verify NO processing was queued (all recordings already exist) + assert mock_process_delay.call_count == 0 + + +@pytest.mark.asyncio +@patch("reflector.worker.process.settings") +@patch("reflector.worker.process.create_platform_client") +async def test_poll_daily_recordings_skips_when_bucket_not_configured( + mock_create_client, + mock_settings, +): + """Test that poll_daily_recordings returns early when bucket is not configured.""" + # No bucket configured + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = None + + # Mock should not be called + mock_daily_client = AsyncMock() + mock_create_client.return_value.__aenter__ = AsyncMock( + return_value=mock_daily_client + ) + mock_create_client.return_value.__aexit__ = AsyncMock() + + # Execute - call the unwrapped async function + poll_fn = _get_poll_daily_recordings_fn() + await poll_fn() + + # Verify API was never called + mock_daily_client.list_recordings.assert_not_called() diff --git a/server/tests/test_utils_daily.py b/server/tests/test_utils_daily.py index 356ffc94..0b2d3929 100644 --- a/server/tests/test_utils_daily.py +++ b/server/tests/test_utils_daily.py @@ -1,6 +1,6 @@ import pytest -from reflector.utils.daily import extract_base_room_name +from reflector.utils.daily import extract_base_room_name, parse_daily_recording_filename @pytest.mark.parametrize( @@ -15,3 +15,50 @@ from reflector.utils.daily import extract_base_room_name ) def test_extract_base_room_name(daily_room_name, expected): assert extract_base_room_name(daily_room_name) == expected + + +@pytest.mark.parametrize( + "filename,expected_recording_ts,expected_participant_id,expected_track_ts", + [ + ( + "1763152299562-12f0b87c-97d4-4dd3-a65c-cee1f854a79c-cam-audio-1763152314582", + 1763152299562, + "12f0b87c-97d4-4dd3-a65c-cee1f854a79c", + 1763152314582, + ), + ( + "1760988935484-52f7f48b-fbab-431f-9a50-87b9abfc8255-cam-audio-1760988935922", + 1760988935484, + "52f7f48b-fbab-431f-9a50-87b9abfc8255", + 1760988935922, + ), + ( + "1760988935484-a37c35e3-6f8e-4274-a482-e9d0f102a732-cam-audio-1760988943823", + 1760988935484, + "a37c35e3-6f8e-4274-a482-e9d0f102a732", + 1760988943823, + ), + ( + "path/to/1763151171834-b6719a43-4481-483a-a8fc-2ae18b69283c-cam-audio-1763151180561", + 1763151171834, + "b6719a43-4481-483a-a8fc-2ae18b69283c", + 1763151180561, + ), + ], +) +def test_parse_daily_recording_filename( + filename, expected_recording_ts, expected_participant_id, expected_track_ts +): + result = parse_daily_recording_filename(filename) + + assert result.recording_start_ts == expected_recording_ts + assert result.participant_id == expected_participant_id + assert result.track_start_ts == expected_track_ts + + +def test_parse_daily_recording_filename_invalid(): + with pytest.raises(ValueError, match="Invalid Daily.co recording filename"): + parse_daily_recording_filename("invalid-filename") + + with pytest.raises(ValueError, match="Invalid Daily.co recording filename"): + parse_daily_recording_filename("123-not-a-uuid-cam-audio-456")