mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
* feat: remove support of sqlite, 100% postgres * fix: more migration and make datetime timezone aware in postgres * fix: change how database is get, and use contextvar to have difference instance between different loops * test: properly use client fixture that handle lifetime/database connection * fix: add missing client fixture parameters to test functions This commit fixes NameError issues where test functions were trying to use the 'client' fixture but didn't have it as a parameter. The changes include: 1. Added 'client' parameter to test functions in: - test_transcripts_audio_download.py (6 functions including fixture) - test_transcripts_speaker.py (3 functions) - test_transcripts_upload.py (1 function) - test_transcripts_rtc_ws.py (2 functions + appserver fixture) 2. Resolved naming conflicts in test_transcripts_rtc_ws.py where both HTTP client and StreamClient were using variable name 'client'. StreamClient instances are now named 'stream_client' to avoid conflicts. 3. Added missing 'from reflector.app import app' import in rtc_ws tests. Background: Previously implemented contextvars solution with get_database() function resolves asyncio event loop conflicts in Celery tasks. The global client fixture was also created to replace manual AsyncClient instances, ensuring proper FastAPI application lifecycle management and database connections during tests. All tests now pass except for 2 pre-existing RTC WebSocket test failures related to asyncpg connection issues unrelated to these fixes. * fix: ensure task are correctly closed * fix: make separate event loop for the live server * fix: make default settings pointing at postgres * build: remove pytest-docker deps out of dev, just tests group
133 lines
4.2 KiB
Python
133 lines
4.2 KiB
Python
"""
|
|
Websocket manager
|
|
=================
|
|
|
|
This module contains the WebsocketManager class, which is responsible for
|
|
managing websockets and handling websocket connections.
|
|
|
|
It uses the RedisPubSubManager class to subscribe to Redis channels and
|
|
broadcast messages to all connected websockets.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import threading
|
|
|
|
import redis.asyncio as redis
|
|
from fastapi import WebSocket
|
|
|
|
from reflector.settings import settings
|
|
|
|
|
|
class RedisPubSubManager:
|
|
def __init__(self, host="localhost", port=6379):
|
|
self.redis_host = host
|
|
self.redis_port = port
|
|
self.redis_connection = None
|
|
self.pubsub = None
|
|
|
|
async def get_redis_connection(self) -> redis.Redis:
|
|
return redis.Redis(
|
|
host=self.redis_host,
|
|
port=self.redis_port,
|
|
auto_close_connection_pool=False,
|
|
)
|
|
|
|
async def connect(self) -> None:
|
|
if self.redis_connection is not None:
|
|
return
|
|
self.redis_connection = await self.get_redis_connection()
|
|
self.pubsub = self.redis_connection.pubsub()
|
|
|
|
async def disconnect(self) -> None:
|
|
if self.redis_connection is None:
|
|
return
|
|
await self.redis_connection.close()
|
|
self.redis_connection = None
|
|
|
|
async def send_json(self, room_id: str, message: str) -> None:
|
|
if not self.redis_connection:
|
|
await self.connect()
|
|
message = json.dumps(message)
|
|
await self.redis_connection.publish(room_id, message)
|
|
|
|
async def subscribe(self, room_id: str) -> redis.Redis:
|
|
await self.pubsub.subscribe(room_id)
|
|
return self.pubsub
|
|
|
|
async def unsubscribe(self, room_id: str) -> None:
|
|
await self.pubsub.unsubscribe(room_id)
|
|
|
|
|
|
class WebsocketManager:
|
|
def __init__(self, pubsub_client: RedisPubSubManager = None):
|
|
self.rooms: dict = {}
|
|
self.tasks: dict = {}
|
|
self.pubsub_client = pubsub_client
|
|
|
|
async def add_user_to_room(self, room_id: str, websocket: WebSocket) -> None:
|
|
await websocket.accept()
|
|
|
|
if room_id in self.rooms:
|
|
self.rooms[room_id].append(websocket)
|
|
else:
|
|
self.rooms[room_id] = [websocket]
|
|
|
|
await self.pubsub_client.connect()
|
|
pubsub_subscriber = await self.pubsub_client.subscribe(room_id)
|
|
task = asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber))
|
|
self.tasks[id(websocket)] = task
|
|
|
|
async def send_json(self, room_id: str, message: dict) -> None:
|
|
await self.pubsub_client.send_json(room_id, message)
|
|
|
|
async def remove_user_from_room(self, room_id: str, websocket: WebSocket) -> None:
|
|
self.rooms[room_id].remove(websocket)
|
|
task = self.tasks.pop(id(websocket), None)
|
|
if task:
|
|
task.cancel()
|
|
|
|
if len(self.rooms[room_id]) == 0:
|
|
del self.rooms[room_id]
|
|
await self.pubsub_client.unsubscribe(room_id)
|
|
|
|
async def _pubsub_data_reader(self, pubsub_subscriber):
|
|
while True:
|
|
message = await pubsub_subscriber.get_message(
|
|
ignore_subscribe_messages=True
|
|
)
|
|
if message is not None:
|
|
room_id = message["channel"].decode("utf-8")
|
|
all_sockets = self.rooms[room_id]
|
|
for socket in all_sockets:
|
|
data = json.loads(message["data"].decode("utf-8"))
|
|
await socket.send_json(data)
|
|
|
|
|
|
def get_ws_manager() -> WebsocketManager:
|
|
"""
|
|
Returns the WebsocketManager instance for managing websockets.
|
|
|
|
This function initializes and returns the WebsocketManager instance,
|
|
which is responsible for managing websockets and handling websocket
|
|
connections.
|
|
|
|
Returns:
|
|
WebsocketManager: The initialized WebsocketManager instance.
|
|
|
|
Raises:
|
|
ImportError: If the 'reflector.settings' module cannot be imported.
|
|
RedisConnectionError: If there is an error connecting to the Redis server.
|
|
"""
|
|
local = threading.local()
|
|
if hasattr(local, "ws_manager"):
|
|
return local.ws_manager
|
|
|
|
pubsub_client = RedisPubSubManager(
|
|
host=settings.REDIS_HOST,
|
|
port=settings.REDIS_PORT,
|
|
)
|
|
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
|
local.ws_manager = ws_manager
|
|
return ws_manager
|