From 5f458aa4a7ec3d00ca5ec49d62fcc8ad232b138e Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Thu, 18 Dec 2025 21:10:04 +0100 Subject: [PATCH] fix: automatically reprocess daily recordings (#797) * Automatically reprocess recordings * Restore the comments * Remove redundant check * Fix indent * Add comment about cyclic import --- server/reflector/db/recordings.py | 31 ++++++++++ server/reflector/worker/app.py | 4 ++ server/reflector/worker/process.py | 99 +++++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index 18fe358b..82609b38 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -3,6 +3,7 @@ from typing import Literal import sqlalchemy as sa from pydantic import BaseModel, Field +from sqlalchemy import or_ from reflector.db import get_database, metadata from reflector.utils import generate_uuid4 @@ -79,5 +80,35 @@ class RecordingController: results = await get_database().fetch_all(query) return [Recording(**row) for row in results] + async def get_multitrack_needing_reprocessing( + self, bucket_name: str + ) -> list[Recording]: + """ + Get multitrack recordings that need reprocessing: + - Have track_keys (multitrack) + - Either have no transcript OR transcript has error status + + This is more efficient than fetching all recordings and filtering in Python. + """ + from reflector.db.transcripts import ( + transcripts, # noqa: PLC0415 cyclic import + ) + + query = ( + recordings.select() + .outerjoin(transcripts, recordings.c.id == transcripts.c.recording_id) + .where( + recordings.c.bucket_name == bucket_name, + recordings.c.track_keys.isnot(None), + or_( + transcripts.c.id.is_(None), + transcripts.c.status == "error", + ), + ) + ) + results = await get_database().fetch_all(query) + recordings_list = [Recording(**row) for row in results] + return [r for r in recordings_list if r.is_multitrack] + recordings_controller = RecordingController() diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index c0e711ae..b1256c94 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -38,6 +38,10 @@ else: "task": "reflector.worker.process.reprocess_failed_recordings", "schedule": crontab(hour=5, minute=0), # Midnight EST }, + "reprocess_failed_daily_recordings": { + "task": "reflector.worker.process.reprocess_failed_daily_recordings", + "schedule": crontab(hour=5, minute=0), # Midnight EST + }, "poll_daily_recordings": { "task": "reflector.worker.process.poll_daily_recordings", "schedule": 180.0, # Every 3 minutes (configurable lookback window) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 142d8dc1..ab163fad 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -671,7 +671,7 @@ async def reprocess_failed_recordings(): Find recordings in Whereby S3 bucket and check if they have proper transcriptions. If not, requeue them for processing. - Note: Daily.co recordings are processed via webhooks, not this cron job. + Note: Daily.co multitrack recordings are handled by reprocess_failed_daily_recordings. """ logger.info("Checking Whereby recordings that need processing or reprocessing") @@ -724,6 +724,103 @@ async def reprocess_failed_recordings(): return reprocessed_count +@shared_task +@asynctask +async def reprocess_failed_daily_recordings(): + """ + Find Daily.co multitrack recordings in the database and check if they have proper transcriptions. + If not, requeue them for processing. + """ + logger.info( + "Checking Daily.co multitrack recordings that need processing or reprocessing" + ) + + if not settings.DAILYCO_STORAGE_AWS_BUCKET_NAME: + logger.debug( + "DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping Daily recording reprocessing" + ) + return 0 + + bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME + reprocessed_count = 0 + + try: + multitrack_recordings = ( + await recordings_controller.get_multitrack_needing_reprocessing(bucket_name) + ) + + logger.info( + "Found multitrack recordings needing reprocessing", + count=len(multitrack_recordings), + bucket=bucket_name, + ) + + for recording in multitrack_recordings: + if not recording.meeting_id: + logger.debug( + "Skipping recording without meeting_id", + recording_id=recording.id, + ) + continue + + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if not meeting: + logger.warning( + "Meeting not found for recording", + recording_id=recording.id, + meeting_id=recording.meeting_id, + ) + continue + + transcript = None + try: + transcript = await transcripts_controller.get_by_recording_id( + recording.id + ) + except ValidationError: + await transcripts_controller.remove_by_recording_id(recording.id) + logger.warning( + "Removed invalid transcript for recording", + recording_id=recording.id, + ) + + if not recording.track_keys: + logger.warning( + "Recording has no track_keys, cannot reprocess", + recording_id=recording.id, + ) + continue + + logger.info( + "Queueing Daily recording for reprocessing", + recording_id=recording.id, + room_name=meeting.room_name, + track_count=len(recording.track_keys), + transcript_status=transcript.status if transcript else None, + ) + + process_multitrack_recording.delay( + bucket_name=bucket_name, + daily_room_name=meeting.room_name, + recording_id=recording.id, + track_keys=recording.track_keys, + ) + reprocessed_count += 1 + + except Exception as e: + logger.error( + "Error checking Daily multitrack recordings", + error=str(e), + exc_info=True, + ) + + logger.info( + "Daily reprocessing complete", + requeued_count=reprocessed_count, + ) + return reprocessed_count + + @shared_task @asynctask async def trigger_daily_reconciliation() -> None: