diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 2143cffd..e57996cc 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,7 +1,7 @@ """Daily.co webhook handler endpoint.""" import json -from typing import Any, Dict +from typing import Any, Dict, Literal from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel @@ -13,6 +13,14 @@ from reflector.video_platforms.factory import create_platform_client router = APIRouter() +class DailyTrack(BaseModel): + """Daily.co recording track (audio or video file).""" + + type: Literal["audio", "video"] + s3Key: str + size: int + + class DailyWebhookEvent(BaseModel): """Daily webhook event structure.""" @@ -156,17 +164,28 @@ async def _handle_recording_ready(event: DailyWebhookEvent): """ room_name = _extract_room_name(event) recording_id = event.payload.get("recording_id") - tracks = event.payload.get("tracks", []) + tracks_raw = event.payload.get("tracks", []) - if not room_name or not tracks: + if not room_name or not tracks_raw: logger.warning( "recording.ready-to-download: missing room_name or tracks", room_name=room_name, - has_tracks=bool(tracks), + has_tracks=bool(tracks_raw), payload=event.payload, ) return + # Validate tracks structure + try: + tracks = [DailyTrack(**t) for t in tracks_raw] + except Exception as e: + logger.error( + "recording.ready-to-download: invalid tracks structure", + error=str(e), + tracks=tracks_raw, + ) + return + meeting = await meetings_controller.get_by_room_name(room_name) if not meeting: logger.warning( @@ -183,12 +202,14 @@ async def _handle_recording_ready(event: DailyWebhookEvent): platform="daily", ) - from reflector.worker.process import process_daily_recording + # Import at runtime to avoid circular dependency (process.py imports from daily.py) + from reflector.worker.process import process_daily_recording # noqa: PLC0415 + # Convert Pydantic models to dicts for Celery serialization process_daily_recording.delay( meeting_id=meeting.id, recording_id=recording_id or event.id, - tracks=tracks, + tracks=[t.model_dump() for t in tracks], ) diff --git a/server/reflector/worker/daily_stub_data.py b/server/reflector/worker/daily_stub_data.py index 5a3d328d..d712fa37 100644 --- a/server/reflector/worker/daily_stub_data.py +++ b/server/reflector/worker/daily_stub_data.py @@ -1,9 +1,16 @@ """Stub data for Daily.co testing - Fish conversation""" import re +from typing import Any +from reflector.processors.types import Word from reflector.utils import generate_uuid4 +# Constants for stub data generation +MIN_WORD_DURATION = 0.3 # Base duration per word in seconds +WORD_LENGTH_MULTIPLIER = 0.05 # Additional duration per character +NUM_STUB_TOPICS = 3 # Number of topics to generate + # The fish argument text - 2 speakers arguing about eating fish FISH_TEXT = """Fish for dinner are nothing wrong with you? There's nothing wrong with me. Wrong with you? Would you shut up? There's nothing wrong with me. I'm just trying to. There's nothing wrong with me. I'm trying to eat a fish. Wrong with you trying to eat a fish and it falls off the plate. Would you shut up? You're bothering me. More than a fish is bothering me. Would you shut up and leave me alone? What's your problem? I'm just trying to eat a fish is wrong with you. I'm only trying to eat a fish. Would you shut up? Wrong with you. There's nothing wrong with me. There's nothing wrong with me. Wrong with you. There's nothing wrong with me. Wrong with you. There's nothing wrong with me. Would you shut up and let me eat my fish? Wrong with you. Shut up! What is wrong with you? Would you just shut up? What's your problem? Would you shut up with you? What is wrong with you? Wrong with me? I'm just trying to get my attention. Did you shut up? You're bothering me. Would you shut up? You're beginning to bug me. What's your problem? Just trying to eat my fish. Stay on the plate. Would you shut up? Just trying to eat my fish. @@ -12,10 +19,10 @@ I'm gonna hit you with my problem. You're worse than this fish. You're more of a What is wrong with you? What's your problem? Problem? I just want to eat my fish. Wrong with you. What's wrong with you? I don't have a problem. You shut up! What's wrong with you? Just shut up! What's wrong with you? Shut up! What is wrong with you? I'm trying to eat a fish. I'm trying to eat a fish and it falls off the plate. Would you shut up? What is wrong with you? Would you shut up? Is wrong with you? Would you just shut up? What is wrong with you? Would you just shut? Is wrong with you? What's your problem? You just shut. What is wrong with you? Trying to eat my fish. Would you be quiet? What's your problem? Would you just shut up? Eat my fish. I can't even eat it. Don't stay on the plate. What's your problem? Would you shut up? What is wrong with you? What is wrong with you? Would you just shut up? What's your problem? What is wrong with you? I'm gonna hit you with my fish if you don't shut up. What's your problem? Would you shut up? What's wrong with you? What is wrong? Shut up! What's your problem?""" -def parse_fish_text(): +def parse_fish_text() -> list[Word]: """Parse fish text into words with timestamps and speakers. - Returns a list of words: [{"text": str, "start": float, "end": float, "speaker": int}] + Returns list of Word objects with text, start/end timestamps, and speaker ID. Speaker assignment heuristic: - Speaker 0 (eating fish): "fish", "eat", "trying", "problem", "I" @@ -34,13 +41,17 @@ def parse_fish_text(): + (sentences[i + 1] if i + 1 < len(sentences) else "") ) - words = [] + words: list[Word] = [] current_time = 0.0 for sentence in full_sentences: if not sentence.strip(): continue + # TODO: Delete this heuristic-based speaker detection when real diarization is implemented. + # This overly complex pattern matching is only for stub test data. + # Real implementation should use actual speaker diarization from audio processing. + # Determine speaker based on content sentence_lower = sentence.lower() @@ -88,15 +99,15 @@ def parse_fish_text(): # Split sentence into words sentence_words = sentence.split() for word in sentence_words: - word_duration = 0.3 + (len(word) * 0.05) # ~0.3-0.5s per word + word_duration = MIN_WORD_DURATION + (len(word) * WORD_LENGTH_MULTIPLIER) words.append( - { - "text": word + " ", # Add space - "start": current_time, - "end": current_time + word_duration, - "speaker": speaker, - } + Word( + text=word + " ", # Add space + start=current_time, + end=current_time + word_duration, + speaker=speaker, + ) ) current_time += word_duration @@ -104,22 +115,21 @@ def parse_fish_text(): return words -def generate_fake_topics(words): +def generate_fake_topics(words: list[Word]) -> list[dict[str, Any]]: """Generate fake topics from words. - Splits into ~3 topics based on timestamp. + Splits into equal topics based on word count. + Returns list of topic dicts for database storage. """ if not words: return [] - total_duration = words[-1]["end"] - chunk_size = len(words) // 3 + chunk_size = len(words) // NUM_STUB_TOPICS + topics: list[dict[str, Any]] = [] - topics = [] - - for i in range(3): + for i in range(NUM_STUB_TOPICS): start_idx = i * chunk_size - end_idx = (i + 1) * chunk_size if i < 2 else len(words) + end_idx = (i + 1) * chunk_size if i < NUM_STUB_TOPICS - 1 else len(words) if start_idx >= len(words): break @@ -130,10 +140,10 @@ def generate_fake_topics(words): "id": generate_uuid4(), "title": f"Fish Argument Part {i+1}", "summary": f"Argument about eating fish continues (part {i+1})", - "timestamp": chunk_words[0]["start"], - "duration": chunk_words[-1]["end"] - chunk_words[0]["start"], - "transcript": "".join(w["text"] for w in chunk_words), - "words": chunk_words, + "timestamp": chunk_words[0].start, + "duration": chunk_words[-1].end - chunk_words[0].start, + "transcript": "".join(w.text for w in chunk_words), + "words": [w.model_dump() for w in chunk_words], } topics.append(topic) @@ -141,18 +151,19 @@ def generate_fake_topics(words): return topics -def generate_fake_participants(): - """Generate fake participants.""" +def generate_fake_participants() -> list[dict[str, Any]]: + """Generate fake participants for stub transcript.""" return [ {"id": generate_uuid4(), "speaker": 0, "name": "Fish Eater"}, {"id": generate_uuid4(), "speaker": 1, "name": "Annoying Person"}, ] -def get_stub_transcript_data(): +def get_stub_transcript_data() -> dict[str, Any]: """Get complete stub transcript data for Daily.co testing. Returns dict with topics, participants, title, summaries, duration. + All data is fake/predetermined for testing webhook flow without GPU processing. """ words = parse_fish_text() topics = generate_fake_topics(words) @@ -164,5 +175,5 @@ def get_stub_transcript_data(): "title": "The Great Fish Eating Argument", "short_summary": "Two people argue about eating fish", "long_summary": "An extended argument between someone trying to eat fish and another person who won't stop asking what's wrong. The fish keeps falling off the plate.", - "duration": words[-1]["end"] if words else 0.0, + "duration": words[-1].end if words else 0.0, } diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index dbebd191..3e806fcd 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -20,6 +20,7 @@ from reflector.pipelines.main_live_pipeline import asynctask from reflector.redis_cache import get_redis_client from reflector.settings import settings from reflector.whereby import get_room_sessions +from reflector.worker.daily_stub_data import get_stub_transcript_data logger = structlog.wrap_logger(get_task_logger(__name__)) @@ -240,9 +241,14 @@ async def process_meetings(): @shared_task @asynctask -async def process_daily_recording(meeting_id: str, recording_id: str, tracks: list): +async def process_daily_recording( + meeting_id: str, recording_id: str, tracks: list[dict] +) -> None: """Stub processor for Daily.co recordings - writes fake transcription/diarization. + Handles webhook retries by checking if recording already exists. + Validates track structure before processing. + Args: meeting_id: Meeting ID recording_id: Recording ID from Daily.co webhook @@ -256,14 +262,41 @@ async def process_daily_recording(meeting_id: str, recording_id: str, tracks: li num_tracks=len(tracks), ) + # Check if recording already exists (webhook retry case) + existing_recording = await recordings_controller.get_by_id(recording_id) + if existing_recording: + logger.warning( + "Recording already exists, skipping processing (likely webhook retry)", + recording_id=recording_id, + ) + return + meeting = await meetings_controller.get_by_id(meeting_id) if not meeting: raise Exception(f"Meeting {meeting_id} not found") room = await rooms_controller.get_by_id(meeting.room_id) + # Validate bucket configuration + if not settings.AWS_DAILY_S3_BUCKET: + raise ValueError("AWS_DAILY_S3_BUCKET not configured for Daily.co processing") + + # Validate and parse tracks + # Import at runtime to avoid circular dependency (daily.py imports from process.py) + from reflector.views.daily import DailyTrack # noqa: PLC0415 + + try: + validated_tracks = [DailyTrack(**t) for t in tracks] + except Exception as e: + logger.error( + "Invalid track structure from Daily.co webhook", + error=str(e), + tracks=tracks, + ) + raise ValueError(f"Invalid track structure: {e}") + # Find first audio track for Recording entity - audio_track = next((t for t in tracks if t["type"] == "audio"), None) + audio_track = next((t for t in validated_tracks if t.type == "audio"), None) if not audio_track: raise Exception(f"No audio tracks found in {len(tracks)} tracks") @@ -272,7 +305,7 @@ async def process_daily_recording(meeting_id: str, recording_id: str, tracks: li Recording( id=recording_id, bucket_name=settings.AWS_DAILY_S3_BUCKET, - object_key=audio_track["s3Key"], + object_key=audio_track.s3Key, recorded_at=datetime.now(timezone.utc), meeting_id=meeting.id, status="completed", @@ -282,7 +315,7 @@ async def process_daily_recording(meeting_id: str, recording_id: str, tracks: li logger.info( "Created recording", recording_id=recording.id, - s3_key=audio_track["s3Key"], + s3_key=audio_track.s3Key, ) # Create Transcript entry @@ -300,9 +333,7 @@ async def process_daily_recording(meeting_id: str, recording_id: str, tracks: li logger.info("Created transcript", transcript_id=transcript.id) - # Generate fake data (fish argument) - from reflector.worker.daily_stub_data import get_stub_transcript_data - + # Generate fake data stub_data = get_stub_transcript_data() # Update transcript with fake data