diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py index 83201ce1..c67c176b 100644 --- a/server/reflector/ws_manager.py +++ b/server/reflector/ws_manager.py @@ -11,7 +11,6 @@ broadcast messages to all connected websockets. import asyncio import json -import threading import redis.asyncio as redis from fastapi import WebSocket @@ -98,9 +97,9 @@ class WebsocketManager: async def _pubsub_data_reader(self, pubsub_subscriber): while True: + # No timeout - global singleton prevents CPU hog from multiple instances message = await pubsub_subscriber.get_message( - ignore_subscribe_messages=True, - timeout=1.0, + ignore_subscribe_messages=True ) if message is not None: room_id = message["channel"].decode("utf-8") @@ -110,29 +109,40 @@ class WebsocketManager: await socket.send_json(data) +# Process-global singleton (not thread-local) +# The original threading.local() pattern was broken - it created a NEW +# threading.local() object on every call, so caching never worked. +# This caused infinite ws_manager instances → resource leaks → CPU hog. +_ws_manager: WebsocketManager | None = None + + def get_ws_manager() -> WebsocketManager: """ - Returns the WebsocketManager instance for managing websockets. + Returns the global WebsocketManager singleton. - This function initializes and returns the WebsocketManager instance, - which is responsible for managing websockets and handling websocket - connections. + Creates instance on first call, subsequent calls return cached instance. + Thread-safe via GIL. Concurrent initialization may create duplicate + instances but last write wins (acceptable for this use case). Returns: - WebsocketManager: The initialized WebsocketManager instance. - - Raises: - ImportError: If the 'reflector.settings' module cannot be imported. - RedisConnectionError: If there is an error connecting to the Redis server. + WebsocketManager: The global WebsocketManager instance. """ - local = threading.local() - if hasattr(local, "ws_manager"): - return local.ws_manager + global _ws_manager + if _ws_manager is not None: + return _ws_manager + + # No lock needed - GIL makes this safe enough + # Worst case: race creates two instances, last assignment wins pubsub_client = RedisPubSubManager( host=settings.REDIS_HOST, port=settings.REDIS_PORT, ) - ws_manager = WebsocketManager(pubsub_client=pubsub_client) - local.ws_manager = ws_manager - return ws_manager + _ws_manager = WebsocketManager(pubsub_client=pubsub_client) + return _ws_manager + + +def reset_ws_manager() -> None: + """Reset singleton for testing. DO NOT use in production.""" + global _ws_manager + _ws_manager = None diff --git a/server/tests/test_user_websocket_auth.py b/server/tests/test_user_websocket_auth.py index 5a40440f..5ea52caa 100644 --- a/server/tests/test_user_websocket_auth.py +++ b/server/tests/test_user_websocket_auth.py @@ -56,7 +56,12 @@ def appserver_ws_user(setup_database): if server_instance: server_instance.should_exit = True - server_thread.join(timeout=30) + server_thread.join(timeout=2.0) + + # Reset global singleton for test isolation + from reflector.ws_manager import reset_ws_manager + + reset_ws_manager() @pytest.fixture(autouse=True) @@ -133,6 +138,11 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user # Connect and then trigger an event via HTTP create async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: + # Give Redis pubsub time to establish subscription before publishing + import asyncio + + await asyncio.sleep(0.2) + # Emit an event to the user's room via a standard HTTP action from httpx import AsyncClient @@ -150,6 +160,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user "email": "user-abc@example.com", } + # Use in-memory client (global singleton makes it share ws_manager) async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room resp = await ac.post("/transcripts", json={"name": "WS Test"})