diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 72421171..e8bad4ab 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -1,6 +1,7 @@ import celery import structlog from celery import Celery +from celery.schedules import crontab from reflector.settings import settings logger = structlog.get_logger(__name__) @@ -30,6 +31,10 @@ else: "task": "reflector.worker.process.process_meetings", "schedule": 60.0, }, + "reprocess_failed_recordings": { + "task": "reflector.worker.process.reprocess_failed_recordings", + "schedule": crontab(hour=5, minute=0), # Midnight EST + }, } if settings.HEALTHCHECK_URL: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 8f9db2f6..68b53893 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -126,3 +126,71 @@ async def process_meetings(): logger.info("Meeting %s is deactivated", meeting.id) logger.info("Processed meetings") + + +@shared_task +@asynctask +async def reprocess_failed_recordings(): + """ + Find recordings in the S3 bucket and check if they have proper transcriptions. + If not, requeue them for processing. + """ + logger.info("Checking for recordings that need processing or reprocessing") + + s3 = boto3.client( + "s3", + region_name=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + reprocessed_count = 0 + + try: + paginator = s3.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=settings.AWS_WHEREBY_S3_BUCKET) + + for page in pages: + if "Contents" not in page: + continue + + for obj in page["Contents"]: + object_key = obj["Key"] + + if not (object_key.endswith(".mp4")): + continue + + room_name = f"/{object_key[:36]}" + meeting = await meetings_controller.get_by_room_name(room_name) + if not meeting: + logger.warning(f"No meeting found for recording: {object_key}") + continue + + room = await rooms_controller.get_by_id(meeting.room_id) + if not room: + logger.warning(f"No room found for meeting: {meeting.id}") + continue + + transcript = await transcripts_controller.get_by_meeting_id(meeting.id) + + needs_processing = False + + if transcript is None: + logger.info(f"Recording {object_key} has no transcript") + needs_processing = True + elif transcript.status == "error": + logger.info( + f"Recording {object_key} has transcript with error status" + ) + needs_processing = True + + if needs_processing: + logger.info(f"Queueing recording for processing: {object_key}") + process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key) + reprocessed_count += 1 + + except Exception as e: + logger.error(f"Error checking S3 bucket: {str(e)}") + + logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings") + return reprocessed_count