From c23518d2e3d3e9388d22a833d0c804313fe12e8a Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Thu, 16 Oct 2025 20:05:26 +0200 Subject: [PATCH 1/3] Trigger multitrack processing for daily recordings --- server/reflector/views/daily.py | 24 +++++++++++++------ server/reflector/worker/process.py | 38 +++++++----------------------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index b1542848..75b089ce 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -8,7 +8,9 @@ from pydantic import BaseModel from reflector.db.meetings import meetings_controller from reflector.logger import logger +from reflector.settings import settings from reflector.video_platforms.factory import create_platform_client +from reflector.worker.process import process_multitrack_recording router = APIRouter() @@ -207,14 +209,22 @@ async def _handle_recording_ready(event: DailyWebhookEvent): platform="daily", ) - # Import at runtime to avoid circular dependency (process.py imports from daily.py) - from reflector.worker.process import process_daily_recording # noqa: PLC0415 + bucket_name = settings.AWS_DAILY_S3_BUCKET + if not bucket_name: + logger.error( + "AWS_DAILY_S3_BUCKET not configured; cannot process Daily recording" + ) + return - # Convert Pydantic models to dicts for Celery serialization - process_daily_recording.delay( - meeting_id=meeting.id, - recording_id=recording_id or event.id, - tracks=[t.model_dump() for t in tracks], + if not settings.DAILY_SUBDOMAIN: + logger.error( + "DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name" + ) + return + prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}" + + process_multitrack_recording.delay( + bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 6ea0029d..0161b90a 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -153,16 +153,16 @@ async def process_recording(bucket_name: str, object_key: str): @shared_task @asynctask -async def process_multitrack_recording(bucket_name: str, prefix: str): +async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str): logger.info( "Processing multitrack recording", bucket=bucket_name, prefix=prefix, - room_name="daily", + meeting_id=meeting_id, ) + # Parse an approximate recorded_at from the prefix directory name try: - effective_room_name = "/daily" dir_name = prefix.rstrip("/").split("/")[-1] ts_match = re.search(r"(\d{14})$", dir_name) if ts_match: @@ -177,32 +177,12 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): recorded_at = datetime.now(timezone.utc) except Exception: logger.warning("Could not parse recorded_at from prefix, using now()") - effective_room_name = "/daily" recorded_at = datetime.now(timezone.utc) - meeting = await meetings_controller.get_by_room_name(effective_room_name) - if meeting: - room = await rooms_controller.get_by_id(meeting.room_id) - else: - room = await rooms_controller.get_by_name(effective_room_name.lstrip("/")) - if not room: - raise Exception(f"Room not found: {effective_room_name}") - start_date = recorded_at - end_date = recorded_at - try: - dummy = await meetings_controller.create( - id=room.id + "-" + recorded_at.strftime("%Y%m%d%H%M%S"), - room_name=effective_room_name, - room_url=f"{effective_room_name}", - host_room_url=f"{effective_room_name}", - start_date=start_date, - end_date=end_date, - room=room, - ) - meeting = dummy - except Exception as e: - logger.warning("Failed to create dummy meeting", error=str(e)) - meeting = None + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + raise Exception(f"Meeting not found: {meeting_id}") + room = await rooms_controller.get_by_id(meeting.room_id) recording = await recordings_controller.get_by_object_key(bucket_name, prefix) if not recording: @@ -211,7 +191,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): bucket_name=bucket_name, object_key=prefix, recorded_at=recorded_at, - meeting_id=meeting.id if meeting else None, + meeting_id=meeting.id, ) ) @@ -232,7 +212,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str): user_id=room.user_id, recording_id=recording.id, share_mode="public", - meeting_id=meeting.id if meeting else None, + meeting_id=meeting.id, room_id=room.id, ) From fc79ff31149bcc0e553a478efdc49403c9b9edfe Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Fri, 17 Oct 2025 14:42:07 +0200 Subject: [PATCH 2/3] Use explicit track keys for processing --- .../pipelines/main_multitrack_pipeline.py | 45 +------ server/reflector/views/daily.py | 20 +--- server/reflector/worker/process.py | 113 +++++++----------- 3 files changed, 55 insertions(+), 123 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index e3c74b24..6555c0c9 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -192,38 +192,7 @@ class PipelineMainMultitrack(PipelineMainBase): async with self.lock_transaction(): return await transcripts_controller.set_status(transcript_id, status) - async def _list_immediate_keys( - self, s3, bucket_name: str, prefix: str - ) -> list[str]: - paginator = s3.get_paginator("list_objects_v2") - raw_prefix = prefix.rstrip("/") - prefixes = [raw_prefix, raw_prefix + "/"] - - keys: set[str] = set() - for pref in prefixes: - for page in paginator.paginate(Bucket=bucket_name, Prefix=pref): - for obj in page.get("Contents", []): - key = obj["Key"] - if not key.startswith(pref): - continue - if pref.endswith("/"): - rel = key[len(pref) :] - if not rel or rel.endswith("/") or "/" in rel: - continue - else: - if key != pref: - continue - keys.add(key) - result = sorted(keys) - self.logger.info( - "S3 list immediate files", - prefixes=prefixes, - total_keys=len(result), - sample=result[:5], - ) - return result - - async def process(self, bucket_name: str, prefix: str): + async def process(self, bucket_name: str, track_keys: list[str]): transcript = await self.get_transcript() s3 = boto3.client( @@ -233,15 +202,11 @@ class PipelineMainMultitrack(PipelineMainBase): aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY, ) - keys = await self._list_immediate_keys(s3, bucket_name, prefix) - if not keys: - raise Exception("No audio tracks found under prefix") - storage = get_transcripts_storage() # Pre-download bytes for all tracks for mixing and transcription track_datas: list[bytes] = [] - for key in keys: + for key in track_keys: try: obj = s3.get_object(Bucket=bucket_name, Key=key) track_datas.append(obj["Body"].read()) @@ -262,7 +227,7 @@ class PipelineMainMultitrack(PipelineMainBase): self.logger.error("Mixdown failed", error=str(e)) speaker_transcripts: list[TranscriptType] = [] - for idx, key in enumerate(keys): + for idx, key in enumerate(track_keys): ext = ".mp4" try: @@ -433,12 +398,12 @@ class PipelineMainMultitrack(PipelineMainBase): @shared_task @asynctask async def task_pipeline_multitrack_process( - *, transcript_id: str, bucket_name: str, prefix: str + *, transcript_id: str, bucket_name: str, track_keys: list[str] ): pipeline = PipelineMainMultitrack(transcript_id=transcript_id) try: await pipeline.set_status(transcript_id, "processing") - await pipeline.process(bucket_name, prefix) + await pipeline.process(bucket_name, track_keys) except Exception: await pipeline.set_status(transcript_id, "error") raise diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 75b089ce..8982fa6f 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -193,16 +193,8 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ) return - meeting = await meetings_controller.get_by_room_name(room_name) - if not meeting: - logger.warning( - "recording.ready-to-download: meeting not found", room_name=room_name - ) - return - logger.info( "Recording ready for download", - meeting_id=meeting.id, room_name=room_name, recording_id=recording_id, num_tracks=len(tracks), @@ -216,15 +208,13 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ) return - if not settings.DAILY_SUBDOMAIN: - logger.error( - "DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name" - ) - return - prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}" + track_keys = [t.s3Key for t in tracks if t.type == "audio"] process_multitrack_recording.delay( - bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id + bucket_name=bucket_name, + room_name=room_name, + recording_id=recording_id, + track_keys=track_keys, ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 0161b90a..872dc7ae 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -153,43 +153,61 @@ async def process_recording(bucket_name: str, object_key: str): @shared_task @asynctask -async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str): +async def process_multitrack_recording( + bucket_name: str, + room_name: str, + recording_id: str, + track_keys: list[str], +): logger.info( "Processing multitrack recording", bucket=bucket_name, - prefix=prefix, - meeting_id=meeting_id, + room_name=room_name, + recording_id=recording_id, + provided_keys=len(track_keys), ) - # Parse an approximate recorded_at from the prefix directory name + if not track_keys: + logger.warning("No audio track keys provided") + return + + recorded_at = datetime.now(timezone.utc) try: - dir_name = prefix.rstrip("/").split("/")[-1] - ts_match = re.search(r"(\d{14})$", dir_name) - if ts_match: - ts = ts_match.group(1) - recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace( - tzinfo=timezone.utc - ) - else: - try: - recorded_at = parse_datetime_with_timezone(dir_name) - except Exception: - recorded_at = datetime.now(timezone.utc) + if track_keys: + folder = os.path.basename(os.path.dirname(track_keys[0])) + ts_match = re.search(r"(\d{14})$", folder) + if ts_match: + ts = ts_match.group(1) + recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace( + tzinfo=timezone.utc + ) except Exception: - logger.warning("Could not parse recorded_at from prefix, using now()") - recorded_at = datetime.now(timezone.utc) + logger.warning("Could not parse recorded_at from keys, using now()") - meeting = await meetings_controller.get_by_id(meeting_id) - if not meeting: - raise Exception(f"Meeting not found: {meeting_id}") - room = await rooms_controller.get_by_id(meeting.room_id) + room_name = room_name.split("-", 1)[0] + room = await rooms_controller.get_by_name(room_name) + if not room: + raise Exception(f"Room not found: {room_name}") - recording = await recordings_controller.get_by_object_key(bucket_name, prefix) + meeting = await meetings_controller.create( + id=recording_id, + room_name=room_name, + room_url=room.name, + host_room_url=room.name, + start_date=recorded_at, + end_date=recorded_at, + room=room, + platform=room.platform, + ) + + recording = await recordings_controller.get_by_id(recording_id) if not recording: + object_key_dir = os.path.dirname(track_keys[0]) if track_keys else "" recording = await recordings_controller.create( Recording( + id=recording_id, bucket_name=bucket_name, - object_key=prefix, + object_key=object_key_dir, recorded_at=recorded_at, meeting_id=meeting.id, ) @@ -216,51 +234,10 @@ async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id room_id=room.id, ) - s3 = boto3.client( - "s3", - region_name=settings.RECORDING_STORAGE_AWS_REGION, - aws_access_key_id=settings.RECORDING_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY, - ) - - paginator = s3.get_paginator("list_objects_v2") - raw_prefix = prefix.rstrip("/") - prefixes = [raw_prefix, raw_prefix + "/"] - - all_keys_set: set[str] = set() - for pref in prefixes: - for page in paginator.paginate(Bucket=bucket_name, Prefix=pref): - contents = page.get("Contents", []) - for obj in contents: - key = obj["Key"] - if not key.startswith(pref): - continue - if pref.endswith("/"): - rel = key[len(pref) :] - if not rel or rel.endswith("/") or "/" in rel: - continue - else: - if key == pref: - all_keys_set.add(key) - continue - all_keys_set.add(key) - - all_keys = sorted(all_keys_set) - logger.info( - "S3 list immediate files", - prefixes=prefixes, - total_keys=len(all_keys), - sample=all_keys[:5], - ) - - track_keys: list[str] = all_keys[:] - - if not track_keys: - logger.info("No objects found under prefix", prefixes=prefixes) - raise Exception("No audio tracks found under prefix") - task_pipeline_multitrack_process.delay( - transcript_id=transcript.id, bucket_name=bucket_name, prefix=prefix + transcript_id=transcript.id, + bucket_name=bucket_name, + track_keys=track_keys, ) From 96f05020cc65f90fb877c0df888adfdd0a2967e3 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Fri, 17 Oct 2025 15:27:27 +0200 Subject: [PATCH 3/3] Align tracks of a multitrack recording --- .../pipelines/main_multitrack_pipeline.py | 75 +++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 6555c0c9..4ea7c5b9 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -57,7 +57,10 @@ class PipelineMainMultitrack(PipelineMainBase): self.empty_pipeline = EmptyPipeline(logger=self.logger) async def mixdown_tracks( - self, track_datas: list[bytes], writer: AudioFileWriterProcessor + self, + track_datas: list[bytes], + writer: AudioFileWriterProcessor, + offsets_seconds: list[float] | None = None, ) -> None: """ Minimal multi-track mixdown using a PyAV filter graph (amix), no resampling. @@ -85,10 +88,22 @@ class PipelineMainMultitrack(PipelineMainBase): self.logger.warning("Mixdown skipped - no decodable audio frames found") return - # Build PyAV filter graph: N abuffer (s32/stereo) -> amix (s32) -> aformat(s16) -> sink + # Build PyAV filter graph: + # N abuffer (s32/stereo) + # -> optional adelay per input (for alignment) + # -> amix (s32) + # -> aformat(s16) + # -> sink graph = av.filter.Graph() inputs = [] - for idx, data in enumerate([d for d in track_datas if d]): + valid_track_datas = [d for d in track_datas if d] + # Align offsets list with the filtered inputs (skip empties) + input_offsets_seconds = None + if offsets_seconds is not None: + input_offsets_seconds = [ + offsets_seconds[i] for i, d in enumerate(track_datas) if d + ] + for idx, data in enumerate(valid_track_datas): args = ( f"time_base=1/{target_sample_rate}:" f"sample_rate={target_sample_rate}:" @@ -114,15 +129,36 @@ class PipelineMainMultitrack(PipelineMainBase): sink = graph.add("abuffersink", name="out") + # Optional per-input delay before mixing + delays_ms: list[int] = [] + if input_offsets_seconds is not None: + base = min(input_offsets_seconds) if input_offsets_seconds else 0.0 + delays_ms = [ + max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds + ] + else: + delays_ms = [0 for _ in inputs] + for idx, in_ctx in enumerate(inputs): - in_ctx.link_to(mixer, 0, idx) + delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0 + if delay_ms > 0: + # adelay requires one value per channel; use same for stereo + adelay = graph.add( + "adelay", + args=f"delays={delay_ms}|{delay_ms}:all=1", + name=f"delay{idx}", + ) + in_ctx.link_to(adelay) + adelay.link_to(mixer, 0, idx) + else: + in_ctx.link_to(mixer, 0, idx) mixer.link_to(fmt) fmt.link_to(sink) graph.configure() # Open containers for decoding containers = [] - for i, d in enumerate([d for d in track_datas if d]): + for i, d in enumerate(valid_track_datas): try: c = av.open(io.BytesIO(d)) containers.append(c) @@ -216,12 +252,30 @@ class PipelineMainMultitrack(PipelineMainBase): ) track_datas.append(b"") + # Estimate offsets from first frame PTS, aligned to track_keys + offsets_seconds: list[float] = [] + for data, key in zip(track_datas, track_keys): + off_s = 0.0 + if data: + try: + c = av.open(io.BytesIO(data)) + try: + for frame in c.decode(audio=0): + if frame.pts is not None and frame.time_base: + off_s = float(frame.pts * frame.time_base) + break + finally: + c.close() + except Exception: + pass + offsets_seconds.append(max(0.0, float(off_s))) + # Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate try: mp3_writer = AudioFileWriterProcessor( path=str(transcript.audio_mp3_filename) ) - await self.mixdown_tracks(track_datas, mp3_writer) + await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds) await mp3_writer.flush() except Exception as e: self.logger.error("Mixdown failed", error=str(e)) @@ -287,7 +341,16 @@ class PipelineMainMultitrack(PipelineMainBase): if not t.words: continue + # Shift word timestamps by the track's offset so all are relative to 00:00 + track_offset = offsets_seconds[idx] if idx < len(offsets_seconds) else 0.0 for w in t.words: + try: + if hasattr(w, "start") and w.start is not None: + w.start = float(w.start) + track_offset + if hasattr(w, "end") and w.end is not None: + w.end = float(w.end) + track_offset + except Exception: + pass w.speaker = idx speaker_transcripts.append(t)