From fc79ff31149bcc0e553a478efdc49403c9b9edfe Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Fri, 17 Oct 2025 14:42:07 +0200 Subject: [PATCH] Use explicit track keys for processing --- .../pipelines/main_multitrack_pipeline.py | 45 +------ server/reflector/views/daily.py | 20 +--- server/reflector/worker/process.py | 113 +++++++----------- 3 files changed, 55 insertions(+), 123 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index e3c74b24..6555c0c9 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -192,38 +192,7 @@ class PipelineMainMultitrack(PipelineMainBase): async with self.lock_transaction(): return await transcripts_controller.set_status(transcript_id, status) - async def _list_immediate_keys( - self, s3, bucket_name: str, prefix: str - ) -> list[str]: - paginator = s3.get_paginator("list_objects_v2") - raw_prefix = prefix.rstrip("/") - prefixes = [raw_prefix, raw_prefix + "/"] - - keys: set[str] = set() - for pref in prefixes: - for page in paginator.paginate(Bucket=bucket_name, Prefix=pref): - for obj in page.get("Contents", []): - key = obj["Key"] - if not key.startswith(pref): - continue - if pref.endswith("/"): - rel = key[len(pref) :] - if not rel or rel.endswith("/") or "/" in rel: - continue - else: - if key != pref: - continue - keys.add(key) - result = sorted(keys) - self.logger.info( - "S3 list immediate files", - prefixes=prefixes, - total_keys=len(result), - sample=result[:5], - ) - return result - - async def process(self, bucket_name: str, prefix: str): + async def process(self, bucket_name: str, track_keys: list[str]): transcript = await self.get_transcript() s3 = boto3.client( @@ -233,15 +202,11 @@ class PipelineMainMultitrack(PipelineMainBase): aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY, ) - keys = await self._list_immediate_keys(s3, bucket_name, prefix) - if not keys: - raise Exception("No audio tracks found under prefix") - storage = get_transcripts_storage() # Pre-download bytes for all tracks for mixing and transcription track_datas: list[bytes] = [] - for key in keys: + for key in track_keys: try: obj = s3.get_object(Bucket=bucket_name, Key=key) track_datas.append(obj["Body"].read()) @@ -262,7 +227,7 @@ class PipelineMainMultitrack(PipelineMainBase): self.logger.error("Mixdown failed", error=str(e)) speaker_transcripts: list[TranscriptType] = [] - for idx, key in enumerate(keys): + for idx, key in enumerate(track_keys): ext = ".mp4" try: @@ -433,12 +398,12 @@ class PipelineMainMultitrack(PipelineMainBase): @shared_task @asynctask async def task_pipeline_multitrack_process( - *, transcript_id: str, bucket_name: str, prefix: str + *, transcript_id: str, bucket_name: str, track_keys: list[str] ): pipeline = PipelineMainMultitrack(transcript_id=transcript_id) try: await pipeline.set_status(transcript_id, "processing") - await pipeline.process(bucket_name, prefix) + await pipeline.process(bucket_name, track_keys) except Exception: await pipeline.set_status(transcript_id, "error") raise diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 75b089ce..8982fa6f 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -193,16 +193,8 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ) return - meeting = await meetings_controller.get_by_room_name(room_name) - if not meeting: - logger.warning( - "recording.ready-to-download: meeting not found", room_name=room_name - ) - return - logger.info( "Recording ready for download", - meeting_id=meeting.id, room_name=room_name, recording_id=recording_id, num_tracks=len(tracks), @@ -216,15 +208,13 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ) return - if not settings.DAILY_SUBDOMAIN: - logger.error( - "DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name" - ) - return - prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}" + track_keys = [t.s3Key for t in tracks if t.type == "audio"] process_multitrack_recording.delay( - bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id + bucket_name=bucket_name, + room_name=room_name, + recording_id=recording_id, + track_keys=track_keys, ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 0161b90a..872dc7ae 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -153,43 +153,61 @@ async def process_recording(bucket_name: str, object_key: str): @shared_task @asynctask -async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str): +async def process_multitrack_recording( + bucket_name: str, + room_name: str, + recording_id: str, + track_keys: list[str], +): logger.info( "Processing multitrack recording", bucket=bucket_name, - prefix=prefix, - meeting_id=meeting_id, + room_name=room_name, + recording_id=recording_id, + provided_keys=len(track_keys), ) - # Parse an approximate recorded_at from the prefix directory name + if not track_keys: + logger.warning("No audio track keys provided") + return + + recorded_at = datetime.now(timezone.utc) try: - dir_name = prefix.rstrip("/").split("/")[-1] - ts_match = re.search(r"(\d{14})$", dir_name) - if ts_match: - ts = ts_match.group(1) - recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace( - tzinfo=timezone.utc - ) - else: - try: - recorded_at = parse_datetime_with_timezone(dir_name) - except Exception: - recorded_at = datetime.now(timezone.utc) + if track_keys: + folder = os.path.basename(os.path.dirname(track_keys[0])) + ts_match = re.search(r"(\d{14})$", folder) + if ts_match: + ts = ts_match.group(1) + recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace( + tzinfo=timezone.utc + ) except Exception: - logger.warning("Could not parse recorded_at from prefix, using now()") - recorded_at = datetime.now(timezone.utc) + logger.warning("Could not parse recorded_at from keys, using now()") - meeting = await meetings_controller.get_by_id(meeting_id) - if not meeting: - raise Exception(f"Meeting not found: {meeting_id}") - room = await rooms_controller.get_by_id(meeting.room_id) + room_name = room_name.split("-", 1)[0] + room = await rooms_controller.get_by_name(room_name) + if not room: + raise Exception(f"Room not found: {room_name}") - recording = await recordings_controller.get_by_object_key(bucket_name, prefix) + meeting = await meetings_controller.create( + id=recording_id, + room_name=room_name, + room_url=room.name, + host_room_url=room.name, + start_date=recorded_at, + end_date=recorded_at, + room=room, + platform=room.platform, + ) + + recording = await recordings_controller.get_by_id(recording_id) if not recording: + object_key_dir = os.path.dirname(track_keys[0]) if track_keys else "" recording = await recordings_controller.create( Recording( + id=recording_id, bucket_name=bucket_name, - object_key=prefix, + object_key=object_key_dir, recorded_at=recorded_at, meeting_id=meeting.id, ) @@ -216,51 +234,10 @@ async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id room_id=room.id, ) - s3 = boto3.client( - "s3", - region_name=settings.RECORDING_STORAGE_AWS_REGION, - aws_access_key_id=settings.RECORDING_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY, - ) - - paginator = s3.get_paginator("list_objects_v2") - raw_prefix = prefix.rstrip("/") - prefixes = [raw_prefix, raw_prefix + "/"] - - all_keys_set: set[str] = set() - for pref in prefixes: - for page in paginator.paginate(Bucket=bucket_name, Prefix=pref): - contents = page.get("Contents", []) - for obj in contents: - key = obj["Key"] - if not key.startswith(pref): - continue - if pref.endswith("/"): - rel = key[len(pref) :] - if not rel or rel.endswith("/") or "/" in rel: - continue - else: - if key == pref: - all_keys_set.add(key) - continue - all_keys_set.add(key) - - all_keys = sorted(all_keys_set) - logger.info( - "S3 list immediate files", - prefixes=prefixes, - total_keys=len(all_keys), - sample=all_keys[:5], - ) - - track_keys: list[str] = all_keys[:] - - if not track_keys: - logger.info("No objects found under prefix", prefixes=prefixes) - raise Exception("No audio tracks found under prefix") - task_pipeline_multitrack_process.delay( - transcript_id=transcript.id, bucket_name=bucket_name, prefix=prefix + transcript_id=transcript.id, + bucket_name=bucket_name, + track_keys=track_keys, )