From e886153ae1aca6a9a2c59e05263d63c55e6ec4df Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Tue, 23 Dec 2025 18:45:06 -0500 Subject: [PATCH] fix hatchet parallel syntax (#810) Co-authored-by: Igor Loskutov --- .../hatchet/workflows/topic_chunk_processing.py | 2 +- server/reflector/hatchet/workflows/track_processing.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/reflector/hatchet/workflows/topic_chunk_processing.py b/server/reflector/hatchet/workflows/topic_chunk_processing.py index d8592872..6a062b1a 100644 --- a/server/reflector/hatchet/workflows/topic_chunk_processing.py +++ b/server/reflector/hatchet/workflows/topic_chunk_processing.py @@ -35,7 +35,7 @@ topic_chunk_workflow = hatchet.workflow( name="TopicChunkProcessing", input_validator=TopicChunkInput, concurrency=ConcurrencyExpression( - expression="true", # constant CEL expression = global limit + expression="'global'", # constant string = global limit across all runs max_runs=20, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, ), diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index 8a401b19..ef8e91f4 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -83,6 +83,15 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) with av.open(source_url) as in_container: + if in_container.duration: + try: + duration = timedelta(seconds=in_container.duration // 1_000_000) + ctx.log( + f"pad_track: track {input.track_index}, duration={duration}" + ) + except Exception: + ctx.log(f"pad_track: track {input.track_index}, duration=ERROR") + start_time_seconds = extract_stream_start_time_from_container( in_container, input.track_index, logger=logger )