From 7dfb37154d8e936e28145ba6b309d8e7ca06eecf Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Fri, 23 Jan 2026 16:47:11 -0500 Subject: [PATCH] Fix critical data flow and concurrency bugs - Add empty padded_tracks guard in process_transcriptions - Fix created_padded_files: use list instead of set to preserve order for zip cleanup - Document size=0 contract in PadTrackResult (size=0 means original key, not padded) - Remove redundant ctx.log in padding_workflow --- .../hatchet/workflows/daily_multitrack_pipeline.py | 7 +++++-- server/reflector/hatchet/workflows/models.py | 6 +++++- server/reflector/hatchet/workflows/padding_workflow.py | 1 - 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index b829d073..dd64a799 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -429,7 +429,7 @@ async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPadding results = await padding_workflow.aio_run_many(bulk_runs) padded_tracks = [] - created_padded_files = set() + created_padded_files = [] for result in results: pad_result = PadTrackResult(**result[TaskName.PAD_TRACK]) @@ -445,7 +445,7 @@ async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPadding if pad_result.size > 0: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm" - created_padded_files.add(storage_path) + created_padded_files.append(storage_path) ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks") @@ -476,6 +476,9 @@ async def process_transcriptions( target_language = participants_result.target_language padded_tracks = paddings_result.padded_tracks + if not padded_tracks: + raise ValueError("No padded tracks available for transcription") + ctx.log( f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows" ) diff --git a/server/reflector/hatchet/workflows/models.py b/server/reflector/hatchet/workflows/models.py index 4ba4a2a4..78f486e7 100644 --- a/server/reflector/hatchet/workflows/models.py +++ b/server/reflector/hatchet/workflows/models.py @@ -21,7 +21,11 @@ class ParticipantInfo(BaseModel): class PadTrackResult(BaseModel): - """Result from pad_track task.""" + """Result from pad_track task. + + If size=0, track required no padding and padded_key contains original S3 key. + If size>0, track was padded and padded_key contains new padded file S3 key. + """ padded_key: NonEmptyString bucket_name: NonEmptyString | None diff --git a/server/reflector/hatchet/workflows/padding_workflow.py b/server/reflector/hatchet/workflows/padding_workflow.py index 2740d499..fbfba18f 100644 --- a/server/reflector/hatchet/workflows/padding_workflow.py +++ b/server/reflector/hatchet/workflows/padding_workflow.py @@ -120,7 +120,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: finally: Path(temp_path).unlink(missing_ok=True) - ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}") logger.info( "[Hatchet] pad_track complete", track_index=input.track_index,