diff --git a/.gitleaksignore b/.gitleaksignore index 141c82d5..49aaa85e 100644 --- a/.gitleaksignore +++ b/.gitleaksignore @@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 server/reflector/worker/process.py:generic-api-key:465 +server/tests/test_recording_request_flow.py:generic-api-key:121 diff --git a/server/migrations/versions/f5b008fa8a14_add_daily_recording_requests.py b/server/migrations/versions/f5b008fa8a14_add_daily_recording_requests.py new file mode 100644 index 00000000..b0eda18a --- /dev/null +++ b/server/migrations/versions/f5b008fa8a14_add_daily_recording_requests.py @@ -0,0 +1,67 @@ +"""add_daily_recording_requests + +Revision ID: f5b008fa8a14 +Revises: 1b1e6a6fc465 +Create Date: 2026-01-20 22:32:06.697144 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "f5b008fa8a14" +down_revision: Union[str, None] = "1b1e6a6fc465" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "daily_recording_request", + sa.Column("recording_id", sa.String(), nullable=False), + sa.Column("meeting_id", sa.String(), nullable=False), + sa.Column("instance_id", sa.String(), nullable=False), + sa.Column("type", sa.String(), nullable=False), + sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("recording_id"), + ) + op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"]) + op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"]) + + # Clean up orphaned recordings before adding FK constraint + op.execute(""" + UPDATE recording SET status = 'orphan', meeting_id = NULL + WHERE meeting_id IS NOT NULL + AND meeting_id NOT IN (SELECT id FROM meeting) + """) + + # Add FK constraint to recording table (cascade delete recordings when meeting deleted) + op.execute(""" + ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting + FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE + """) + + # Add CHECK constraints to enforce orphan invariants + op.execute(""" + ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting + CHECK (status != 'orphan' OR meeting_id IS NULL) + """) + op.execute(""" + ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting + CHECK (status = 'orphan' OR meeting_id IS NOT NULL) + """) + + +def downgrade() -> None: + op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting") + op.execute( + "ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting" + ) + op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting") + op.drop_index("idx_instance_id", table_name="daily_recording_request") + op.drop_index("idx_meeting_id", table_name="daily_recording_request") + op.drop_table("daily_recording_request") diff --git a/server/reflector/dailyco_api/recording_orphans.py b/server/reflector/dailyco_api/recording_orphans.py new file mode 100644 index 00000000..f4232add --- /dev/null +++ b/server/reflector/dailyco_api/recording_orphans.py @@ -0,0 +1,56 @@ +"""Utility for creating orphan recordings.""" + +import os +from datetime import datetime, timezone + +from reflector.db.recordings import Recording, recordings_controller +from reflector.logger import logger +from reflector.utils.string import NonEmptyString + + +async def create_and_log_orphan( + recording_id: NonEmptyString, + bucket_name: str, + room_name: str, + start_ts: int, + track_keys: list[str] | None, + source: str, +) -> bool: + """Create orphan recording and log if first occurrence. + + Args: + recording_id: Daily.co recording ID + bucket_name: S3 bucket (empty string for cloud recordings) + room_name: Daily.co room name + start_ts: Unix timestamp + track_keys: Track keys for raw-tracks, None for cloud + source: "webhook" or "polling" for logging + + Returns: + True if created (first poller), False if already exists + """ + if track_keys: + object_key = os.path.dirname(track_keys[0]) if track_keys else room_name + else: + object_key = room_name + + created = await recordings_controller.create_orphan( + Recording( + id=recording_id, + bucket_name=bucket_name, + object_key=object_key, + recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc), + track_keys=track_keys, + meeting_id=None, + status="orphan", + ) + ) + + if created: + logger.error( + f"Orphan recording ({source})", + recording_id=recording_id, + room_name=room_name, + ) + + return created diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index deffb52a..96982bdd 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -26,6 +26,7 @@ def get_database() -> databases.Database: # import models import reflector.db.calendar_events # noqa import reflector.db.daily_participant_sessions # noqa +import reflector.db.daily_recording_requests # noqa import reflector.db.meetings # noqa import reflector.db.recordings # noqa import reflector.db.rooms # noqa diff --git a/server/reflector/db/daily_recording_requests.py b/server/reflector/db/daily_recording_requests.py new file mode 100644 index 00000000..be621fd8 --- /dev/null +++ b/server/reflector/db/daily_recording_requests.py @@ -0,0 +1,111 @@ +from datetime import datetime +from typing import Literal +from uuid import UUID + +import sqlalchemy as sa +from pydantic import BaseModel +from sqlalchemy.dialects.postgresql import insert + +from reflector.db import get_database, metadata +from reflector.utils.string import NonEmptyString + +daily_recording_requests = sa.Table( + "daily_recording_request", + metadata, + sa.Column("recording_id", sa.String, primary_key=True), + sa.Column( + "meeting_id", + sa.String, + sa.ForeignKey("meeting.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("instance_id", sa.String, nullable=False), + sa.Column("type", sa.String, nullable=False), + sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False), + sa.Index("idx_meeting_id", "meeting_id"), + sa.Index("idx_instance_id", "instance_id"), +) + + +class DailyRecordingRequest(BaseModel): + recording_id: NonEmptyString + meeting_id: NonEmptyString + instance_id: UUID + type: Literal["cloud", "raw-tracks"] + requested_at: datetime + + +class DailyRecordingRequestsController: + async def create(self, request: DailyRecordingRequest) -> None: + stmt = insert(daily_recording_requests).values( + recording_id=request.recording_id, + meeting_id=request.meeting_id, + instance_id=str(request.instance_id), + type=request.type, + requested_at=request.requested_at, + ) + stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"]) + await get_database().execute(stmt) + + async def find_by_recording_id( + self, + recording_id: NonEmptyString, + ) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None: + query = daily_recording_requests.select().where( + daily_recording_requests.c.recording_id == recording_id + ) + result = await get_database().fetch_one(query) + + if not result: + return None + + req = DailyRecordingRequest( + recording_id=result["recording_id"], + meeting_id=result["meeting_id"], + instance_id=UUID(result["instance_id"]), + type=result["type"], + requested_at=result["requested_at"], + ) + return (req.meeting_id, req.type) + + async def find_by_instance_id( + self, + instance_id: UUID, + ) -> list[DailyRecordingRequest]: + """Multiple recordings can have same instance_id (stop/restart).""" + query = daily_recording_requests.select().where( + daily_recording_requests.c.instance_id == str(instance_id) + ) + results = await get_database().fetch_all(query) + return [ + DailyRecordingRequest( + recording_id=r["recording_id"], + meeting_id=r["meeting_id"], + instance_id=UUID(r["instance_id"]), + type=r["type"], + requested_at=r["requested_at"], + ) + for r in results + ] + + async def get_by_meeting_id( + self, + meeting_id: NonEmptyString, + ) -> list[DailyRecordingRequest]: + query = daily_recording_requests.select().where( + daily_recording_requests.c.meeting_id == meeting_id + ) + results = await get_database().fetch_all(query) + return [ + DailyRecordingRequest( + recording_id=r["recording_id"], + meeting_id=r["meeting_id"], + instance_id=UUID(r["instance_id"]), + type=r["type"], + requested_at=r["requested_at"], + ) + for r in results + ] + + +daily_recording_requests_controller = DailyRecordingRequestsController() diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 02f407b2..59401b34 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, Literal import sqlalchemy as sa @@ -183,84 +183,6 @@ class MeetingController: results = await get_database().fetch_all(query) return [Meeting(**r) for r in results] - async def get_by_room_name_and_time( - self, - room_name: NonEmptyString, - recording_start: datetime, - time_window_hours: int = 168, - ) -> Meeting | None: - """ - Get meeting by room name closest to recording timestamp. - - HACK ALERT: Daily.co doesn't return instanceId in recordings API response, - and mtgSessionId is separate from our instanceId. Time-based matching is - the least-bad workaround. - - This handles edge case of duplicate room_name values in DB (race conditions, - double-clicks, etc.) by matching based on temporal proximity. - - Algorithm: - 1. Find meetings within time_window_hours of recording_start - 2. Return meeting with start_date closest to recording_start - 3. If tie, return first by meeting.id (deterministic) - - Args: - room_name: Daily.co room name from recording - recording_start: Timezone-aware datetime from recording.start_ts - time_window_hours: Search window (default 168 = 1 week) - - Returns: - Meeting closest to recording timestamp, or None if no matches - - Failure modes: - - Multiple meetings in same room within ~5 minutes: picks closest - - All meetings outside time window: returns None - - Clock skew between Daily.co and DB: 1-week window tolerates this - - Why 1 week window: - - Handles webhook failures (recording discovered days later) - - Tolerates clock skew - - Rejects unrelated meetings from weeks ago - - """ - # Validate timezone-aware datetime - if recording_start.tzinfo is None: - raise ValueError( - f"recording_start must be timezone-aware, got naive datetime: {recording_start}" - ) - - window_start = recording_start - timedelta(hours=time_window_hours) - window_end = recording_start + timedelta(hours=time_window_hours) - - query = ( - meetings.select() - .where( - sa.and_( - meetings.c.room_name == room_name, - meetings.c.start_date >= window_start, - meetings.c.start_date <= window_end, - ) - ) - .order_by(meetings.c.start_date) - ) - - results = await get_database().fetch_all(query) - if not results: - return None - - candidates = [Meeting(**r) for r in results] - - # Find meeting with start_date closest to recording_start - closest = min( - candidates, - key=lambda m: ( - abs((m.start_date - recording_start).total_seconds()), - m.id, # Tie-breaker: deterministic by UUID - ), - ) - - return closest - async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: """ Get latest active meeting for a room. @@ -350,44 +272,6 @@ class MeetingController: query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) await get_database().execute(query) - async def set_cloud_recording_if_missing( - self, - meeting_id: NonEmptyString, - s3_key: NonEmptyString, - duration: int, - ) -> bool: - """ - Set cloud recording only if not already set. - - Returns True if updated, False if already set. - Prevents webhook/polling race condition via atomic WHERE clause. - """ - # Check current value before update to detect actual change - meeting_before = await self.get_by_id(meeting_id) - if not meeting_before: - return False - - was_null = meeting_before.daily_composed_video_s3_key is None - - query = ( - meetings.update() - .where( - sa.and_( - meetings.c.id == meeting_id, - meetings.c.daily_composed_video_s3_key.is_(None), - ) - ) - .values( - daily_composed_video_s3_key=s3_key, - daily_composed_video_duration=duration, - ) - ) - await get_database().execute(query) - - # Return True only if value was NULL before (actual update occurred) - # If was_null=False, the WHERE clause prevented the update - return was_null - async def increment_num_clients(self, meeting_id: str) -> None: """Atomically increment participant count.""" query = ( @@ -467,6 +351,27 @@ class MeetingConsentController: result = await get_database().fetch_one(query) return result is not None + async def set_cloud_recording_if_missing( + self, + meeting_id: NonEmptyString, + s3_key: NonEmptyString, + duration: int, + ) -> bool: + """Returns True if updated, False if already set.""" + query = ( + meetings.update() + .where( + meetings.c.id == meeting_id, + meetings.c.daily_composed_video_s3_key.is_(None), + ) + .values( + daily_composed_video_s3_key=s3_key, + daily_composed_video_duration=duration, + ) + ) + result = await get_database().execute(query) + return result.rowcount > 0 + meetings_controller = MeetingController() meeting_consent_controller = MeetingConsentController() diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index bf799561..4261ad6e 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -4,10 +4,10 @@ from typing import Literal import sqlalchemy as sa from pydantic import BaseModel, Field from sqlalchemy import or_ +from sqlalchemy.dialects.postgresql import insert from reflector.db import get_database, metadata from reflector.utils import generate_uuid4 -from reflector.utils.string import NonEmptyString recordings = sa.Table( "recording", @@ -31,14 +31,13 @@ recordings = sa.Table( class Recording(BaseModel): id: str = Field(default_factory=generate_uuid4) bucket_name: str - # for single-track object_key: str recorded_at: datetime - status: Literal["pending", "processing", "completed", "failed"] = "pending" + status: Literal["pending", "processing", "completed", "failed", "orphan"] = ( + "pending" + ) meeting_id: str | None = None - # for multitrack reprocessing - # track_keys can be empty list [] if recording finished but no audio was captured (silence/muted) - # None means not a multitrack recording, [] means multitrack with no tracks + # None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio track_keys: list[str] | None = None @property @@ -72,20 +71,6 @@ class RecordingController: query = recordings.delete().where(recordings.c.id == id) await get_database().execute(query) - async def set_meeting_id( - self, - recording_id: NonEmptyString, - meeting_id: NonEmptyString, - ) -> None: - """Link recording to meeting.""" - query = ( - recordings.update() - .where(recordings.c.id == recording_id) - .values(meeting_id=meeting_id) - ) - await get_database().execute(query) - - # no check for existence async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: if not recording_ids: return [] @@ -104,9 +89,12 @@ class RecordingController: This is more efficient than fetching all recordings and filtering in Python. """ - from reflector.db.transcripts import ( - transcripts, # noqa: PLC0415 cyclic import - ) + # INLINE IMPORT REQUIRED: Circular dependency + # - recordings.py needs transcripts table for JOIN query + # - transcripts.py imports recordings_controller + # - db/__init__.py loads recordings before transcripts (line 31 vs 33) + # - Top-level import would fail during module initialization + from reflector.db.transcripts import transcripts query = ( recordings.select() @@ -124,5 +112,27 @@ class RecordingController: recordings_list = [Recording(**row) for row in results] return [r for r in recordings_list if r.is_multitrack] + async def try_create_with_meeting(self, recording: Recording) -> bool: + """Returns True if created, False if already exists.""" + assert recording.meeting_id is not None, "meeting_id required for non-orphan" + assert recording.status != "orphan", "use create_orphan for orphans" + + stmt = insert(recordings).values(**recording.model_dump()) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + result = await get_database().execute(stmt) + + return result.rowcount > 0 + + async def create_orphan(self, recording: Recording) -> bool: + """Returns True if created, False if already exists.""" + assert recording.status == "orphan", "status must be 'orphan'" + assert recording.meeting_id is None, "meeting_id must be NULL for orphan" + + stmt = insert(recordings).values(**recording.model_dump()) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + result = await get_database().execute(stmt) + + return result.rowcount > 0 + recordings_controller = RecordingController() diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 384290da..8d8a9ebd 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,4 +1,6 @@ import json +import os +from datetime import datetime, timezone from typing import assert_never from fastapi import APIRouter, HTTPException, Request @@ -12,7 +14,10 @@ from reflector.dailyco_api import ( RecordingReadyEvent, RecordingStartedEvent, ) +from reflector.dailyco_api.recording_orphans import create_and_log_orphan +from reflector.db.daily_recording_requests import daily_recording_requests_controller from reflector.db.meetings import meetings_controller +from reflector.db.recordings import Recording, recordings_controller from reflector.logger import logger as _logger from reflector.settings import settings from reflector.video_platforms.factory import create_platform_client @@ -212,10 +217,73 @@ async def _handle_recording_ready(event: RecordingReadyEvent): track_keys = [t.s3Key for t in tracks if t.type == "audio"] + # Lookup request + match = await daily_recording_requests_controller.find_by_recording_id( + recording_id + ) + + if not match: + await create_and_log_orphan( + recording_id=recording_id, + bucket_name=bucket_name, + room_name=room_name, + start_ts=event.payload.start_ts, + track_keys=track_keys, + source="webhook", + ) + return + + meeting_id, _ = match + + # Verify meeting exists + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + logger.error( + "Meeting not found (webhook)", + recording_id=recording_id, + meeting_id=meeting_id, + ) + await create_and_log_orphan( + recording_id=recording_id, + bucket_name=bucket_name, + room_name=room_name, + start_ts=event.payload.start_ts, + track_keys=track_keys, + source="webhook", + ) + return + + # Create recording atomically + created = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name=bucket_name, + object_key=( + os.path.dirname(track_keys[0]) if track_keys else room_name + ), + recorded_at=datetime.fromtimestamp( + event.payload.start_ts, tz=timezone.utc + ), + track_keys=track_keys, + meeting_id=meeting_id, + status="pending", + ) + ) + + if not created: + # Already created (polling got it first) + logger.debug( + "Recording already exists (webhook late)", + recording_id=recording_id, + meeting_id=meeting_id, + ) + return + logger.info( - "Raw-tracks recording queuing processing", + "Raw-tracks recording queuing processing (webhook)", recording_id=recording_id, room_name=room_name, + meeting_id=meeting_id, num_tracks=len(track_keys), ) diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py index 44adf500..e2f8c7fa 100644 --- a/server/reflector/views/meetings.py +++ b/server/reflector/views/meetings.py @@ -1,4 +1,5 @@ import json +import logging from datetime import datetime, timezone from typing import Annotated, Any, Optional from uuid import UUID @@ -9,16 +10,21 @@ from pydantic import BaseModel import reflector.auth as auth from reflector.dailyco_api import RecordingType from reflector.dailyco_api.client import DailyApiError +from reflector.db.daily_recording_requests import ( + DailyRecordingRequest, + daily_recording_requests_controller, +) from reflector.db.meetings import ( MeetingConsent, meeting_consent_controller, meetings_controller, ) from reflector.db.rooms import rooms_controller -from reflector.logger import logger from reflector.utils.string import NonEmptyString from reflector.video_platforms.factory import create_platform_client +logger = logging.getLogger(__name__) + router = APIRouter() @@ -102,13 +108,6 @@ async def start_recording( if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") - log = logger.bind( - meeting_id=meeting_id, - room_name=meeting.room_name, - recording_type=body.type, - instance_id=body.instanceId, - ) - try: client = create_platform_client("daily") result = await client.start_recording( @@ -117,9 +116,30 @@ async def start_recording( instance_id=body.instanceId, ) - log.info(f"Started {body.type} recording via REST API") + recording_id = result["id"] - return {"status": "ok", "result": result} + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id=recording_id, + meeting_id=meeting_id, + instance_id=body.instanceId, + type=body.type, + requested_at=datetime.now(timezone.utc), + ) + ) + + logger.info( + f"Started {body.type} recording via REST API", + extra={ + "meeting_id": meeting_id, + "room_name": meeting.room_name, + "recording_type": body.type, + "instance_id": body.instanceId, + "recording_id": recording_id, + }, + ) + + return {"status": "ok", "recording_id": recording_id} except DailyApiError as e: # Parse Daily.co error response to detect "has an active stream" @@ -130,22 +150,42 @@ async def start_recording( # "has an active stream" means recording already started by another participant # This is SUCCESS from business logic perspective - return 200 if "has an active stream" in error_info: - log.info( - f"{body.type} recording already active (started by another participant)" + logger.info( + f"{body.type} recording already active (started by another participant)", + extra={ + "meeting_id": meeting_id, + "room_name": meeting.room_name, + "recording_type": body.type, + "instance_id": body.instanceId, + }, ) return {"status": "already_active", "instanceId": str(body.instanceId)} except (json.JSONDecodeError, KeyError): pass # Fall through to error handling # All other Daily.co API errors - log.error(f"Failed to start {body.type} recording", error=str(e)) + logger.error( + f"Failed to start {body.type} recording", + extra={ + "meeting_id": meeting_id, + "recording_type": body.type, + "error": str(e), + }, + ) raise HTTPException( status_code=500, detail=f"Failed to start recording: {str(e)}" ) except Exception as e: # Non-Daily.co errors - log.error(f"Failed to start {body.type} recording", error=str(e)) + logger.error( + f"Failed to start {body.type} recording", + extra={ + "meeting_id": meeting_id, + "recording_type": body.type, + "error": str(e), + }, + ) raise HTTPException( status_code=500, detail=f"Failed to start recording: {str(e)}" ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 8d88de43..5ffd3195 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -1,6 +1,5 @@ import json import os -import re from datetime import datetime, timezone from typing import List, Literal from urllib.parse import unquote @@ -13,10 +12,12 @@ from celery.utils.log import get_task_logger from pydantic import ValidationError from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse +from reflector.dailyco_api.recording_orphans import create_and_log_orphan from reflector.db.daily_participant_sessions import ( DailyParticipantSession, daily_participant_sessions_controller, ) +from reflector.db.daily_recording_requests import daily_recording_requests_controller from reflector.db.meetings import meetings_controller from reflector.db.recordings import Recording, recordings_controller from reflector.db.rooms import rooms_controller @@ -230,79 +231,44 @@ async def _process_multitrack_recording_inner( recording_start_ts: int, ): """ - Process multitrack recording (first time or reprocessing). + Process multitrack recording. - For first processing (webhook/polling): - - Uses recording_start_ts for time-based meeting matching (no instanceId available) - - For reprocessing: - - Uses recording.meeting_id directly (already linked during first processing) - - recording_start_ts is ignored + Recording must already exist with meeting_id set (created by webhook/polling before queueing). """ - tz = timezone.utc - recorded_at = datetime.now(tz) - try: - if track_keys: - folder = os.path.basename(os.path.dirname(track_keys[0])) - ts_match = re.search(r"(\d{14})$", folder) - if ts_match: - ts = ts_match.group(1) - recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz) - except Exception as e: - logger.warning( - f"Could not parse recorded_at from keys, using now() {recorded_at}", - e, - exc_info=True, - ) - - # Check if recording already exists (reprocessing path) + # Get recording (must exist - created by webhook/polling) recording = await recordings_controller.get_by_id(recording_id) - if recording and recording.meeting_id: - # Reprocessing: recording exists with meeting already linked - meeting = await meetings_controller.get_by_id(recording.meeting_id) - if not meeting: - logger.error( - "Reprocessing: meeting not found for recording - skipping", - meeting_id=recording.meeting_id, - recording_id=recording_id, - ) - return + if not recording: + logger.error( + "Recording not found - should have been created by webhook/polling", + recording_id=recording_id, + ) + return - logger.info( - "Reprocessing: using existing recording.meeting_id", + if not recording.meeting_id: + logger.error( + "Recording has no meeting_id - orphan should not be queued", recording_id=recording_id, - meeting_id=meeting.id, - room_name=daily_room_name, ) - else: - # First processing: recording doesn't exist, need time-based matching - # (Daily.co doesn't return instanceId in API, must match by timestamp) - recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc) - meeting = await meetings_controller.get_by_room_name_and_time( - room_name=daily_room_name, - recording_start=recording_start, - time_window_hours=168, # 1 week - ) - if not meeting: - logger.error( - "Raw-tracks: no meeting found within 1-week window (time-based match) - skipping", - recording_id=recording_id, - room_name=daily_room_name, - recording_start_ts=recording_start_ts, - recording_start=recording_start.isoformat(), - ) - return # Skip processing, will retry on next poll - logger.info( - "First processing: found meeting via time-based matching", - meeting_id=meeting.id, - room_name=daily_room_name, + return + + # Get meeting + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if not meeting: + logger.error( + "Meeting not found for recording", + meeting_id=recording.meeting_id, recording_id=recording_id, - time_delta_seconds=abs( - (meeting.start_date - recording_start).total_seconds() - ), ) + return + + logger.info( + "Processing multitrack recording", + recording_id=recording_id, + meeting_id=meeting.id, + room_name=daily_room_name, + ) room_name_base = extract_base_room_name(daily_room_name) @@ -310,33 +276,6 @@ async def _process_multitrack_recording_inner( if not room: raise Exception(f"Room not found: {room_name_base}") - if not recording: - # Create recording (only happens during first processing) - object_key_dir = os.path.dirname(track_keys[0]) if track_keys else "" - recording = await recordings_controller.create( - Recording( - id=recording_id, - bucket_name=bucket_name, - object_key=object_key_dir, - recorded_at=recorded_at, - meeting_id=meeting.id, - track_keys=track_keys, - ) - ) - elif not recording.meeting_id: - # Recording exists but meeting_id is null (failed first processing) - # Update with meeting from time-based matching - await recordings_controller.set_meeting_id( - recording_id=recording.id, - meeting_id=meeting.id, - ) - recording.meeting_id = meeting.id - logger.info( - "Updated existing recording with meeting_id", - recording_id=recording.id, - meeting_id=meeting.id, - ) - transcript = await transcripts_controller.get_by_recording_id(recording.id) if not transcript: transcript = await transcripts_controller.add( @@ -522,7 +461,7 @@ async def store_cloud_recording( Store cloud recording reference in meeting table. Common function for both webhook and polling code paths. - Uses time-based matching to handle duplicate room_name values. + Uses direct recording_id lookup via daily_recording_requests table. Args: recording_id: Daily.co recording ID @@ -535,155 +474,170 @@ async def store_cloud_recording( Returns: True if stored, False if skipped/failed """ - recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc) + # Lookup request + match = await daily_recording_requests_controller.find_by_recording_id(recording_id) - meeting = await meetings_controller.get_by_room_name_and_time( - room_name=room_name, - recording_start=recording_start, - time_window_hours=168, # 1 week - ) - - if not meeting: - logger.warning( - f"Cloud recording ({source}): no meeting found within 1-week window", + if not match: + # ORPHAN: No request found (pre-migration recording or failed request creation) + await create_and_log_orphan( recording_id=recording_id, + bucket_name="", room_name=room_name, - recording_start_ts=start_ts, - recording_start=recording_start.isoformat(), + start_ts=start_ts, + track_keys=None, + source=source, ) return False + meeting_id, _ = match + success = await meetings_controller.set_cloud_recording_if_missing( - meeting_id=meeting.id, + meeting_id=meeting_id, s3_key=s3_key, duration=duration, ) if not success: logger.debug( - f"Cloud recording ({source}): already set (race lost)", + f"Cloud recording ({source}): already set (stop/restart?)", recording_id=recording_id, room_name=room_name, - meeting_id=meeting.id, + meeting_id=meeting_id, ) return False logger.info( - f"Cloud recording stored via {source} (time-based match)", - meeting_id=meeting.id, + f"Cloud recording stored via {source}", + meeting_id=meeting_id, recording_id=recording_id, s3_key=s3_key, duration=duration, - time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()), ) return True async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): - """ - Store cloud recordings missing from meeting table via polling. + """Process cloud recordings (database deduplication, worker-agnostic). - Uses time-based matching via store_cloud_recording(). + Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table. + Only first cloud recording per meeting is kept (existing behavior). """ if not cloud_recordings: return - stored_count = 0 - for recording in cloud_recordings: - # Extract S3 key from recording (cloud recordings use s3key field) - s3_key = recording.s3key or (recording.s3.key if recording.s3 else None) - if not s3_key: - logger.warning( - "Cloud recording: missing S3 key", - recording_id=recording.id, - room_name=recording.room_name, + for rec in cloud_recordings: + # Lookup request + match = await daily_recording_requests_controller.find_by_recording_id(rec.id) + + if not match: + await create_and_log_orphan( + recording_id=rec.id, + bucket_name="", + room_name=rec.room_name, + start_ts=rec.start_ts, + track_keys=None, + source="polling", ) continue - stored = await store_cloud_recording( - recording_id=recording.id, - room_name=recording.room_name, - s3_key=s3_key, - duration=recording.duration, - start_ts=recording.start_ts, - source="polling", - ) - if stored: - stored_count += 1 + meeting_id, _ = match - logger.info( - "Cloud recording polling complete", - total=len(cloud_recordings), - stored=stored_count, - ) + if not rec.s3key: + logger.error("Cloud recording missing s3_key", recording_id=rec.id) + continue + + # Store in meeting table (atomic, only if not already set) + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting_id, + s3_key=rec.s3key, + duration=rec.duration, + ) + + if success: + logger.info( + "Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id + ) + else: + logger.warning( + "Cloud recording already exists for meeting (stop/restart?)", + recording_id=rec.id, + meeting_id=meeting_id, + ) async def _poll_raw_tracks_recordings( raw_tracks_recordings: List[FinishedRecordingResponse], - bucket_name: str, -): - """Queue raw-tracks recordings missing from DB (existing logic).""" + bucket_name: NonEmptyString, +) -> None: + """Process raw-tracks (database deduplication, worker-agnostic).""" if not raw_tracks_recordings: return - recording_ids = [rec.id for rec in raw_tracks_recordings] - existing_recordings = await recordings_controller.get_by_ids(recording_ids) - existing_ids = {rec.id for rec in existing_recordings} + for rec in raw_tracks_recordings: + # Lookup request FIRST (before any DB writes) + match = await daily_recording_requests_controller.find_by_recording_id(rec.id) - missing_recordings = [ - rec for rec in raw_tracks_recordings if rec.id not in existing_ids - ] - - if not missing_recordings: - logger.debug( - "All raw-tracks recordings already in DB", - api_count=len(raw_tracks_recordings), - existing_count=len(existing_recordings), - ) - return - - logger.info( - "Found raw-tracks recordings missing from DB", - missing_count=len(missing_recordings), - total_api_count=len(raw_tracks_recordings), - existing_count=len(existing_recordings), - ) - - for recording in missing_recordings: - if not recording.tracks: - logger.warning( - "Finished raw-tracks recording has no tracks (no audio captured)", - recording_id=recording.id, - room_name=recording.room_name, + if not match: + await create_and_log_orphan( + recording_id=rec.id, + bucket_name=bucket_name, + room_name=rec.room_name, + start_ts=rec.start_ts, + track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"], + source="polling", ) continue - track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] + meeting_id, _ = match - if not track_keys: - logger.warning( - "No audio tracks found in raw-tracks recording", - recording_id=recording.id, - room_name=recording.room_name, - total_tracks=len(recording.tracks), + # Verify meeting exists + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + logger.error( + "Meeting not found", recording_id=rec.id, meeting_id=meeting_id + ) + await create_and_log_orphan( + recording_id=rec.id, + bucket_name=bucket_name, + room_name=rec.room_name, + start_ts=rec.start_ts, + track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"], + source="polling", ) continue - logger.info( - "Queueing missing raw-tracks recording for processing", - recording_id=recording.id, - room_name=recording.room_name, - track_count=len(track_keys), + # DEDUPLICATION: Atomically create recording (single operation, no race window) + # ON CONFLICT → concurrent poller already got it, skip entire logic + track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"] + + created = await recordings_controller.try_create_with_meeting( + Recording( + id=rec.id, + bucket_name=bucket_name, + object_key=os.path.dirname(track_keys[0]) if track_keys else "", + recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc), + track_keys=track_keys, + meeting_id=meeting_id, # Set at creation (constraint-safe) + status="pending", + ) ) + if not created: + # Conflict: another poller already created/queued this + # Skip all remaining logic (match already done by winner) + continue + + # Only winner reaches here - queue processing (works with Celery or Hatchet) process_multitrack_recording.delay( + recording_id=rec.id, + daily_room_name=rec.room_name, + recording_start_ts=rec.start_ts, bucket_name=bucket_name, - daily_room_name=recording.room_name, - recording_id=recording.id, track_keys=track_keys, - recording_start_ts=recording.start_ts, ) + logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id) + async def poll_daily_room_presence(meeting_id: str) -> None: """Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed. diff --git a/server/test_daily_api_recordings.py b/server/test_daily_api_recordings.py new file mode 100644 index 00000000..1172aa4d --- /dev/null +++ b/server/test_daily_api_recordings.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +"""Test script to fetch Daily.co recordings for a specific room and show raw API response.""" + +import asyncio +import json + +from reflector.video_platforms.factory import create_platform_client + + +async def main(): + room_name = "daily-private-igor-20260110042117" + + print(f"\n=== Fetching recordings for room: {room_name} ===\n") + + async with create_platform_client("daily") as client: + recordings = await client.list_recordings(room_name=room_name) + + print(f"Found {len(recordings)} recording objects from Daily.co API\n") + + for i, rec in enumerate(recordings, 1): + print(f"--- Recording #{i} ---") + print(f"ID: {rec.id}") + print(f"Room: {rec.room_name}") + print(f"Start TS: {rec.start_ts}") + print(f"Status: {rec.status}") + print(f"Duration: {rec.duration}") + print(f"Type: {rec.type}") + print(f"Tracks count: {len(rec.tracks)}") + + if rec.tracks: + print(f"Tracks:") + for j, track in enumerate(rec.tracks, 1): + print(f" Track {j}: {track.s3Key}") + + print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/server/tests/test_daily_recording_requests.py b/server/tests/test_daily_recording_requests.py new file mode 100644 index 00000000..01b6abe3 --- /dev/null +++ b/server/tests/test_daily_recording_requests.py @@ -0,0 +1,258 @@ +from datetime import datetime, timezone +from uuid import UUID + +import pytest + +from reflector.db.daily_recording_requests import ( + DailyRecordingRequest, + daily_recording_requests_controller, +) +from reflector.db.meetings import Meeting, meetings_controller +from reflector.db.recordings import Recording, recordings_controller +from reflector.db.rooms import Room, rooms_controller + + +@pytest.mark.asyncio +async def test_create_request(): + """Test creating a recording request.""" + # Create meeting first + room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user") + await rooms_controller.create(room) + + meeting = Meeting( + id="meeting-123", + room_name="test-room", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="cloud", + ) + await meetings_controller.create(meeting) + + request = DailyRecordingRequest( + recording_id="rec-1", + meeting_id="meeting-123", + instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + + await daily_recording_requests_controller.create(request) + + result = await daily_recording_requests_controller.find_by_recording_id("rec-1") + assert result is not None + assert result[0] == "meeting-123" + assert result[1] == "cloud" + + +@pytest.mark.asyncio +async def test_multiple_recordings_same_meeting(): + """Test stop/restart creates multiple request rows.""" + # Create room and meeting + room = Room( + id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user" + ) + await rooms_controller.create(room) + + meeting_id = "meeting-456" + meeting = Meeting( + id=meeting_id, + room_name="test-room-2", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="cloud", + ) + await meetings_controller.create(meeting) + + instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890") + + # First recording + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id="rec-1", + meeting_id=meeting_id, + instance_id=instance_id, + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + ) + + # Stop, then restart (new recording_id, same instance_id) + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id="rec-2", # DIFFERENT + meeting_id=meeting_id, + instance_id=instance_id, # SAME + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + ) + + # Both exist + requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id) + assert len(requests) == 2 + assert {r.recording_id for r in requests} == {"rec-1", "rec-2"} + + +@pytest.mark.asyncio +async def test_deduplication_via_database(): + """Test concurrent pollers use database for deduplication.""" + # Create room and meeting + room = Room( + id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user" + ) + await rooms_controller.create(room) + + meeting = Meeting( + id="meeting-789", + room_name="test-room-3", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="raw-tracks", + ) + await meetings_controller.create(meeting) + + recording_id = "rec-123" + + # Poller 1 + created1 = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="test-key", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-789", + status="pending", + track_keys=["track1.webm", "track2.webm"], + ) + ) + assert created1 is True # First wins + + # Poller 2 (concurrent) + created2 = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="test-key", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-789", + status="pending", + track_keys=["track1.webm", "track2.webm"], + ) + ) + assert created2 is False # Conflict, skip + + +@pytest.mark.asyncio +async def test_orphan_logged_once(): + """Test orphan marked once, skipped on re-poll.""" + # First poll + created1 = await recordings_controller.create_orphan( + Recording( + id="orphan-123", + bucket_name="test-bucket", + object_key="orphan-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, + status="orphan", + track_keys=None, + ) + ) + assert created1 is True + + # Second poll (same orphan discovered again) + created2 = await recordings_controller.create_orphan( + Recording( + id="orphan-123", + bucket_name="test-bucket", + object_key="orphan-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, + status="orphan", + track_keys=None, + ) + ) + assert created2 is False # Already exists + + # Verify it exists + existing = await recordings_controller.get_by_id("orphan-123") + assert existing is not None + assert existing.status == "orphan" + + +@pytest.mark.asyncio +async def test_orphan_constraints(): + """Test orphan invariants are enforced.""" + # Can't create orphan with meeting_id + with pytest.raises(AssertionError, match="meeting_id must be NULL"): + await recordings_controller.create_orphan( + Recording( + id="bad-orphan-1", + bucket_name="test", + object_key="test", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-123", # Should be None + status="orphan", + track_keys=None, + ) + ) + + # Can't create orphan with wrong status + with pytest.raises(AssertionError, match="status must be 'orphan'"): + await recordings_controller.create_orphan( + Recording( + id="bad-orphan-2", + bucket_name="test", + object_key="test", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, + status="pending", # Should be "orphan" + track_keys=None, + ) + ) + + +@pytest.mark.asyncio +async def test_try_create_with_meeting_constraints(): + """Test try_create_with_meeting enforces constraints.""" + # Create room and meeting + room = Room( + id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user" + ) + await rooms_controller.create(room) + + meeting = Meeting( + id="meeting-999", + room_name="test-room-4", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="cloud", + ) + await meetings_controller.create(meeting) + + # Can't create with orphan status + with pytest.raises(AssertionError, match="use create_orphan"): + await recordings_controller.try_create_with_meeting( + Recording( + id="bad-rec-1", + bucket_name="test", + object_key="test", + recorded_at=datetime.now(timezone.utc), + meeting_id="meeting-999", + status="orphan", # Should not be orphan + track_keys=None, + ) + ) + + # Can't create without meeting_id + with pytest.raises(AssertionError, match="meeting_id required"): + await recordings_controller.try_create_with_meeting( + Recording( + id="bad-rec-2", + bucket_name="test", + object_key="test", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, # Should have meeting_id + status="pending", + track_keys=None, + ) + ) diff --git a/server/tests/test_recording_request_flow.py b/server/tests/test_recording_request_flow.py new file mode 100644 index 00000000..9ea309ac --- /dev/null +++ b/server/tests/test_recording_request_flow.py @@ -0,0 +1,300 @@ +""" +Integration tests for recording request flow. + +These tests verify the end-to-end flow of: +1. Starting a recording (creates request) +2. Webhook/polling discovering recording (matches via request) +3. Recording processing (uses existing meeting_id) +""" + +from datetime import datetime, timezone +from uuid import UUID, uuid4 + +import pytest + +from reflector.db.daily_recording_requests import ( + DailyRecordingRequest, + daily_recording_requests_controller, +) +from reflector.db.meetings import Meeting, meetings_controller +from reflector.db.recordings import Recording, recordings_controller +from reflector.db.rooms import Room, rooms_controller + + +@pytest.mark.asyncio +async def test_recording_request_flow_cloud(client): + """Test full cloud recording flow: start -> webhook -> match""" + # Create room and meeting + room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user") + await rooms_controller.create(room) + + meeting_id = f"meeting-{uuid4()}" + meeting = Meeting( + id=meeting_id, + room_name="test-room", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="cloud", + ) + await meetings_controller.create(meeting) + + # Simulate recording start (what endpoint does) + recording_id = "rec-cloud-123" + instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890") + + request = DailyRecordingRequest( + recording_id=recording_id, + meeting_id=meeting_id, + instance_id=instance_id, + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + await daily_recording_requests_controller.create(request) + + # Verify request exists + match = await daily_recording_requests_controller.find_by_recording_id(recording_id) + assert match is not None + assert match[0] == meeting_id + assert match[1] == "cloud" + + # Simulate webhook/polling storing cloud recording + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting_id, + s3_key="s3://bucket/recording.mp4", + duration=120, + ) + assert success is True + + # Verify meeting updated + updated_meeting = await meetings_controller.get_by_id(meeting_id) + assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4" + assert updated_meeting.daily_composed_video_duration == 120 + + +@pytest.mark.asyncio +async def test_recording_request_flow_raw_tracks(client): + """Test full raw-tracks recording flow: start -> webhook/polling -> process""" + # Create room and meeting + room = Room( + id="test-room-2", + name="Test Room 2", + slug="test-room-2", + user_id="test-user", + ) + await rooms_controller.create(room) + + meeting_id = f"meeting-{uuid4()}" + meeting = Meeting( + id=meeting_id, + room_name="test-room-2", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="raw-tracks", + ) + await meetings_controller.create(meeting) + + # Simulate recording start + recording_id = "rec-raw-456" + instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890") + + request = DailyRecordingRequest( + recording_id=recording_id, + meeting_id=meeting_id, + instance_id=instance_id, + type="raw-tracks", + requested_at=datetime.now(timezone.utc), + ) + await daily_recording_requests_controller.create(request) + + # Simulate webhook/polling discovering recording + match = await daily_recording_requests_controller.find_by_recording_id(recording_id) + assert match is not None + found_meeting_id, recording_type = match + assert found_meeting_id == meeting_id + assert recording_type == "raw-tracks" + + # Create recording (what webhook/polling does) + created = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="recordings/20260120/", + recorded_at=datetime.now(timezone.utc), + track_keys=["track1.webm", "track2.webm"], + meeting_id=meeting_id, + status="pending", + ) + ) + assert created is True + + # Verify recording exists with meeting_id + recording = await recordings_controller.get_by_id(recording_id) + assert recording is not None + assert recording.meeting_id == meeting_id + assert recording.status == "pending" + assert len(recording.track_keys) == 2 + + +@pytest.mark.asyncio +async def test_stop_restart_creates_multiple_requests(client): + """Test stop/restart creates multiple request rows with same instance_id""" + # Create room and meeting + room = Room( + id="test-room-3", + name="Test Room 3", + slug="test-room-3", + user_id="test-user", + ) + await rooms_controller.create(room) + + meeting_id = f"meeting-{uuid4()}" + meeting = Meeting( + id=meeting_id, + room_name="test-room-3", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="cloud", + ) + await meetings_controller.create(meeting) + + instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890") + + # First recording + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id="rec-first", + meeting_id=meeting_id, + instance_id=instance_id, + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + ) + + # Stop, then restart (new recording_id, same instance_id) + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id="rec-second", # DIFFERENT + meeting_id=meeting_id, + instance_id=instance_id, # SAME + type="cloud", + requested_at=datetime.now(timezone.utc), + ) + ) + + # Both exist + requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id) + assert len(requests) == 2 + assert {r.recording_id for r in requests} == {"rec-first", "rec-second"} + assert all(r.instance_id == instance_id for r in requests) + + +@pytest.mark.asyncio +async def test_orphan_recording_no_request(client): + """Test orphan recording (no request found)""" + # Simulate polling discovering recording with no request + recording_id = "rec-orphan" + + match = await daily_recording_requests_controller.find_by_recording_id(recording_id) + assert match is None # No request + + # Mark as orphan + created = await recordings_controller.create_orphan( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="orphan-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, + status="orphan", + track_keys=None, + ) + ) + assert created is True + + # Verify orphan exists + recording = await recordings_controller.get_by_id(recording_id) + assert recording is not None + assert recording.status == "orphan" + assert recording.meeting_id is None + + # Second poll - already exists + created_again = await recordings_controller.create_orphan( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="orphan-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=None, + status="orphan", + track_keys=None, + ) + ) + assert created_again is False # Already exists + + +@pytest.mark.asyncio +async def test_concurrent_polling_deduplication(client): + """Test concurrent pollers only queue once""" + # Create room and meeting + room = Room( + id="test-room-4", + name="Test Room 4", + slug="test-room-4", + user_id="test-user", + ) + await rooms_controller.create(room) + + meeting_id = f"meeting-{uuid4()}" + meeting = Meeting( + id=meeting_id, + room_name="test-room-4", + start_date=datetime.now(timezone.utc), + end_date=None, + recording_type="raw-tracks", + ) + await meetings_controller.create(meeting) + + # Create request + recording_id = "rec-concurrent" + await daily_recording_requests_controller.create( + DailyRecordingRequest( + recording_id=recording_id, + meeting_id=meeting_id, + instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"), + type="raw-tracks", + requested_at=datetime.now(timezone.utc), + ) + ) + + # Poller 1 + created1 = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="test-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=meeting_id, + status="pending", + track_keys=["track1.webm"], + ) + ) + assert created1 is True # First wins + + # Poller 2 (concurrent) + created2 = await recordings_controller.try_create_with_meeting( + Recording( + id=recording_id, + bucket_name="test-bucket", + object_key="test-key", + recorded_at=datetime.now(timezone.utc), + meeting_id=meeting_id, + status="pending", + track_keys=["track1.webm"], + ) + ) + assert created2 is False # Conflict, skip + + # Only one recording exists + recording = await recordings_controller.get_by_id(recording_id) + assert recording is not None + assert recording.meeting_id == meeting_id diff --git a/server/tests/test_time_based_meeting_matching.py b/server/tests/test_time_based_meeting_matching.py deleted file mode 100644 index 3506c183..00000000 --- a/server/tests/test_time_based_meeting_matching.py +++ /dev/null @@ -1,374 +0,0 @@ -""" -Integration tests for time-based meeting-to-recording matching. - -Tests the critical path for matching Daily.co recordings to meetings when -API doesn't return instanceId. -""" - -from datetime import datetime, timedelta, timezone - -import pytest - -from reflector.db.meetings import meetings_controller -from reflector.db.rooms import rooms_controller - - -@pytest.fixture -async def test_room(): - """Create a test room for meetings.""" - room = await rooms_controller.add( - name="test-room-time", - user_id="test-user-id", - zulip_auto_post=False, - zulip_stream="", - zulip_topic="", - is_locked=False, - room_mode="normal", - recording_type="cloud", - recording_trigger="automatic", - is_shared=False, - platform="daily", - ) - return room - - -@pytest.fixture -def base_time(): - """Fixed timestamp for deterministic tests.""" - return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc) - - -class TestTimeBasedMatching: - """Test get_by_room_name_and_time() matching logic.""" - - async def test_exact_time_match(self, test_room, base_time): - """Recording timestamp exactly matches meeting start_date.""" - meeting = await meetings_controller.create( - id="meeting-exact", - room_name="daily-test-20260114090000", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-20260114090000", - recording_start=base_time, - time_window_hours=168, - ) - - assert result is not None - assert result.id == meeting.id - - async def test_recording_slightly_after_meeting_start(self, test_room, base_time): - """Recording started 1 minute after meeting (participants joined late).""" - meeting = await meetings_controller.create( - id="meeting-late", - room_name="daily-test-20260114090100", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - recording_start = base_time + timedelta(minutes=1) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-20260114090100", - recording_start=recording_start, - time_window_hours=168, - ) - - assert result is not None - assert result.id == meeting.id - - async def test_duplicate_room_names_picks_closest(self, test_room, base_time): - """ - Two meetings with same room_name (duplicate/race condition). - Should pick closest by timestamp. - """ - meeting1 = await meetings_controller.create( - id="meeting-1-first", - room_name="daily-duplicate-room", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - meeting2 = await meetings_controller.create( - id="meeting-2-second", - room_name="daily-duplicate-room", # Same room_name! - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time + timedelta(seconds=0.99), # 0.99s later - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - # Recording started 0.5s after meeting1 - # Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer - recording_start = base_time + timedelta(seconds=0.5) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-duplicate-room", - recording_start=recording_start, - time_window_hours=168, - ) - - assert result is not None - assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s) - - async def test_outside_time_window_returns_none(self, test_room, base_time): - """Recording outside 1-week window returns None.""" - await meetings_controller.create( - id="meeting-old", - room_name="daily-test-old", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - # Recording 8 days later (outside 7-day window) - recording_start = base_time + timedelta(days=8) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-old", - recording_start=recording_start, - time_window_hours=168, - ) - - assert result is None - - async def test_tie_breaker_deterministic(self, test_room, base_time): - """When time delta identical, tie-breaker by meeting.id is deterministic.""" - meeting_z = await meetings_controller.create( - id="zzz-last-uuid", - room_name="daily-test-tie", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - meeting_a = await meetings_controller.create( - id="aaa-first-uuid", - room_name="daily-test-tie", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, # Exact same start_date - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-tie", - recording_start=base_time, - time_window_hours=168, - ) - - assert result is not None - # Tie-breaker: lexicographically first UUID - assert result.id == "aaa-first-uuid" - - async def test_timezone_naive_datetime_raises(self, test_room, base_time): - """Timezone-naive datetime raises ValueError.""" - await meetings_controller.create( - id="meeting-tz", - room_name="daily-test-tz", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - # Naive datetime (no timezone) - naive_dt = datetime(2026, 1, 14, 9, 0, 0) - - with pytest.raises(ValueError, match="timezone-aware"): - await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-tz", - recording_start=naive_dt, - time_window_hours=168, - ) - - async def test_one_week_boundary_after_included(self, test_room, base_time): - """Meeting 1-week AFTER recording is included (window_end boundary).""" - meeting_time = base_time + timedelta(hours=168) - - await meetings_controller.create( - id="meeting-boundary-after", - room_name="daily-test-boundary-after", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=meeting_time, - end_date=meeting_time + timedelta(hours=1), - room=test_room, - ) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-boundary-after", - recording_start=base_time, - time_window_hours=168, - ) - - assert result is not None - assert result.id == "meeting-boundary-after" - - async def test_one_week_boundary_before_included(self, test_room, base_time): - """Meeting 1-week BEFORE recording is included (window_start boundary).""" - meeting_time = base_time - timedelta(hours=168) - - await meetings_controller.create( - id="meeting-boundary-before", - room_name="daily-test-boundary-before", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=meeting_time, - end_date=meeting_time + timedelta(hours=1), - room=test_room, - ) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-boundary-before", - recording_start=base_time, - time_window_hours=168, - ) - - assert result is not None - assert result.id == "meeting-boundary-before" - - async def test_recording_before_meeting_start(self, test_room, base_time): - """Recording started before meeting (clock skew or early join).""" - await meetings_controller.create( - id="meeting-early", - room_name="daily-test-early", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - recording_start = base_time - timedelta(minutes=2) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-early", - recording_start=recording_start, - time_window_hours=168, - ) - - assert result is not None - assert result.id == "meeting-early" - - async def test_mixed_inside_outside_window(self, test_room, base_time): - """Multiple meetings, only one inside window - returns the inside one.""" - await meetings_controller.create( - id="meeting-old", - room_name="daily-test-mixed", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time - timedelta(days=10), - end_date=base_time - timedelta(days=10, hours=-1), - room=test_room, - ) - - await meetings_controller.create( - id="meeting-inside", - room_name="daily-test-mixed", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time - timedelta(days=2), - end_date=base_time - timedelta(days=2, hours=-1), - room=test_room, - ) - - await meetings_controller.create( - id="meeting-future", - room_name="daily-test-mixed", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time + timedelta(days=10), - end_date=base_time + timedelta(days=10, hours=1), - room=test_room, - ) - - result = await meetings_controller.get_by_room_name_and_time( - room_name="daily-test-mixed", - recording_start=base_time, - time_window_hours=168, - ) - - assert result is not None - assert result.id == "meeting-inside" - - -class TestAtomicCloudRecordingUpdate: - """Test atomic update prevents race conditions.""" - - async def test_first_update_succeeds(self, test_room, base_time): - """First call to set_cloud_recording_if_missing succeeds.""" - meeting = await meetings_controller.create( - id="meeting-atomic-1", - room_name="daily-test-atomic", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - success = await meetings_controller.set_cloud_recording_if_missing( - meeting_id=meeting.id, - s3_key="first-s3-key", - duration=100, - ) - - assert success is True - - updated = await meetings_controller.get_by_id(meeting.id) - assert updated.daily_composed_video_s3_key == "first-s3-key" - assert updated.daily_composed_video_duration == 100 - - async def test_second_update_fails_atomically(self, test_room, base_time): - """Second call to update same meeting doesn't overwrite (atomic check).""" - meeting = await meetings_controller.create( - id="meeting-atomic-2", - room_name="daily-test-atomic2", - room_url="https://example.daily.co/test", - host_room_url="https://example.daily.co/test?t=host", - start_date=base_time, - end_date=base_time + timedelta(hours=1), - room=test_room, - ) - - success1 = await meetings_controller.set_cloud_recording_if_missing( - meeting_id=meeting.id, - s3_key="first-s3-key", - duration=100, - ) - - assert success1 is True - - after_first = await meetings_controller.get_by_id(meeting.id) - assert after_first.daily_composed_video_s3_key == "first-s3-key" - - success2 = await meetings_controller.set_cloud_recording_if_missing( - meeting_id=meeting.id, - s3_key="bucket/path/should-not-overwrite", - duration=200, - ) - - assert success2 is False - - final = await meetings_controller.get_by_id(meeting.id) - assert final.daily_composed_video_s3_key == "first-s3-key" - assert final.daily_composed_video_duration == 100