diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 3ad3bef2..1efbfeea 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -11,13 +11,13 @@ from celery import chain, shared_task from reflector.asynctask import asynctask from reflector.db.transcripts import ( TranscriptStatus, - TranscriptText, transcripts_controller, ) from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_send_webhook_if_needed from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, + broadcast_to_sockets, task_cleanup_consent, task_pipeline_post_to_zulip, ) @@ -225,6 +225,7 @@ class PipelineMainMultitrack(PipelineMainBase): for c in containers: c.close() + @broadcast_to_sockets async def set_status(self, transcript_id: str, status: TranscriptStatus): async with self.lock_transaction(): return await transcripts_controller.set_status(transcript_id, status) @@ -274,7 +275,8 @@ class PipelineMainMultitrack(PipelineMainBase): # Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate try: mp3_writer = AudioFileWriterProcessor( - path=str(transcript.audio_mp3_filename) + path=str(transcript.audio_mp3_filename), + on_duration=self.on_duration, ) await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds) await mp3_writer.flush() @@ -287,6 +289,7 @@ class PipelineMainMultitrack(PipelineMainBase): waveform_processor = AudioWaveformProcessor( audio_path=transcript.audio_mp3_filename, waveform_path=transcript.audio_waveform_filename, + on_waveform=self.on_waveform, ) waveform_processor.set_pipeline(self.empty_pipeline) await waveform_processor.flush() @@ -382,13 +385,8 @@ class PipelineMainMultitrack(PipelineMainBase): merged_transcript = TranscriptType(words=merged_words, translation=None) - await transcripts_controller.append_event( - transcript, - event="TRANSCRIPT", - data=TranscriptText( - text=merged_transcript.text, translation=merged_transcript.translation - ), - ) + # Emit TRANSCRIPT event through the shared handler (persists and broadcasts) + await self.on_transcript(merged_transcript) topics = await self.detect_topics(merged_transcript, transcript.target_language) await asyncio.gather(