mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
95
server/tests/test_transcripts_audio_download.py
Normal file
95
server/tests/test_transcripts_audio_download.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import pytest
|
||||
import shutil
|
||||
from httpx import AsyncClient
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def fake_transcript(tmpdir):
|
||||
from reflector.settings import settings
|
||||
from reflector.app import app
|
||||
from reflector.views.transcripts import transcripts_controller
|
||||
|
||||
settings.DATA_DIR = Path(tmpdir)
|
||||
|
||||
# create a transcript
|
||||
ac = AsyncClient(app=app, base_url="http://test/v1")
|
||||
response = await ac.post("/transcripts", json={"name": "Test audio download"})
|
||||
assert response.status_code == 200
|
||||
tid = response.json()["id"]
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(tid)
|
||||
assert transcript is not None
|
||||
|
||||
await transcripts_controller.update(transcript, {"status": "finished"})
|
||||
|
||||
# manually copy a file at the expected location
|
||||
audio_filename = transcript.audio_filename
|
||||
path = Path(__file__).parent / "records" / "test_mathieu_hello.wav"
|
||||
audio_filename.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy(path, audio_filename)
|
||||
yield transcript
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url_suffix,content_type",
|
||||
[
|
||||
["", "audio/wav"],
|
||||
["/mp3", "audio/mp3"],
|
||||
],
|
||||
)
|
||||
async def test_transcript_audio_download(fake_transcript, url_suffix, content_type):
|
||||
from reflector.app import app
|
||||
|
||||
ac = AsyncClient(app=app, base_url="http://test/v1")
|
||||
response = await ac.get(f"/transcripts/{fake_transcript.id}/audio{url_suffix}")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == content_type
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url_suffix,content_type",
|
||||
[
|
||||
["", "audio/wav"],
|
||||
["/mp3", "audio/mp3"],
|
||||
],
|
||||
)
|
||||
async def test_transcript_audio_download_range(
|
||||
fake_transcript, url_suffix, content_type
|
||||
):
|
||||
from reflector.app import app
|
||||
|
||||
ac = AsyncClient(app=app, base_url="http://test/v1")
|
||||
response = await ac.get(
|
||||
f"/transcripts/{fake_transcript.id}/audio{url_suffix}",
|
||||
headers={"range": "bytes=0-100"},
|
||||
)
|
||||
assert response.status_code == 206
|
||||
assert response.headers["content-type"] == content_type
|
||||
assert response.headers["content-range"].startswith("bytes 0-100/")
|
||||
assert response.headers["content-length"] == "101"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"url_suffix,content_type",
|
||||
[
|
||||
["", "audio/wav"],
|
||||
["/mp3", "audio/mp3"],
|
||||
],
|
||||
)
|
||||
async def test_transcript_audio_download_range_with_seek(
|
||||
fake_transcript, url_suffix, content_type
|
||||
):
|
||||
from reflector.app import app
|
||||
|
||||
ac = AsyncClient(app=app, base_url="http://test/v1")
|
||||
response = await ac.get(
|
||||
f"/transcripts/{fake_transcript.id}/audio{url_suffix}",
|
||||
headers={"range": "bytes=100-"},
|
||||
)
|
||||
assert response.status_code == 206
|
||||
assert response.headers["content-type"] == content_type
|
||||
assert response.headers["content-range"].startswith("bytes 100-")
|
||||
Reference in New Issue
Block a user