Emit multriack pipeline events

This commit is contained in:
2025-10-21 16:31:31 +02:00
parent acb6e90f28
commit d82abf65ba

View File

@@ -11,13 +11,13 @@ from celery import chain, shared_task
from reflector.asynctask import asynctask
from reflector.db.transcripts import (
TranscriptStatus,
TranscriptText,
transcripts_controller,
)
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_send_webhook_if_needed
from reflector.pipelines.main_live_pipeline import (
PipelineMainBase,
broadcast_to_sockets,
task_cleanup_consent,
task_pipeline_post_to_zulip,
)
@@ -225,6 +225,7 @@ class PipelineMainMultitrack(PipelineMainBase):
for c in containers:
c.close()
@broadcast_to_sockets
async def set_status(self, transcript_id: str, status: TranscriptStatus):
async with self.lock_transaction():
return await transcripts_controller.set_status(transcript_id, status)
@@ -274,7 +275,8 @@ class PipelineMainMultitrack(PipelineMainBase):
# Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate
try:
mp3_writer = AudioFileWriterProcessor(
path=str(transcript.audio_mp3_filename)
path=str(transcript.audio_mp3_filename),
on_duration=self.on_duration,
)
await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds)
await mp3_writer.flush()
@@ -287,6 +289,7 @@ class PipelineMainMultitrack(PipelineMainBase):
waveform_processor = AudioWaveformProcessor(
audio_path=transcript.audio_mp3_filename,
waveform_path=transcript.audio_waveform_filename,
on_waveform=self.on_waveform,
)
waveform_processor.set_pipeline(self.empty_pipeline)
await waveform_processor.flush()
@@ -382,13 +385,8 @@ class PipelineMainMultitrack(PipelineMainBase):
merged_transcript = TranscriptType(words=merged_words, translation=None)
await transcripts_controller.append_event(
transcript,
event="TRANSCRIPT",
data=TranscriptText(
text=merged_transcript.text, translation=merged_transcript.translation
),
)
# Emit TRANSCRIPT event through the shared handler (persists and broadcasts)
await self.on_transcript(merged_transcript)
topics = await self.detect_topics(merged_transcript, transcript.target_language)
await asyncio.gather(