mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-06 10:46:46 +00:00
fix: websocket tests (#825)
* fix websocket tests * fix: restore timeout and fix celery test infrastructure - Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?) - Use Redis for Celery tests (memory:// broker doesn't support chords) - Add timeout param to in-memory subscriber mock - Remove duplicate celery_includes fixture from rtc_ws tests * fix: remove redundant inline imports in test files * fix: update gitleaks ignore for moved s3_key line --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
|
||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
|
||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
|
||||
server/reflector/worker/process.py:generic-api-key:465
|
||||
server/reflector/worker/process.py:generic-api-key:594
|
||||
|
||||
@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
|
||||
import redis.asyncio as redis
|
||||
from fastapi import WebSocket
|
||||
@@ -98,6 +97,7 @@ class WebsocketManager:
|
||||
|
||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||
while True:
|
||||
# timeout=1.0 prevents tight CPU loop when no messages available
|
||||
message = await pubsub_subscriber.get_message(
|
||||
ignore_subscribe_messages=True
|
||||
)
|
||||
@@ -109,29 +109,38 @@ class WebsocketManager:
|
||||
await socket.send_json(data)
|
||||
|
||||
|
||||
# Process-global singleton to ensure only one WebsocketManager instance exists.
|
||||
# Multiple instances would cause resource leaks and CPU issues.
|
||||
_ws_manager: WebsocketManager | None = None
|
||||
|
||||
|
||||
def get_ws_manager() -> WebsocketManager:
|
||||
"""
|
||||
Returns the WebsocketManager instance for managing websockets.
|
||||
Returns the global WebsocketManager singleton.
|
||||
|
||||
This function initializes and returns the WebsocketManager instance,
|
||||
which is responsible for managing websockets and handling websocket
|
||||
connections.
|
||||
Creates instance on first call, subsequent calls return cached instance.
|
||||
Thread-safe via GIL. Concurrent initialization may create duplicate
|
||||
instances but last write wins (acceptable for this use case).
|
||||
|
||||
Returns:
|
||||
WebsocketManager: The initialized WebsocketManager instance.
|
||||
|
||||
Raises:
|
||||
ImportError: If the 'reflector.settings' module cannot be imported.
|
||||
RedisConnectionError: If there is an error connecting to the Redis server.
|
||||
WebsocketManager: The global WebsocketManager instance.
|
||||
"""
|
||||
local = threading.local()
|
||||
if hasattr(local, "ws_manager"):
|
||||
return local.ws_manager
|
||||
global _ws_manager
|
||||
|
||||
if _ws_manager is not None:
|
||||
return _ws_manager
|
||||
|
||||
# No lock needed - GIL makes this safe enough
|
||||
# Worst case: race creates two instances, last assignment wins
|
||||
pubsub_client = RedisPubSubManager(
|
||||
host=settings.REDIS_HOST,
|
||||
port=settings.REDIS_PORT,
|
||||
)
|
||||
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
local.ws_manager = ws_manager
|
||||
return ws_manager
|
||||
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
return _ws_manager
|
||||
|
||||
|
||||
def reset_ws_manager() -> None:
|
||||
"""Reset singleton for testing. DO NOT use in production."""
|
||||
global _ws_manager
|
||||
_ws_manager = None
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from tempfile import NamedTemporaryFile
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@@ -333,11 +332,14 @@ def celery_enable_logging():
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_config():
|
||||
with NamedTemporaryFile() as f:
|
||||
yield {
|
||||
"broker_url": "memory://",
|
||||
"result_backend": f"db+sqlite:///{f.name}",
|
||||
}
|
||||
redis_host = os.environ.get("REDIS_HOST", "localhost")
|
||||
redis_port = os.environ.get("REDIS_PORT", "6379")
|
||||
# Use db 2 to avoid conflicts with main app
|
||||
redis_url = f"redis://{redis_host}:{redis_port}/2"
|
||||
yield {
|
||||
"broker_url": redis_url,
|
||||
"result_backend": redis_url,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch):
|
||||
def __init__(self, queue: asyncio.Queue):
|
||||
self.queue = queue
|
||||
|
||||
async def get_message(self, ignore_subscribe_messages: bool = True):
|
||||
async def get_message(
|
||||
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
|
||||
):
|
||||
wait_timeout = timeout if timeout is not None else 0.05
|
||||
try:
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
|
||||
settings.DATA_DIR = DATA_DIR
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_includes():
|
||||
return ["reflector.pipelines.main_live_pipeline"]
|
||||
# Using celery_includes from conftest.py which includes both pipelines
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
|
||||
@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
|
||||
|
||||
if server_instance:
|
||||
server_instance.should_exit = True
|
||||
server_thread.join(timeout=30)
|
||||
server_thread.join(timeout=2.0)
|
||||
|
||||
# Reset global singleton for test isolation
|
||||
from reflector.ws_manager import reset_ws_manager
|
||||
|
||||
reset_ws_manager()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
|
||||
# Connect and then trigger an event via HTTP create
|
||||
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Emit an event to the user's room via a standard HTTP action
|
||||
from httpx import AsyncClient
|
||||
|
||||
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
"email": "user-abc@example.com",
|
||||
}
|
||||
|
||||
# Use in-memory client (global singleton makes it share ws_manager)
|
||||
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
||||
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
||||
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
||||
|
||||
Reference in New Issue
Block a user