diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 2e7eb3f0..e9941a07 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -11,7 +11,6 @@ from celery import chain, shared_task from reflector.asynctask import asynctask from reflector.db.transcripts import ( TranscriptStatus, - TranscriptText, TranscriptWaveform, transcripts_controller, ) @@ -19,6 +18,7 @@ from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_send_webhook_if_needed from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, + broadcast_to_sockets, task_cleanup_consent, task_pipeline_post_to_zulip, ) @@ -365,6 +365,7 @@ class PipelineMainMultitrack(PipelineMainBase): for c in containers: c.close() + @broadcast_to_sockets async def set_status(self, transcript_id: str, status: TranscriptStatus): async with self.lock_transaction(): return await transcripts_controller.set_status(transcript_id, status) @@ -423,7 +424,8 @@ class PipelineMainMultitrack(PipelineMainBase): transcript.data_path.mkdir(parents=True, exist_ok=True) mp3_writer = AudioFileWriterProcessor( - path=str(transcript.audio_mp3_filename) + path=str(transcript.audio_mp3_filename), + on_duration=self.on_duration, ) # Use PADDED tracks with NO additional offsets (already aligned by padding) await self.mixdown_tracks( @@ -542,13 +544,8 @@ class PipelineMainMultitrack(PipelineMainBase): merged_transcript = TranscriptType(words=merged_words, translation=None) - await transcripts_controller.append_event( - transcript, - event="TRANSCRIPT", - data=TranscriptText( - text=merged_transcript.text, translation=merged_transcript.translation - ), - ) + # Emit TRANSCRIPT event through the shared handler (persists and broadcasts) + await self.on_transcript(merged_transcript) topics = await self.detect_topics(merged_transcript, transcript.target_language) await asyncio.gather(