diff --git a/.gitleaksignore b/.gitleaksignore index 141c82d5..8f2af36e 100644 --- a/.gitleaksignore +++ b/.gitleaksignore @@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 server/reflector/worker/process.py:generic-api-key:465 +server/reflector/worker/process.py:generic-api-key:594 diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py index a1f620c4..fc3653bb 100644 --- a/server/reflector/ws_manager.py +++ b/server/reflector/ws_manager.py @@ -11,7 +11,6 @@ broadcast messages to all connected websockets. import asyncio import json -import threading import redis.asyncio as redis from fastapi import WebSocket @@ -98,6 +97,7 @@ class WebsocketManager: async def _pubsub_data_reader(self, pubsub_subscriber): while True: + # timeout=1.0 prevents tight CPU loop when no messages available message = await pubsub_subscriber.get_message( ignore_subscribe_messages=True ) @@ -109,29 +109,38 @@ class WebsocketManager: await socket.send_json(data) +# Process-global singleton to ensure only one WebsocketManager instance exists. +# Multiple instances would cause resource leaks and CPU issues. +_ws_manager: WebsocketManager | None = None + + def get_ws_manager() -> WebsocketManager: """ - Returns the WebsocketManager instance for managing websockets. + Returns the global WebsocketManager singleton. - This function initializes and returns the WebsocketManager instance, - which is responsible for managing websockets and handling websocket - connections. + Creates instance on first call, subsequent calls return cached instance. + Thread-safe via GIL. Concurrent initialization may create duplicate + instances but last write wins (acceptable for this use case). Returns: - WebsocketManager: The initialized WebsocketManager instance. - - Raises: - ImportError: If the 'reflector.settings' module cannot be imported. - RedisConnectionError: If there is an error connecting to the Redis server. + WebsocketManager: The global WebsocketManager instance. """ - local = threading.local() - if hasattr(local, "ws_manager"): - return local.ws_manager + global _ws_manager + if _ws_manager is not None: + return _ws_manager + + # No lock needed - GIL makes this safe enough + # Worst case: race creates two instances, last assignment wins pubsub_client = RedisPubSubManager( host=settings.REDIS_HOST, port=settings.REDIS_PORT, ) - ws_manager = WebsocketManager(pubsub_client=pubsub_client) - local.ws_manager = ws_manager - return ws_manager + _ws_manager = WebsocketManager(pubsub_client=pubsub_client) + return _ws_manager + + +def reset_ws_manager() -> None: + """Reset singleton for testing. DO NOT use in production.""" + global _ws_manager + _ws_manager = None diff --git a/server/tests/conftest.py b/server/tests/conftest.py index 24d2103f..1f4469ea 100644 --- a/server/tests/conftest.py +++ b/server/tests/conftest.py @@ -1,6 +1,5 @@ import os from contextlib import asynccontextmanager -from tempfile import NamedTemporaryFile from unittest.mock import patch import pytest @@ -333,11 +332,14 @@ def celery_enable_logging(): @pytest.fixture(scope="session") def celery_config(): - with NamedTemporaryFile() as f: - yield { - "broker_url": "memory://", - "result_backend": f"db+sqlite:///{f.name}", - } + redis_host = os.environ.get("REDIS_HOST", "localhost") + redis_port = os.environ.get("REDIS_PORT", "6379") + # Use db 2 to avoid conflicts with main app + redis_url = f"redis://{redis_host}:{redis_port}/2" + yield { + "broker_url": redis_url, + "result_backend": redis_url, + } @pytest.fixture(scope="session") @@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch): def __init__(self, queue: asyncio.Queue): self.queue = queue - async def get_message(self, ignore_subscribe_messages: bool = True): + async def get_message( + self, ignore_subscribe_messages: bool = True, timeout: float | None = None + ): + wait_timeout = timeout if timeout is not None else 0.05 try: - return await asyncio.wait_for(self.queue.get(), timeout=0.05) + return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout) except Exception: return None diff --git a/server/tests/test_transcripts_rtc_ws.py b/server/tests/test_transcripts_rtc_ws.py index 35b00912..8c015791 100644 --- a/server/tests/test_transcripts_rtc_ws.py +++ b/server/tests/test_transcripts_rtc_ws.py @@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker) settings.DATA_DIR = DATA_DIR -@pytest.fixture(scope="session") -def celery_includes(): - return ["reflector.pipelines.main_live_pipeline"] +# Using celery_includes from conftest.py which includes both pipelines @pytest.mark.usefixtures("setup_database") diff --git a/server/tests/test_user_websocket_auth.py b/server/tests/test_user_websocket_auth.py index 5a40440f..6ecc87b9 100644 --- a/server/tests/test_user_websocket_auth.py +++ b/server/tests/test_user_websocket_auth.py @@ -56,7 +56,12 @@ def appserver_ws_user(setup_database): if server_instance: server_instance.should_exit = True - server_thread.join(timeout=30) + server_thread.join(timeout=2.0) + + # Reset global singleton for test isolation + from reflector.ws_manager import reset_ws_manager + + reset_ws_manager() @pytest.fixture(autouse=True) @@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user # Connect and then trigger an event via HTTP create async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: + await asyncio.sleep(0.2) + # Emit an event to the user's room via a standard HTTP action from httpx import AsyncClient @@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user "email": "user-abc@example.com", } + # Use in-memory client (global singleton makes it share ws_manager) async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room resp = await ac.post("/transcripts", json={"name": "WS Test"})