mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 12:19:06 +00:00
fix: automatically reprocess daily recordings (#797)
* Automatically reprocess recordings * Restore the comments * Remove redundant check * Fix indent * Add comment about cyclic import
This commit is contained in:
@@ -3,6 +3,7 @@ from typing import Literal
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy import or_
|
||||
|
||||
from reflector.db import get_database, metadata
|
||||
from reflector.utils import generate_uuid4
|
||||
@@ -79,5 +80,35 @@ class RecordingController:
|
||||
results = await get_database().fetch_all(query)
|
||||
return [Recording(**row) for row in results]
|
||||
|
||||
async def get_multitrack_needing_reprocessing(
|
||||
self, bucket_name: str
|
||||
) -> list[Recording]:
|
||||
"""
|
||||
Get multitrack recordings that need reprocessing:
|
||||
- Have track_keys (multitrack)
|
||||
- Either have no transcript OR transcript has error status
|
||||
|
||||
This is more efficient than fetching all recordings and filtering in Python.
|
||||
"""
|
||||
from reflector.db.transcripts import (
|
||||
transcripts, # noqa: PLC0415 cyclic import
|
||||
)
|
||||
|
||||
query = (
|
||||
recordings.select()
|
||||
.outerjoin(transcripts, recordings.c.id == transcripts.c.recording_id)
|
||||
.where(
|
||||
recordings.c.bucket_name == bucket_name,
|
||||
recordings.c.track_keys.isnot(None),
|
||||
or_(
|
||||
transcripts.c.id.is_(None),
|
||||
transcripts.c.status == "error",
|
||||
),
|
||||
)
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
recordings_list = [Recording(**row) for row in results]
|
||||
return [r for r in recordings_list if r.is_multitrack]
|
||||
|
||||
|
||||
recordings_controller = RecordingController()
|
||||
|
||||
@@ -38,6 +38,10 @@ else:
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"reprocess_failed_daily_recordings": {
|
||||
"task": "reflector.worker.process.reprocess_failed_daily_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"poll_daily_recordings": {
|
||||
"task": "reflector.worker.process.poll_daily_recordings",
|
||||
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
|
||||
|
||||
@@ -671,7 +671,7 @@ async def reprocess_failed_recordings():
|
||||
Find recordings in Whereby S3 bucket and check if they have proper transcriptions.
|
||||
If not, requeue them for processing.
|
||||
|
||||
Note: Daily.co recordings are processed via webhooks, not this cron job.
|
||||
Note: Daily.co multitrack recordings are handled by reprocess_failed_daily_recordings.
|
||||
"""
|
||||
logger.info("Checking Whereby recordings that need processing or reprocessing")
|
||||
|
||||
@@ -724,6 +724,103 @@ async def reprocess_failed_recordings():
|
||||
return reprocessed_count
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def reprocess_failed_daily_recordings():
|
||||
"""
|
||||
Find Daily.co multitrack recordings in the database and check if they have proper transcriptions.
|
||||
If not, requeue them for processing.
|
||||
"""
|
||||
logger.info(
|
||||
"Checking Daily.co multitrack recordings that need processing or reprocessing"
|
||||
)
|
||||
|
||||
if not settings.DAILYCO_STORAGE_AWS_BUCKET_NAME:
|
||||
logger.debug(
|
||||
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping Daily recording reprocessing"
|
||||
)
|
||||
return 0
|
||||
|
||||
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
|
||||
reprocessed_count = 0
|
||||
|
||||
try:
|
||||
multitrack_recordings = (
|
||||
await recordings_controller.get_multitrack_needing_reprocessing(bucket_name)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Found multitrack recordings needing reprocessing",
|
||||
count=len(multitrack_recordings),
|
||||
bucket=bucket_name,
|
||||
)
|
||||
|
||||
for recording in multitrack_recordings:
|
||||
if not recording.meeting_id:
|
||||
logger.debug(
|
||||
"Skipping recording without meeting_id",
|
||||
recording_id=recording.id,
|
||||
)
|
||||
continue
|
||||
|
||||
meeting = await meetings_controller.get_by_id(recording.meeting_id)
|
||||
if not meeting:
|
||||
logger.warning(
|
||||
"Meeting not found for recording",
|
||||
recording_id=recording.id,
|
||||
meeting_id=recording.meeting_id,
|
||||
)
|
||||
continue
|
||||
|
||||
transcript = None
|
||||
try:
|
||||
transcript = await transcripts_controller.get_by_recording_id(
|
||||
recording.id
|
||||
)
|
||||
except ValidationError:
|
||||
await transcripts_controller.remove_by_recording_id(recording.id)
|
||||
logger.warning(
|
||||
"Removed invalid transcript for recording",
|
||||
recording_id=recording.id,
|
||||
)
|
||||
|
||||
if not recording.track_keys:
|
||||
logger.warning(
|
||||
"Recording has no track_keys, cannot reprocess",
|
||||
recording_id=recording.id,
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"Queueing Daily recording for reprocessing",
|
||||
recording_id=recording.id,
|
||||
room_name=meeting.room_name,
|
||||
track_count=len(recording.track_keys),
|
||||
transcript_status=transcript.status if transcript else None,
|
||||
)
|
||||
|
||||
process_multitrack_recording.delay(
|
||||
bucket_name=bucket_name,
|
||||
daily_room_name=meeting.room_name,
|
||||
recording_id=recording.id,
|
||||
track_keys=recording.track_keys,
|
||||
)
|
||||
reprocessed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error checking Daily multitrack recordings",
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Daily reprocessing complete",
|
||||
requeued_count=reprocessed_count,
|
||||
)
|
||||
return reprocessed_count
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def trigger_daily_reconciliation() -> None:
|
||||
|
||||
Reference in New Issue
Block a user