diff --git a/server/reflector/hatchet/workflows/topic_chunk_processing.py b/server/reflector/hatchet/workflows/topic_chunk_processing.py index 1f8e6c47..c199e7a1 100644 --- a/server/reflector/hatchet/workflows/topic_chunk_processing.py +++ b/server/reflector/hatchet/workflows/topic_chunk_processing.py @@ -7,12 +7,12 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing. from datetime import timedelta -from hatchet_sdk import Context +from hatchet_sdk import ConcurrencyExpression, Context from hatchet_sdk.rate_limit import RateLimit from pydantic import BaseModel from reflector.hatchet.client import HatchetClientManager -from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_SHORT +from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM from reflector.hatchet.workflows.models import TopicChunkResult from reflector.logger import logger from reflector.processors.prompts import TOPIC_PROMPT @@ -32,12 +32,17 @@ class TopicChunkInput(BaseModel): hatchet = HatchetClientManager.get_client() topic_chunk_workflow = hatchet.workflow( - name="TopicChunkProcessing", input_validator=TopicChunkInput + name="TopicChunkProcessing", + input_validator=TopicChunkInput, + concurrency=ConcurrencyExpression( + expression="true", # constant CEL expression = global limit + max_runs=20, + ), ) @topic_chunk_workflow.task( - execution_timeout=timedelta(seconds=TIMEOUT_SHORT), + execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), retries=3, rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)], )