From acb6e90f28c031c76ab2594d3d3bc0604a778a04 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Tue, 21 Oct 2025 13:33:31 +0200 Subject: [PATCH 1/2] Generate waveforms for the mixed audio --- .../pipelines/main_multitrack_pipeline.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 4ea7c5b9..3ad3bef2 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -27,6 +27,7 @@ from reflector.processors import ( TranscriptFinalTitleProcessor, TranscriptTopicDetectorProcessor, ) +from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.file_transcript import FileTranscriptInput from reflector.processors.file_transcript_auto import FileTranscriptAutoProcessor from reflector.processors.types import TitleSummary @@ -280,6 +281,23 @@ class PipelineMainMultitrack(PipelineMainBase): except Exception as e: self.logger.error("Mixdown failed", error=str(e)) + # Generate waveform for the mixed audio if present + try: + if transcript.audio_mp3_filename.exists(): + waveform_processor = AudioWaveformProcessor( + audio_path=transcript.audio_mp3_filename, + waveform_path=transcript.audio_waveform_filename, + ) + waveform_processor.set_pipeline(self.empty_pipeline) + await waveform_processor.flush() + else: + self.logger.warning( + "Waveform skipped - mixed MP3 not found", + path=str(transcript.audio_mp3_filename), + ) + except Exception as e: + self.logger.error("Waveform generation failed", error=str(e)) + speaker_transcripts: list[TranscriptType] = [] for idx, key in enumerate(track_keys): ext = ".mp4" From d82abf65ba7cd0deb16fc7863bfc08ff087b8216 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Tue, 21 Oct 2025 16:31:31 +0200 Subject: [PATCH 2/2] Emit multriack pipeline events --- .../pipelines/main_multitrack_pipeline.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 3ad3bef2..1efbfeea 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -11,13 +11,13 @@ from celery import chain, shared_task from reflector.asynctask import asynctask from reflector.db.transcripts import ( TranscriptStatus, - TranscriptText, transcripts_controller, ) from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_send_webhook_if_needed from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, + broadcast_to_sockets, task_cleanup_consent, task_pipeline_post_to_zulip, ) @@ -225,6 +225,7 @@ class PipelineMainMultitrack(PipelineMainBase): for c in containers: c.close() + @broadcast_to_sockets async def set_status(self, transcript_id: str, status: TranscriptStatus): async with self.lock_transaction(): return await transcripts_controller.set_status(transcript_id, status) @@ -274,7 +275,8 @@ class PipelineMainMultitrack(PipelineMainBase): # Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate try: mp3_writer = AudioFileWriterProcessor( - path=str(transcript.audio_mp3_filename) + path=str(transcript.audio_mp3_filename), + on_duration=self.on_duration, ) await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds) await mp3_writer.flush() @@ -287,6 +289,7 @@ class PipelineMainMultitrack(PipelineMainBase): waveform_processor = AudioWaveformProcessor( audio_path=transcript.audio_mp3_filename, waveform_path=transcript.audio_waveform_filename, + on_waveform=self.on_waveform, ) waveform_processor.set_pipeline(self.empty_pipeline) await waveform_processor.flush() @@ -382,13 +385,8 @@ class PipelineMainMultitrack(PipelineMainBase): merged_transcript = TranscriptType(words=merged_words, translation=None) - await transcripts_controller.append_event( - transcript, - event="TRANSCRIPT", - data=TranscriptText( - text=merged_transcript.text, translation=merged_transcript.translation - ), - ) + # Emit TRANSCRIPT event through the shared handler (persists and broadcasts) + await self.on_transcript(merged_transcript) topics = await self.detect_topics(merged_transcript, transcript.target_language) await asyncio.gather(