diff --git a/docs/docs/pipelines/live-pipeline.md b/docs/docs/pipelines/live-pipeline.md index 66854fb4..758fd945 100644 --- a/docs/docs/pipelines/live-pipeline.md +++ b/docs/docs/pipelines/live-pipeline.md @@ -4,421 +4,4 @@ title: Live pipeline # Live pipeline -# Daily.co - -This document details every external call, storage operation, and database write that occurs when a new Daily.co recording is discovered. - -## Trigger - -Two entry points, both converging to the same handler: - -1. **Webhook**: Daily.co sends `POST /v1/daily/webhook` with `recording.ready-to-download` -2. **Polling**: `GET /recordings` (paginated, max 100/call) → filter new → convert to same payload format - -Both produce `RecordingReadyPayload` and call `handleRecordingReady(payload)`. - -``` -┌─────────────────┐ ┌──────────────────────────┐ -│ Daily Webhook │────▶│ RecordingReadyPayload │ -│ (push) │ │ {room_name, recording_id│ -└─────────────────┘ │ tracks[], ...} │ - └────────────┬─────────────┘ -┌─────────────────┐ │ -│ GET /recordings│ ▼ -│ (poll) │────▶ convert ──▶ handleRecordingReady() -└─────────────────┘ │ - ▼ - ┌────────────────────────┐ - │ process_multitrack_ │ - │ recording pipeline │ - └────────────────────────┘ -``` - -**Polling API**: `GET https://api.daily.co/v1/recordings` -- Pagination: `limit` (max 100), `starting_after`, `ending_before` -- Rate limit: ~2 req/sec -- Response: `{total_count, data: Recording[]}` - -```mermaid -flowchart TB - subgraph Trigger["1. Recording Discovery - Daily.co Webhook"] - DAILY_WEBHOOK["Daily.co sends POST /v1/daily/webhook
type: recording.ready-to-download"] - VERIFY["Verify X-Webhook-Signature (HMAC)"] - PARSE["Parse DailyWebhookEvent
Extract tracks[], room_name, recording_id"] - FILTER["Filter audio tracks only
track_keys = [t.s3Key for t in tracks if t.type == 'audio']"] - DISPATCH["process_multitrack_recording.delay()"] - - DAILY_WEBHOOK --> VERIFY --> PARSE --> FILTER --> DISPATCH - end - - subgraph Init["2. Recording Initialization"] - FETCH_MEETING[DB READ: meetings_controller.get_by_room_name] - FETCH_ROOM[DB READ: rooms_controller.get_by_name] - DAILY_API_REC[Daily API: GET /recordings/recording_id] - DAILY_API_PART[Daily API: GET /meetings/mtgSessionId/participants] - CREATE_RECORDING[DB WRITE: recordings_controller.create] - CREATE_TRANSCRIPT[DB WRITE: transcripts_controller.add] - MAP_PARTICIPANTS[DB WRITE: transcript.participants upsert] - end - - subgraph Pipeline["3. Processing Pipeline"] - direction TB - PAD[Track Padding & Mixdown] - TRANSCRIBE[GPU: Transcription per track] - TOPICS[LLM: Topic Detection] - TITLE[LLM: Title Generation] - SUMMARY[LLM: Summary Generation] - end - - subgraph Storage["4. S3 Operations"] - S3_PRESIGN[S3: generate_presigned_url for tracks] - S3_UPLOAD_PADDED[S3 UPLOAD: padded tracks temp] - S3_UPLOAD_MP3[S3 UPLOAD: audio.mp3] - S3_DELETE_TEMP[S3 DELETE: cleanup temp files] - end - - subgraph PostProcess["5. Post-Processing"] - CONSENT[Consent check & cleanup] - ZULIP[Zulip: send/update message] - WEBHOOK_OUT[Webhook: POST to room.webhook_url] - end - - Trigger --> Init --> Pipeline - Pipeline --> Storage - Pipeline --> PostProcess -``` - -## Detailed Sequence: Daily.co Multitrack Recording - -```mermaid -sequenceDiagram - participant DailyCo as Daily.co - participant API as FastAPI /v1/daily/webhook - participant Worker as Celery Worker - participant DB as PostgreSQL - participant DailyAPI as Daily.co REST API - participant S3 as AWS S3 - participant GPU as Modal.com GPU - participant LLM as LLM Service - participant WS as WebSocket - participant Zulip as Zulip - participant ExtWH as External Webhook - - Note over DailyCo,API: Phase 0: Webhook Receipt - DailyCo->>API: POST /v1/daily/webhook - Note right of DailyCo: X-Webhook-Signature, X-Webhook-Timestamp - API->>API: verify_webhook_signature() - API->>API: Extract audio track s3Keys from payload.tracks[] - API->>Worker: process_multitrack_recording.delay() - API-->>DailyCo: 200 OK - - Note over Worker,DailyAPI: Phase 1: Recording Initialization - Worker->>DB: SELECT meeting WHERE room_name=? - Worker->>DB: SELECT room WHERE name=? - Worker->>DailyAPI: GET /recordings/{recording_id} - DailyAPI-->>Worker: {mtgSessionId, ...} - Worker->>DailyAPI: GET /meetings/{mtgSessionId}/participants - DailyAPI-->>Worker: [{participant_id, user_name}, ...] - Worker->>DB: INSERT INTO recording - Worker->>DB: INSERT INTO transcript (status='idle') - loop For each track_key (parse participant_id from filename) - Worker->>DB: UPSERT transcript.participants[speaker=idx, name=X] - end - - Note over Worker,S3: Phase 2: Track Padding - Worker->>DB: UPDATE transcript SET status='processing' - Worker->>WS: broadcast STATUS='processing' - loop For each track in track_keys (N tracks) - Worker->>S3: generate_presigned_url(track_key, DAILYCO_BUCKET) - S3-->>Worker: presigned_url (2hr) - Note over Worker: PyAV: read WebM, extract start_time - Note over Worker: PyAV: adelay filter (pad silence) - Worker->>S3: PUT file_pipeline/{id}/tracks/padded_{idx}.webm - Worker->>S3: generate_presigned_url(padded_{idx}.webm) - end - - Note over Worker,S3: Phase 3: Audio Mixdown - Note over Worker: PyAV: amix filter → stereo MP3 - Worker->>DB: UPDATE transcript SET duration=X - Worker->>WS: broadcast DURATION - Worker->>S3: PUT {transcript_id}/audio.mp3 - Worker->>DB: UPDATE transcript SET audio_location='storage' - - Note over Worker: Phase 4: Waveform - Note over Worker: Generate peaks from MP3 - Worker->>DB: UPDATE events+=WAVEFORM - Worker->>WS: broadcast WAVEFORM - - Note over Worker,GPU: Phase 5: Transcription (N GPU calls) - loop For each padded track URL (N tracks) - Worker->>GPU: POST /v1/audio/transcriptions-from-url - Note right of GPU: {audio_file_url, language, batch:true} - GPU-->>Worker: {words: [{word, start, end}, ...]} - Note over Worker: Assign speaker=track_idx to words - end - Note over Worker: Merge all words, sort by start time - Worker->>DB: UPDATE events+=TRANSCRIPT - Worker->>WS: broadcast TRANSCRIPT - - Note over Worker,S3: Cleanup temp files - loop For each padded file - Worker->>S3: DELETE padded_{idx}.webm - end - - Note over Worker,LLM: Phase 6: Topic Detection (C LLM calls) - Note over Worker: C = ceil(total_words / 300) - loop For each 300-word chunk (C chunks) - Worker->>LLM: TOPIC_PROMPT + words[i:i+300] - Note right of LLM: "Extract main topic title + 2-sentence summary" - LLM-->>Worker: TitleSummary{title, summary} - Worker->>DB: UPSERT topics[] - Worker->>DB: UPDATE events+=TOPIC - Worker->>WS: broadcast TOPIC - end - - Note over Worker,LLM: Phase 7a: Title Generation (1 LLM call) - Note over Worker: Input: all TitleSummary[].title joined - Worker->>LLM: TITLE_PROMPT - Note right of LLM: "Generate concise title from topic titles" - LLM-->>Worker: "Meeting Title" - Worker->>DB: UPDATE transcript SET title=X - Worker->>DB: UPDATE events+=FINAL_TITLE - Worker->>WS: broadcast FINAL_TITLE - - Note over Worker,LLM: Phase 7b: Summary Generation (2+2M LLM calls) - Note over Worker: Reconstruct full transcript from TitleSummary[].transcript - opt If participants unknown - Worker->>LLM: PARTICIPANTS_PROMPT - LLM-->>Worker: ParticipantsResponse - end - Worker->>LLM: SUBJECTS_PROMPT (call #1) - Note right of LLM: "Main high-level topics? Max 6" - LLM-->>Worker: SubjectsResponse{subjects: ["A", "B", ...]} - - loop For each subject (M subjects, max 6) - Worker->>LLM: DETAILED_SUBJECT_PROMPT (call #2..#1+M) - Note right of LLM: "Info about 'A': decisions, actions, deadlines" - LLM-->>Worker: detailed_response (discarded after next call) - Worker->>LLM: PARAGRAPH_SUMMARY_PROMPT (call #2+M..#1+2M) - Note right of LLM: "Summarize in 1 paragraph" - LLM-->>Worker: paragraph → summaries[] - end - - Worker->>LLM: RECAP_PROMPT (call #2+2M) - Note right of LLM: "High-level quick recap, 1 paragraph" - LLM-->>Worker: recap - Note over Worker: long_summary = "# Quick recap\n{recap}\n# Summary\n**A**\n{para1}..." - Note over Worker: short_summary = recap only - Worker->>DB: UPDATE long_summary, short_summary - Worker->>DB: UPDATE events+=FINAL_LONG_SUMMARY - Worker->>WS: broadcast FINAL_LONG_SUMMARY - Worker->>DB: UPDATE events+=FINAL_SHORT_SUMMARY - Worker->>WS: broadcast FINAL_SHORT_SUMMARY - - Note over Worker,DB: Phase 8: Finalize - Worker->>DB: UPDATE transcript SET status='ended' - Worker->>DB: UPDATE events+=STATUS - Worker->>WS: broadcast STATUS='ended' - - Note over Worker,ExtWH: Phase 9: Post-Processing Chain - Worker->>DB: SELECT meeting_consent WHERE meeting_id=? - alt Any consent denied - Worker->>S3: DELETE tracks from DAILYCO_BUCKET - Worker->>S3: DELETE audio.mp3 from TRANSCRIPT_BUCKET - Worker->>DB: UPDATE transcript SET audio_deleted=true - end - - opt Room has zulip_auto_post=true - alt Existing zulip_message_id - Worker->>Zulip: PATCH /api/v1/messages/{id} - else New - Worker->>Zulip: POST /api/v1/messages - Zulip-->>Worker: {id} - Worker->>DB: UPDATE transcript SET zulip_message_id=X - end - end - - opt Room has webhook_url - Worker->>ExtWH: POST {webhook_url} - Note right of ExtWH: X-Webhook-Signature: HMAC-SHA256 - Note right of ExtWH: Body: {transcript_id, room_id, ...} - end -``` - -## Title & Summary Generation Data Flow - -```mermaid -flowchart TB - subgraph Input["Input: TitleSummary[] from Topic Detection"] - TS1["TitleSummary 1
title: 'Q1 Budget'
transcript: words[0:300]"] - TS2["TitleSummary 2
title: 'Product Launch'
transcript: words[300:600]"] - TS3["TitleSummary N..."] - end - - subgraph TitleGen["Title Generation"] - T1["Extract .title from each TitleSummary"] - T2["Concatenate: '- Q1 Budget\n- Product Launch\n...'"] - T3["LLM: TITLE_PROMPT\n'Generate concise title from topic titles'"] - T4["Output: FinalTitle"] - - T1 --> T2 --> T3 --> T4 - end - - subgraph SummaryGen["Summary Generation"] - direction TB - - subgraph Reconstruct["1. Reconstruct Full Transcript"] - S1["For each TitleSummary.transcript.as_segments()"] - S2["Map speaker ID → name"] - S3["Build: 'Alice: hello\nBob: hi\n...'"] - S1 --> S2 --> S3 - end - - subgraph Subjects["2. Extract Subjects - LLM call #1"] - S4["LLM: SUBJECTS_PROMPT\n'Main high-level topics? Max 6'"] - S5["subjects[] = ['Budget Review', ...]"] - S4 --> S5 - end - - subgraph DetailedSum["3. Per-Subject Summary - LLM calls #2 to #(1+2M)"] - S6["For each subject:"] - S7["LLM: DETAILED_SUBJECT_PROMPT\n'Info about subject: decisions, actions...'"] - S8["detailed_response - NOT STORED"] - S9["LLM: PARAGRAPH_SUMMARY_PROMPT\n'Summarize in 1 paragraph'"] - S10["paragraph → summaries[]"] - - S6 --> S7 --> S8 --> S9 --> S10 - end - - subgraph Recap["4. Generate Recap - LLM call #(2+2M)"] - S11["Concatenate paragraph summaries"] - S12["LLM: RECAP_PROMPT\n'High-level recap, 1 paragraph'"] - S13["recap"] - S11 --> S12 --> S13 - end - - subgraph Output["5. Output"] - S14["long_summary = markdown:\n# Quick recap\n[recap]\n# Summary\n**Subject 1**\n[para1]..."] - S15["short_summary = recap only"] - S14 --> S15 - end - - Reconstruct --> Subjects --> DetailedSum --> Recap --> Output - end - - Input --> TitleGen - Input --> SummaryGen -``` - -### topics[] vs subjects[] - -| | topics[] | subjects[] | -|-|----------|------------| -| **Source** | 300-word chunk splitting | LLM extraction from full text | -| **Count** | Variable (words / 300) | Max 6 | -| **Purpose** | Timeline segmentation | Summary structure | -| **Has timestamp?** | Yes | No | - -## External API Calls Summary - -### 1. Daily.co REST API (called during initialization) - -| Endpoint | Method | When | Purpose | -|----------|--------|------|---------| -| `GET /recordings/{recording_id}` | GET | After webhook | Get mtgSessionId for participant lookup | -| `GET /meetings/{mtgSessionId}/participants` | GET | After above | Map participant_id → user_name | - -### 2. GPU Service (Modal.com or Self-Hosted) - -| Endpoint | Method | Count | Request | -|----------|--------|-------|---------| -| `{TRANSCRIPT_URL}/v1/audio/transcriptions-from-url` | POST | **N** (N = num tracks) | `{audio_file_url, language, batch: true}` | - -**Note**: Diarization is NOT called for multitrack - speaker identification comes from separate tracks. - -### 3. LLM Service (OpenAI-compatible via LlamaIndex) - -| Phase | Operation | Input | LLM Calls | Output | -|-------|-----------|-------|-----------|--------| -| Topic Detection | TOPIC_PROMPT per 300-word chunk | words[i:i+300] | **C** = ceil(words/300) | TitleSummary{title, summary, timestamp} | -| Title Generation | TITLE_PROMPT | All topic titles joined | **1** | FinalTitle | -| Participant ID | PARTICIPANTS_PROMPT | Full transcript | **0-1** (skipped if known) | ParticipantsResponse | -| Subject Extraction | SUBJECTS_PROMPT | Full transcript | **1** | SubjectsResponse{subjects[]} | -| Subject Detail | DETAILED_SUBJECT_PROMPT | Full transcript + subject name | **M** (M = subjects, max 6) | detailed text (discarded) | -| Subject Paragraph | PARAGRAPH_SUMMARY_PROMPT | Detailed text | **M** | paragraph text → summaries[] | -| Recap | RECAP_PROMPT | All paragraph summaries | **1** | recap text | - -**Total LLM calls**: C + 2M + 3 (+ 1 if participants unknown) -- Short meeting (1000 words, 3 subjects): ~4 + 6 + 3 = **13 calls** -- Long meeting (5000 words, 6 subjects): ~17 + 12 + 3 = **32 calls** - -## S3 Operations Summary - -### Source Bucket: `DAILYCO_STORAGE_AWS_BUCKET_NAME` -Daily.co uploads raw-tracks recordings here. - -| Operation | Key Pattern | When | -|-----------|-------------|------| -| **READ** (presign) | `{domain}/{room_name}/{ts}/{participant_id}-cam-audio-{ts}.webm` | Track acquisition | -| **DELETE** | Same as above | Consent denied cleanup | - -### Transcript Storage Bucket: `TRANSCRIPT_STORAGE_AWS_BUCKET_NAME` -Reflector's own storage. - -| Operation | Key Pattern | When | -|-----------|-------------|------| -| **PUT** | `file_pipeline/{transcript_id}/tracks/padded_{idx}.webm` | After track padding | -| **READ** (presign) | Same | For GPU transcription | -| **DELETE** | Same | After transcription complete | -| **PUT** | `{transcript_id}/audio.mp3` | After mixdown | -| **DELETE** | Same | Consent denied cleanup | - -## Database Operations - -### Tables Written - -| Table | Operation | When | -|-------|-----------|------| -| `recording` | INSERT | Initialization | -| `transcript` | INSERT | Initialization | -| `transcript` | UPDATE (participants) | After Daily API participant fetch | -| `transcript` | UPDATE (status, events, duration, topics, title, summaries, etc.) | Throughout pipeline | - -### Transcript Update Sequence - -``` -1. INSERT: id, name, status='idle', source_kind='room', user_id, recording_id, room_id, meeting_id -2. UPDATE: participants[] (speaker index → participant name mapping) -3. UPDATE: status='processing', events+=[{event:'STATUS', data:{value:'processing'}}] -4. UPDATE: duration=X, events+=[{event:'DURATION', data:{duration:X}}] -5. UPDATE: audio_location='storage' -6. UPDATE: events+=[{event:'WAVEFORM', data:{waveform:[...]}}] -7. UPDATE: events+=[{event:'TRANSCRIPT', data:{text, translation}}] -8. UPDATE: topics[]+=topic, events+=[{event:'TOPIC'}] -- repeated per chunk -9. UPDATE: title=X, events+=[{event:'FINAL_TITLE'}] -10. UPDATE: long_summary=X, events+=[{event:'FINAL_LONG_SUMMARY'}] -11. UPDATE: short_summary=X, events+=[{event:'FINAL_SHORT_SUMMARY'}] -12. UPDATE: status='ended', events+=[{event:'STATUS', data:{value:'ended'}}] -13. UPDATE: zulip_message_id=X -- if Zulip enabled -14. UPDATE: audio_deleted=true -- if consent denied -``` - -## WebSocket Events - -All broadcast to room `ts:{transcript_id}`: - -| Event | Payload | Trigger | -|-------|---------|---------| -| STATUS | `{value: "processing"\|"ended"\|"error"}` | Status transitions | -| DURATION | `{duration: float}` | After audio processing | -| WAVEFORM | `{waveform: float[]}` | After waveform generation | -| TRANSCRIPT | `{text: string, translation: string\|null}` | After transcription merge | -| TOPIC | `{id, title, summary, timestamp, duration, transcript, words}` | Per topic detected | -| FINAL_TITLE | `{title: string}` | After LLM title generation | -| FINAL_LONG_SUMMARY | `{long_summary: string}` | After LLM summary | -| FINAL_SHORT_SUMMARY | `{short_summary: string}` | After LLM recap | - -User-room broadcasts to `user:{user_id}`: -- `TRANSCRIPT_STATUS` -- `TRANSCRIPT_FINAL_TITLE` -- `TRANSCRIPT_DURATION` +Documentation coming soon. \ No newline at end of file diff --git a/server/docs/daily_pipeline.md b/server/docs/daily_pipeline.md new file mode 100644 index 00000000..dab9263e --- /dev/null +++ b/server/docs/daily_pipeline.md @@ -0,0 +1,421 @@ +# Daily.co pipeline + +This document details every external call, storage operation, and database write that occurs when a new Daily.co recording is discovered. +It includes a bunch of common logic that other pipelines use, therefore not everything is Daily-oriented. + +**The doc was generated at 12.12.2025 and things may have changed since.** + +## Trigger + +Two entry points, both converging to the same handler: + +1. **Webhook**: Daily.co sends `POST /v1/daily/webhook` with `recording.ready-to-download` +2. **Polling**: `GET /recordings` (paginated, max 100/call) → filter new → convert to same payload format + +Both produce `RecordingReadyPayload` and call `handleRecordingReady(payload)`. + +``` +┌─────────────────┐ ┌──────────────────────────┐ +│ Daily Webhook │────▶│ RecordingReadyPayload │ +│ (push) │ │ {room_name, recording_id│ +└─────────────────┘ │ tracks[], ...} │ + └────────────┬─────────────┘ +┌─────────────────┐ │ +│ GET /recordings│ ▼ +│ (poll) │────▶ convert ──▶ handleRecordingReady() +└─────────────────┘ │ + ▼ + ┌────────────────────────┐ + │ process_multitrack_ │ + │ recording pipeline │ + └────────────────────────┘ +``` + +**Polling API**: `GET https://api.daily.co/v1/recordings` +- Pagination: `limit` (max 100), `starting_after`, `ending_before` +- Rate limit: ~2 req/sec +- Response: `{total_count, data: Recording[]}` + +```mermaid +flowchart TB + subgraph Trigger["1. Recording Discovery - Daily.co Webhook"] + DAILY_WEBHOOK["Daily.co sends POST /v1/daily/webhook
type: recording.ready-to-download"] + VERIFY["Verify X-Webhook-Signature (HMAC)"] + PARSE["Parse DailyWebhookEvent
Extract tracks[], room_name, recording_id"] + FILTER["Filter audio tracks only
track_keys = [t.s3Key for t in tracks if t.type == 'audio']"] + DISPATCH["process_multitrack_recording.delay()"] + + DAILY_WEBHOOK --> VERIFY --> PARSE --> FILTER --> DISPATCH + end + + subgraph Init["2. Recording Initialization"] + FETCH_MEETING[DB READ: meetings_controller.get_by_room_name] + FETCH_ROOM[DB READ: rooms_controller.get_by_name] + DAILY_API_REC[Daily API: GET /recordings/recording_id] + DAILY_API_PART[Daily API: GET /meetings/mtgSessionId/participants] + CREATE_RECORDING[DB WRITE: recordings_controller.create] + CREATE_TRANSCRIPT[DB WRITE: transcripts_controller.add] + MAP_PARTICIPANTS[DB WRITE: transcript.participants upsert] + end + + subgraph Pipeline["3. Processing Pipeline"] + direction TB + PAD[Track Padding & Mixdown] + TRANSCRIBE[GPU: Transcription per track] + TOPICS[LLM: Topic Detection] + TITLE[LLM: Title Generation] + SUMMARY[LLM: Summary Generation] + end + + subgraph Storage["4. S3 Operations"] + S3_PRESIGN[S3: generate_presigned_url for tracks] + S3_UPLOAD_PADDED[S3 UPLOAD: padded tracks temp] + S3_UPLOAD_MP3[S3 UPLOAD: audio.mp3] + S3_DELETE_TEMP[S3 DELETE: cleanup temp files] + end + + subgraph PostProcess["5. Post-Processing"] + CONSENT[Consent check & cleanup] + ZULIP[Zulip: send/update message] + WEBHOOK_OUT[Webhook: POST to room.webhook_url] + end + + Trigger --> Init --> Pipeline + Pipeline --> Storage + Pipeline --> PostProcess +``` + +## Detailed Sequence: Daily.co Multitrack Recording + +```mermaid +sequenceDiagram + participant DailyCo as Daily.co + participant API as FastAPI /v1/daily/webhook + participant Worker as Celery Worker + participant DB as PostgreSQL + participant DailyAPI as Daily.co REST API + participant S3 as AWS S3 + participant GPU as Modal.com GPU + participant LLM as LLM Service + participant WS as WebSocket + participant Zulip as Zulip + participant ExtWH as External Webhook + + Note over DailyCo,API: Phase 0: Webhook Receipt + DailyCo->>API: POST /v1/daily/webhook + Note right of DailyCo: X-Webhook-Signature, X-Webhook-Timestamp + API->>API: verify_webhook_signature() + API->>API: Extract audio track s3Keys from payload.tracks[] + API->>Worker: process_multitrack_recording.delay() + API-->>DailyCo: 200 OK + + Note over Worker,DailyAPI: Phase 1: Recording Initialization + Worker->>DB: SELECT meeting WHERE room_name=? + Worker->>DB: SELECT room WHERE name=? + Worker->>DailyAPI: GET /recordings/{recording_id} + DailyAPI-->>Worker: {mtgSessionId, ...} + Worker->>DailyAPI: GET /meetings/{mtgSessionId}/participants + DailyAPI-->>Worker: [{participant_id, user_name}, ...] + Worker->>DB: INSERT INTO recording + Worker->>DB: INSERT INTO transcript (status='idle') + loop For each track_key (parse participant_id from filename) + Worker->>DB: UPSERT transcript.participants[speaker=idx, name=X] + end + + Note over Worker,S3: Phase 2: Track Padding + Worker->>DB: UPDATE transcript SET status='processing' + Worker->>WS: broadcast STATUS='processing' + loop For each track in track_keys (N tracks) + Worker->>S3: generate_presigned_url(track_key, DAILYCO_BUCKET) + S3-->>Worker: presigned_url (2hr) + Note over Worker: PyAV: read WebM, extract start_time + Note over Worker: PyAV: adelay filter (pad silence) + Worker->>S3: PUT file_pipeline/{id}/tracks/padded_{idx}.webm + Worker->>S3: generate_presigned_url(padded_{idx}.webm) + end + + Note over Worker,S3: Phase 3: Audio Mixdown + Note over Worker: PyAV: amix filter → stereo MP3 + Worker->>DB: UPDATE transcript SET duration=X + Worker->>WS: broadcast DURATION + Worker->>S3: PUT {transcript_id}/audio.mp3 + Worker->>DB: UPDATE transcript SET audio_location='storage' + + Note over Worker: Phase 4: Waveform + Note over Worker: Generate peaks from MP3 + Worker->>DB: UPDATE events+=WAVEFORM + Worker->>WS: broadcast WAVEFORM + + Note over Worker,GPU: Phase 5: Transcription (N GPU calls) + loop For each padded track URL (N tracks) + Worker->>GPU: POST /v1/audio/transcriptions-from-url + Note right of GPU: {audio_file_url, language, batch:true} + GPU-->>Worker: {words: [{word, start, end}, ...]} + Note over Worker: Assign speaker=track_idx to words + end + Note over Worker: Merge all words, sort by start time + Worker->>DB: UPDATE events+=TRANSCRIPT + Worker->>WS: broadcast TRANSCRIPT + + Note over Worker,S3: Cleanup temp files + loop For each padded file + Worker->>S3: DELETE padded_{idx}.webm + end + + Note over Worker,LLM: Phase 6: Topic Detection (C LLM calls) + Note over Worker: C = ceil(total_words / 300) + loop For each 300-word chunk (C chunks) + Worker->>LLM: TOPIC_PROMPT + words[i:i+300] + Note right of LLM: "Extract main topic title + 2-sentence summary" + LLM-->>Worker: TitleSummary{title, summary} + Worker->>DB: UPSERT topics[] + Worker->>DB: UPDATE events+=TOPIC + Worker->>WS: broadcast TOPIC + end + + Note over Worker,LLM: Phase 7a: Title Generation (1 LLM call) + Note over Worker: Input: all TitleSummary[].title joined + Worker->>LLM: TITLE_PROMPT + Note right of LLM: "Generate concise title from topic titles" + LLM-->>Worker: "Meeting Title" + Worker->>DB: UPDATE transcript SET title=X + Worker->>DB: UPDATE events+=FINAL_TITLE + Worker->>WS: broadcast FINAL_TITLE + + Note over Worker,LLM: Phase 7b: Summary Generation (2+2M LLM calls) + Note over Worker: Reconstruct full transcript from TitleSummary[].transcript + opt If participants unknown + Worker->>LLM: PARTICIPANTS_PROMPT + LLM-->>Worker: ParticipantsResponse + end + Worker->>LLM: SUBJECTS_PROMPT (call #1) + Note right of LLM: "Main high-level topics? Max 6" + LLM-->>Worker: SubjectsResponse{subjects: ["A", "B", ...]} + + loop For each subject (M subjects, max 6) + Worker->>LLM: DETAILED_SUBJECT_PROMPT (call #2..#1+M) + Note right of LLM: "Info about 'A': decisions, actions, deadlines" + LLM-->>Worker: detailed_response (discarded after next call) + Worker->>LLM: PARAGRAPH_SUMMARY_PROMPT (call #2+M..#1+2M) + Note right of LLM: "Summarize in 1 paragraph" + LLM-->>Worker: paragraph → summaries[] + end + + Worker->>LLM: RECAP_PROMPT (call #2+2M) + Note right of LLM: "High-level quick recap, 1 paragraph" + LLM-->>Worker: recap + Note over Worker: long_summary = "# Quick recap\n{recap}\n# Summary\n**A**\n{para1}..." + Note over Worker: short_summary = recap only + Worker->>DB: UPDATE long_summary, short_summary + Worker->>DB: UPDATE events+=FINAL_LONG_SUMMARY + Worker->>WS: broadcast FINAL_LONG_SUMMARY + Worker->>DB: UPDATE events+=FINAL_SHORT_SUMMARY + Worker->>WS: broadcast FINAL_SHORT_SUMMARY + + Note over Worker,DB: Phase 8: Finalize + Worker->>DB: UPDATE transcript SET status='ended' + Worker->>DB: UPDATE events+=STATUS + Worker->>WS: broadcast STATUS='ended' + + Note over Worker,ExtWH: Phase 9: Post-Processing Chain + Worker->>DB: SELECT meeting_consent WHERE meeting_id=? + alt Any consent denied + Worker->>S3: DELETE tracks from DAILYCO_BUCKET + Worker->>S3: DELETE audio.mp3 from TRANSCRIPT_BUCKET + Worker->>DB: UPDATE transcript SET audio_deleted=true + end + + opt Room has zulip_auto_post=true + alt Existing zulip_message_id + Worker->>Zulip: PATCH /api/v1/messages/{id} + else New + Worker->>Zulip: POST /api/v1/messages + Zulip-->>Worker: {id} + Worker->>DB: UPDATE transcript SET zulip_message_id=X + end + end + + opt Room has webhook_url + Worker->>ExtWH: POST {webhook_url} + Note right of ExtWH: X-Webhook-Signature: HMAC-SHA256 + Note right of ExtWH: Body: {transcript_id, room_id, ...} + end +``` + +## Title & Summary Generation Data Flow + +```mermaid +flowchart TB + subgraph Input["Input: TitleSummary[] from Topic Detection"] + TS1["TitleSummary 1
title: 'Q1 Budget'
transcript: words[0:300]"] + TS2["TitleSummary 2
title: 'Product Launch'
transcript: words[300:600]"] + TS3["TitleSummary N..."] + end + + subgraph TitleGen["Title Generation"] + T1["Extract .title from each TitleSummary"] + T2["Concatenate: '- Q1 Budget\n- Product Launch\n...'"] + T3["LLM: TITLE_PROMPT\n'Generate concise title from topic titles'"] + T4["Output: FinalTitle"] + + T1 --> T2 --> T3 --> T4 + end + + subgraph SummaryGen["Summary Generation"] + direction TB + + subgraph Reconstruct["1. Reconstruct Full Transcript"] + S1["For each TitleSummary.transcript.as_segments()"] + S2["Map speaker ID → name"] + S3["Build: 'Alice: hello\nBob: hi\n...'"] + S1 --> S2 --> S3 + end + + subgraph Subjects["2. Extract Subjects - LLM call #1"] + S4["LLM: SUBJECTS_PROMPT\n'Main high-level topics? Max 6'"] + S5["subjects[] = ['Budget Review', ...]"] + S4 --> S5 + end + + subgraph DetailedSum["3. Per-Subject Summary - LLM calls #2 to #(1+2M)"] + S6["For each subject:"] + S7["LLM: DETAILED_SUBJECT_PROMPT\n'Info about subject: decisions, actions...'"] + S8["detailed_response - NOT STORED"] + S9["LLM: PARAGRAPH_SUMMARY_PROMPT\n'Summarize in 1 paragraph'"] + S10["paragraph → summaries[]"] + + S6 --> S7 --> S8 --> S9 --> S10 + end + + subgraph Recap["4. Generate Recap - LLM call #(2+2M)"] + S11["Concatenate paragraph summaries"] + S12["LLM: RECAP_PROMPT\n'High-level recap, 1 paragraph'"] + S13["recap"] + S11 --> S12 --> S13 + end + + subgraph Output["5. Output"] + S14["long_summary = markdown:\n# Quick recap\n[recap]\n# Summary\n**Subject 1**\n[para1]..."] + S15["short_summary = recap only"] + S14 --> S15 + end + + Reconstruct --> Subjects --> DetailedSum --> Recap --> Output + end + + Input --> TitleGen + Input --> SummaryGen +``` + +### topics[] vs subjects[] + +| | topics[] | subjects[] | +|-|----------|------------| +| **Source** | 300-word chunk splitting | LLM extraction from full text | +| **Count** | Variable (words / 300) | Max 6 | +| **Purpose** | Timeline segmentation | Summary structure | +| **Has timestamp?** | Yes | No | + +## External API Calls Summary + +### 1. Daily.co REST API (called during initialization) + +| Endpoint | Method | When | Purpose | +|----------|--------|------|---------| +| `GET /recordings/{recording_id}` | GET | After webhook | Get mtgSessionId for participant lookup | +| `GET /meetings/{mtgSessionId}/participants` | GET | After above | Map participant_id → user_name | + +### 2. GPU Service (Modal.com or Self-Hosted) + +| Endpoint | Method | Count | Request | +|----------|--------|-------|---------| +| `{TRANSCRIPT_URL}/v1/audio/transcriptions-from-url` | POST | **N** (N = num tracks) | `{audio_file_url, language, batch: true}` | + +**Note**: Diarization is NOT called for multitrack - speaker identification comes from separate tracks. + +### 3. LLM Service (OpenAI-compatible via LlamaIndex) + +| Phase | Operation | Input | LLM Calls | Output | +|-------|-----------|-------|-----------|--------| +| Topic Detection | TOPIC_PROMPT per 300-word chunk | words[i:i+300] | **C** = ceil(words/300) | TitleSummary{title, summary, timestamp} | +| Title Generation | TITLE_PROMPT | All topic titles joined | **1** | FinalTitle | +| Participant ID | PARTICIPANTS_PROMPT | Full transcript | **0-1** (skipped if known) | ParticipantsResponse | +| Subject Extraction | SUBJECTS_PROMPT | Full transcript | **1** | SubjectsResponse{subjects[]} | +| Subject Detail | DETAILED_SUBJECT_PROMPT | Full transcript + subject name | **M** (M = subjects, max 6) | detailed text (discarded) | +| Subject Paragraph | PARAGRAPH_SUMMARY_PROMPT | Detailed text | **M** | paragraph text → summaries[] | +| Recap | RECAP_PROMPT | All paragraph summaries | **1** | recap text | + +**Total LLM calls**: C + 2M + 3 (+ 1 if participants unknown) +- Short meeting (1000 words, 3 subjects): ~4 + 6 + 3 = **13 calls** +- Long meeting (5000 words, 6 subjects): ~17 + 12 + 3 = **32 calls** + +## S3 Operations Summary + +### Source Bucket: `DAILYCO_STORAGE_AWS_BUCKET_NAME` +Daily.co uploads raw-tracks recordings here. + +| Operation | Key Pattern | When | +|-----------|-------------|------| +| **READ** (presign) | `{domain}/{room_name}/{ts}/{participant_id}-cam-audio-{ts}.webm` | Track acquisition | +| **DELETE** | Same as above | Consent denied cleanup | + +### Transcript Storage Bucket: `TRANSCRIPT_STORAGE_AWS_BUCKET_NAME` +Reflector's own storage. + +| Operation | Key Pattern | When | +|-----------|-------------|------| +| **PUT** | `file_pipeline/{transcript_id}/tracks/padded_{idx}.webm` | After track padding | +| **READ** (presign) | Same | For GPU transcription | +| **DELETE** | Same | After transcription complete | +| **PUT** | `{transcript_id}/audio.mp3` | After mixdown | +| **DELETE** | Same | Consent denied cleanup | + +## Database Operations + +### Tables Written + +| Table | Operation | When | +|-------|-----------|------| +| `recording` | INSERT | Initialization | +| `transcript` | INSERT | Initialization | +| `transcript` | UPDATE (participants) | After Daily API participant fetch | +| `transcript` | UPDATE (status, events, duration, topics, title, summaries, etc.) | Throughout pipeline | + +### Transcript Update Sequence + +``` +1. INSERT: id, name, status='idle', source_kind='room', user_id, recording_id, room_id, meeting_id +2. UPDATE: participants[] (speaker index → participant name mapping) +3. UPDATE: status='processing', events+=[{event:'STATUS', data:{value:'processing'}}] +4. UPDATE: duration=X, events+=[{event:'DURATION', data:{duration:X}}] +5. UPDATE: audio_location='storage' +6. UPDATE: events+=[{event:'WAVEFORM', data:{waveform:[...]}}] +7. UPDATE: events+=[{event:'TRANSCRIPT', data:{text, translation}}] +8. UPDATE: topics[]+=topic, events+=[{event:'TOPIC'}] -- repeated per chunk +9. UPDATE: title=X, events+=[{event:'FINAL_TITLE'}] +10. UPDATE: long_summary=X, events+=[{event:'FINAL_LONG_SUMMARY'}] +11. UPDATE: short_summary=X, events+=[{event:'FINAL_SHORT_SUMMARY'}] +12. UPDATE: status='ended', events+=[{event:'STATUS', data:{value:'ended'}}] +13. UPDATE: zulip_message_id=X -- if Zulip enabled +14. UPDATE: audio_deleted=true -- if consent denied +``` + +## WebSocket Events + +All broadcast to room `ts:{transcript_id}`: + +| Event | Payload | Trigger | +|-------|---------|---------| +| STATUS | `{value: "processing"\|"ended"\|"error"}` | Status transitions | +| DURATION | `{duration: float}` | After audio processing | +| WAVEFORM | `{waveform: float[]}` | After waveform generation | +| TRANSCRIPT | `{text: string, translation: string\|null}` | After transcription merge | +| TOPIC | `{id, title, summary, timestamp, duration, transcript, words}` | Per topic detected | +| FINAL_TITLE | `{title: string}` | After LLM title generation | +| FINAL_LONG_SUMMARY | `{long_summary: string}` | After LLM summary | +| FINAL_SHORT_SUMMARY | `{short_summary: string}` | After LLM recap | + +User-room broadcasts to `user:{user_id}`: +- `TRANSCRIPT_STATUS` +- `TRANSCRIPT_FINAL_TITLE` +- `TRANSCRIPT_DURATION`