diff --git a/server/reflector/hatchet/workflows/topic_chunk_processing.py b/server/reflector/hatchet/workflows/topic_chunk_processing.py index d8592872..6a062b1a 100644 --- a/server/reflector/hatchet/workflows/topic_chunk_processing.py +++ b/server/reflector/hatchet/workflows/topic_chunk_processing.py @@ -35,7 +35,7 @@ topic_chunk_workflow = hatchet.workflow( name="TopicChunkProcessing", input_validator=TopicChunkInput, concurrency=ConcurrencyExpression( - expression="true", # constant CEL expression = global limit + expression="'global'", # constant string = global limit across all runs max_runs=20, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, ), diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index 8a401b19..ef8e91f4 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -83,6 +83,15 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) with av.open(source_url) as in_container: + if in_container.duration: + try: + duration = timedelta(seconds=in_container.duration // 1_000_000) + ctx.log( + f"pad_track: track {input.track_index}, duration={duration}" + ) + except Exception: + ctx.log(f"pad_track: track {input.track_index}, duration=ERROR") + start_time_seconds = extract_stream_start_time_from_container( in_container, input.track_index, logger=logger )