From 1b33fba3ba6996e9dedb28abd96cc31dc3656d37 Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Fri, 23 Jan 2026 16:05:43 -0500 Subject: [PATCH] Fix: Move padding_workflow to LLM worker for parallel execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical bug fix: padding_workflow was registered on CPU worker (slots=1), causing all padding tasks to run serially instead of in parallel. Changes: - Moved padding_workflow from run_workers_cpu.py to run_workers_llm.py - LLM worker has slots=10, allowing up to 10 parallel padding operations - Padding is I/O-bound (S3 download/upload), not CPU-intensive - CPU worker now handles only mixdown_tracks (compute-heavy, serialized) Impact: - Before: 4 tracks × 5s padding = 20s serial execution - After: 4 tracks × 5s padding = ~5s parallel execution (4 concurrent) - Restores intended performance benefit of the refactoring --- server/reflector/hatchet/run_workers_cpu.py | 9 ++++----- server/reflector/hatchet/run_workers_llm.py | 6 ++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/reflector/hatchet/run_workers_cpu.py b/server/reflector/hatchet/run_workers_cpu.py index 39de3a1b..380f8403 100644 --- a/server/reflector/hatchet/run_workers_cpu.py +++ b/server/reflector/hatchet/run_workers_cpu.py @@ -1,9 +1,9 @@ """ CPU-heavy worker pool for audio processing tasks. -Handles: mixdown_tracks (serialized), padding workflows (parallel child workflows) +Handles: mixdown_tracks only (serialized with max_runs=1) Configuration: -- slots=1: Mixdown serialized globally with max_runs=1 +- slots=1: Only one mixdown at a time - Worker affinity: pool=cpu-heavy """ @@ -11,7 +11,6 @@ from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, ) -from reflector.hatchet.workflows.padding_workflow import padding_workflow from reflector.logger import logger from reflector.settings import settings @@ -24,7 +23,7 @@ def main(): hatchet = HatchetClientManager.get_client() logger.info( - "Starting Hatchet CPU worker pool (mixdown + padding)", + "Starting Hatchet CPU worker pool (mixdown only)", worker_name="cpu-worker-pool", slots=1, labels={"pool": "cpu-heavy"}, @@ -36,7 +35,7 @@ def main(): labels={ "pool": "cpu-heavy", }, - workflows=[daily_multitrack_pipeline, padding_workflow], + workflows=[daily_multitrack_pipeline], ) try: diff --git a/server/reflector/hatchet/run_workers_llm.py b/server/reflector/hatchet/run_workers_llm.py index a3b44bb8..ecec6906 100644 --- a/server/reflector/hatchet/run_workers_llm.py +++ b/server/reflector/hatchet/run_workers_llm.py @@ -1,12 +1,13 @@ """ LLM/I/O worker pool for all non-CPU tasks. -Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration) +Handles: all tasks except mixdown_tracks (padding, transcription, LLM inference, orchestration) """ from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, ) +from reflector.hatchet.workflows.padding_workflow import padding_workflow from reflector.hatchet.workflows.subject_processing import subject_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.transcription_workflow import transcription_workflow @@ -34,7 +35,7 @@ def main(): llm_worker = hatchet.worker( WORKER_NAME, - slots=SLOTS, # not all slots are probably used + slots=SLOTS, labels={ "pool": POOL, }, @@ -42,6 +43,7 @@ def main(): daily_multitrack_pipeline, topic_chunk_workflow, subject_workflow, + padding_workflow, transcription_workflow, ], )