test: fix WebSocket chat tests using async approach

Replaced TestClient-based tests with proper async WebSocket testing
using httpx_ws and threaded server pattern. TestClient has event loop
issues with WebSocket connections that were causing all tests to fail.

Changes:
- Rewrote all WebSocket tests to use aconnect_ws from httpx_ws
- Added chat_appserver fixture using threaded Uvicorn server
- Tests now use separate event loop in server thread
- All 6 tests now pass without asyncio/event loop errors
- Matches existing pattern from test_transcripts_rtc_ws.py

Tests validate:
- WebSocket connection and echo behavior
- Error handling for non-existent transcripts
- Multiple sequential messages
- Graceful disconnection
- WebVTT context generation
- Unknown message type handling

Closes fn-1.8 (End-to-end testing)
This commit is contained in:
Igor Loskutov
2026-01-12 20:17:42 -05:00
parent 8ca5324c1a
commit 68df825734
3 changed files with 178 additions and 93 deletions

View File

@@ -1,6 +1,13 @@
"""Tests for transcript chat WebSocket endpoint."""
import asyncio
import threading
import time
from pathlib import Path
import pytest
from httpx_ws import aconnect_ws
from uvicorn import Config, Server
from reflector.db.transcripts import (
SourceKind,
@@ -11,6 +18,72 @@ from reflector.db.transcripts import (
from reflector.processors.types import Word
@pytest.fixture
def chat_appserver(tmpdir, setup_database):
"""Start a real HTTP server for WebSocket testing."""
from reflector.app import app
from reflector.db import get_database
from reflector.settings import settings
DATA_DIR = settings.DATA_DIR
settings.DATA_DIR = Path(tmpdir)
# Start server in separate thread with its own event loop
host = "127.0.0.1"
port = 1256 # Different port from rtc tests
server_started = threading.Event()
server_exception = None
server_instance = None
def run_server():
nonlocal server_exception, server_instance
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config = Config(app=app, host=host, port=port, loop=loop)
server_instance = Server(config)
async def start_server():
# Initialize database connection in this event loop
database = get_database()
await database.connect()
try:
await server_instance.serve()
finally:
await database.disconnect()
# Signal that server is starting
server_started.set()
loop.run_until_complete(start_server())
except Exception as e:
server_exception = e
server_started.set()
finally:
loop.close()
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Wait for server to start
server_started.wait(timeout=30)
if server_exception:
raise server_exception
# Wait for server to be fully ready
time.sleep(1)
yield server_instance, host, port
# Stop server
if server_instance:
server_instance.should_exit = True
server_thread.join(timeout=30)
settings.DATA_DIR = DATA_DIR
@pytest.fixture
async def test_transcript(setup_database):
"""Create a test transcript for WebSocket tests."""
@@ -57,114 +130,105 @@ async def test_transcript_with_content(setup_database):
return transcript
def test_chat_websocket_connection_success(test_transcript):
@pytest.mark.asyncio
async def test_chat_websocket_connection_success(test_transcript, chat_appserver):
"""Test successful WebSocket connection to chat endpoint."""
from starlette.testclient import TestClient
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
from reflector.app import app
async with aconnect_ws(f"{base_url}/transcripts/{test_transcript.id}/chat") as ws:
# Send unknown message type to test echo behavior
await ws.send_json({"type": "test", "text": "Hello"})
with TestClient(app) as client:
# Connect to WebSocket endpoint
with client.websocket_connect(
f"/v1/transcripts/{test_transcript.id}/chat"
) as websocket:
# Send a test message
websocket.send_json({"type": "message", "text": "Hello"})
# Receive echo response
response = websocket.receive_json()
assert response["type"] == "echo"
assert response["data"]["type"] == "message"
assert response["data"]["text"] == "Hello"
# Should receive echo for unknown types
response = await ws.receive_json()
assert response["type"] == "echo"
assert response["data"]["type"] == "test"
def test_chat_websocket_nonexistent_transcript():
@pytest.mark.asyncio
async def test_chat_websocket_nonexistent_transcript(chat_appserver):
"""Test WebSocket connection fails for nonexistent transcript."""
from starlette.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
from reflector.app import app
with TestClient(app) as client:
# Try to connect to non-existent transcript - should raise on connect
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect(
"/v1/transcripts/nonexistent-id/chat"
) as websocket:
websocket.send_json({"type": "message", "text": "Hello"})
# Connection should fail or disconnect immediately for non-existent transcript
# Different behavior from successful connection
with pytest.raises(Exception): # Will raise on connection or first operation
async with aconnect_ws(f"{base_url}/transcripts/nonexistent-id/chat") as ws:
await ws.send_json({"type": "message", "text": "Hello"})
await ws.receive_json()
def test_chat_websocket_multiple_messages(test_transcript):
@pytest.mark.asyncio
async def test_chat_websocket_multiple_messages(test_transcript, chat_appserver):
"""Test sending multiple messages through WebSocket."""
from starlette.testclient import TestClient
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
from reflector.app import app
async with aconnect_ws(f"{base_url}/transcripts/{test_transcript.id}/chat") as ws:
# Send multiple unknown message types (testing echo behavior)
messages = ["First message", "Second message", "Third message"]
with TestClient(app) as client:
with client.websocket_connect(
f"/v1/transcripts/{test_transcript.id}/chat"
) as websocket:
# Send multiple messages
messages = ["First message", "Second message", "Third message"]
for msg in messages:
websocket.send_json({"type": "message", "text": msg})
response = websocket.receive_json()
assert response["type"] == "echo"
assert response["data"]["text"] == msg
for i, msg in enumerate(messages):
await ws.send_json({"type": f"test{i}", "text": msg})
response = await ws.receive_json()
assert response["type"] == "echo"
assert response["data"]["type"] == f"test{i}"
assert response["data"]["text"] == msg
def test_chat_websocket_disconnect_graceful(test_transcript):
@pytest.mark.asyncio
async def test_chat_websocket_disconnect_graceful(test_transcript, chat_appserver):
"""Test WebSocket disconnects gracefully."""
from starlette.testclient import TestClient
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
from reflector.app import app
with TestClient(app) as client:
with client.websocket_connect(
f"/v1/transcripts/{test_transcript.id}/chat"
) as websocket:
websocket.send_json({"type": "message", "text": "Hello"})
websocket.receive_json()
# Close connection - context manager handles it
# No exception should be raised
async with aconnect_ws(f"{base_url}/transcripts/{test_transcript.id}/chat") as ws:
await ws.send_json({"type": "message", "text": "Hello"})
await ws.receive_json()
# Close handled by context manager - should not raise
def test_chat_websocket_context_generation(test_transcript_with_content):
@pytest.mark.asyncio
async def test_chat_websocket_context_generation(
test_transcript_with_content, chat_appserver
):
"""Test WebVTT context is generated on connection."""
from starlette.testclient import TestClient
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
from reflector.app import app
async with aconnect_ws(
f"{base_url}/transcripts/{test_transcript_with_content.id}/chat"
) as ws:
# Request context
await ws.send_json({"type": "get_context"})
with TestClient(app) as client:
with client.websocket_connect(
f"/v1/transcripts/{test_transcript_with_content.id}/chat"
) as websocket:
# Send request for context (new message type)
websocket.send_json({"type": "get_context"})
# Receive context response
response = await ws.receive_json()
assert response["type"] == "context"
assert "webvtt" in response
# Receive context response
response = websocket.receive_json()
assert response["type"] == "context"
assert "webvtt" in response
# Verify WebVTT format
webvtt = response["webvtt"]
assert webvtt.startswith("WEBVTT")
assert "<v Alice>" in webvtt
assert "<v Bob>" in webvtt
assert "Hello everyone." in webvtt
assert "Hi there!" in webvtt
# Verify WebVTT format
webvtt = response["webvtt"]
assert webvtt.startswith("WEBVTT")
assert "<v Alice>" in webvtt
assert "<v Bob>" in webvtt
assert "Hello everyone." in webvtt
assert "Hi there!" in webvtt
def test_chat_websocket_message_protocol(test_transcript_with_content):
"""Test LLM message streaming protocol (unit test without actual LLM)."""
# This test verifies the message protocol structure
# Actual LLM integration requires mocking or live LLM
import json
@pytest.mark.asyncio
async def test_chat_websocket_unknown_message_type(test_transcript, chat_appserver):
"""Test unknown message types are echoed back."""
server, host, port = chat_appserver
base_url = f"ws://{host}:{port}/v1"
# Verify message types match protocol
assert json.dumps({"type": "message", "text": "test"}) # Client to server
assert json.dumps({"type": "token", "text": "chunk"}) # Server to client
assert json.dumps({"type": "done"}) # Server to client
assert json.dumps({"type": "error", "message": "error"}) # Server to client
async with aconnect_ws(f"{base_url}/transcripts/{test_transcript.id}/chat") as ws:
# Send unknown message type
await ws.send_json({"type": "unknown", "data": "test"})
# Should receive echo
response = await ws.receive_json()
assert response["type"] == "echo"
assert response["data"]["type"] == "unknown"