fix: daily multitrack pipeline finalze dependency fix

This commit is contained in:
Igor Loskutov
2026-01-30 15:19:27 -05:00
parent 2592e369f6
commit 23eb1371cb

View File

@@ -561,13 +561,27 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
Path(output_path).unlink(missing_ok=True) Path(output_path).unlink(missing_ok=True)
duration = duration_ms_callback_capture_container[0]
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript: if transcript:
await transcripts_controller.update( await transcripts_controller.update(
transcript, {"audio_location": "storage"} transcript, {"audio_location": "storage", "duration": duration}
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
duration_data,
logger=logger,
) )
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
@@ -1095,7 +1109,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items], parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
) )
@@ -1108,12 +1122,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
""" """
ctx.log("finalize: saving transcript and setting status to 'ended'") ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks) track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files created_padded_files = track_result.created_padded_files
if created_padded_files: if created_padded_files:
@@ -1133,7 +1143,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText, TranscriptText,
transcripts_controller, transcripts_controller,
) )
@@ -1142,8 +1151,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -1155,21 +1162,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
logger=logger, logger=logger,
) )
# Save duration and clear workflow_run_id (workflow completed successfully) # Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks # Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume "workflow_run_id": None, # Clear on success - no need to resume
}, },
) )
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log( ctx.log(