diff --git a/.gitleaksignore b/.gitleaksignore index 8eb80bd5..141c82d5 100644 --- a/.gitleaksignore +++ b/.gitleaksignore @@ -3,3 +3,4 @@ docs/docs/installation/auth-setup.md:curl-auth-header:250 docs/docs/installation/daily-setup.md:curl-auth-header:277 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 +server/reflector/worker/process.py:generic-api-key:465 diff --git a/server/docs/DAILY_REFLECTOR_DATA_MODEL.md b/server/docs/DAILY_REFLECTOR_DATA_MODEL.md new file mode 100644 index 00000000..c25a3fd6 --- /dev/null +++ b/server/docs/DAILY_REFLECTOR_DATA_MODEL.md @@ -0,0 +1,496 @@ +# Daily.co and Reflector Data Model + +This document explains the data model relationships between Daily.co's API concepts and Reflector's database schema, clarifying common sources of confusion. + +--- + +## Table of Contents + +1. [Core Entities Overview](#core-entities-overview) +2. [Daily.co vs Reflector Terminology](#dailyco-vs-reflector-terminology) +3. [Entity Relationships](#entity-relationships) +4. [Recording Multiplicity](#recording-multiplicity) +5. [Session Identifiers Explained](#session-identifiers-explained) +6. [Time-Based Matching](#time-based-matching) +7. [Multitrack Recording Details](#multitrack-recording-details) +8. [Verified Example](#verified-example) + +--- + +## Core Entities Overview + +### Reflector's Four Primary Entities + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Room (Reflector) │ +│ - Persistent meeting template │ +│ - User-created configuration │ +│ - Example: "team-standup" │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:N + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Meeting (Reflector) │ +│ - Single session instance │ +│ - Creates NEW Daily.co room with timestamp │ +│ - Example: "team-standup-20260115120000" │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:N + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Recording (Reflector + Daily.co) │ +│ - One segment of audio/video │ +│ - New recording created on stop/restart │ +│ - track_keys: JSON array of S3 file paths │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:1 + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Transcript (Reflector) │ +│ - Processed audio with transcription │ +│ - Diarization, summaries, topics │ +│ - One transcript per recording │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Daily.co vs Reflector Terminology + +### Room + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Virtual meeting space on Daily.co platform | User-created meeting template/configuration | +| **Lifetime** | Configurable expiration | Persistent until user deletes | +| **Creation** | API call for each meeting | Pre-created by user once | +| **Reuse** | Can host multiple sessions | Generates new Daily.co room per meeting | +| **Name Format** | `room-name` (reusable) | `room-name` (base identifier) | +| **Timestamping** | Not required | Meeting adds timestamp: `{name}-YYYYMMDDHHMMSS` | + +**Example:** +``` +Reflector Room: "daily-private-igor" (persistent config) + ↓ starts meeting +Daily.co Room: "daily-private-igor-20260110042117" +``` + +### Meeting + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Session that starts when first participant joins | Explicit database record of a session | +| **Identifier** | `mtgSessionId` (generated by Daily.co) | `meeting.id` (UUID, generated by Reflector) | +| **Creation** | Implicit (first participant join) | Explicit API call before participants join | +| **Purpose** | Tracks active session state | Links recordings, transcripts, participants | +| **Scope** | Per room instance | Per Reflector room + timestamp | + +**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)). + +### Recording + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Audio/video files on S3 | Metadata + processing status | +| **Types** | `cloud` (composed video), `raw-tracks` (multitrack) | Stores references + `track_keys` array | +| **Multiplicity** | One recording object per start/stop cycle | One DB row per Daily.co recording object | +| **Identifier** | Daily.co `recording_id` | Same `recording_id` (stored in DB) | +| **Multitrack** | Array of `.webm` files (one per participant) | `track_keys` JSON array with S3 paths | +| **Linkage** | Via `room_name` + `start_ts` | FK `meeting_id` (set via time-based match) | + +**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs. + +--- + +## Entity Relationships + +### Database Schema Relationships + +```sql +-- Simplified schema showing key relationships + +TABLE room ( + id VARCHAR PRIMARY KEY, + name VARCHAR UNIQUE, + platform VARCHAR -- 'whereby' | 'daily' +) + +TABLE meeting ( + id VARCHAR PRIMARY KEY, + room_id VARCHAR REFERENCES room(id) ON DELETE CASCADE, -- nullable + room_name VARCHAR, -- Daily.co room name (timestamped) + start_date TIMESTAMP, + platform VARCHAR +) + +TABLE recording ( + id VARCHAR PRIMARY KEY, -- Daily.co recording_id + meeting_id VARCHAR, -- FK to meeting (set via time-based match) + bucket_name VARCHAR, + object_key VARCHAR, -- S3 prefix + track_keys JSON, -- Array of S3 keys for multitrack + recorded_at TIMESTAMP +) + +TABLE transcript ( + id VARCHAR PRIMARY KEY, + recording_id VARCHAR, -- nullable FK + meeting_id VARCHAR, -- nullable FK + room_id VARCHAR, -- nullable FK + participants JSON, -- [{id, speaker, name, user_id}, ...] + title VARCHAR, + long_summary VARCHAR, + webvtt TEXT +) +``` + +**Relationship Cardinalities:** +``` +1 Room → N Meetings +1 Meeting → N Recordings (common: 1-21 recordings per meeting) +1 Recording → 1 Transcript +1 Meeting → N Transcripts (via recordings) +``` + +--- + +## Recording Multiplicity + +### Why Multiple Recordings Per Meeting? + +Daily.co creates a **new recording object** (new ID, new files) whenever recording stops and restarts. This happens due to: + +1. **Manual stop/start** - User clicks stop, then start recording again +2. **Network reconnection** - Participant drops, reconnects → triggers restart +3. **Participant rejoin** - Last participant leaves, new one joins → new session + +--- + +## Session Identifiers Explained + +### The Hidden Entity: Daily.co Meeting Session + +Daily.co has an **implicit ephemeral entity** that sits between Room and Recording: + +``` +Daily.co Room: "daily-private-igor-20260110042117" + │ + ├─ Daily.co Meeting Session #1 (mtgSessionId: c04334de...) + │ └─ Recording #3 (f4a50f94) - 4s, 1 track + │ + └─ Daily.co Meeting Session #2 (mtgSessionId: 4cdae3c0...) + ├─ Recording #2 (b0fa94da) - 80s, 2 tracks ← recording stopped + └─ Recording #1 (05edf519) - 62s, 1 track ← then restarted +``` + +**Daily.co Meeting Session:** +- **Lifecycle:** Starts when first participant joins, ends when last participant leaves +- **Identifier:** `mtgSessionId` (generated by Daily.co) +- **Persistence:** Ephemeral - new ID if everyone leaves and someone rejoins +- **Relationship:** 1 Session → N Recordings (if recording stops/restarts during session) + +**Key Insight:** Multiple recordings can share the same `mtgSessionId` if recording was stopped and restarted while participants remained connected. + +### mtgSessionId (Meeting Session Identifier) + +`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room). + +### session_id (Per-Participant) + +**Different concept:** Per-participant connection identifier from webhooks. + +**Reflector Tracking:** `daily_participant_session` table +```sql +TABLE daily_participant_session ( + id VARCHAR PRIMARY KEY, -- {meeting_id}:{user_id}:{joined_at_ms} + meeting_id VARCHAR, + session_id VARCHAR, -- From webhook (per-participant) + user_id VARCHAR, + user_name VARCHAR, + joined_at TIMESTAMP, + left_at TIMESTAMP +) +``` +--- + +## Time-Based Matching + +### Problem Statement + +Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers. + +**Example API response:** +```json +{ + "id": "recording-uuid", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018896, + "mtgSessionId": null ← Missing! +} +``` + +### Solution: Time-Based Matching + +**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()` + + +--- + +## Multitrack Recording Details + +### track_keys JSON Array + +**Schema:** `recording.track_keys` (JSON, nullable) +```sql +-- Example recording with 2 audio tracks +{ + "id": "b0fa94da-73b5-4f95-9239-5216a682a505", + "track_keys": [ + "igormonadical/daily-private-igor-20260110042117/1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565", + "igormonadical/daily-private-igor-20260110042117/1768018896877-9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-1768018899286" + ] +} +``` + +**Semantics:** +- `track_keys = null` → Not multitrack (cloud recording) +- `track_keys = []` → Multitrack recording with no audio captured (silence/muted) +- `track_keys = [...]` → Multitrack with N audio tracks + +**Property:** `recording.is_multitrack` (Python) +```python +@property +def is_multitrack(self) -> bool: + return self.track_keys is not None and len(self.track_keys) > 0 +``` + +### Track Filename Format + +Daily.co multitrack filenames encode timing and participant information: + +**Format:** `{recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}` + +**Example:** `1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565` + +**Parsed Components:** +```python +# reflector/utils/daily.py:25-60 +class DailyRecordingFilename(NamedTuple): + recording_start_ts: int # 1768018896877 (milliseconds) + participant_id: str # 890c0eae-e186-4534-a7bd-7c794b7d6d7f + track_start_ts: int # 1768018914565 (milliseconds) +``` + +**Note:** Browser downloads from S3 add `.webm` extension due to MIME headers, but S3 object keys have no extension. + +### Video Track Filtering + +Daily.co API returns both audio and video tracks, but Reflector only processes audio. + +**Filtering Logic:** `reflector/worker/process.py:660` +```python +track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] +``` + +**Example API Response:** +```json +{ + "tracks": [ + {"type": "audio", "s3Key": "...cam-audio-1768018914565"}, + {"type": "audio", "s3Key": "...cam-audio-1768018899286"}, + {"type": "video", "s3Key": "...cam-video-1768018897095"} ← Filtered out + ] +} +``` + +**Result:** Only 2 audio tracks stored in `recording.track_keys`, video track discarded. + +**Rationale:** Reflector is audio transcription system; video not needed for processing. + +### Track-to-Participant Mapping + +**Flow:** +1. Daily.co webhook/polling provides `track_keys` array +2. Each track filename contains `participant_id` +3. Reflector queries Daily.co API: `GET /meetings/{mtgSessionId}/participants` +4. Maps `participant_id` → `user_name` +5. Stores in `transcript.participants` JSON: +```json +[ + { + "id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", + "speaker": 0, + "name": "test2", + "user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22" + }, + { + "id": "9660e8e9-4297-4f17-951d-0b2bf2401803", + "speaker": 1, + "name": "test", + "user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22" + } +] +``` + +**Diarization:** Multitrack recordings don't need speaker diarization AI — speaker identity comes from separate audio tracks. + +--- + +## Example + +### Meeting: daily-private-igor-20260110042117 + +**Context:** User conducted test recording with start/stop cycles, producing 3 recordings. + +#### Database State + +```sql +-- Meeting +id: 034804b8-cee2-4fb4-94d7-122f6f068a61 +room_name: daily-private-igor-20260110042117 +start_date: 2026-01-10 04:21:17+00 +``` + +#### Daily.co API Response + +```json +[ + { + "id": "f4a50f94-053c-4f9d-bda6-78ad051fbc36", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018885, + "duration": 4, + "status": "finished", + "mtgSessionId": "c04334de-42a0-4c2a-96be-a49b068dca85", + "tracks": [ + {"type": "audio", "s3Key": "...62e8f3ae...cam-audio-1768018885417"} + ] + }, + { + "id": "b0fa94da-73b5-4f95-9239-5216a682a505", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018896, + "duration": 80, + "status": "finished", + "mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345", + "tracks": [ + {"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914565"}, + {"type": "audio", "s3Key": "...9660e8e9...cam-audio-1768018899286"}, + {"type": "video", "s3Key": "...9660e8e9...cam-video-1768018897095"} + ] + }, + { + "id": "05edf519-9048-4b49-9a75-73e9826fd950", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018914, + "duration": 62, + "status": "finished", + "mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345", + "tracks": [ + {"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914948"} + ] + } +] +``` + +**Key Observations:** +- 3 recording objects returned by Daily.co +- 2 different `mtgSessionId` values (2 different meeting instances) +- Recording #2 has 3 tracks (2 audio + 1 video) +- Timestamps: 1768018885 → 1768018896 (+11s) → 1768018914 (+18s) + +#### Reflector Database + +**Recordings:** +``` +┌──────────────────────────────────────┬──────────────┬────────────┬──────────────────────────────────────┐ +│ id │ track_count │ duration │ mtgSessionId │ +├──────────────────────────────────────┼──────────────┼────────────┼──────────────────────────────────────┤ +│ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 │ 4s │ c04334de-42a0-4c2a-96be-a49b068dca85 │ +│ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (video=0) │ 80s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │ +│ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 │ 62s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │ +└──────────────────────────────────────┴──────────────┴────────────┴──────────────────────────────────────┘ +``` +**Note:** Recording #2 has 2 audio tracks (video filtered out), not 3. + +**Transcripts:** +``` +┌──────────────────────────────────────┬──────────────────────────────────────┬──────────────┬──────────────────────────────────────────────┐ +│ id │ recording_id │ participants │ title │ +├──────────────────────────────────────┼──────────────────────────────────────┼──────────────┼──────────────────────────────────────────────┤ +│ 17149b1f-546c-4837-80a0-f8140bd16592 │ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 (test) │ (empty - no speech) │ +│ 49801332-3222-4c11-bdb2-375479fc87f2 │ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (test, │ "Examination and Validation Procedures │ +│ │ │ test2) │ Review" │ +│ e5271e12-20fb-42d2-b5a8-21438abadef9 │ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 (test2) │ "Technical Sound Check Procedure Review" │ +└──────────────────────────────────────┴──────────────────────────────────────┴──────────────┴──────────────────────────────────────────────┘ +``` + +**Transcript Content:** + +*Transcript #1* (17149b1f): Empty WebVTT (no audio captured) + +*Transcript #2* (49801332): +```webvtt +WEBVTT + +00:00:03.109 --> 00:00:05.589 +Test, test, test. Test, test, test, test, test. + +00:00:19.829 --> 00:00:22.710 +Test test test test test test test test test test test. +``` +**AI-Generated Summary:** +> "The meeting focused on the critical importance of rigorous testing for ensuring reliability and quality, with test and test2 emphasizing the need for a structured testing framework and meticulous documentation..." + +*Transcript #3* (e5271e12): +```webvtt +WEBVTT + +00:00:02.029 --> 00:00:04.910 +Test, test, test, test, test, test, test, test, test, test, test. +``` + +#### Validation: track_keys → participants + +**Recording #2 (b0fa94da) tracks:** +```json +[ + ".../890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-...", + ".../9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-..." +] +``` + +**Transcript #2 (49801332) participants:** +```json +[ + {"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", "speaker": 0, "name": "test2"}, + {"id": "9660e8e9-4297-4f17-951d-0b2bf2401803", "speaker": 1, "name": "test"} +] +``` + +### Data Flow + +``` +Daily.co API: 3 recordings + ↓ +Polling: _poll_raw_tracks_recordings() + ↓ +Worker: process_multitrack_recording.delay() × 3 + ↓ +DB: 3 recording rows created + ↓ +Pipeline: Audio processing + transcription × 3 + ↓ +DB: 3 transcript rows created (1:1 with recordings) + ↓ +UI: User sees 3 separate transcripts +``` + +**Result:** ✅ 1:1 Recording → Transcript relationship maintained. + + +--- +**Document Version:** 1.0 +**Last Verified:** 2026-01-15 +**Data Source:** Production database + Daily.co API inspection diff --git a/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py b/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py new file mode 100644 index 00000000..6df05b8a --- /dev/null +++ b/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py @@ -0,0 +1,40 @@ +"""add cloud recording support + +Revision ID: 1b1e6a6fc465 +Revises: bd3a729bb379 +Create Date: 2026-01-09 17:17:33.535620 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1b1e6a6fc465" +down_revision: Union[str, None] = "bd3a729bb379" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.add_column( + sa.Column("daily_composed_video_s3_key", sa.String(), nullable=True) + ) + batch_op.add_column( + sa.Column("daily_composed_video_duration", sa.Integer(), nullable=True) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.drop_column("daily_composed_video_duration") + batch_op.drop_column("daily_composed_video_s3_key") + + # ### end Alembic commands ### diff --git a/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py b/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py new file mode 100644 index 00000000..bdf8691d --- /dev/null +++ b/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py @@ -0,0 +1,23 @@ +"""merge cloud recording and celery heads + +Revision ID: e69f08ead8ea +Revises: 1b1e6a6fc465, 80beb1ea3269 +Create Date: 2026-01-21 21:39:10.326841 + +""" + +from typing import Sequence, Union + +# revision identifiers, used by Alembic. +revision: str = "e69f08ead8ea" +down_revision: Union[str, None] = ("1b1e6a6fc465", "80beb1ea3269") +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + pass + + +def downgrade() -> None: + pass diff --git a/server/reflector/dailyco_api/__init__.py b/server/reflector/dailyco_api/__init__.py index 65be426e..69e94a08 100644 --- a/server/reflector/dailyco_api/__init__.py +++ b/server/reflector/dailyco_api/__init__.py @@ -3,7 +3,7 @@ Daily.co API Module """ # Client -from .client import DailyApiClient, DailyApiError +from .client import DailyApiClient, DailyApiError, RecordingType # Request models from .requests import ( @@ -64,6 +64,7 @@ __all__ = [ # Client "DailyApiClient", "DailyApiError", + "RecordingType", # Requests "CreateRoomRequest", "RoomProperties", diff --git a/server/reflector/dailyco_api/client.py b/server/reflector/dailyco_api/client.py index 43b76d88..8634039f 100644 --- a/server/reflector/dailyco_api/client.py +++ b/server/reflector/dailyco_api/client.py @@ -7,7 +7,8 @@ Reference: https://docs.daily.co/reference/rest-api """ from http import HTTPStatus -from typing import Any +from typing import Any, Literal +from uuid import UUID import httpx import structlog @@ -32,6 +33,8 @@ from .responses import ( logger = structlog.get_logger(__name__) +RecordingType = Literal["cloud", "raw-tracks"] + class DailyApiError(Exception): """Daily.co API error with full request/response context.""" @@ -395,6 +398,38 @@ class DailyApiClient: return [RecordingResponse(**r) for r in data["data"]] + async def start_recording( + self, + room_name: NonEmptyString, + recording_type: RecordingType, + instance_id: UUID, + ) -> dict[str, Any]: + """Start recording via REST API. + + Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start + + Args: + room_name: Daily.co room name + recording_type: Recording type + instance_id: UUID for this recording session + + Returns: + Recording start confirmation from Daily.co API + + Raises: + DailyApiError: If API request fails + """ + client = await self._get_client() + response = await client.post( + f"{self.base_url}/rooms/{room_name}/recordings/start", + headers=self.headers, + json={ + "type": recording_type, + "instanceId": str(instance_id), + }, + ) + return await self._handle_response(response, "start_recording") + # ============================================================================ # MEETING TOKENS # ============================================================================ diff --git a/server/reflector/dailyco_api/instance_id.py b/server/reflector/dailyco_api/instance_id.py new file mode 100644 index 00000000..7743229f --- /dev/null +++ b/server/reflector/dailyco_api/instance_id.py @@ -0,0 +1,37 @@ +""" +Daily.co recording instanceId generation utilities. + +Deterministic instance ID generation for cloud and raw-tracks recordings. +MUST match frontend logic +""" + +from uuid import UUID, uuid5 + +from reflector.utils.string import NonEmptyString + +# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds +# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching +RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890") + + +def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID: + """ + Generate instanceId for cloud recording. + + Cloud recordings use meeting ID directly as instanceId. + This ensures each meeting has one unique cloud recording. + """ + return UUID(meeting_id) + + +def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID: + """ + Generate instanceId for raw-tracks recording. + + Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure + different instanceId from cloud while remaining deterministic. + + Daily.co requires cloud and raw-tracks to have different instanceIds + for concurrent recording. + """ + return uuid5(RAW_TRACKS_NAMESPACE, meeting_id) diff --git a/server/reflector/dailyco_api/requests.py b/server/reflector/dailyco_api/requests.py index 0adf892b..885579e0 100644 --- a/server/reflector/dailyco_api/requests.py +++ b/server/reflector/dailyco_api/requests.py @@ -88,13 +88,6 @@ class MeetingTokenProperties(BaseModel): is_owner: bool = Field( default=False, description="Grant owner privileges to token holder" ) - start_cloud_recording: bool = Field( - default=False, description="Automatically start cloud recording on join" - ) - start_cloud_recording_opts: dict | None = Field( - default=None, - description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)", - ) enable_recording_ui: bool = Field( default=True, description="Show recording controls in UI" ) diff --git a/server/reflector/dailyco_api/responses.py b/server/reflector/dailyco_api/responses.py index 6ac95188..21b8fcf0 100644 --- a/server/reflector/dailyco_api/responses.py +++ b/server/reflector/dailyco_api/responses.py @@ -116,6 +116,7 @@ class RecordingS3Info(BaseModel): bucket_name: NonEmptyString bucket_region: NonEmptyString + key: NonEmptyString | None = None endpoint: NonEmptyString | None = None @@ -132,6 +133,9 @@ class RecordingResponse(BaseModel): id: NonEmptyString = Field(description="Recording identifier") room_name: NonEmptyString = Field(description="Room where recording occurred") start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)") + type: Literal["cloud", "raw-tracks"] | None = Field( + None, description="Recording type (may be missing from API)" + ) status: RecordingStatus = Field( description="Recording status ('in-progress' or 'finished')" ) @@ -145,6 +149,9 @@ class RecordingResponse(BaseModel): None, description="Token for sharing recording" ) s3: RecordingS3Info | None = Field(None, description="S3 bucket information") + s3key: NonEmptyString | None = Field( + None, description="S3 key for cloud recordings (top-level field)" + ) tracks: list[DailyTrack] = Field( default_factory=list, description="Track list for raw-tracks recordings (always array, never null)", diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 8a80e756..02f407b2 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Literal import sqlalchemy as sa @@ -9,7 +9,7 @@ from reflector.db import get_database, metadata from reflector.db.rooms import Room from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.utils import generate_uuid4 -from reflector.utils.string import assert_equal +from reflector.utils.string import NonEmptyString, assert_equal meetings = sa.Table( "meeting", @@ -63,6 +63,9 @@ meetings = sa.Table( nullable=False, server_default=assert_equal(WHEREBY_PLATFORM, "whereby"), ), + # Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby + sa.Column("daily_composed_video_s3_key", sa.String, nullable=True), + sa.Column("daily_composed_video_duration", sa.Integer, nullable=True), sa.Index("idx_meeting_room_id", "room_id"), sa.Index("idx_meeting_calendar_event", "calendar_event_id"), ) @@ -110,6 +113,9 @@ class Meeting(BaseModel): calendar_event_id: str | None = None calendar_metadata: dict[str, Any] | None = None platform: Platform = WHEREBY_PLATFORM + # Daily.co composed video (Brady Bunch grid) - Daily.co only + daily_composed_video_s3_key: str | None = None + daily_composed_video_duration: int | None = None class MeetingController: @@ -171,6 +177,90 @@ class MeetingController: return None return Meeting(**result) + async def get_by_room_name_all(self, room_name: str) -> list[Meeting]: + """Get all meetings for a room name (not just most recent).""" + query = meetings.select().where(meetings.c.room_name == room_name) + results = await get_database().fetch_all(query) + return [Meeting(**r) for r in results] + + async def get_by_room_name_and_time( + self, + room_name: NonEmptyString, + recording_start: datetime, + time_window_hours: int = 168, + ) -> Meeting | None: + """ + Get meeting by room name closest to recording timestamp. + + HACK ALERT: Daily.co doesn't return instanceId in recordings API response, + and mtgSessionId is separate from our instanceId. Time-based matching is + the least-bad workaround. + + This handles edge case of duplicate room_name values in DB (race conditions, + double-clicks, etc.) by matching based on temporal proximity. + + Algorithm: + 1. Find meetings within time_window_hours of recording_start + 2. Return meeting with start_date closest to recording_start + 3. If tie, return first by meeting.id (deterministic) + + Args: + room_name: Daily.co room name from recording + recording_start: Timezone-aware datetime from recording.start_ts + time_window_hours: Search window (default 168 = 1 week) + + Returns: + Meeting closest to recording timestamp, or None if no matches + + Failure modes: + - Multiple meetings in same room within ~5 minutes: picks closest + - All meetings outside time window: returns None + - Clock skew between Daily.co and DB: 1-week window tolerates this + + Why 1 week window: + - Handles webhook failures (recording discovered days later) + - Tolerates clock skew + - Rejects unrelated meetings from weeks ago + + """ + # Validate timezone-aware datetime + if recording_start.tzinfo is None: + raise ValueError( + f"recording_start must be timezone-aware, got naive datetime: {recording_start}" + ) + + window_start = recording_start - timedelta(hours=time_window_hours) + window_end = recording_start + timedelta(hours=time_window_hours) + + query = ( + meetings.select() + .where( + sa.and_( + meetings.c.room_name == room_name, + meetings.c.start_date >= window_start, + meetings.c.start_date <= window_end, + ) + ) + .order_by(meetings.c.start_date) + ) + + results = await get_database().fetch_all(query) + if not results: + return None + + candidates = [Meeting(**r) for r in results] + + # Find meeting with start_date closest to recording_start + closest = min( + candidates, + key=lambda m: ( + abs((m.start_date - recording_start).total_seconds()), + m.id, # Tie-breaker: deterministic by UUID + ), + ) + + return closest + async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: """ Get latest active meeting for a room. @@ -260,6 +350,44 @@ class MeetingController: query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) await get_database().execute(query) + async def set_cloud_recording_if_missing( + self, + meeting_id: NonEmptyString, + s3_key: NonEmptyString, + duration: int, + ) -> bool: + """ + Set cloud recording only if not already set. + + Returns True if updated, False if already set. + Prevents webhook/polling race condition via atomic WHERE clause. + """ + # Check current value before update to detect actual change + meeting_before = await self.get_by_id(meeting_id) + if not meeting_before: + return False + + was_null = meeting_before.daily_composed_video_s3_key is None + + query = ( + meetings.update() + .where( + sa.and_( + meetings.c.id == meeting_id, + meetings.c.daily_composed_video_s3_key.is_(None), + ) + ) + .values( + daily_composed_video_s3_key=s3_key, + daily_composed_video_duration=duration, + ) + ) + await get_database().execute(query) + + # Return True only if value was NULL before (actual update occurred) + # If was_null=False, the WHERE clause prevented the update + return was_null + async def increment_num_clients(self, meeting_id: str) -> None: """Atomically increment participant count.""" query = ( diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index 82609b38..bf799561 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -7,6 +7,7 @@ from sqlalchemy import or_ from reflector.db import get_database, metadata from reflector.utils import generate_uuid4 +from reflector.utils.string import NonEmptyString recordings = sa.Table( "recording", @@ -71,6 +72,19 @@ class RecordingController: query = recordings.delete().where(recordings.c.id == id) await get_database().execute(query) + async def set_meeting_id( + self, + recording_id: NonEmptyString, + meeting_id: NonEmptyString, + ) -> None: + """Link recording to meeting.""" + query = ( + recordings.update() + .where(recordings.c.id == recording_id) + .values(meeting_id=meeting_id) + ) + await get_database().execute(query) + # no check for existence async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: if not recording_ids: diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index f71e959b..cef78b4c 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -1,4 +1,5 @@ from datetime import datetime +from uuid import UUID from reflector.dailyco_api import ( CreateMeetingTokenRequest, @@ -12,9 +13,11 @@ from reflector.dailyco_api import ( RoomProperties, verify_webhook_signature, ) +from reflector.dailyco_api import RecordingType as DailyRecordingType from reflector.db.daily_participant_sessions import ( daily_participant_sessions_controller, ) +from reflector.db.meetings import meetings_controller from reflector.db.rooms import Room from reflector.logger import logger from reflector.storage import get_dailyco_storage @@ -58,10 +61,9 @@ class DailyClient(VideoPlatformClient): enable_recording = None if room.recording_type == self.RECORDING_LOCAL: enable_recording = "local" - elif ( - room.recording_type == self.RECORDING_CLOUD - ): # daily "cloud" is not our "cloud" - enable_recording = "raw-tracks" + elif room.recording_type == self.RECORDING_CLOUD: + # Don't set enable_recording - recordings started via REST API (not auto-start) + enable_recording = None properties = RoomProperties( enable_recording=enable_recording, @@ -106,8 +108,6 @@ class DailyClient(VideoPlatformClient): Daily.co doesn't provide historical session API, so we query our database where participant.joined/left webhooks are stored. """ - from reflector.db.meetings import meetings_controller # noqa: PLC0415 - meeting = await meetings_controller.get_by_room_name(room_name) if not meeting: return [] @@ -179,21 +179,14 @@ class DailyClient(VideoPlatformClient): async def create_meeting_token( self, room_name: DailyRoomName, - start_cloud_recording: bool, enable_recording_ui: bool, user_id: NonEmptyString | None = None, is_owner: bool = False, max_recording_duration_seconds: int | None = None, ) -> NonEmptyString: - start_cloud_recording_opts = None - if start_cloud_recording and max_recording_duration_seconds: - start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds} - properties = MeetingTokenProperties( room_name=room_name, user_id=user_id, - start_cloud_recording=start_cloud_recording, - start_cloud_recording_opts=start_cloud_recording_opts, enable_recording_ui=enable_recording_ui, is_owner=is_owner, ) @@ -201,6 +194,23 @@ class DailyClient(VideoPlatformClient): result = await self._api_client.create_meeting_token(request) return result.token + async def start_recording( + self, + room_name: DailyRoomName, + recording_type: DailyRecordingType, + instance_id: UUID, + ) -> dict: + """Start recording via Daily.co REST API. + + Args: + instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector) + """ + return await self._api_client.start_recording( + room_name=room_name, + recording_type=recording_type, + instance_id=instance_id, + ) + async def close(self): """Clean up API client resources.""" await self._api_client.close() diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index cbdac409..384290da 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -19,6 +19,7 @@ from reflector.video_platforms.factory import create_platform_client from reflector.worker.process import ( poll_daily_room_presence_task, process_multitrack_recording, + store_cloud_recording, ) router = APIRouter() @@ -174,46 +175,64 @@ async def _handle_recording_started(event: RecordingStartedEvent): async def _handle_recording_ready(event: RecordingReadyEvent): room_name = event.payload.room_name recording_id = event.payload.recording_id - tracks = event.payload.tracks - - if not tracks: - logger.warning( - "recording.ready-to-download: missing tracks", - room_name=room_name, - recording_id=recording_id, - payload=event.payload, - ) - return + recording_type = event.payload.type logger.info( "Recording ready for download", room_name=room_name, recording_id=recording_id, - num_tracks=len(tracks), + recording_type=recording_type, platform="daily", ) bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME if not bucket_name: - logger.error( - "DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording" - ) + logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured") return - track_keys = [t.s3Key for t in tracks if t.type == "audio"] + if recording_type == "cloud": + await store_cloud_recording( + recording_id=recording_id, + room_name=room_name, + s3_key=event.payload.s3_key, + duration=event.payload.duration, + start_ts=event.payload.start_ts, + source="webhook", + ) - logger.info( - "Recording webhook queuing processing", - recording_id=recording_id, - room_name=room_name, - ) + elif recording_type == "raw-tracks": + tracks = event.payload.tracks + if not tracks: + logger.warning( + "raw-tracks recording: missing tracks array", + room_name=room_name, + recording_id=recording_id, + ) + return - process_multitrack_recording.delay( - bucket_name=bucket_name, - daily_room_name=room_name, - recording_id=recording_id, - track_keys=track_keys, - ) + track_keys = [t.s3Key for t in tracks if t.type == "audio"] + + logger.info( + "Raw-tracks recording queuing processing", + recording_id=recording_id, + room_name=room_name, + num_tracks=len(track_keys), + ) + + process_multitrack_recording.delay( + bucket_name=bucket_name, + daily_room_name=room_name, + recording_id=recording_id, + track_keys=track_keys, + recording_start_ts=event.payload.start_ts, + ) + + else: + logger.warning( + "Unknown recording type", + recording_type=recording_type, + recording_id=recording_id, + ) async def _handle_recording_error(event: RecordingErrorEvent): diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py index 25987e47..44adf500 100644 --- a/server/reflector/views/meetings.py +++ b/server/reflector/views/meetings.py @@ -1,16 +1,23 @@ +import json from datetime import datetime, timezone -from typing import Annotated, Optional +from typing import Annotated, Any, Optional +from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel import reflector.auth as auth +from reflector.dailyco_api import RecordingType +from reflector.dailyco_api.client import DailyApiError from reflector.db.meetings import ( MeetingConsent, meeting_consent_controller, meetings_controller, ) from reflector.db.rooms import rooms_controller +from reflector.logger import logger +from reflector.utils.string import NonEmptyString +from reflector.video_platforms.factory import create_platform_client router = APIRouter() @@ -73,3 +80,72 @@ async def meeting_deactivate( await meetings_controller.update_meeting(meeting_id, is_active=False) return {"status": "success", "meeting_id": meeting_id} + + +class StartRecordingRequest(BaseModel): + type: RecordingType + instanceId: UUID + + +@router.post("/meetings/{meeting_id}/recordings/start") +async def start_recording( + meeting_id: NonEmptyString, body: StartRecordingRequest +) -> dict[str, Any]: + """Start cloud or raw-tracks recording via Daily.co REST API. + + Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time. + Uses different instanceIds for cloud vs raw-tracks (same won't work) + + Note: No authentication required - anonymous users supported. TODO this is a DOS vector + """ + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + log = logger.bind( + meeting_id=meeting_id, + room_name=meeting.room_name, + recording_type=body.type, + instance_id=body.instanceId, + ) + + try: + client = create_platform_client("daily") + result = await client.start_recording( + room_name=meeting.room_name, + recording_type=body.type, + instance_id=body.instanceId, + ) + + log.info(f"Started {body.type} recording via REST API") + + return {"status": "ok", "result": result} + + except DailyApiError as e: + # Parse Daily.co error response to detect "has an active stream" + try: + error_body = json.loads(e.response_body) + error_info = error_body.get("info", "") + + # "has an active stream" means recording already started by another participant + # This is SUCCESS from business logic perspective - return 200 + if "has an active stream" in error_info: + log.info( + f"{body.type} recording already active (started by another participant)" + ) + return {"status": "already_active", "instanceId": str(body.instanceId)} + except (json.JSONDecodeError, KeyError): + pass # Fall through to error handling + + # All other Daily.co API errors + log.error(f"Failed to start {body.type} recording", error=str(e)) + raise HTTPException( + status_code=500, detail=f"Failed to start recording: {str(e)}" + ) + + except Exception as e: + # Non-Daily.co errors + log.error(f"Failed to start {body.type} recording", error=str(e)) + raise HTTPException( + status_code=500, detail=f"Failed to start recording: {str(e)}" + ) diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 278235b4..11e668c0 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -73,6 +73,8 @@ class Meeting(BaseModel): calendar_event_id: str | None = None calendar_metadata: dict[str, Any] | None = None platform: Platform + daily_composed_video_s3_key: str | None = None + daily_composed_video_duration: int | None = None class CreateRoom(BaseModel): @@ -586,7 +588,6 @@ async def rooms_join_meeting( ) token = await client.create_meeting_token( meeting.room_name, - start_cloud_recording=meeting.recording_type == "cloud", enable_recording_ui=enable_recording_ui, user_id=user_id, is_owner=user_id == room.user_id, diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index b1256c94..a353cf55 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -6,6 +6,11 @@ from celery.schedules import crontab from reflector.settings import settings logger = structlog.get_logger(__name__) + +# Polling intervals (seconds) +# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery) +POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0 + if celery.current_app.main != "default": logger.info(f"Celery already configured ({celery.current_app})") app = celery.current_app @@ -44,7 +49,7 @@ else: }, "poll_daily_recordings": { "task": "reflector.worker.process.poll_daily_recordings", - "schedule": 180.0, # Every 3 minutes (configurable lookback window) + "schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC, }, "trigger_daily_reconciliation": { "task": "reflector.worker.process.trigger_daily_reconciliation", diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index a1fa3a3c..8d88de43 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -2,7 +2,7 @@ import json import os import re from datetime import datetime, timezone -from typing import List +from typing import List, Literal from urllib.parse import unquote import av @@ -42,6 +42,7 @@ from reflector.utils.daily import ( filter_cam_audio_tracks, recording_lock_key, ) +from reflector.utils.string import NonEmptyString from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.whereby_utils import ( parse_whereby_recording_filename, @@ -175,13 +176,18 @@ async def process_multitrack_recording( daily_room_name: DailyRoomName, recording_id: str, track_keys: list[str], + recording_start_ts: int, ): + """ + Process raw-tracks (multitrack) recording from Daily.co. + """ logger.info( "Processing multitrack recording", bucket=bucket_name, room_name=daily_room_name, recording_id=recording_id, provided_keys=len(track_keys), + recording_start_ts=recording_start_ts, ) if not track_keys: @@ -212,7 +218,7 @@ async def process_multitrack_recording( ) await _process_multitrack_recording_inner( - bucket_name, daily_room_name, recording_id, track_keys + bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts ) @@ -221,8 +227,18 @@ async def _process_multitrack_recording_inner( daily_room_name: DailyRoomName, recording_id: str, track_keys: list[str], + recording_start_ts: int, ): - """Inner function containing the actual processing logic.""" + """ + Process multitrack recording (first time or reprocessing). + + For first processing (webhook/polling): + - Uses recording_start_ts for time-based meeting matching (no instanceId available) + + For reprocessing: + - Uses recording.meeting_id directly (already linked during first processing) + - recording_start_ts is ignored + """ tz = timezone.utc recorded_at = datetime.now(tz) @@ -240,7 +256,53 @@ async def _process_multitrack_recording_inner( exc_info=True, ) - meeting = await meetings_controller.get_by_room_name(daily_room_name) + # Check if recording already exists (reprocessing path) + recording = await recordings_controller.get_by_id(recording_id) + + if recording and recording.meeting_id: + # Reprocessing: recording exists with meeting already linked + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if not meeting: + logger.error( + "Reprocessing: meeting not found for recording - skipping", + meeting_id=recording.meeting_id, + recording_id=recording_id, + ) + return + + logger.info( + "Reprocessing: using existing recording.meeting_id", + recording_id=recording_id, + meeting_id=meeting.id, + room_name=daily_room_name, + ) + else: + # First processing: recording doesn't exist, need time-based matching + # (Daily.co doesn't return instanceId in API, must match by timestamp) + recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc) + meeting = await meetings_controller.get_by_room_name_and_time( + room_name=daily_room_name, + recording_start=recording_start, + time_window_hours=168, # 1 week + ) + if not meeting: + logger.error( + "Raw-tracks: no meeting found within 1-week window (time-based match) - skipping", + recording_id=recording_id, + room_name=daily_room_name, + recording_start_ts=recording_start_ts, + recording_start=recording_start.isoformat(), + ) + return # Skip processing, will retry on next poll + logger.info( + "First processing: found meeting via time-based matching", + meeting_id=meeting.id, + room_name=daily_room_name, + recording_id=recording_id, + time_delta_seconds=abs( + (meeting.start_date - recording_start).total_seconds() + ), + ) room_name_base = extract_base_room_name(daily_room_name) @@ -248,18 +310,8 @@ async def _process_multitrack_recording_inner( if not room: raise Exception(f"Room not found: {room_name_base}") - if not meeting: - raise Exception(f"Meeting not found: {room_name_base}") - - logger.info( - "Found existing Meeting for recording", - meeting_id=meeting.id, - room_name=daily_room_name, - recording_id=recording_id, - ) - - recording = await recordings_controller.get_by_id(recording_id) if not recording: + # Create recording (only happens during first processing) object_key_dir = os.path.dirname(track_keys[0]) if track_keys else "" recording = await recordings_controller.create( Recording( @@ -271,7 +323,19 @@ async def _process_multitrack_recording_inner( track_keys=track_keys, ) ) - # else: Recording already exists; metadata set at creation time + elif not recording.meeting_id: + # Recording exists but meeting_id is null (failed first processing) + # Update with meeting from time-based matching + await recordings_controller.set_meeting_id( + recording_id=recording.id, + meeting_id=meeting.id, + ) + recording.meeting_id = meeting.id + logger.info( + "Updated existing recording with meeting_id", + recording_id=recording.id, + meeting_id=meeting.id, + ) transcript = await transcripts_controller.get_by_recording_id(recording.id) if not transcript: @@ -338,9 +402,11 @@ async def poll_daily_recordings(): """Poll Daily.co API for recordings and process missing ones. Fetches latest recordings from Daily.co API (default limit 100), compares with DB, - and queues processing for recordings not already in DB. + and stores/queues missing recordings: + - Cloud recordings: Store S3 key in meeting table + - Raw-tracks recordings: Queue multitrack processing - For each missing recording, uses audio tracks from API response. + Acts as fallback when webhooks active, primary discovery when webhooks unavailable. Worker-level locking provides idempotency (see process_multitrack_recording). """ @@ -381,51 +447,222 @@ async def poll_daily_recordings(): ) return - recording_ids = [rec.id for rec in finished_recordings] + # Separate cloud and raw-tracks recordings + cloud_recordings = [] + raw_tracks_recordings = [] + for rec in finished_recordings: + if rec.type: + # Daily.co API returns null type - make sure this assumption stays + # If this logs, Daily.co API changed - we can remove inference logic. + recording_type = rec.type + logger.warning( + "Recording has explicit type field from Daily.co API (unexpected, API may have changed)", + recording_id=rec.id, + room_name=rec.room_name, + recording_type=recording_type, + has_s3key=bool(rec.s3key), + tracks_count=len(rec.tracks), + ) + else: + # DAILY.CO API LIMITATION: + # GET /recordings response does NOT include type field. + # Daily.co docs mention type field exists, but API never returns it. + # Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None. + # + # This is not a recent API change - Daily.co has never returned type. + # Must infer from structural properties. + # + # Inference heuristic (reliable for finished recordings): + # - Has tracks array → raw-tracks + # - Has s3key but no tracks → cloud + # - Neither → failed/incomplete recording + if len(rec.tracks) > 0: + recording_type = "raw-tracks" + elif rec.s3key and len(rec.tracks) == 0: + recording_type = "cloud" + else: + logger.warning( + "Recording has no type, no s3key, and no tracks - likely failed recording", + recording_id=rec.id, + room_name=rec.room_name, + status=rec.status, + duration=rec.duration, + mtg_session_id=rec.mtgSessionId, + ) + continue + + if recording_type == "cloud": + cloud_recordings.append(rec) + else: + raw_tracks_recordings.append(rec) + + logger.debug( + "Poll results", + total=len(finished_recordings), + cloud=len(cloud_recordings), + raw_tracks=len(raw_tracks_recordings), + ) + + # Process cloud recordings + await _poll_cloud_recordings(cloud_recordings) + + # Process raw-tracks recordings + await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name) + + +async def store_cloud_recording( + recording_id: NonEmptyString, + room_name: NonEmptyString, + s3_key: NonEmptyString, + duration: int, + start_ts: int, + source: Literal["webhook", "polling"], +) -> bool: + """ + Store cloud recording reference in meeting table. + + Common function for both webhook and polling code paths. + Uses time-based matching to handle duplicate room_name values. + + Args: + recording_id: Daily.co recording ID + room_name: Daily.co room name + s3_key: S3 key where recording is stored + duration: Recording duration in seconds + start_ts: Unix timestamp when recording started + source: "webhook" or "polling" (for logging) + + Returns: + True if stored, False if skipped/failed + """ + recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc) + + meeting = await meetings_controller.get_by_room_name_and_time( + room_name=room_name, + recording_start=recording_start, + time_window_hours=168, # 1 week + ) + + if not meeting: + logger.warning( + f"Cloud recording ({source}): no meeting found within 1-week window", + recording_id=recording_id, + room_name=room_name, + recording_start_ts=start_ts, + recording_start=recording_start.isoformat(), + ) + return False + + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key=s3_key, + duration=duration, + ) + + if not success: + logger.debug( + f"Cloud recording ({source}): already set (race lost)", + recording_id=recording_id, + room_name=room_name, + meeting_id=meeting.id, + ) + return False + + logger.info( + f"Cloud recording stored via {source} (time-based match)", + meeting_id=meeting.id, + recording_id=recording_id, + s3_key=s3_key, + duration=duration, + time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()), + ) + return True + + +async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): + """ + Store cloud recordings missing from meeting table via polling. + + Uses time-based matching via store_cloud_recording(). + """ + if not cloud_recordings: + return + + stored_count = 0 + for recording in cloud_recordings: + # Extract S3 key from recording (cloud recordings use s3key field) + s3_key = recording.s3key or (recording.s3.key if recording.s3 else None) + if not s3_key: + logger.warning( + "Cloud recording: missing S3 key", + recording_id=recording.id, + room_name=recording.room_name, + ) + continue + + stored = await store_cloud_recording( + recording_id=recording.id, + room_name=recording.room_name, + s3_key=s3_key, + duration=recording.duration, + start_ts=recording.start_ts, + source="polling", + ) + if stored: + stored_count += 1 + + logger.info( + "Cloud recording polling complete", + total=len(cloud_recordings), + stored=stored_count, + ) + + +async def _poll_raw_tracks_recordings( + raw_tracks_recordings: List[FinishedRecordingResponse], + bucket_name: str, +): + """Queue raw-tracks recordings missing from DB (existing logic).""" + if not raw_tracks_recordings: + return + + recording_ids = [rec.id for rec in raw_tracks_recordings] existing_recordings = await recordings_controller.get_by_ids(recording_ids) existing_ids = {rec.id for rec in existing_recordings} missing_recordings = [ - rec for rec in finished_recordings if rec.id not in existing_ids + rec for rec in raw_tracks_recordings if rec.id not in existing_ids ] if not missing_recordings: logger.debug( - "All recordings already in DB", - api_count=len(finished_recordings), + "All raw-tracks recordings already in DB", + api_count=len(raw_tracks_recordings), existing_count=len(existing_recordings), ) return logger.info( - "Found recordings missing from DB", + "Found raw-tracks recordings missing from DB", missing_count=len(missing_recordings), - total_api_count=len(finished_recordings), + total_api_count=len(raw_tracks_recordings), existing_count=len(existing_recordings), ) for recording in missing_recordings: if not recording.tracks: - if recording.status == "finished": - logger.warning( - "Finished recording has no tracks (no audio captured)", - recording_id=recording.id, - room_name=recording.room_name, - ) - else: - logger.debug( - "No tracks in recording yet", - recording_id=recording.id, - room_name=recording.room_name, - status=recording.status, - ) + logger.warning( + "Finished raw-tracks recording has no tracks (no audio captured)", + recording_id=recording.id, + room_name=recording.room_name, + ) continue track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] if not track_keys: logger.warning( - "No audio tracks found in recording (only video tracks)", + "No audio tracks found in raw-tracks recording", recording_id=recording.id, room_name=recording.room_name, total_tracks=len(recording.tracks), @@ -433,7 +670,7 @@ async def poll_daily_recordings(): continue logger.info( - "Queueing missing recording for processing", + "Queueing missing raw-tracks recording for processing", recording_id=recording.id, room_name=recording.room_name, track_count=len(track_keys), @@ -444,6 +681,7 @@ async def poll_daily_recordings(): daily_room_name=recording.room_name, recording_id=recording.id, track_keys=track_keys, + recording_start_ts=recording.start_ts, ) @@ -883,11 +1121,16 @@ async def reprocess_failed_daily_recordings(): transcript_status=transcript.status if transcript else None, ) + # For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) + # Reprocessing uses recording.meeting_id directly instead of time-based matching + recording_start_ts = int(recording.recorded_at.timestamp()) + process_multitrack_recording.delay( bucket_name=bucket_name, daily_room_name=meeting.room_name, recording_id=recording.id, track_keys=recording.track_keys, + recording_start_ts=recording_start_ts, ) reprocessed_count += 1 diff --git a/server/tests/test_dailyco_instance_id.py b/server/tests/test_dailyco_instance_id.py new file mode 100644 index 00000000..d410205b --- /dev/null +++ b/server/tests/test_dailyco_instance_id.py @@ -0,0 +1,147 @@ +""" +Tests for Daily.co instanceId generation. + +Verifies deterministic behavior and frontend/backend consistency. +""" + +import pytest + +from reflector.dailyco_api.instance_id import ( + RAW_TRACKS_NAMESPACE, + generate_cloud_instance_id, + generate_raw_tracks_instance_id, +) + + +class TestInstanceIdDeterminism: + """Test deterministic generation of instanceIds.""" + + def test_cloud_instance_id_is_meeting_id(self): + """Cloud instanceId is meeting ID directly (implicitly tests determinism).""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + result1 = generate_cloud_instance_id(meeting_id) + result2 = generate_cloud_instance_id(meeting_id) + assert str(result1) == meeting_id + assert result1 == result2 + + def test_raw_tracks_instance_id_deterministic(self): + """Raw-tracks instanceId generation is deterministic.""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + result1 = generate_raw_tracks_instance_id(meeting_id) + result2 = generate_raw_tracks_instance_id(meeting_id) + assert result1 == result2 + + def test_raw_tracks_different_from_cloud(self): + """Raw-tracks instanceId differs from cloud instanceId.""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + cloud_id = generate_cloud_instance_id(meeting_id) + raw_tracks_id = generate_raw_tracks_instance_id(meeting_id) + assert cloud_id != raw_tracks_id + + def test_different_meetings_different_instance_ids(self): + """Different meetings generate different instanceIds.""" + meeting_id1 = "550e8400-e29b-41d4-a716-446655440000" + meeting_id2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8" + + cloud1 = generate_cloud_instance_id(meeting_id1) + cloud2 = generate_cloud_instance_id(meeting_id2) + assert cloud1 != cloud2 + + raw1 = generate_raw_tracks_instance_id(meeting_id1) + raw2 = generate_raw_tracks_instance_id(meeting_id2) + assert raw1 != raw2 + + +class TestFrontendBackendConsistency: + """Test that backend matches frontend logic.""" + + def test_namespace_matches_frontend(self): + """Namespace UUID matches frontend RAW_TRACKS_NAMESPACE constant.""" + # From www/app/[roomName]/components/DailyRoom.tsx + frontend_namespace = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + assert str(RAW_TRACKS_NAMESPACE) == frontend_namespace + + def test_raw_tracks_generation_matches_frontend_logic(self): + """Backend UUIDv5 generation matches frontend uuidv5() call.""" + # Example meeting ID + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + + # Backend result + backend_result = generate_raw_tracks_instance_id(meeting_id) + + # Expected result from frontend: uuidv5(meeting.id, RAW_TRACKS_NAMESPACE) + # Python uuid5 uses (namespace, name) argument order + # JavaScript uuid.v5(name, namespace) - same args, different order + # Frontend: uuidv5(meeting.id, "a1b2c3d4-e5f6-7890-abcd-ef1234567890") + # Backend: uuid5(UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), meeting.id) + + # Verify it's a valid UUID (will raise if not) + assert len(str(backend_result)) == 36 + assert backend_result.version == 5 + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_invalid_uuid_format_raises(self): + """Invalid UUID format raises ValueError.""" + with pytest.raises(ValueError): + generate_cloud_instance_id("not-a-uuid") + + def test_lowercase_uuid_normalized_for_cloud(self): + """Cloud instanceId: lowercase/uppercase UUIDs produce same result.""" + meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000" + meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000" + + cloud_lower = generate_cloud_instance_id(meeting_id_lower) + cloud_upper = generate_cloud_instance_id(meeting_id_upper) + assert cloud_lower == cloud_upper + + def test_uuid5_is_case_sensitive_warning(self): + """ + Documents uuid5 case sensitivity - different case UUIDs produce different hashes. + + Not a problem: meeting.id always lowercase from DB and API. + Frontend generates raw-tracks instanceId from lowercase meeting.id. + Backend receives lowercase meeting_id when matching. + + This test documents the behavior, not a requirement. + """ + meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000" + meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000" + + raw_lower = generate_raw_tracks_instance_id(meeting_id_lower) + raw_upper = generate_raw_tracks_instance_id(meeting_id_upper) + assert raw_lower != raw_upper + + +class TestMtgSessionIdVsInstanceId: + """ + Documents that Daily.co's mtgSessionId differs from our instanceId. + + Why this matters: We investigated using mtgSessionId for matching but discovered + it's Daily.co-generated and unrelated to instanceId we send. This test documents + that finding so we don't investigate it again. + + Production data from 2026-01-13: + - Meeting ID: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 + - Cloud instanceId: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 (same as meeting ID) + - Raw-tracks instanceId: 784b3af3-c7dd-57f0-ac54-2ee91c6927cb (UUIDv5 derived) + - Recording mtgSessionId: f25a2e09-740f-4932-9c0d-b1bebaa669c6 (different!) + + Conclusion: Cannot use mtgSessionId for recording-to-meeting matching. + """ + + def test_mtg_session_id_differs_from_our_instance_ids(self): + """mtgSessionId (Daily.co) != instanceId (ours) for both cloud and raw-tracks.""" + meeting_id = "4ad503b6-8189-4910-a8f7-68cdd1b7f990" + expected_raw_tracks_id = "784b3af3-c7dd-57f0-ac54-2ee91c6927cb" + mtg_session_id = "f25a2e09-740f-4932-9c0d-b1bebaa669c6" + + cloud_instance_id = generate_cloud_instance_id(meeting_id) + raw_tracks_instance_id = generate_raw_tracks_instance_id(meeting_id) + + assert str(cloud_instance_id) == meeting_id + assert str(raw_tracks_instance_id) == expected_raw_tracks_id + assert str(cloud_instance_id) != mtg_session_id + assert str(raw_tracks_instance_id) != mtg_session_id diff --git a/server/tests/test_time_based_meeting_matching.py b/server/tests/test_time_based_meeting_matching.py new file mode 100644 index 00000000..3506c183 --- /dev/null +++ b/server/tests/test_time_based_meeting_matching.py @@ -0,0 +1,374 @@ +""" +Integration tests for time-based meeting-to-recording matching. + +Tests the critical path for matching Daily.co recordings to meetings when +API doesn't return instanceId. +""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from reflector.db.meetings import meetings_controller +from reflector.db.rooms import rooms_controller + + +@pytest.fixture +async def test_room(): + """Create a test room for meetings.""" + room = await rooms_controller.add( + name="test-room-time", + user_id="test-user-id", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic", + is_shared=False, + platform="daily", + ) + return room + + +@pytest.fixture +def base_time(): + """Fixed timestamp for deterministic tests.""" + return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc) + + +class TestTimeBasedMatching: + """Test get_by_room_name_and_time() matching logic.""" + + async def test_exact_time_match(self, test_room, base_time): + """Recording timestamp exactly matches meeting start_date.""" + meeting = await meetings_controller.create( + id="meeting-exact", + room_name="daily-test-20260114090000", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-20260114090000", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting.id + + async def test_recording_slightly_after_meeting_start(self, test_room, base_time): + """Recording started 1 minute after meeting (participants joined late).""" + meeting = await meetings_controller.create( + id="meeting-late", + room_name="daily-test-20260114090100", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + recording_start = base_time + timedelta(minutes=1) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-20260114090100", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting.id + + async def test_duplicate_room_names_picks_closest(self, test_room, base_time): + """ + Two meetings with same room_name (duplicate/race condition). + Should pick closest by timestamp. + """ + meeting1 = await meetings_controller.create( + id="meeting-1-first", + room_name="daily-duplicate-room", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + meeting2 = await meetings_controller.create( + id="meeting-2-second", + room_name="daily-duplicate-room", # Same room_name! + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time + timedelta(seconds=0.99), # 0.99s later + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Recording started 0.5s after meeting1 + # Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer + recording_start = base_time + timedelta(seconds=0.5) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-duplicate-room", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s) + + async def test_outside_time_window_returns_none(self, test_room, base_time): + """Recording outside 1-week window returns None.""" + await meetings_controller.create( + id="meeting-old", + room_name="daily-test-old", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Recording 8 days later (outside 7-day window) + recording_start = base_time + timedelta(days=8) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-old", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is None + + async def test_tie_breaker_deterministic(self, test_room, base_time): + """When time delta identical, tie-breaker by meeting.id is deterministic.""" + meeting_z = await meetings_controller.create( + id="zzz-last-uuid", + room_name="daily-test-tie", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + meeting_a = await meetings_controller.create( + id="aaa-first-uuid", + room_name="daily-test-tie", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, # Exact same start_date + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-tie", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + # Tie-breaker: lexicographically first UUID + assert result.id == "aaa-first-uuid" + + async def test_timezone_naive_datetime_raises(self, test_room, base_time): + """Timezone-naive datetime raises ValueError.""" + await meetings_controller.create( + id="meeting-tz", + room_name="daily-test-tz", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Naive datetime (no timezone) + naive_dt = datetime(2026, 1, 14, 9, 0, 0) + + with pytest.raises(ValueError, match="timezone-aware"): + await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-tz", + recording_start=naive_dt, + time_window_hours=168, + ) + + async def test_one_week_boundary_after_included(self, test_room, base_time): + """Meeting 1-week AFTER recording is included (window_end boundary).""" + meeting_time = base_time + timedelta(hours=168) + + await meetings_controller.create( + id="meeting-boundary-after", + room_name="daily-test-boundary-after", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=meeting_time, + end_date=meeting_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-boundary-after", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-boundary-after" + + async def test_one_week_boundary_before_included(self, test_room, base_time): + """Meeting 1-week BEFORE recording is included (window_start boundary).""" + meeting_time = base_time - timedelta(hours=168) + + await meetings_controller.create( + id="meeting-boundary-before", + room_name="daily-test-boundary-before", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=meeting_time, + end_date=meeting_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-boundary-before", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-boundary-before" + + async def test_recording_before_meeting_start(self, test_room, base_time): + """Recording started before meeting (clock skew or early join).""" + await meetings_controller.create( + id="meeting-early", + room_name="daily-test-early", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + recording_start = base_time - timedelta(minutes=2) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-early", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-early" + + async def test_mixed_inside_outside_window(self, test_room, base_time): + """Multiple meetings, only one inside window - returns the inside one.""" + await meetings_controller.create( + id="meeting-old", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time - timedelta(days=10), + end_date=base_time - timedelta(days=10, hours=-1), + room=test_room, + ) + + await meetings_controller.create( + id="meeting-inside", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time - timedelta(days=2), + end_date=base_time - timedelta(days=2, hours=-1), + room=test_room, + ) + + await meetings_controller.create( + id="meeting-future", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time + timedelta(days=10), + end_date=base_time + timedelta(days=10, hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-mixed", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-inside" + + +class TestAtomicCloudRecordingUpdate: + """Test atomic update prevents race conditions.""" + + async def test_first_update_succeeds(self, test_room, base_time): + """First call to set_cloud_recording_if_missing succeeds.""" + meeting = await meetings_controller.create( + id="meeting-atomic-1", + room_name="daily-test-atomic", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="first-s3-key", + duration=100, + ) + + assert success is True + + updated = await meetings_controller.get_by_id(meeting.id) + assert updated.daily_composed_video_s3_key == "first-s3-key" + assert updated.daily_composed_video_duration == 100 + + async def test_second_update_fails_atomically(self, test_room, base_time): + """Second call to update same meeting doesn't overwrite (atomic check).""" + meeting = await meetings_controller.create( + id="meeting-atomic-2", + room_name="daily-test-atomic2", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + success1 = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="first-s3-key", + duration=100, + ) + + assert success1 is True + + after_first = await meetings_controller.get_by_id(meeting.id) + assert after_first.daily_composed_video_s3_key == "first-s3-key" + + success2 = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="bucket/path/should-not-overwrite", + duration=200, + ) + + assert success2 is False + + final = await meetings_controller.get_by_id(meeting.id) + assert final.daily_composed_video_s3_key == "first-s3-key" + assert final.daily_composed_video_duration == 100 diff --git a/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx b/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx index d7ba37dc..500c4cc5 100644 --- a/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx @@ -3,7 +3,8 @@ import React from "react"; import Markdown from "react-markdown"; import "../../../styles/markdown.css"; import type { components } from "../../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { useTranscriptUpdate } from "../../../lib/apiHooks"; import { @@ -18,7 +19,7 @@ import { LuPen } from "react-icons/lu"; import { useError } from "../../../(errors)/errorContext"; type FinalSummaryProps = { - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; onUpdate: (newSummary: string) => void; finalSummaryRef: React.Dispatch>; diff --git a/www/app/(app)/transcripts/createTranscript.ts b/www/app/(app)/transcripts/createTranscript.ts index 8a235161..0991130f 100644 --- a/www/app/(app)/transcripts/createTranscript.ts +++ b/www/app/(app)/transcripts/createTranscript.ts @@ -2,10 +2,11 @@ import type { components } from "../../reflector-api"; import { useTranscriptCreate } from "../../lib/apiHooks"; type CreateTranscript = components["schemas"]["CreateTranscript"]; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type UseCreateTranscript = { - transcript: GetTranscript | null; + transcript: GetTranscriptWithParticipants | null; loading: boolean; error: Error | null; create: (transcriptCreationDetails: CreateTranscript) => Promise; diff --git a/www/app/(app)/transcripts/shareAndPrivacy.tsx b/www/app/(app)/transcripts/shareAndPrivacy.tsx index 04cda920..207d900f 100644 --- a/www/app/(app)/transcripts/shareAndPrivacy.tsx +++ b/www/app/(app)/transcripts/shareAndPrivacy.tsx @@ -2,7 +2,8 @@ import { useEffect, useState } from "react"; import { ShareMode, toShareMode } from "../../lib/shareMode"; import type { components } from "../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; import { @@ -27,7 +28,7 @@ import { featureEnabled } from "../../lib/features"; type ShareAndPrivacyProps = { finalSummaryElement: HTMLDivElement | null; - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; }; diff --git a/www/app/(app)/transcripts/shareZulip.tsx b/www/app/(app)/transcripts/shareZulip.tsx index c3efe3ab..96242de2 100644 --- a/www/app/(app)/transcripts/shareZulip.tsx +++ b/www/app/(app)/transcripts/shareZulip.tsx @@ -1,7 +1,8 @@ import { useState, useEffect, useMemo } from "react"; import type { components } from "../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { BoxProps, @@ -26,7 +27,7 @@ import { import { featureEnabled } from "../../lib/features"; type ShareZulipProps = { - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; disabled: boolean; }; diff --git a/www/app/(app)/transcripts/transcriptTitle.tsx b/www/app/(app)/transcripts/transcriptTitle.tsx index 49a22c71..ea738673 100644 --- a/www/app/(app)/transcripts/transcriptTitle.tsx +++ b/www/app/(app)/transcripts/transcriptTitle.tsx @@ -2,7 +2,8 @@ import { useState } from "react"; import type { components } from "../../reflector-api"; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { useTranscriptUpdate, @@ -20,7 +21,7 @@ type TranscriptTitle = { onUpdate: (newTitle: string) => void; // share props - transcript: GetTranscript | null; + transcript: GetTranscriptWithParticipants | null; topics: GetTranscriptTopic[] | null; finalSummaryElement: HTMLDivElement | null; }; diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index 44fa6315..d1c00254 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -22,14 +22,29 @@ import DailyIframe, { import type { components } from "../../reflector-api"; import { useAuth } from "../../lib/AuthProvider"; import { useConsentDialog } from "../../lib/consent"; -import { useRoomJoinMeeting } from "../../lib/apiHooks"; +import { + useRoomJoinMeeting, + useMeetingStartRecording, +} from "../../lib/apiHooks"; import { omit } from "remeda"; -import { assertExists } from "../../lib/utils"; -import { assertMeetingId } from "../../lib/types"; +import { + assertExists, + NonEmptyString, + parseNonEmptyString, +} from "../../lib/utils"; +import { assertMeetingId, DailyRecordingType } from "../../lib/types"; +import { useUuidV5 } from "react-uuid-hook"; const CONSENT_BUTTON_ID = "recording-consent"; const RECORDING_INDICATOR_ID = "recording-indicator"; +// Namespace UUID for UUIDv5 generation of raw-tracks instanceIds +// DO NOT CHANGE: Breaks instanceId determinism across deployments +const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"; + +const RECORDING_START_DELAY_MS = 2000; +const RECORDING_START_MAX_RETRIES = 5; + type Meeting = components["schemas"]["Meeting"]; type Room = components["schemas"]["RoomDetails"]; @@ -73,9 +88,7 @@ const useFrame = ( cbs: { onLeftMeeting: () => void; onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void; - onJoinMeeting: ( - startRecording: (args: { type: "raw-tracks" }) => void, - ) => void; + onJoinMeeting: () => void; }, ) => { const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE); @@ -126,7 +139,7 @@ const useFrame = ( console.error("frame is null in joined-meeting callback"); return; } - cbs.onJoinMeeting(frame.startRecording.bind(frame)); + cbs.onJoinMeeting(); }; frame.on("joined-meeting", joinCb); return () => { @@ -173,8 +186,15 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const authLastUserId = auth.lastUserId; const [container, setContainer] = useState(null); const joinMutation = useRoomJoinMeeting(); + const startRecordingMutation = useMeetingStartRecording(); const [joinedMeeting, setJoinedMeeting] = useState(null); + // Generate deterministic instanceIds so all participants use SAME IDs + const cloudInstanceId = parseNonEmptyString(meeting.id); + const rawTracksInstanceId = parseNonEmptyString( + useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0], + ); + const roomName = params?.roomName as string; const { @@ -228,19 +248,72 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { ], ); - const handleFrameJoinMeeting = useCallback( - (startRecording: (args: { type: "raw-tracks" }) => void) => { - try { - if (meeting.recording_type === "cloud") { - console.log("Starting cloud recording"); - startRecording({ type: "raw-tracks" }); - } - } catch (error) { - console.error("Failed to start recording:", error); - } - }, - [meeting.recording_type], - ); + const handleFrameJoinMeeting = useCallback(() => { + if (meeting.recording_type === "cloud") { + console.log("Starting dual recording via REST API", { + cloudInstanceId, + rawTracksInstanceId, + }); + + // Start both cloud and raw-tracks via backend REST API (with retry on 404) + // Daily.co needs time to register call as "hosting" for REST API + const startRecordingWithRetry = ( + type: DailyRecordingType, + instanceId: NonEmptyString, + attempt: number = 1, + ) => { + setTimeout(() => { + startRecordingMutation.mutate( + { + params: { + path: { + meeting_id: meeting.id, + }, + }, + body: { + type, + instanceId, + }, + }, + { + onError: (error: any) => { + const errorText = error?.detail || error?.message || ""; + const is404NotHosting = errorText.includes( + "does not seem to be hosting a call", + ); + const isActiveStream = errorText.includes( + "has an active stream", + ); + + if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) { + console.log( + `${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`, + ); + startRecordingWithRetry(type, instanceId, attempt + 1); + } else if (isActiveStream) { + console.log( + `${type}: Recording already active (started by another participant)`, + ); + } else { + console.error(`Failed to start ${type} recording:`, error); + } + }, + }, + ); + }, RECORDING_START_DELAY_MS); + }; + + // Start both recordings + startRecordingWithRetry("cloud", cloudInstanceId); + startRecordingWithRetry("raw-tracks", rawTracksInstanceId); + } + }, [ + meeting.recording_type, + meeting.id, + startRecordingMutation, + cloudInstanceId, + rawTracksInstanceId, + ]); const recordingIconUrl = useMemo( () => new URL("/recording-icon.svg", window.location.origin), diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index a59c31eb..a00eb552 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -567,6 +567,20 @@ export function useTranscriptSpeakerMerge() { ); } +export function useMeetingStartRecording() { + const { setError } = useError(); + + return $api.useMutation( + "post", + "/v1/meetings/{meeting_id}/recordings/start", + { + onError: (error) => { + setError(error as Error, "Failed to start recording"); + }, + }, + ); +} + export function useMeetingAudioConsent() { const { setError } = useError(); diff --git a/www/app/lib/transcript.ts b/www/app/lib/transcript.ts index d1fd8b3d..f23a7c38 100644 --- a/www/app/lib/transcript.ts +++ b/www/app/lib/transcript.ts @@ -1,5 +1,6 @@ import { components } from "../reflector-api"; -type ApiTranscriptStatus = components["schemas"]["GetTranscript"]["status"]; +type ApiTranscriptStatus = + components["schemas"]["GetTranscriptWithParticipants"]["status"]; export type TranscriptStatus = ApiTranscriptStatus; diff --git a/www/app/lib/types.ts b/www/app/lib/types.ts index c5ab8ce7..54e2bae1 100644 --- a/www/app/lib/types.ts +++ b/www/app/lib/types.ts @@ -89,3 +89,5 @@ export const assertMeetingId = (s: string): MeetingId => { // just cast for now return nes as MeetingId; }; + +export type DailyRecordingType = "cloud" | "raw-tracks"; diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index 3704a9a0..12a7085c 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -75,6 +75,31 @@ export interface paths { patch: operations["v1_meeting_deactivate"]; trace?: never; }; + "/v1/meetings/{meeting_id}/recordings/start": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** + * Start Recording + * @description Start cloud or raw-tracks recording via Daily.co REST API. + * + * Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time. + * Uses different instanceIds for cloud vs raw-tracks (same won't work) + * + * Note: No authentication required - anonymous users supported. TODO this is a DOS vector + */ + post: operations["v1_start_recording"]; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/v1/rooms": { parameters: { query?: never; @@ -1544,6 +1569,10 @@ export interface components { * @enum {string} */ platform: "whereby" | "daily"; + /** Daily Composed Video S3 Key */ + daily_composed_video_s3_key?: string | null; + /** Daily Composed Video Duration */ + daily_composed_video_duration?: number | null; }; /** MeetingConsentRequest */ MeetingConsentRequest: { @@ -1818,6 +1847,19 @@ export interface components { /** Words */ words: components["schemas"]["Word"][]; }; + /** StartRecordingRequest */ + StartRecordingRequest: { + /** + * Type + * @enum {string} + */ + type: "cloud" | "raw-tracks"; + /** + * Instanceid + * Format: uuid + */ + instanceId: string; + }; /** Stream */ Stream: { /** Stream Id */ @@ -2126,6 +2168,43 @@ export interface operations { }; }; }; + v1_start_recording: { + parameters: { + query?: never; + header?: never; + path: { + meeting_id: string; + }; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["StartRecordingRequest"]; + }; + }; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": { + [key: string]: unknown; + }; + }; + }; + /** @description Validation Error */ + 422: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; + }; + }; v1_rooms_list: { parameters: { query?: { diff --git a/www/package.json b/www/package.json index 13895a3a..ceefbf55 100644 --- a/www/package.json +++ b/www/package.json @@ -46,6 +46,7 @@ "react-markdown": "^9.0.0", "react-qr-code": "^2.0.12", "react-select-search": "^4.1.7", + "react-uuid-hook": "^0.0.6", "redlock": "5.0.0-beta.2", "remeda": "^2.31.1", "sass": "^1.63.6", diff --git a/www/pnpm-lock.yaml b/www/pnpm-lock.yaml index 4cc219ea..cd65de55 100644 --- a/www/pnpm-lock.yaml +++ b/www/pnpm-lock.yaml @@ -106,6 +106,9 @@ importers: react-select-search: specifier: ^4.1.7 version: 4.1.8(prop-types@15.8.1)(react-dom@18.3.1(react@18.3.1))(react@18.3.1) + react-uuid-hook: + specifier: ^0.0.6 + version: 0.0.6(react@18.3.1) redlock: specifier: 5.0.0-beta.2 version: 5.0.0-beta.2 @@ -7628,6 +7631,14 @@ packages: "@types/react": optional: true + react-uuid-hook@0.0.6: + resolution: + { + integrity: sha512-u9+EvFbqpWfLE/ReYFry0vYu1BAg1fY9ekr0XLSDNnfWyrnVFytpurwz5qYsIB0psevuvrpZHIcvu7AjUwqinA==, + } + peerDependencies: + react: ">=16.8.0" + react@18.3.1: resolution: { @@ -8771,6 +8782,13 @@ packages: integrity: sha512-Fykw5U4eZESbq739BeLvEBFRuJODfrlmjx5eJux7W817LjRaq4b7/i4t2zxQmhcX+fAj4nMfRdTzO4tmwLKn0w==, } + uuid@13.0.0: + resolution: + { + integrity: sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==, + } + hasBin: true + uuid@8.3.2: resolution: { @@ -14570,6 +14588,11 @@ snapshots: optionalDependencies: "@types/react": 18.2.20 + react-uuid-hook@0.0.6(react@18.3.1): + dependencies: + react: 18.3.1 + uuid: 13.0.0 + react@18.3.1: dependencies: loose-envify: 1.4.0 @@ -15401,6 +15424,8 @@ snapshots: uuid-validate@0.0.3: {} + uuid@13.0.0: {} + uuid@8.3.2: {} uuid@9.0.1: {}