diff --git a/docs/Dockerfile b/docs/Dockerfile index 03898c6c..78267a29 100644 --- a/docs/Dockerfile +++ b/docs/Dockerfile @@ -1,15 +1,22 @@ FROM node:18-alpine AS builder WORKDIR /app +# Install curl for fetching OpenAPI spec +RUN apk add --no-cache curl + # Copy package files COPY package*.json ./ # Install dependencies RUN npm ci -# Copy source (includes static/openapi.json if pre-fetched) +# Copy source COPY . . +# Fetch OpenAPI spec from production API +ARG OPENAPI_URL=https://api-reflector.monadical.com/openapi.json +RUN mkdir -p ./static && curl -sf "${OPENAPI_URL}" -o ./static/openapi.json || echo '{}' > ./static/openapi.json + # Fix docusaurus config: change onBrokenLinks to 'warn' for Docker build RUN sed -i "s/onBrokenLinks: 'throw'/onBrokenLinks: 'warn'/g" docusaurus.config.ts diff --git a/docs/docs/pipelines/live-pipeline.md b/docs/docs/pipelines/live-pipeline.md index 8fbb95e5..66854fb4 100644 --- a/docs/docs/pipelines/live-pipeline.md +++ b/docs/docs/pipelines/live-pipeline.md @@ -1,7 +1,424 @@ --- -title: live pipeline +title: Live pipeline --- -# live pipeline +# Live pipeline -Documentation coming soon. See [TODO.md](/docs/TODO) for required information. +# Daily.co + +This document details every external call, storage operation, and database write that occurs when a new Daily.co recording is discovered. + +## Trigger + +Two entry points, both converging to the same handler: + +1. **Webhook**: Daily.co sends `POST /v1/daily/webhook` with `recording.ready-to-download` +2. **Polling**: `GET /recordings` (paginated, max 100/call) → filter new → convert to same payload format + +Both produce `RecordingReadyPayload` and call `handleRecordingReady(payload)`. + +``` +┌─────────────────┐ ┌──────────────────────────┐ +│ Daily Webhook │────▶│ RecordingReadyPayload │ +│ (push) │ │ {room_name, recording_id│ +└─────────────────┘ │ tracks[], ...} │ + └────────────┬─────────────┘ +┌─────────────────┐ │ +│ GET /recordings│ ▼ +│ (poll) │────▶ convert ──▶ handleRecordingReady() +└─────────────────┘ │ + ▼ + ┌────────────────────────┐ + │ process_multitrack_ │ + │ recording pipeline │ + └────────────────────────┘ +``` + +**Polling API**: `GET https://api.daily.co/v1/recordings` +- Pagination: `limit` (max 100), `starting_after`, `ending_before` +- Rate limit: ~2 req/sec +- Response: `{total_count, data: Recording[]}` + +```mermaid +flowchart TB + subgraph Trigger["1. Recording Discovery - Daily.co Webhook"] + DAILY_WEBHOOK["Daily.co sends POST /v1/daily/webhook
type: recording.ready-to-download"] + VERIFY["Verify X-Webhook-Signature (HMAC)"] + PARSE["Parse DailyWebhookEvent
Extract tracks[], room_name, recording_id"] + FILTER["Filter audio tracks only
track_keys = [t.s3Key for t in tracks if t.type == 'audio']"] + DISPATCH["process_multitrack_recording.delay()"] + + DAILY_WEBHOOK --> VERIFY --> PARSE --> FILTER --> DISPATCH + end + + subgraph Init["2. Recording Initialization"] + FETCH_MEETING[DB READ: meetings_controller.get_by_room_name] + FETCH_ROOM[DB READ: rooms_controller.get_by_name] + DAILY_API_REC[Daily API: GET /recordings/recording_id] + DAILY_API_PART[Daily API: GET /meetings/mtgSessionId/participants] + CREATE_RECORDING[DB WRITE: recordings_controller.create] + CREATE_TRANSCRIPT[DB WRITE: transcripts_controller.add] + MAP_PARTICIPANTS[DB WRITE: transcript.participants upsert] + end + + subgraph Pipeline["3. Processing Pipeline"] + direction TB + PAD[Track Padding & Mixdown] + TRANSCRIBE[GPU: Transcription per track] + TOPICS[LLM: Topic Detection] + TITLE[LLM: Title Generation] + SUMMARY[LLM: Summary Generation] + end + + subgraph Storage["4. S3 Operations"] + S3_PRESIGN[S3: generate_presigned_url for tracks] + S3_UPLOAD_PADDED[S3 UPLOAD: padded tracks temp] + S3_UPLOAD_MP3[S3 UPLOAD: audio.mp3] + S3_DELETE_TEMP[S3 DELETE: cleanup temp files] + end + + subgraph PostProcess["5. Post-Processing"] + CONSENT[Consent check & cleanup] + ZULIP[Zulip: send/update message] + WEBHOOK_OUT[Webhook: POST to room.webhook_url] + end + + Trigger --> Init --> Pipeline + Pipeline --> Storage + Pipeline --> PostProcess +``` + +## Detailed Sequence: Daily.co Multitrack Recording + +```mermaid +sequenceDiagram + participant DailyCo as Daily.co + participant API as FastAPI /v1/daily/webhook + participant Worker as Celery Worker + participant DB as PostgreSQL + participant DailyAPI as Daily.co REST API + participant S3 as AWS S3 + participant GPU as Modal.com GPU + participant LLM as LLM Service + participant WS as WebSocket + participant Zulip as Zulip + participant ExtWH as External Webhook + + Note over DailyCo,API: Phase 0: Webhook Receipt + DailyCo->>API: POST /v1/daily/webhook + Note right of DailyCo: X-Webhook-Signature, X-Webhook-Timestamp + API->>API: verify_webhook_signature() + API->>API: Extract audio track s3Keys from payload.tracks[] + API->>Worker: process_multitrack_recording.delay() + API-->>DailyCo: 200 OK + + Note over Worker,DailyAPI: Phase 1: Recording Initialization + Worker->>DB: SELECT meeting WHERE room_name=? + Worker->>DB: SELECT room WHERE name=? + Worker->>DailyAPI: GET /recordings/{recording_id} + DailyAPI-->>Worker: {mtgSessionId, ...} + Worker->>DailyAPI: GET /meetings/{mtgSessionId}/participants + DailyAPI-->>Worker: [{participant_id, user_name}, ...] + Worker->>DB: INSERT INTO recording + Worker->>DB: INSERT INTO transcript (status='idle') + loop For each track_key (parse participant_id from filename) + Worker->>DB: UPSERT transcript.participants[speaker=idx, name=X] + end + + Note over Worker,S3: Phase 2: Track Padding + Worker->>DB: UPDATE transcript SET status='processing' + Worker->>WS: broadcast STATUS='processing' + loop For each track in track_keys (N tracks) + Worker->>S3: generate_presigned_url(track_key, DAILYCO_BUCKET) + S3-->>Worker: presigned_url (2hr) + Note over Worker: PyAV: read WebM, extract start_time + Note over Worker: PyAV: adelay filter (pad silence) + Worker->>S3: PUT file_pipeline/{id}/tracks/padded_{idx}.webm + Worker->>S3: generate_presigned_url(padded_{idx}.webm) + end + + Note over Worker,S3: Phase 3: Audio Mixdown + Note over Worker: PyAV: amix filter → stereo MP3 + Worker->>DB: UPDATE transcript SET duration=X + Worker->>WS: broadcast DURATION + Worker->>S3: PUT {transcript_id}/audio.mp3 + Worker->>DB: UPDATE transcript SET audio_location='storage' + + Note over Worker: Phase 4: Waveform + Note over Worker: Generate peaks from MP3 + Worker->>DB: UPDATE events+=WAVEFORM + Worker->>WS: broadcast WAVEFORM + + Note over Worker,GPU: Phase 5: Transcription (N GPU calls) + loop For each padded track URL (N tracks) + Worker->>GPU: POST /v1/audio/transcriptions-from-url + Note right of GPU: {audio_file_url, language, batch:true} + GPU-->>Worker: {words: [{word, start, end}, ...]} + Note over Worker: Assign speaker=track_idx to words + end + Note over Worker: Merge all words, sort by start time + Worker->>DB: UPDATE events+=TRANSCRIPT + Worker->>WS: broadcast TRANSCRIPT + + Note over Worker,S3: Cleanup temp files + loop For each padded file + Worker->>S3: DELETE padded_{idx}.webm + end + + Note over Worker,LLM: Phase 6: Topic Detection (C LLM calls) + Note over Worker: C = ceil(total_words / 300) + loop For each 300-word chunk (C chunks) + Worker->>LLM: TOPIC_PROMPT + words[i:i+300] + Note right of LLM: "Extract main topic title + 2-sentence summary" + LLM-->>Worker: TitleSummary{title, summary} + Worker->>DB: UPSERT topics[] + Worker->>DB: UPDATE events+=TOPIC + Worker->>WS: broadcast TOPIC + end + + Note over Worker,LLM: Phase 7a: Title Generation (1 LLM call) + Note over Worker: Input: all TitleSummary[].title joined + Worker->>LLM: TITLE_PROMPT + Note right of LLM: "Generate concise title from topic titles" + LLM-->>Worker: "Meeting Title" + Worker->>DB: UPDATE transcript SET title=X + Worker->>DB: UPDATE events+=FINAL_TITLE + Worker->>WS: broadcast FINAL_TITLE + + Note over Worker,LLM: Phase 7b: Summary Generation (2+2M LLM calls) + Note over Worker: Reconstruct full transcript from TitleSummary[].transcript + opt If participants unknown + Worker->>LLM: PARTICIPANTS_PROMPT + LLM-->>Worker: ParticipantsResponse + end + Worker->>LLM: SUBJECTS_PROMPT (call #1) + Note right of LLM: "Main high-level topics? Max 6" + LLM-->>Worker: SubjectsResponse{subjects: ["A", "B", ...]} + + loop For each subject (M subjects, max 6) + Worker->>LLM: DETAILED_SUBJECT_PROMPT (call #2..#1+M) + Note right of LLM: "Info about 'A': decisions, actions, deadlines" + LLM-->>Worker: detailed_response (discarded after next call) + Worker->>LLM: PARAGRAPH_SUMMARY_PROMPT (call #2+M..#1+2M) + Note right of LLM: "Summarize in 1 paragraph" + LLM-->>Worker: paragraph → summaries[] + end + + Worker->>LLM: RECAP_PROMPT (call #2+2M) + Note right of LLM: "High-level quick recap, 1 paragraph" + LLM-->>Worker: recap + Note over Worker: long_summary = "# Quick recap\n{recap}\n# Summary\n**A**\n{para1}..." + Note over Worker: short_summary = recap only + Worker->>DB: UPDATE long_summary, short_summary + Worker->>DB: UPDATE events+=FINAL_LONG_SUMMARY + Worker->>WS: broadcast FINAL_LONG_SUMMARY + Worker->>DB: UPDATE events+=FINAL_SHORT_SUMMARY + Worker->>WS: broadcast FINAL_SHORT_SUMMARY + + Note over Worker,DB: Phase 8: Finalize + Worker->>DB: UPDATE transcript SET status='ended' + Worker->>DB: UPDATE events+=STATUS + Worker->>WS: broadcast STATUS='ended' + + Note over Worker,ExtWH: Phase 9: Post-Processing Chain + Worker->>DB: SELECT meeting_consent WHERE meeting_id=? + alt Any consent denied + Worker->>S3: DELETE tracks from DAILYCO_BUCKET + Worker->>S3: DELETE audio.mp3 from TRANSCRIPT_BUCKET + Worker->>DB: UPDATE transcript SET audio_deleted=true + end + + opt Room has zulip_auto_post=true + alt Existing zulip_message_id + Worker->>Zulip: PATCH /api/v1/messages/{id} + else New + Worker->>Zulip: POST /api/v1/messages + Zulip-->>Worker: {id} + Worker->>DB: UPDATE transcript SET zulip_message_id=X + end + end + + opt Room has webhook_url + Worker->>ExtWH: POST {webhook_url} + Note right of ExtWH: X-Webhook-Signature: HMAC-SHA256 + Note right of ExtWH: Body: {transcript_id, room_id, ...} + end +``` + +## Title & Summary Generation Data Flow + +```mermaid +flowchart TB + subgraph Input["Input: TitleSummary[] from Topic Detection"] + TS1["TitleSummary 1
title: 'Q1 Budget'
transcript: words[0:300]"] + TS2["TitleSummary 2
title: 'Product Launch'
transcript: words[300:600]"] + TS3["TitleSummary N..."] + end + + subgraph TitleGen["Title Generation"] + T1["Extract .title from each TitleSummary"] + T2["Concatenate: '- Q1 Budget\n- Product Launch\n...'"] + T3["LLM: TITLE_PROMPT\n'Generate concise title from topic titles'"] + T4["Output: FinalTitle"] + + T1 --> T2 --> T3 --> T4 + end + + subgraph SummaryGen["Summary Generation"] + direction TB + + subgraph Reconstruct["1. Reconstruct Full Transcript"] + S1["For each TitleSummary.transcript.as_segments()"] + S2["Map speaker ID → name"] + S3["Build: 'Alice: hello\nBob: hi\n...'"] + S1 --> S2 --> S3 + end + + subgraph Subjects["2. Extract Subjects - LLM call #1"] + S4["LLM: SUBJECTS_PROMPT\n'Main high-level topics? Max 6'"] + S5["subjects[] = ['Budget Review', ...]"] + S4 --> S5 + end + + subgraph DetailedSum["3. Per-Subject Summary - LLM calls #2 to #(1+2M)"] + S6["For each subject:"] + S7["LLM: DETAILED_SUBJECT_PROMPT\n'Info about subject: decisions, actions...'"] + S8["detailed_response - NOT STORED"] + S9["LLM: PARAGRAPH_SUMMARY_PROMPT\n'Summarize in 1 paragraph'"] + S10["paragraph → summaries[]"] + + S6 --> S7 --> S8 --> S9 --> S10 + end + + subgraph Recap["4. Generate Recap - LLM call #(2+2M)"] + S11["Concatenate paragraph summaries"] + S12["LLM: RECAP_PROMPT\n'High-level recap, 1 paragraph'"] + S13["recap"] + S11 --> S12 --> S13 + end + + subgraph Output["5. Output"] + S14["long_summary = markdown:\n# Quick recap\n[recap]\n# Summary\n**Subject 1**\n[para1]..."] + S15["short_summary = recap only"] + S14 --> S15 + end + + Reconstruct --> Subjects --> DetailedSum --> Recap --> Output + end + + Input --> TitleGen + Input --> SummaryGen +``` + +### topics[] vs subjects[] + +| | topics[] | subjects[] | +|-|----------|------------| +| **Source** | 300-word chunk splitting | LLM extraction from full text | +| **Count** | Variable (words / 300) | Max 6 | +| **Purpose** | Timeline segmentation | Summary structure | +| **Has timestamp?** | Yes | No | + +## External API Calls Summary + +### 1. Daily.co REST API (called during initialization) + +| Endpoint | Method | When | Purpose | +|----------|--------|------|---------| +| `GET /recordings/{recording_id}` | GET | After webhook | Get mtgSessionId for participant lookup | +| `GET /meetings/{mtgSessionId}/participants` | GET | After above | Map participant_id → user_name | + +### 2. GPU Service (Modal.com or Self-Hosted) + +| Endpoint | Method | Count | Request | +|----------|--------|-------|---------| +| `{TRANSCRIPT_URL}/v1/audio/transcriptions-from-url` | POST | **N** (N = num tracks) | `{audio_file_url, language, batch: true}` | + +**Note**: Diarization is NOT called for multitrack - speaker identification comes from separate tracks. + +### 3. LLM Service (OpenAI-compatible via LlamaIndex) + +| Phase | Operation | Input | LLM Calls | Output | +|-------|-----------|-------|-----------|--------| +| Topic Detection | TOPIC_PROMPT per 300-word chunk | words[i:i+300] | **C** = ceil(words/300) | TitleSummary{title, summary, timestamp} | +| Title Generation | TITLE_PROMPT | All topic titles joined | **1** | FinalTitle | +| Participant ID | PARTICIPANTS_PROMPT | Full transcript | **0-1** (skipped if known) | ParticipantsResponse | +| Subject Extraction | SUBJECTS_PROMPT | Full transcript | **1** | SubjectsResponse{subjects[]} | +| Subject Detail | DETAILED_SUBJECT_PROMPT | Full transcript + subject name | **M** (M = subjects, max 6) | detailed text (discarded) | +| Subject Paragraph | PARAGRAPH_SUMMARY_PROMPT | Detailed text | **M** | paragraph text → summaries[] | +| Recap | RECAP_PROMPT | All paragraph summaries | **1** | recap text | + +**Total LLM calls**: C + 2M + 3 (+ 1 if participants unknown) +- Short meeting (1000 words, 3 subjects): ~4 + 6 + 3 = **13 calls** +- Long meeting (5000 words, 6 subjects): ~17 + 12 + 3 = **32 calls** + +## S3 Operations Summary + +### Source Bucket: `DAILYCO_STORAGE_AWS_BUCKET_NAME` +Daily.co uploads raw-tracks recordings here. + +| Operation | Key Pattern | When | +|-----------|-------------|------| +| **READ** (presign) | `{domain}/{room_name}/{ts}/{participant_id}-cam-audio-{ts}.webm` | Track acquisition | +| **DELETE** | Same as above | Consent denied cleanup | + +### Transcript Storage Bucket: `TRANSCRIPT_STORAGE_AWS_BUCKET_NAME` +Reflector's own storage. + +| Operation | Key Pattern | When | +|-----------|-------------|------| +| **PUT** | `file_pipeline/{transcript_id}/tracks/padded_{idx}.webm` | After track padding | +| **READ** (presign) | Same | For GPU transcription | +| **DELETE** | Same | After transcription complete | +| **PUT** | `{transcript_id}/audio.mp3` | After mixdown | +| **DELETE** | Same | Consent denied cleanup | + +## Database Operations + +### Tables Written + +| Table | Operation | When | +|-------|-----------|------| +| `recording` | INSERT | Initialization | +| `transcript` | INSERT | Initialization | +| `transcript` | UPDATE (participants) | After Daily API participant fetch | +| `transcript` | UPDATE (status, events, duration, topics, title, summaries, etc.) | Throughout pipeline | + +### Transcript Update Sequence + +``` +1. INSERT: id, name, status='idle', source_kind='room', user_id, recording_id, room_id, meeting_id +2. UPDATE: participants[] (speaker index → participant name mapping) +3. UPDATE: status='processing', events+=[{event:'STATUS', data:{value:'processing'}}] +4. UPDATE: duration=X, events+=[{event:'DURATION', data:{duration:X}}] +5. UPDATE: audio_location='storage' +6. UPDATE: events+=[{event:'WAVEFORM', data:{waveform:[...]}}] +7. UPDATE: events+=[{event:'TRANSCRIPT', data:{text, translation}}] +8. UPDATE: topics[]+=topic, events+=[{event:'TOPIC'}] -- repeated per chunk +9. UPDATE: title=X, events+=[{event:'FINAL_TITLE'}] +10. UPDATE: long_summary=X, events+=[{event:'FINAL_LONG_SUMMARY'}] +11. UPDATE: short_summary=X, events+=[{event:'FINAL_SHORT_SUMMARY'}] +12. UPDATE: status='ended', events+=[{event:'STATUS', data:{value:'ended'}}] +13. UPDATE: zulip_message_id=X -- if Zulip enabled +14. UPDATE: audio_deleted=true -- if consent denied +``` + +## WebSocket Events + +All broadcast to room `ts:{transcript_id}`: + +| Event | Payload | Trigger | +|-------|---------|---------| +| STATUS | `{value: "processing"\|"ended"\|"error"}` | Status transitions | +| DURATION | `{duration: float}` | After audio processing | +| WAVEFORM | `{waveform: float[]}` | After waveform generation | +| TRANSCRIPT | `{text: string, translation: string\|null}` | After transcription merge | +| TOPIC | `{id, title, summary, timestamp, duration, transcript, words}` | Per topic detected | +| FINAL_TITLE | `{title: string}` | After LLM title generation | +| FINAL_LONG_SUMMARY | `{long_summary: string}` | After LLM summary | +| FINAL_SHORT_SUMMARY | `{short_summary: string}` | After LLM recap | + +User-room broadcasts to `user:{user_id}`: +- `TRANSCRIPT_STATUS` +- `TRANSCRIPT_FINAL_TITLE` +- `TRANSCRIPT_DURATION`