From bd5df1ce2ebf35d7f3413b295e56937a9a28ef7b Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Tue, 2 Dec 2025 17:10:06 -0500 Subject: [PATCH] fix: Multitrack mixdown optimisation 2 (#764) * Revert "fix: Skip mixdown for multitrack (#760)" This reverts commit b51b7aa9176c1a53ba57ad99f5e976c804a1e80c. * multitrack mixdown optimisation * return the "good" ui part of "skip mixdown" --------- Co-authored-by: Igor Loskutov --- .../pipelines/main_multitrack_pipeline.py | 93 +++++++++---------- server/reflector/settings.py | 8 -- 2 files changed, 44 insertions(+), 57 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 2b23c7b6..579bfbd3 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -31,7 +31,6 @@ from reflector.processors import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.types import TitleSummary from reflector.processors.types import Transcript as TranscriptType -from reflector.settings import settings from reflector.storage import Storage, get_transcripts_storage from reflector.utils.daily import ( filter_cam_audio_tracks, @@ -423,7 +422,15 @@ class PipelineMainMultitrack(PipelineMainBase): # Open all containers with cleanup guaranteed for i, url in enumerate(valid_track_urls): try: - c = av.open(url) + c = av.open( + url, + options={ + # it's trying to stream from s3 by default + "reconnect": "1", + "reconnect_streamed": "1", + "reconnect_delay_max": "5", + }, + ) containers.append(c) except Exception as e: self.logger.warning( @@ -452,6 +459,8 @@ class PipelineMainMultitrack(PipelineMainBase): frame = next(dec) except StopIteration: active[i] = False + # causes stream to move on / unclogs memory + inputs[i].push(None) continue if frame.sample_rate != target_sample_rate: @@ -471,8 +480,6 @@ class PipelineMainMultitrack(PipelineMainBase): mixed.time_base = Fraction(1, target_sample_rate) await writer.push(mixed) - for in_ctx in inputs: - in_ctx.push(None) while True: try: mixed = sink.pull() @@ -632,55 +639,43 @@ class PipelineMainMultitrack(PipelineMainBase): transcript.data_path.mkdir(parents=True, exist_ok=True) - if settings.SKIP_MIXDOWN: - self.logger.warning( - "SKIP_MIXDOWN enabled: Skipping mixdown and waveform generation. " - "UI will have no audio playback or waveform.", - num_tracks=len(padded_track_urls), - transcript_id=transcript.id, - ) - else: - mp3_writer = AudioFileWriterProcessor( - path=str(transcript.audio_mp3_filename), - on_duration=self.on_duration, - ) - await self.mixdown_tracks( - padded_track_urls, mp3_writer, offsets_seconds=None - ) - await mp3_writer.flush() + mp3_writer = AudioFileWriterProcessor( + path=str(transcript.audio_mp3_filename), + on_duration=self.on_duration, + ) + await self.mixdown_tracks(padded_track_urls, mp3_writer, offsets_seconds=None) + await mp3_writer.flush() - if not transcript.audio_mp3_filename.exists(): - raise Exception( - "Mixdown failed - no MP3 file generated. Cannot proceed without playable audio." - ) - - storage_path = f"{transcript.id}/audio.mp3" - # Use file handle streaming to avoid loading entire MP3 into memory - mp3_size = transcript.audio_mp3_filename.stat().st_size - with open(transcript.audio_mp3_filename, "rb") as mp3_file: - await transcript_storage.put_file(storage_path, mp3_file) - mp3_url = await transcript_storage.get_file_url(storage_path) - - await transcripts_controller.update( - transcript, {"audio_location": "storage"} + if not transcript.audio_mp3_filename.exists(): + raise Exception( + "Mixdown failed - no MP3 file generated. Cannot proceed without playable audio." ) - self.logger.info( - f"Uploaded mixed audio to storage", - storage_path=storage_path, - size=mp3_size, - url=mp3_url, - ) + storage_path = f"{transcript.id}/audio.mp3" + # Use file handle streaming to avoid loading entire MP3 into memory + mp3_size = transcript.audio_mp3_filename.stat().st_size + with open(transcript.audio_mp3_filename, "rb") as mp3_file: + await transcript_storage.put_file(storage_path, mp3_file) + mp3_url = await transcript_storage.get_file_url(storage_path) - self.logger.info("Generating waveform from mixed audio") - waveform_processor = AudioWaveformProcessor( - audio_path=transcript.audio_mp3_filename, - waveform_path=transcript.audio_waveform_filename, - on_waveform=self.on_waveform, - ) - waveform_processor.set_pipeline(self.empty_pipeline) - await waveform_processor.flush() - self.logger.info("Waveform generated successfully") + await transcripts_controller.update(transcript, {"audio_location": "storage"}) + + self.logger.info( + f"Uploaded mixed audio to storage", + storage_path=storage_path, + size=mp3_size, + url=mp3_url, + ) + + self.logger.info("Generating waveform from mixed audio") + waveform_processor = AudioWaveformProcessor( + audio_path=transcript.audio_mp3_filename, + waveform_path=transcript.audio_waveform_filename, + on_waveform=self.on_waveform, + ) + waveform_processor.set_pipeline(self.empty_pipeline) + await waveform_processor.flush() + self.logger.info("Waveform generated successfully") speaker_transcripts: list[TranscriptType] = [] for idx, padded_url in enumerate(padded_track_urls): diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 338e1da9..1ec46d94 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -138,14 +138,6 @@ class Settings(BaseSettings): DAILY_WEBHOOK_UUID: str | None = ( None # Webhook UUID for this environment. Not used by production code ) - - # Multitrack processing - # SKIP_MIXDOWN: When True, skips audio mixdown and waveform generation. - # Transcription still works using individual tracks. Useful for: - # - Diagnosing OOM issues in mixdown - # - Fast processing when audio playback is not needed - # Note: UI will have no audio playback or waveform when enabled. - SKIP_MIXDOWN: bool = True # Platform Configuration DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM