mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 09:56:47 +00:00
Compare commits
4 Commits
transcript
...
fix/websoc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dee1555807 | ||
| 8a293882ad | |||
| d83c4a30b4 | |||
| 3b6540eae5 |
@@ -1,5 +1,12 @@
|
||||
# Changelog
|
||||
|
||||
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* worker affinity ([#819](https://github.com/Monadical-SAS/reflector/issues/819)) ([3b6540e](https://github.com/Monadical-SAS/reflector/commit/3b6540eae5b597449f98661bdf15483b77be3268))
|
||||
|
||||
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)
|
||||
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ services:
|
||||
environment:
|
||||
ENTRYPOINT: beat
|
||||
|
||||
hatchet-worker:
|
||||
hatchet-worker-cpu:
|
||||
build:
|
||||
context: server
|
||||
volumes:
|
||||
@@ -43,7 +43,20 @@ services:
|
||||
env_file:
|
||||
- ./server/.env
|
||||
environment:
|
||||
ENTRYPOINT: hatchet-worker
|
||||
ENTRYPOINT: hatchet-worker-cpu
|
||||
depends_on:
|
||||
hatchet:
|
||||
condition: service_healthy
|
||||
hatchet-worker-llm:
|
||||
build:
|
||||
context: server
|
||||
volumes:
|
||||
- ./server/:/app/
|
||||
- /app/.venv
|
||||
env_file:
|
||||
- ./server/.env
|
||||
environment:
|
||||
ENTRYPOINT: hatchet-worker-llm
|
||||
depends_on:
|
||||
hatchet:
|
||||
condition: service_healthy
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
"""
|
||||
Run Hatchet workers for the multitrack pipeline.
|
||||
Runs as a separate process, just like Celery workers.
|
||||
|
||||
Usage:
|
||||
uv run -m reflector.hatchet.run_workers
|
||||
|
||||
# Or via docker:
|
||||
docker compose exec server uv run -m reflector.hatchet.run_workers
|
||||
"""
|
||||
|
||||
import signal
|
||||
import sys
|
||||
|
||||
from hatchet_sdk.rate_limit import RateLimitDuration
|
||||
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Start Hatchet worker polling."""
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting workers")
|
||||
sys.exit(1)
|
||||
|
||||
if not settings.HATCHET_CLIENT_TOKEN:
|
||||
logger.error("HATCHET_CLIENT_TOKEN is not set")
|
||||
sys.exit(1)
|
||||
|
||||
logger.info(
|
||||
"Starting Hatchet workers",
|
||||
debug=settings.HATCHET_DEBUG,
|
||||
)
|
||||
|
||||
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
|
||||
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
|
||||
# Can't use lazy init: decorators need the client object when function is defined.
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
)
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
hatchet.rate_limits.put(
|
||||
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
|
||||
)
|
||||
|
||||
worker = hatchet.worker(
|
||||
"reflector-pipeline-worker",
|
||||
workflows=[
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
],
|
||||
)
|
||||
|
||||
def shutdown_handler(signum: int, frame) -> None:
|
||||
logger.info("Received shutdown signal, stopping workers...")
|
||||
# Worker cleanup happens automatically on exit
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, shutdown_handler)
|
||||
signal.signal(signal.SIGTERM, shutdown_handler)
|
||||
|
||||
logger.info("Starting Hatchet worker polling...")
|
||||
worker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
48
server/reflector/hatchet/run_workers_cpu.py
Normal file
48
server/reflector/hatchet/run_workers_cpu.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""
|
||||
CPU-heavy worker pool for audio processing tasks.
|
||||
Handles ONLY: mixdown_tracks
|
||||
|
||||
Configuration:
|
||||
- slots=1: Only mixdown (already serialized globally with max_runs=1)
|
||||
- Worker affinity: pool=cpu-heavy
|
||||
"""
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
"Starting Hatchet CPU worker pool (mixdown only)",
|
||||
worker_name="cpu-worker-pool",
|
||||
slots=1,
|
||||
labels={"pool": "cpu-heavy"},
|
||||
)
|
||||
|
||||
cpu_worker = hatchet.worker(
|
||||
"cpu-worker-pool",
|
||||
slots=1, # Only 1 mixdown at a time (already serialized globally)
|
||||
labels={
|
||||
"pool": "cpu-heavy",
|
||||
},
|
||||
workflows=[daily_multitrack_pipeline],
|
||||
)
|
||||
|
||||
try:
|
||||
cpu_worker.start()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received shutdown signal, stopping CPU workers...")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
56
server/reflector/hatchet/run_workers_llm.py
Normal file
56
server/reflector/hatchet/run_workers_llm.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
LLM/I/O worker pool for all non-CPU tasks.
|
||||
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
|
||||
"""
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
SLOTS = 10
|
||||
WORKER_NAME = "llm-worker-pool"
|
||||
POOL = "llm-io"
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
"Starting Hatchet LLM worker pool (all tasks except mixdown)",
|
||||
worker_name=WORKER_NAME,
|
||||
slots=SLOTS,
|
||||
labels={"pool": POOL},
|
||||
)
|
||||
|
||||
llm_worker = hatchet.worker(
|
||||
WORKER_NAME,
|
||||
slots=SLOTS, # not all slots are probably used
|
||||
labels={
|
||||
"pool": POOL,
|
||||
},
|
||||
workflows=[
|
||||
daily_multitrack_pipeline,
|
||||
topic_chunk_workflow,
|
||||
subject_workflow,
|
||||
track_workflow,
|
||||
],
|
||||
)
|
||||
|
||||
try:
|
||||
llm_worker.start()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received shutdown signal, stopping LLM workers...")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -23,7 +23,12 @@ from pathlib import Path
|
||||
from typing import Any, Callable, Coroutine, Protocol, TypeVar
|
||||
|
||||
import httpx
|
||||
from hatchet_sdk import Context
|
||||
from hatchet_sdk import (
|
||||
ConcurrencyExpression,
|
||||
ConcurrencyLimitStrategy,
|
||||
Context,
|
||||
)
|
||||
from hatchet_sdk.labels import DesiredWorkerLabel
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.dailyco_api.client import DailyApiClient
|
||||
@@ -467,6 +472,20 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
||||
parents=[process_tracks],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
||||
retries=3,
|
||||
desired_worker_labels={
|
||||
"pool": DesiredWorkerLabel(
|
||||
value="cpu-heavy",
|
||||
required=True,
|
||||
weight=100,
|
||||
),
|
||||
},
|
||||
concurrency=[
|
||||
ConcurrencyExpression(
|
||||
expression="'mixdown-global'",
|
||||
max_runs=1, # serialize mixdown to prevent resource contention
|
||||
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, # Queue
|
||||
)
|
||||
],
|
||||
)
|
||||
@with_error_handling(TaskName.MIXDOWN_TRACKS)
|
||||
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||
|
||||
@@ -7,7 +7,11 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context
|
||||
from hatchet_sdk import (
|
||||
ConcurrencyExpression,
|
||||
ConcurrencyLimitStrategy,
|
||||
Context,
|
||||
)
|
||||
from hatchet_sdk.rate_limit import RateLimit
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -34,11 +38,13 @@ hatchet = HatchetClientManager.get_client()
|
||||
topic_chunk_workflow = hatchet.workflow(
|
||||
name="TopicChunkProcessing",
|
||||
input_validator=TopicChunkInput,
|
||||
concurrency=ConcurrencyExpression(
|
||||
expression="'global'", # constant string = global limit across all runs
|
||||
max_runs=20,
|
||||
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
|
||||
),
|
||||
concurrency=[
|
||||
ConcurrencyExpression(
|
||||
expression="'global'", # constant string = global limit across all runs
|
||||
max_runs=20,
|
||||
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
|
||||
import redis.asyncio as redis
|
||||
from fastapi import WebSocket
|
||||
@@ -98,6 +97,7 @@ class WebsocketManager:
|
||||
|
||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||
while True:
|
||||
# No timeout - global singleton prevents CPU hog from multiple instances
|
||||
message = await pubsub_subscriber.get_message(
|
||||
ignore_subscribe_messages=True
|
||||
)
|
||||
@@ -109,29 +109,40 @@ class WebsocketManager:
|
||||
await socket.send_json(data)
|
||||
|
||||
|
||||
# Process-global singleton (not thread-local)
|
||||
# The original threading.local() pattern was broken - it created a NEW
|
||||
# threading.local() object on every call, so caching never worked.
|
||||
# This caused infinite ws_manager instances → resource leaks → CPU hog.
|
||||
_ws_manager: WebsocketManager | None = None
|
||||
|
||||
|
||||
def get_ws_manager() -> WebsocketManager:
|
||||
"""
|
||||
Returns the WebsocketManager instance for managing websockets.
|
||||
Returns the global WebsocketManager singleton.
|
||||
|
||||
This function initializes and returns the WebsocketManager instance,
|
||||
which is responsible for managing websockets and handling websocket
|
||||
connections.
|
||||
Creates instance on first call, subsequent calls return cached instance.
|
||||
Thread-safe via GIL. Concurrent initialization may create duplicate
|
||||
instances but last write wins (acceptable for this use case).
|
||||
|
||||
Returns:
|
||||
WebsocketManager: The initialized WebsocketManager instance.
|
||||
|
||||
Raises:
|
||||
ImportError: If the 'reflector.settings' module cannot be imported.
|
||||
RedisConnectionError: If there is an error connecting to the Redis server.
|
||||
WebsocketManager: The global WebsocketManager instance.
|
||||
"""
|
||||
local = threading.local()
|
||||
if hasattr(local, "ws_manager"):
|
||||
return local.ws_manager
|
||||
global _ws_manager
|
||||
|
||||
if _ws_manager is not None:
|
||||
return _ws_manager
|
||||
|
||||
# No lock needed - GIL makes this safe enough
|
||||
# Worst case: race creates two instances, last assignment wins
|
||||
pubsub_client = RedisPubSubManager(
|
||||
host=settings.REDIS_HOST,
|
||||
port=settings.REDIS_PORT,
|
||||
)
|
||||
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
local.ws_manager = ws_manager
|
||||
return ws_manager
|
||||
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
return _ws_manager
|
||||
|
||||
|
||||
def reset_ws_manager() -> None:
|
||||
"""Reset singleton for testing. DO NOT use in production."""
|
||||
global _ws_manager
|
||||
_ws_manager = None
|
||||
|
||||
@@ -7,8 +7,10 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
|
||||
uv run celery -A reflector.worker.app worker --loglevel=info
|
||||
elif [ "${ENTRYPOINT}" = "beat" ]; then
|
||||
uv run celery -A reflector.worker.app beat --loglevel=info
|
||||
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then
|
||||
uv run python -m reflector.hatchet.run_workers
|
||||
elif [ "${ENTRYPOINT}" = "hatchet-worker-cpu" ]; then
|
||||
uv run python -m reflector.hatchet.run_workers_cpu
|
||||
elif [ "${ENTRYPOINT}" = "hatchet-worker-llm" ]; then
|
||||
uv run python -m reflector.hatchet.run_workers_llm
|
||||
else
|
||||
echo "Unknown command"
|
||||
fi
|
||||
|
||||
@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
|
||||
|
||||
if server_instance:
|
||||
server_instance.should_exit = True
|
||||
server_thread.join(timeout=30)
|
||||
server_thread.join(timeout=2.0)
|
||||
|
||||
# Reset global singleton for test isolation
|
||||
from reflector.ws_manager import reset_ws_manager
|
||||
|
||||
reset_ws_manager()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -133,6 +138,11 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
|
||||
# Connect and then trigger an event via HTTP create
|
||||
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
||||
# Give Redis pubsub time to establish subscription before publishing
|
||||
import asyncio
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Emit an event to the user's room via a standard HTTP action
|
||||
from httpx import AsyncClient
|
||||
|
||||
@@ -150,6 +160,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
"email": "user-abc@example.com",
|
||||
}
|
||||
|
||||
# Use in-memory client (global singleton makes it share ws_manager)
|
||||
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
||||
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
||||
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
||||
|
||||
Reference in New Issue
Block a user