Merge branch 'main' into fix/pipeline-fixes-whereby-preparation

This commit is contained in:
Igor Loskutov
2025-12-23 17:40:42 -05:00

View File

@@ -7,12 +7,12 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
from datetime import timedelta from datetime import timedelta
from hatchet_sdk import Context from hatchet_sdk import ConcurrencyExpression, Context
from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_SHORT from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT from reflector.processors.prompts import TOPIC_PROMPT
@@ -32,12 +32,17 @@ class TopicChunkInput(BaseModel):
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow( topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput name="TopicChunkProcessing",
input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression(
expression="true", # constant CEL expression = global limit
max_runs=20,
),
) )
@topic_chunk_workflow.task( @topic_chunk_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3, retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)], rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
) )