Compare commits

...

4 Commits

Author SHA1 Message Date
Igor Loskutov
dee1555807 fix websocket tests 2026-01-21 14:20:39 -05:00
8a293882ad timeout to untighten ws python loop (#821)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-20 16:29:09 -05:00
d83c4a30b4 chore(main): release 0.28.0 (#820) 2026-01-20 12:35:26 -05:00
3b6540eae5 feat: worker affinity (#819)
* worker affinity

* worker affinity

* worker affinity

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-20 12:27:16 -05:00
10 changed files with 201 additions and 105 deletions

View File

@@ -1,5 +1,12 @@
# Changelog # Changelog
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
### Features
* worker affinity ([#819](https://github.com/Monadical-SAS/reflector/issues/819)) ([3b6540e](https://github.com/Monadical-SAS/reflector/commit/3b6540eae5b597449f98661bdf15483b77be3268))
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26) ## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)

View File

@@ -34,7 +34,7 @@ services:
environment: environment:
ENTRYPOINT: beat ENTRYPOINT: beat
hatchet-worker: hatchet-worker-cpu:
build: build:
context: server context: server
volumes: volumes:
@@ -43,7 +43,20 @@ services:
env_file: env_file:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: hatchet-worker ENTRYPOINT: hatchet-worker-cpu
depends_on:
hatchet:
condition: service_healthy
hatchet-worker-llm:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: hatchet-worker-llm
depends_on: depends_on:
hatchet: hatchet:
condition: service_healthy condition: service_healthy

View File

@@ -1,77 +0,0 @@
"""
Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers.
Usage:
uv run -m reflector.hatchet.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.hatchet.run_workers
"""
import signal
import sys
from hatchet_sdk.rate_limit import RateLimitDuration
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Hatchet worker polling."""
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting workers")
sys.exit(1)
if not settings.HATCHET_CLIENT_TOKEN:
logger.error("HATCHET_CLIENT_TOKEN is not set")
sys.exit(1)
logger.info(
"Starting Hatchet workers",
debug=settings.HATCHET_DEBUG,
)
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
)
hatchet = HatchetClientManager.get_client()
hatchet.rate_limits.put(
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
)
worker = hatchet.worker(
"reflector-pipeline-worker",
workflows=[
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
)
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
# Worker cleanup happens automatically on exit
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting Hatchet worker polling...")
worker.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,48 @@
"""
CPU-heavy worker pool for audio processing tasks.
Handles ONLY: mixdown_tracks
Configuration:
- slots=1: Only mixdown (already serialized globally with max_runs=1)
- Worker affinity: pool=cpu-heavy
"""
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.logger import logger
from reflector.settings import settings
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(
"Starting Hatchet CPU worker pool (mixdown only)",
worker_name="cpu-worker-pool",
slots=1,
labels={"pool": "cpu-heavy"},
)
cpu_worker = hatchet.worker(
"cpu-worker-pool",
slots=1, # Only 1 mixdown at a time (already serialized globally)
labels={
"pool": "cpu-heavy",
},
workflows=[daily_multitrack_pipeline],
)
try:
cpu_worker.start()
except KeyboardInterrupt:
logger.info("Received shutdown signal, stopping CPU workers...")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
"""
LLM/I/O worker pool for all non-CPU tasks.
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
"""
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger
from reflector.settings import settings
SLOTS = 10
WORKER_NAME = "llm-worker-pool"
POOL = "llm-io"
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(
"Starting Hatchet LLM worker pool (all tasks except mixdown)",
worker_name=WORKER_NAME,
slots=SLOTS,
labels={"pool": POOL},
)
llm_worker = hatchet.worker(
WORKER_NAME,
slots=SLOTS, # not all slots are probably used
labels={
"pool": POOL,
},
workflows=[
daily_multitrack_pipeline,
topic_chunk_workflow,
subject_workflow,
track_workflow,
],
)
try:
llm_worker.start()
except KeyboardInterrupt:
logger.info("Received shutdown signal, stopping LLM workers...")
if __name__ == "__main__":
main()

View File

@@ -23,7 +23,12 @@ from pathlib import Path
from typing import Any, Callable, Coroutine, Protocol, TypeVar from typing import Any, Callable, Coroutine, Protocol, TypeVar
import httpx import httpx
from hatchet_sdk import Context from hatchet_sdk import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
Context,
)
from hatchet_sdk.labels import DesiredWorkerLabel
from pydantic import BaseModel from pydantic import BaseModel
from reflector.dailyco_api.client import DailyApiClient from reflector.dailyco_api.client import DailyApiClient
@@ -467,6 +472,20 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
parents=[process_tracks], parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3, retries=3,
desired_worker_labels={
"pool": DesiredWorkerLabel(
value="cpu-heavy",
required=True,
weight=100,
),
},
concurrency=[
ConcurrencyExpression(
expression="'mixdown-global'",
max_runs=1, # serialize mixdown to prevent resource contention
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, # Queue
)
],
) )
@with_error_handling(TaskName.MIXDOWN_TRACKS) @with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:

View File

@@ -7,7 +7,11 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
from datetime import timedelta from datetime import timedelta
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context from hatchet_sdk import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
Context,
)
from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel from pydantic import BaseModel
@@ -34,11 +38,13 @@ hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow( topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", name="TopicChunkProcessing",
input_validator=TopicChunkInput, input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression( concurrency=[
expression="'global'", # constant string = global limit across all runs ConcurrencyExpression(
max_runs=20, expression="'global'", # constant string = global limit across all runs
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, max_runs=20,
), limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
)
],
) )

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio import asyncio
import json import json
import threading
import redis.asyncio as redis import redis.asyncio as redis
from fastapi import WebSocket from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber): async def _pubsub_data_reader(self, pubsub_subscriber):
while True: while True:
# No timeout - global singleton prevents CPU hog from multiple instances
message = await pubsub_subscriber.get_message( message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True ignore_subscribe_messages=True
) )
@@ -109,29 +109,40 @@ class WebsocketManager:
await socket.send_json(data) await socket.send_json(data)
# Process-global singleton (not thread-local)
# The original threading.local() pattern was broken - it created a NEW
# threading.local() object on every call, so caching never worked.
# This caused infinite ws_manager instances → resource leaks → CPU hog.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager: def get_ws_manager() -> WebsocketManager:
""" """
Returns the WebsocketManager instance for managing websockets. Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance, Creates instance on first call, subsequent calls return cached instance.
which is responsible for managing websockets and handling websocket Thread-safe via GIL. Concurrent initialization may create duplicate
connections. instances but last write wins (acceptable for this use case).
Returns: Returns:
WebsocketManager: The initialized WebsocketManager instance. WebsocketManager: The global WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
""" """
local = threading.local() global _ws_manager
if hasattr(local, "ws_manager"):
return local.ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager( pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST, host=settings.REDIS_HOST,
port=settings.REDIS_PORT, port=settings.REDIS_PORT,
) )
ws_manager = WebsocketManager(pubsub_client=pubsub_client) _ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager return _ws_manager
return ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -7,8 +7,10 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
uv run celery -A reflector.worker.app worker --loglevel=info uv run celery -A reflector.worker.app worker --loglevel=info
elif [ "${ENTRYPOINT}" = "beat" ]; then elif [ "${ENTRYPOINT}" = "beat" ]; then
uv run celery -A reflector.worker.app beat --loglevel=info uv run celery -A reflector.worker.app beat --loglevel=info
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then elif [ "${ENTRYPOINT}" = "hatchet-worker-cpu" ]; then
uv run python -m reflector.hatchet.run_workers uv run python -m reflector.hatchet.run_workers_cpu
elif [ "${ENTRYPOINT}" = "hatchet-worker-llm" ]; then
uv run python -m reflector.hatchet.run_workers_llm
else else
echo "Unknown command" echo "Unknown command"
fi fi

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance: if server_instance:
server_instance.should_exit = True server_instance.should_exit = True
server_thread.join(timeout=30) server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -133,6 +138,11 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create # Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
# Give Redis pubsub time to establish subscription before publishing
import asyncio
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action # Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient from httpx import AsyncClient
@@ -150,6 +160,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com", "email": "user-abc@example.com",
} }
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"}) resp = await ac.post("/transcripts", json={"name": "WS Test"})