From c23518d2e3d3e9388d22a833d0c804313fe12e8a Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Thu, 16 Oct 2025 20:05:26 +0200 Subject: [PATCH] Trigger multitrack processing for daily recordings --- server/reflector/views/daily.py | 24 +++++++++++++------ server/reflector/worker/process.py | 38 +++++++----------------------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index b1542848..75b089ce 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -8,7 +8,9 @@ from pydantic import BaseModel from reflector.db.meetings import meetings_controller from reflector.logger import logger +from reflector.settings import settings from reflector.video_platforms.factory import create_platform_client +from reflector.worker.process import process_multitrack_recording router = APIRouter() @@ -207,14 +209,22 @@ async def _handle_recording_ready(event: DailyWebhookEvent): platform="daily", ) - # Import at runtime to avoid circular dependency (process.py imports from daily.py) - from reflector.worker.process import process_daily_recording # noqa: PLC0415 + bucket_name = settings.AWS_DAILY_S3_BUCKET + if not bucket_name: + logger.error( + "AWS_DAILY_S3_BUCKET not configured; cannot process Daily recording" + ) + return - # Convert Pydantic models to dicts for Celery serialization - process_daily_recording.delay( - meeting_id=meeting.id, - recording_id=recording_id or event.id, - tracks=[t.model_dump() for t in tracks], + if not settings.DAILY_SUBDOMAIN: + logger.error( + "DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name" + ) + return + prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}" + + process_multitrack_recording.delay( + bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 6ea0029d..0161b90a 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -153,16 +153,16 @@ async def process_recording(bucket_name: str, object_key: str): @shared_task @asynctask -async def process_multitrack_recording(bucket_name: str, prefix: str): +async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str): logger.info( "Processing multitrack recording", bucket=bucket_name, prefix=prefix, - room_name="daily", + meeting_id=meeting_id, ) + # Parse an approximate recorded_at from the prefix directory name try: - effective_room_name = "/daily" dir_name = prefix.rstrip("/").split("/")[-1] ts_match = re.search(r"(\d{14})$", dir_name) if ts_match: @@ -177,32 +177,12 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): recorded_at = datetime.now(timezone.utc) except Exception: logger.warning("Could not parse recorded_at from prefix, using now()") - effective_room_name = "/daily" recorded_at = datetime.now(timezone.utc) - meeting = await meetings_controller.get_by_room_name(effective_room_name) - if meeting: - room = await rooms_controller.get_by_id(meeting.room_id) - else: - room = await rooms_controller.get_by_name(effective_room_name.lstrip("/")) - if not room: - raise Exception(f"Room not found: {effective_room_name}") - start_date = recorded_at - end_date = recorded_at - try: - dummy = await meetings_controller.create( - id=room.id + "-" + recorded_at.strftime("%Y%m%d%H%M%S"), - room_name=effective_room_name, - room_url=f"{effective_room_name}", - host_room_url=f"{effective_room_name}", - start_date=start_date, - end_date=end_date, - room=room, - ) - meeting = dummy - except Exception as e: - logger.warning("Failed to create dummy meeting", error=str(e)) - meeting = None + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + raise Exception(f"Meeting not found: {meeting_id}") + room = await rooms_controller.get_by_id(meeting.room_id) recording = await recordings_controller.get_by_object_key(bucket_name, prefix) if not recording: @@ -211,7 +191,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): bucket_name=bucket_name, object_key=prefix, recorded_at=recorded_at, - meeting_id=meeting.id if meeting else None, + meeting_id=meeting.id, ) ) @@ -232,7 +212,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): user_id=room.user_id, recording_id=recording.id, share_mode="public", - meeting_id=meeting.id if meeting else None, + meeting_id=meeting.id, room_id=room.id, )