diff --git a/server/reflector/hatchet/workflows/topic_chunk_processing.py b/server/reflector/hatchet/workflows/topic_chunk_processing.py index c199e7a1..d8592872 100644 --- a/server/reflector/hatchet/workflows/topic_chunk_processing.py +++ b/server/reflector/hatchet/workflows/topic_chunk_processing.py @@ -7,7 +7,7 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing. from datetime import timedelta -from hatchet_sdk import ConcurrencyExpression, Context +from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context from hatchet_sdk.rate_limit import RateLimit from pydantic import BaseModel @@ -37,6 +37,7 @@ topic_chunk_workflow = hatchet.workflow( concurrency=ConcurrencyExpression( expression="true", # constant CEL expression = global limit max_runs=20, + limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, ), ) diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 1c36d0e9..0e0ebc7c 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -188,8 +188,8 @@ async def dispatch_transcript_processing( room_forces_hatchet = room.use_hatchet if room else False # Start durable workflow if enabled (Hatchet) - # or if room has use_hatchet=True - use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet + # and if room has use_hatchet=True + use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet if room_forces_hatchet: logger.info( diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 95a75464..466cdef0 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -287,9 +287,7 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - # Start durable workflow if enabled (Hatchet) or room overrides it - durable_started = False - use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet) + use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet if room and room.use_hatchet and not settings.HATCHET_ENABLED: logger.info( @@ -836,7 +834,7 @@ async def reprocess_failed_daily_recordings(): ) continue - use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet) + use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet if use_hatchet: # Hatchet requires a transcript for workflow_run_id tracking