diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 3fd64c2c..32c45fdb 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -561,13 +561,27 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: Path(output_path).unlink(missing_ok=True) + duration = duration_ms_callback_capture_container[0] + async with fresh_db_connection(): - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 + from reflector.db.transcripts import ( # noqa: PLC0415 + TranscriptDuration, + transcripts_controller, + ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: await transcripts_controller.update( - transcript, {"audio_location": "storage"} + transcript, {"audio_location": "storage", "duration": duration} + ) + + duration_data = TranscriptDuration(duration=duration) + await append_event_and_broadcast( + input.transcript_id, + transcript, + "DURATION", + duration_data, + logger=logger, ) ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") @@ -1095,7 +1109,7 @@ async def identify_action_items( @daily_multitrack_pipeline.task( - parents=[generate_title, generate_recap, identify_action_items], + parents=[process_tracks, generate_title, generate_recap, identify_action_items], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3, ) @@ -1108,12 +1122,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: """ ctx.log("finalize: saving transcript and setting status to 'ended'") - mixdown_result = ctx.task_output(mixdown_tracks) track_result = ctx.task_output(process_tracks) - duration = mixdown_result.duration - all_words = track_result.all_words - # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) created_padded_files = track_result.created_padded_files if created_padded_files: @@ -1133,7 +1143,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: async with fresh_db_connection(): from reflector.db.transcripts import ( # noqa: PLC0415 - TranscriptDuration, TranscriptText, transcripts_controller, ) @@ -1142,8 +1151,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: if transcript is None: raise ValueError(f"Transcript {input.transcript_id} not found in database") - merged_transcript = TranscriptType(words=all_words, translation=None) - await append_event_and_broadcast( input.transcript_id, transcript, @@ -1155,21 +1162,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: logger=logger, ) - # Save duration and clear workflow_run_id (workflow completed successfully) - # Note: title/long_summary/short_summary already saved by their callbacks + # Clear workflow_run_id (workflow completed successfully) + # Note: title/long_summary/short_summary/duration already saved by their callbacks await transcripts_controller.update( transcript, { - "duration": duration, "workflow_run_id": None, # Clear on success - no need to resume }, ) - duration_data = TranscriptDuration(duration=duration) - await append_event_and_broadcast( - input.transcript_id, transcript, "DURATION", duration_data, logger=logger - ) - await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) ctx.log(