mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 09:56:47 +00:00
Compare commits
1 Commits
v0.32.1
...
fix/websoc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dee1555807 |
@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import threading
|
|
||||||
|
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from fastapi import WebSocket
|
from fastapi import WebSocket
|
||||||
@@ -98,9 +97,9 @@ class WebsocketManager:
|
|||||||
|
|
||||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||||
while True:
|
while True:
|
||||||
|
# No timeout - global singleton prevents CPU hog from multiple instances
|
||||||
message = await pubsub_subscriber.get_message(
|
message = await pubsub_subscriber.get_message(
|
||||||
ignore_subscribe_messages=True,
|
ignore_subscribe_messages=True
|
||||||
timeout=1.0,
|
|
||||||
)
|
)
|
||||||
if message is not None:
|
if message is not None:
|
||||||
room_id = message["channel"].decode("utf-8")
|
room_id = message["channel"].decode("utf-8")
|
||||||
@@ -110,29 +109,40 @@ class WebsocketManager:
|
|||||||
await socket.send_json(data)
|
await socket.send_json(data)
|
||||||
|
|
||||||
|
|
||||||
|
# Process-global singleton (not thread-local)
|
||||||
|
# The original threading.local() pattern was broken - it created a NEW
|
||||||
|
# threading.local() object on every call, so caching never worked.
|
||||||
|
# This caused infinite ws_manager instances → resource leaks → CPU hog.
|
||||||
|
_ws_manager: WebsocketManager | None = None
|
||||||
|
|
||||||
|
|
||||||
def get_ws_manager() -> WebsocketManager:
|
def get_ws_manager() -> WebsocketManager:
|
||||||
"""
|
"""
|
||||||
Returns the WebsocketManager instance for managing websockets.
|
Returns the global WebsocketManager singleton.
|
||||||
|
|
||||||
This function initializes and returns the WebsocketManager instance,
|
Creates instance on first call, subsequent calls return cached instance.
|
||||||
which is responsible for managing websockets and handling websocket
|
Thread-safe via GIL. Concurrent initialization may create duplicate
|
||||||
connections.
|
instances but last write wins (acceptable for this use case).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
WebsocketManager: The initialized WebsocketManager instance.
|
WebsocketManager: The global WebsocketManager instance.
|
||||||
|
|
||||||
Raises:
|
|
||||||
ImportError: If the 'reflector.settings' module cannot be imported.
|
|
||||||
RedisConnectionError: If there is an error connecting to the Redis server.
|
|
||||||
"""
|
"""
|
||||||
local = threading.local()
|
global _ws_manager
|
||||||
if hasattr(local, "ws_manager"):
|
|
||||||
return local.ws_manager
|
|
||||||
|
|
||||||
|
if _ws_manager is not None:
|
||||||
|
return _ws_manager
|
||||||
|
|
||||||
|
# No lock needed - GIL makes this safe enough
|
||||||
|
# Worst case: race creates two instances, last assignment wins
|
||||||
pubsub_client = RedisPubSubManager(
|
pubsub_client = RedisPubSubManager(
|
||||||
host=settings.REDIS_HOST,
|
host=settings.REDIS_HOST,
|
||||||
port=settings.REDIS_PORT,
|
port=settings.REDIS_PORT,
|
||||||
)
|
)
|
||||||
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||||
local.ws_manager = ws_manager
|
return _ws_manager
|
||||||
return ws_manager
|
|
||||||
|
|
||||||
|
def reset_ws_manager() -> None:
|
||||||
|
"""Reset singleton for testing. DO NOT use in production."""
|
||||||
|
global _ws_manager
|
||||||
|
_ws_manager = None
|
||||||
|
|||||||
@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
|
|||||||
|
|
||||||
if server_instance:
|
if server_instance:
|
||||||
server_instance.should_exit = True
|
server_instance.should_exit = True
|
||||||
server_thread.join(timeout=30)
|
server_thread.join(timeout=2.0)
|
||||||
|
|
||||||
|
# Reset global singleton for test isolation
|
||||||
|
from reflector.ws_manager import reset_ws_manager
|
||||||
|
|
||||||
|
reset_ws_manager()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@@ -133,6 +138,11 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
|||||||
|
|
||||||
# Connect and then trigger an event via HTTP create
|
# Connect and then trigger an event via HTTP create
|
||||||
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
||||||
|
# Give Redis pubsub time to establish subscription before publishing
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
# Emit an event to the user's room via a standard HTTP action
|
# Emit an event to the user's room via a standard HTTP action
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
@@ -150,6 +160,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
|||||||
"email": "user-abc@example.com",
|
"email": "user-abc@example.com",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Use in-memory client (global singleton makes it share ws_manager)
|
||||||
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
||||||
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
||||||
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
||||||
|
|||||||
Reference in New Issue
Block a user