Fix: Move padding_workflow to LLM worker for parallel execution

Critical bug fix: padding_workflow was registered on CPU worker (slots=1),
causing all padding tasks to run serially instead of in parallel.

Changes:
- Moved padding_workflow from run_workers_cpu.py to run_workers_llm.py
- LLM worker has slots=10, allowing up to 10 parallel padding operations
- Padding is I/O-bound (S3 download/upload), not CPU-intensive
- CPU worker now handles only mixdown_tracks (compute-heavy, serialized)

Impact:
- Before: 4 tracks × 5s padding = 20s serial execution
- After: 4 tracks × 5s padding = ~5s parallel execution (4 concurrent)
- Restores intended performance benefit of the refactoring
This commit is contained in:
Igor Loskutov
2026-01-23 16:05:43 -05:00
parent 3ce279daa4
commit 1b33fba3ba
2 changed files with 8 additions and 7 deletions

View File

@@ -1,9 +1,9 @@
""" """
CPU-heavy worker pool for audio processing tasks. CPU-heavy worker pool for audio processing tasks.
Handles: mixdown_tracks (serialized), padding workflows (parallel child workflows) Handles: mixdown_tracks only (serialized with max_runs=1)
Configuration: Configuration:
- slots=1: Mixdown serialized globally with max_runs=1 - slots=1: Only one mixdown at a time
- Worker affinity: pool=cpu-heavy - Worker affinity: pool=cpu-heavy
""" """
@@ -11,7 +11,6 @@ from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline, daily_multitrack_pipeline,
) )
from reflector.hatchet.workflows.padding_workflow import padding_workflow
from reflector.logger import logger from reflector.logger import logger
from reflector.settings import settings from reflector.settings import settings
@@ -24,7 +23,7 @@ def main():
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
logger.info( logger.info(
"Starting Hatchet CPU worker pool (mixdown + padding)", "Starting Hatchet CPU worker pool (mixdown only)",
worker_name="cpu-worker-pool", worker_name="cpu-worker-pool",
slots=1, slots=1,
labels={"pool": "cpu-heavy"}, labels={"pool": "cpu-heavy"},
@@ -36,7 +35,7 @@ def main():
labels={ labels={
"pool": "cpu-heavy", "pool": "cpu-heavy",
}, },
workflows=[daily_multitrack_pipeline, padding_workflow], workflows=[daily_multitrack_pipeline],
) )
try: try:

View File

@@ -1,12 +1,13 @@
""" """
LLM/I/O worker pool for all non-CPU tasks. LLM/I/O worker pool for all non-CPU tasks.
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration) Handles: all tasks except mixdown_tracks (padding, transcription, LLM inference, orchestration)
""" """
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline, daily_multitrack_pipeline,
) )
from reflector.hatchet.workflows.padding_workflow import padding_workflow
from reflector.hatchet.workflows.subject_processing import subject_workflow from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.transcription_workflow import transcription_workflow from reflector.hatchet.workflows.transcription_workflow import transcription_workflow
@@ -34,7 +35,7 @@ def main():
llm_worker = hatchet.worker( llm_worker = hatchet.worker(
WORKER_NAME, WORKER_NAME,
slots=SLOTS, # not all slots are probably used slots=SLOTS,
labels={ labels={
"pool": POOL, "pool": POOL,
}, },
@@ -42,6 +43,7 @@ def main():
daily_multitrack_pipeline, daily_multitrack_pipeline,
topic_chunk_workflow, topic_chunk_workflow,
subject_workflow, subject_workflow,
padding_workflow,
transcription_workflow, transcription_workflow,
], ],
) )