From 96f05020cc65f90fb877c0df888adfdd0a2967e3 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Fri, 17 Oct 2025 15:27:27 +0200 Subject: [PATCH] Align tracks of a multitrack recording --- .../pipelines/main_multitrack_pipeline.py | 75 +++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 6555c0c9..4ea7c5b9 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -57,7 +57,10 @@ class PipelineMainMultitrack(PipelineMainBase): self.empty_pipeline = EmptyPipeline(logger=self.logger) async def mixdown_tracks( - self, track_datas: list[bytes], writer: AudioFileWriterProcessor + self, + track_datas: list[bytes], + writer: AudioFileWriterProcessor, + offsets_seconds: list[float] | None = None, ) -> None: """ Minimal multi-track mixdown using a PyAV filter graph (amix), no resampling. @@ -85,10 +88,22 @@ class PipelineMainMultitrack(PipelineMainBase): self.logger.warning("Mixdown skipped - no decodable audio frames found") return - # Build PyAV filter graph: N abuffer (s32/stereo) -> amix (s32) -> aformat(s16) -> sink + # Build PyAV filter graph: + # N abuffer (s32/stereo) + # -> optional adelay per input (for alignment) + # -> amix (s32) + # -> aformat(s16) + # -> sink graph = av.filter.Graph() inputs = [] - for idx, data in enumerate([d for d in track_datas if d]): + valid_track_datas = [d for d in track_datas if d] + # Align offsets list with the filtered inputs (skip empties) + input_offsets_seconds = None + if offsets_seconds is not None: + input_offsets_seconds = [ + offsets_seconds[i] for i, d in enumerate(track_datas) if d + ] + for idx, data in enumerate(valid_track_datas): args = ( f"time_base=1/{target_sample_rate}:" f"sample_rate={target_sample_rate}:" @@ -114,15 +129,36 @@ class PipelineMainMultitrack(PipelineMainBase): sink = graph.add("abuffersink", name="out") + # Optional per-input delay before mixing + delays_ms: list[int] = [] + if input_offsets_seconds is not None: + base = min(input_offsets_seconds) if input_offsets_seconds else 0.0 + delays_ms = [ + max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds + ] + else: + delays_ms = [0 for _ in inputs] + for idx, in_ctx in enumerate(inputs): - in_ctx.link_to(mixer, 0, idx) + delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0 + if delay_ms > 0: + # adelay requires one value per channel; use same for stereo + adelay = graph.add( + "adelay", + args=f"delays={delay_ms}|{delay_ms}:all=1", + name=f"delay{idx}", + ) + in_ctx.link_to(adelay) + adelay.link_to(mixer, 0, idx) + else: + in_ctx.link_to(mixer, 0, idx) mixer.link_to(fmt) fmt.link_to(sink) graph.configure() # Open containers for decoding containers = [] - for i, d in enumerate([d for d in track_datas if d]): + for i, d in enumerate(valid_track_datas): try: c = av.open(io.BytesIO(d)) containers.append(c) @@ -216,12 +252,30 @@ class PipelineMainMultitrack(PipelineMainBase): ) track_datas.append(b"") + # Estimate offsets from first frame PTS, aligned to track_keys + offsets_seconds: list[float] = [] + for data, key in zip(track_datas, track_keys): + off_s = 0.0 + if data: + try: + c = av.open(io.BytesIO(data)) + try: + for frame in c.decode(audio=0): + if frame.pts is not None and frame.time_base: + off_s = float(frame.pts * frame.time_base) + break + finally: + c.close() + except Exception: + pass + offsets_seconds.append(max(0.0, float(off_s))) + # Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate try: mp3_writer = AudioFileWriterProcessor( path=str(transcript.audio_mp3_filename) ) - await self.mixdown_tracks(track_datas, mp3_writer) + await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds) await mp3_writer.flush() except Exception as e: self.logger.error("Mixdown failed", error=str(e)) @@ -287,7 +341,16 @@ class PipelineMainMultitrack(PipelineMainBase): if not t.words: continue + # Shift word timestamps by the track's offset so all are relative to 00:00 + track_offset = offsets_seconds[idx] if idx < len(offsets_seconds) else 0.0 for w in t.words: + try: + if hasattr(w, "start") and w.start is not None: + w.start = float(w.start) + track_offset + if hasattr(w, "end") and w.end is not None: + w.end = float(w.end) + track_offset + except Exception: + pass w.speaker = idx speaker_transcripts.append(t)