Fix critical data flow and concurrency bugs

- Add empty padded_tracks guard in process_transcriptions
- Fix created_padded_files: use list instead of set to preserve order for zip cleanup
- Document size=0 contract in PadTrackResult (size=0 means original key, not padded)
- Remove redundant ctx.log in padding_workflow
This commit is contained in:
Igor Loskutov
2026-01-23 16:47:11 -05:00
parent 67679e90b2
commit 7dfb37154d
3 changed files with 10 additions and 4 deletions

View File

@@ -429,7 +429,7 @@ async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPadding
results = await padding_workflow.aio_run_many(bulk_runs) results = await padding_workflow.aio_run_many(bulk_runs)
padded_tracks = [] padded_tracks = []
created_padded_files = set() created_padded_files = []
for result in results: for result in results:
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK]) pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
@@ -445,7 +445,7 @@ async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPadding
if pad_result.size > 0: if pad_result.size > 0:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm" storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
created_padded_files.add(storage_path) created_padded_files.append(storage_path)
ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks") ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks")
@@ -476,6 +476,9 @@ async def process_transcriptions(
target_language = participants_result.target_language target_language = participants_result.target_language
padded_tracks = paddings_result.padded_tracks padded_tracks = paddings_result.padded_tracks
if not padded_tracks:
raise ValueError("No padded tracks available for transcription")
ctx.log( ctx.log(
f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows" f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows"
) )

View File

@@ -21,7 +21,11 @@ class ParticipantInfo(BaseModel):
class PadTrackResult(BaseModel): class PadTrackResult(BaseModel):
"""Result from pad_track task.""" """Result from pad_track task.
If size=0, track required no padding and padded_key contains original S3 key.
If size>0, track was padded and padded_key contains new padded file S3 key.
"""
padded_key: NonEmptyString padded_key: NonEmptyString
bucket_name: NonEmptyString | None bucket_name: NonEmptyString | None

View File

@@ -120,7 +120,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
finally: finally:
Path(temp_path).unlink(missing_ok=True) Path(temp_path).unlink(missing_ok=True)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info( logger.info(
"[Hatchet] pad_track complete", "[Hatchet] pad_track complete",
track_index=input.track_index, track_index=input.track_index,