mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
feat: postgresql migration and removal of sqlite in pytest (#546)
* feat: remove support of sqlite, 100% postgres * fix: more migration and make datetime timezone aware in postgres * fix: change how database is get, and use contextvar to have difference instance between different loops * test: properly use client fixture that handle lifetime/database connection * fix: add missing client fixture parameters to test functions This commit fixes NameError issues where test functions were trying to use the 'client' fixture but didn't have it as a parameter. The changes include: 1. Added 'client' parameter to test functions in: - test_transcripts_audio_download.py (6 functions including fixture) - test_transcripts_speaker.py (3 functions) - test_transcripts_upload.py (1 function) - test_transcripts_rtc_ws.py (2 functions + appserver fixture) 2. Resolved naming conflicts in test_transcripts_rtc_ws.py where both HTTP client and StreamClient were using variable name 'client'. StreamClient instances are now named 'stream_client' to avoid conflicts. 3. Added missing 'from reflector.app import app' import in rtc_ws tests. Background: Previously implemented contextvars solution with get_database() function resolves asyncio event loop conflicts in Celery tasks. The global client fixture was also created to replace manual AsyncClient instances, ensuring proper FastAPI application lifecycle management and database connections during tests. All tests now pass except for 2 pre-existing RTC WebSocket test failures related to asyncpg connection issues unrelated to these fixes. * fix: ensure task are correctly closed * fix: make separate event loop for the live server * fix: make default settings pointing at postgres * build: remove pytest-docker deps out of dev, just tests group
This commit is contained in:
@@ -10,7 +10,6 @@ import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
from httpx_ws import aconnect_ws
|
||||
from uvicorn import Config, Server
|
||||
|
||||
@@ -50,23 +49,69 @@ class ThreadedUvicorn:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker):
|
||||
def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker):
|
||||
import threading
|
||||
|
||||
from reflector.app import app
|
||||
from reflector.db import get_database
|
||||
from reflector.settings import settings
|
||||
|
||||
DATA_DIR = settings.DATA_DIR
|
||||
settings.DATA_DIR = Path(tmpdir)
|
||||
|
||||
# start server
|
||||
# start server in a separate thread with its own event loop
|
||||
host = "127.0.0.1"
|
||||
port = 1255
|
||||
config = Config(app=app, host=host, port=port)
|
||||
server = ThreadedUvicorn(config)
|
||||
await server.start()
|
||||
server_started = threading.Event()
|
||||
server_exception = None
|
||||
server_instance = None
|
||||
|
||||
yield (server, host, port)
|
||||
def run_server():
|
||||
nonlocal server_exception, server_instance
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
config = Config(app=app, host=host, port=port, loop=loop)
|
||||
server_instance = Server(config)
|
||||
|
||||
async def start_server():
|
||||
# Initialize database connection in this event loop
|
||||
database = get_database()
|
||||
await database.connect()
|
||||
try:
|
||||
await server_instance.serve()
|
||||
finally:
|
||||
await database.disconnect()
|
||||
|
||||
# Signal that server is starting
|
||||
server_started.set()
|
||||
loop.run_until_complete(start_server())
|
||||
except Exception as e:
|
||||
server_exception = e
|
||||
server_started.set()
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
server_thread = threading.Thread(target=run_server, daemon=True)
|
||||
server_thread.start()
|
||||
|
||||
# Wait for server to start
|
||||
server_started.wait(timeout=30)
|
||||
if server_exception:
|
||||
raise server_exception
|
||||
|
||||
# Wait a bit more for the server to be fully ready
|
||||
time.sleep(1)
|
||||
|
||||
yield server_instance, host, port
|
||||
|
||||
# Stop server
|
||||
if server_instance:
|
||||
server_instance.should_exit = True
|
||||
server_thread.join(timeout=30)
|
||||
|
||||
server.stop()
|
||||
settings.DATA_DIR = DATA_DIR
|
||||
|
||||
|
||||
@@ -89,6 +134,7 @@ async def test_transcript_rtc_and_websocket(
|
||||
dummy_storage,
|
||||
fake_mp3_upload,
|
||||
appserver,
|
||||
client,
|
||||
):
|
||||
# goal: start the server, exchange RTC, receive websocket events
|
||||
# because of that, we need to start the server in a thread
|
||||
@@ -97,8 +143,7 @@ async def test_transcript_rtc_and_websocket(
|
||||
|
||||
# create a transcript
|
||||
base_url = f"http://{host}:{port}/v1"
|
||||
ac = AsyncClient(base_url=base_url)
|
||||
response = await ac.post("/transcripts", json={"name": "Test RTC"})
|
||||
response = await client.post("/transcripts", json={"name": "Test RTC"})
|
||||
assert response.status_code == 200
|
||||
tid = response.json()["id"]
|
||||
|
||||
@@ -143,11 +188,11 @@ async def test_transcript_rtc_and_websocket(
|
||||
|
||||
url = f"{base_url}/transcripts/{tid}/record/webrtc"
|
||||
path = Path(__file__).parent / "records" / "test_short.wav"
|
||||
client = StreamClient(signaling, url=url, play_from=path.as_posix())
|
||||
await client.start()
|
||||
stream_client = StreamClient(signaling, url=url, play_from=path.as_posix())
|
||||
await stream_client.start()
|
||||
|
||||
timeout = 20
|
||||
while not client.is_ended():
|
||||
timeout = 120
|
||||
while not stream_client.is_ended():
|
||||
await asyncio.sleep(1)
|
||||
timeout -= 1
|
||||
if timeout < 0:
|
||||
@@ -155,14 +200,14 @@ async def test_transcript_rtc_and_websocket(
|
||||
|
||||
# XXX aiortc is long to close the connection
|
||||
# instead of waiting a long time, we just send a STOP
|
||||
client.channel.send(json.dumps({"cmd": "STOP"}))
|
||||
await client.stop()
|
||||
stream_client.channel.send(json.dumps({"cmd": "STOP"}))
|
||||
await stream_client.stop()
|
||||
|
||||
# wait the processing to finish
|
||||
timeout = 20
|
||||
timeout = 120
|
||||
while True:
|
||||
# fetch the transcript and check if it is ended
|
||||
resp = await ac.get(f"/transcripts/{tid}")
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
if resp.json()["status"] in ("ended", "error"):
|
||||
break
|
||||
@@ -215,7 +260,7 @@ async def test_transcript_rtc_and_websocket(
|
||||
ev = events[eventnames.index("WAVEFORM")]
|
||||
assert isinstance(ev["data"]["waveform"], list)
|
||||
assert len(ev["data"]["waveform"]) >= 250
|
||||
waveform_resp = await ac.get(f"/transcripts/{tid}/audio/waveform")
|
||||
waveform_resp = await client.get(f"/transcripts/{tid}/audio/waveform")
|
||||
assert waveform_resp.status_code == 200
|
||||
assert waveform_resp.headers["content-type"] == "application/json"
|
||||
assert isinstance(waveform_resp.json()["data"], list)
|
||||
@@ -235,7 +280,7 @@ async def test_transcript_rtc_and_websocket(
|
||||
assert "DURATION" in eventnames
|
||||
|
||||
# check that audio/mp3 is available
|
||||
audio_resp = await ac.get(f"/transcripts/{tid}/audio/mp3")
|
||||
audio_resp = await client.get(f"/transcripts/{tid}/audio/mp3")
|
||||
assert audio_resp.status_code == 200
|
||||
assert audio_resp.headers["Content-Type"] == "audio/mpeg"
|
||||
|
||||
@@ -254,6 +299,7 @@ async def test_transcript_rtc_and_websocket_and_fr(
|
||||
dummy_storage,
|
||||
fake_mp3_upload,
|
||||
appserver,
|
||||
client,
|
||||
):
|
||||
# goal: start the server, exchange RTC, receive websocket events
|
||||
# because of that, we need to start the server in a thread
|
||||
@@ -263,8 +309,7 @@ async def test_transcript_rtc_and_websocket_and_fr(
|
||||
|
||||
# create a transcript
|
||||
base_url = f"http://{host}:{port}/v1"
|
||||
ac = AsyncClient(base_url=base_url)
|
||||
response = await ac.post(
|
||||
response = await client.post(
|
||||
"/transcripts", json={"name": "Test RTC", "target_language": "fr"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
@@ -311,11 +356,11 @@ async def test_transcript_rtc_and_websocket_and_fr(
|
||||
|
||||
url = f"{base_url}/transcripts/{tid}/record/webrtc"
|
||||
path = Path(__file__).parent / "records" / "test_short.wav"
|
||||
client = StreamClient(signaling, url=url, play_from=path.as_posix())
|
||||
await client.start()
|
||||
stream_client = StreamClient(signaling, url=url, play_from=path.as_posix())
|
||||
await stream_client.start()
|
||||
|
||||
timeout = 20
|
||||
while not client.is_ended():
|
||||
timeout = 120
|
||||
while not stream_client.is_ended():
|
||||
await asyncio.sleep(1)
|
||||
timeout -= 1
|
||||
if timeout < 0:
|
||||
@@ -323,18 +368,18 @@ async def test_transcript_rtc_and_websocket_and_fr(
|
||||
|
||||
# XXX aiortc is long to close the connection
|
||||
# instead of waiting a long time, we just send a STOP
|
||||
client.channel.send(json.dumps({"cmd": "STOP"}))
|
||||
stream_client.channel.send(json.dumps({"cmd": "STOP"}))
|
||||
|
||||
# wait the processing to finish
|
||||
await asyncio.sleep(2)
|
||||
|
||||
await client.stop()
|
||||
await stream_client.stop()
|
||||
|
||||
# wait the processing to finish
|
||||
timeout = 20
|
||||
timeout = 120
|
||||
while True:
|
||||
# fetch the transcript and check if it is ended
|
||||
resp = await ac.get(f"/transcripts/{tid}")
|
||||
resp = await client.get(f"/transcripts/{tid}")
|
||||
assert resp.status_code == 200
|
||||
if resp.json()["status"] == "ended":
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user