mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-22 07:06:47 +00:00
110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
"""
|
|
Integration test: WebRTC stream → LivePostProcessingPipeline → full processing.
|
|
|
|
Exercises: WebRTC SDP exchange → live audio streaming → connection close →
|
|
Hatchet LivePostPipeline → whisper transcription → LLM summarization/topics → status "ended".
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
|
|
import httpx
|
|
import pytest
|
|
from aiortc import RTCPeerConnection, RTCSessionDescription
|
|
from aiortc.contrib.media import MediaPlayer
|
|
|
|
SERVER_URL = os.environ.get("SERVER_URL", "http://server:1250")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_live_pipeline_end_to_end(
|
|
api_client, test_records_dir, poll_transcript_status
|
|
):
|
|
"""Stream audio via WebRTC and verify the full post-processing pipeline completes."""
|
|
# 1. Create transcript
|
|
resp = await api_client.post(
|
|
"/transcripts",
|
|
json={"name": "integration-live-test"},
|
|
)
|
|
assert resp.status_code == 200, f"Failed to create transcript: {resp.text}"
|
|
transcript = resp.json()
|
|
transcript_id = transcript["id"]
|
|
|
|
# 2. Set up WebRTC peer connection with audio from test file
|
|
audio_path = test_records_dir / "test_short.wav"
|
|
assert audio_path.exists(), f"Test audio file not found: {audio_path}"
|
|
|
|
pc = RTCPeerConnection()
|
|
player = MediaPlayer(audio_path.as_posix())
|
|
|
|
# Add audio track
|
|
audio_track = player.audio
|
|
pc.addTrack(audio_track)
|
|
|
|
# Create data channel (server expects this for STOP command)
|
|
channel = pc.createDataChannel("data-channel")
|
|
|
|
# 3. Generate SDP offer
|
|
offer = await pc.createOffer()
|
|
await pc.setLocalDescription(offer)
|
|
|
|
sdp_payload = {
|
|
"sdp": pc.localDescription.sdp,
|
|
"type": pc.localDescription.type,
|
|
}
|
|
|
|
# 4. Send offer to server and get answer
|
|
webrtc_url = f"{SERVER_URL}/v1/transcripts/{transcript_id}/record/webrtc"
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
|
|
resp = await client.post(webrtc_url, json=sdp_payload)
|
|
assert resp.status_code == 200, f"WebRTC offer failed: {resp.text}"
|
|
|
|
answer_data = resp.json()
|
|
answer = RTCSessionDescription(sdp=answer_data["sdp"], type=answer_data["type"])
|
|
await pc.setRemoteDescription(answer)
|
|
|
|
# 5. Wait for audio playback to finish
|
|
max_stream_wait = 60
|
|
elapsed = 0
|
|
while elapsed < max_stream_wait:
|
|
if audio_track.readyState == "ended":
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
elapsed += 0.5
|
|
|
|
# 6. Send STOP command and close connection
|
|
try:
|
|
channel.send(json.dumps({"cmd": "STOP"}))
|
|
await asyncio.sleep(1)
|
|
except Exception:
|
|
pass # Channel may not be open if track ended quickly
|
|
|
|
await pc.close()
|
|
|
|
# 7. Poll until post-processing pipeline completes
|
|
data = await poll_transcript_status(
|
|
api_client, transcript_id, target="ended", max_wait=300
|
|
)
|
|
|
|
# 8. Assertions
|
|
assert data["status"] == "ended"
|
|
assert data.get("title") and len(data["title"]) > 0, "Title should be non-empty"
|
|
assert (
|
|
data.get("long_summary") and len(data["long_summary"]) > 0
|
|
), "Long summary should be non-empty"
|
|
assert (
|
|
data.get("short_summary") and len(data["short_summary"]) > 0
|
|
), "Short summary should be non-empty"
|
|
|
|
# Topics are served from a separate endpoint
|
|
topics_resp = await api_client.get(f"/transcripts/{transcript_id}/topics")
|
|
assert topics_resp.status_code == 200, f"Failed to get topics: {topics_resp.text}"
|
|
topics = topics_resp.json()
|
|
assert len(topics) >= 1, "Should have at least 1 topic"
|
|
for topic in topics:
|
|
assert topic.get("title"), "Each topic should have a title"
|
|
assert topic.get("summary"), "Each topic should have a summary"
|
|
|
|
assert data.get("duration", 0) > 0, "Duration should be positive"
|