Files
reflector/server/tests/test_livekit_track_processing.py
2026-04-01 19:34:57 -05:00

394 lines
14 KiB
Python

"""
Tests for LiveKit track processing: filepath parsing, offset calculation,
and pad_track padding_seconds behavior.
"""
from datetime import datetime, timezone
from fractions import Fraction
import av
import pytest
from reflector.utils.livekit import (
LiveKitTrackFile,
calculate_track_offsets,
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
# ── Filepath parsing ──────────────────────────────────────────
class TestParseLiveKitTrackFilepath:
def test_parses_ogg_audio_track(self):
result = parse_livekit_track_filepath(
"livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg"
)
assert result.room_name == "myroom-20260401172036"
assert result.participant_identity == "juan-4b82ed"
assert result.track_id == "TR_AMR3SWs74Divho"
assert result.timestamp == datetime(2026, 4, 1, 19, 57, 58, tzinfo=timezone.utc)
def test_parses_different_identities(self):
r1 = parse_livekit_track_filepath(
"livekit/room-20260401/alice-a1b2c3-2026-04-01T100000-TR_abc123.ogg"
)
r2 = parse_livekit_track_filepath(
"livekit/room-20260401/bob_smith-d4e5f6-2026-04-01T100030-TR_def456.ogg"
)
assert r1.participant_identity == "alice-a1b2c3"
assert r2.participant_identity == "bob_smith-d4e5f6"
def test_rejects_json_manifest(self):
with pytest.raises(ValueError, match="doesn't match expected format"):
parse_livekit_track_filepath("livekit/myroom-20260401/EG_K5sipvfB5fTM.json")
def test_rejects_webm_video(self):
# webm files match the pattern but are filtered by filter_audio_tracks
result = parse_livekit_track_filepath(
"livekit/myroom-20260401/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm"
)
# webm parses successfully (TR_ prefix matches video tracks too)
assert result.track_id == "TR_VC679dgMQBdfhT"
def test_rejects_invalid_path(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("not/a/valid/path.ogg")
def test_rejects_missing_track_id(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("livekit/room/user-2026-04-01T100000.ogg")
def test_parses_timestamp_correctly(self):
result = parse_livekit_track_filepath(
"livekit/room-20260401/user-abc123-2026-12-25T235959-TR_test.ogg"
)
assert result.timestamp == datetime(
2026, 12, 25, 23, 59, 59, tzinfo=timezone.utc
)
# ── Audio track filtering ─────────────────────────────────────
class TestFilterAudioTracks:
def test_filters_to_ogg_only(self):
keys = [
"livekit/room/EG_abc.json",
"livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
"livekit/room/user-abc-2026-04-01T100000-TR_video.webm",
"livekit/room/EG_def.json",
"livekit/room/user2-def-2026-04-01T100030-TR_audio2.ogg",
]
result = filter_audio_tracks(keys)
assert len(result) == 2
assert all(k.endswith(".ogg") for k in result)
def test_empty_input(self):
assert filter_audio_tracks([]) == []
def test_no_audio_tracks(self):
keys = ["livekit/room/EG_abc.json", "livekit/room/user-TR_v.webm"]
assert filter_audio_tracks(keys) == []
# ── Offset calculation ─────────────────────────────────────────
class TestCalculateTrackOffsets:
def test_single_track_zero_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
)
]
offsets = calculate_track_offsets(tracks)
assert len(offsets) == 1
assert offsets[0][1] == 0.0
def test_two_tracks_correct_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 1, 10, tzinfo=timezone.utc),
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert offsets[0][1] == 0.0 # alice (earliest)
assert offsets[1][1] == 70.0 # bob (70 seconds later)
def test_three_tracks_earliest_is_zero(self):
tracks = [
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 0, 30, tzinfo=timezone.utc),
track_id="TR_2",
),
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k3",
room_name="r",
participant_identity="charlie",
timestamp=datetime(2026, 4, 1, 10, 1, 0, tzinfo=timezone.utc),
track_id="TR_3",
),
]
offsets = calculate_track_offsets(tracks)
offset_map = {t.participant_identity: o for t, o in offsets}
assert offset_map["alice"] == 0.0
assert offset_map["bob"] == 30.0
assert offset_map["charlie"] == 60.0
def test_empty_tracks(self):
assert calculate_track_offsets([]) == []
def test_simultaneous_tracks_zero_offsets(self):
ts = datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc)
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="a",
timestamp=ts,
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="b",
timestamp=ts,
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert all(o == 0.0 for _, o in offsets)
# ── Room name extraction ───────────────────────────────────────
class TestExtractLiveKitBaseRoomName:
def test_strips_timestamp_suffix(self):
assert extract_livekit_base_room_name("myroom-20260401172036") == "myroom"
def test_preserves_hyphenated_name(self):
assert (
extract_livekit_base_room_name("my-room-name-20260401172036")
== "my-room-name"
)
def test_single_segment(self):
assert extract_livekit_base_room_name("room-20260401") == "room"
# ── pad_track padding_seconds behavior ─────────────────────────
class TestPadTrackPaddingSeconds:
"""Test that pad_track correctly uses pre-calculated padding_seconds
for LiveKit (skipping container metadata) vs extracting from container
for Daily (when padding_seconds is None).
"""
def _make_test_ogg(self, path: str, duration_seconds: float = 5.0):
"""Create a minimal OGG/Opus file for testing."""
with av.open(path, "w", format="ogg") as out:
stream = out.add_stream("libopus", rate=48000)
stream.bit_rate = 64000
samples_per_frame = 960 # Opus standard
total_samples = int(duration_seconds * 48000)
pts = 0
while pts < total_samples:
frame = av.AudioFrame(
format="s16", layout="stereo", samples=samples_per_frame
)
# Fill with silence (zeros)
frame.planes[0].update(bytes(samples_per_frame * 2 * 2)) # s16 * stereo
frame.sample_rate = 48000
frame.pts = pts
frame.time_base = Fraction(1, 48000)
for packet in stream.encode(frame):
out.mux(packet)
pts += samples_per_frame
for packet in stream.encode(None):
out.mux(packet)
def test_ogg_has_zero_start_time(self, tmp_path):
"""Verify that OGG files (like LiveKit produces) have start_time=0,
confirming why pre-calculated padding is needed."""
ogg_path = str(tmp_path / "test.ogg")
self._make_test_ogg(ogg_path)
with av.open(ogg_path) as container:
from reflector.utils.audio_padding import (
extract_stream_start_time_from_container,
)
start_time = extract_stream_start_time_from_container(container, 0)
assert start_time <= 0.0, (
"OGG files should have start_time<=0 (no usable offset), confirming "
f"LiveKit tracks need pre-calculated padding_seconds. Got: {start_time}"
)
def test_precalculated_padding_skips_metadata_extraction(self, tmp_path):
"""When padding_seconds is set, pad_track should use it directly
and NOT call extract_stream_start_time_from_container."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=70.0,
)
assert input_data.padding_seconds == 70.0
# The pad_track function checks: if input.padding_seconds is not None → use it
# This means extract_stream_start_time_from_container is never called for LiveKit
def test_none_padding_falls_back_to_metadata(self, tmp_path):
"""When padding_seconds is None (Daily), pad_track should extract
start_time from container metadata."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="daily/room/track.webm",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="daily",
padding_seconds=None,
)
assert input_data.padding_seconds is None
# pad_track will call extract_stream_start_time_from_container for this case
def test_zero_padding_returns_original_key(self):
"""When padding_seconds=0.0, pad_track should return the original S3 key
without applying any padding (same as start_time=0 from metadata)."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/earliest-track.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=0.0,
)
# padding_seconds=0.0 → start_time_seconds=0.0 → "no padding needed" branch
assert input_data.padding_seconds == 0.0
# ── Pipeline offset calculation (process_tracks logic) ─────────
class TestProcessTracksOffsetCalculation:
"""Test the offset calculation logic used in process_tracks
for LiveKit source_platform."""
def test_livekit_offsets_from_timestamps(self):
"""Simulate the offset calculation done in process_tracks."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "admin-0129c3",
"timestamp": "2026-04-01T23:44:50+00:00",
},
{
"s3_key": "track2.ogg",
"participant_identity": "juan-5a5b41",
"timestamp": "2026-04-01T23:46:00+00:00",
},
]
# Replicate the logic from process_tracks
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
earliest = min(ts for _, ts in timestamps)
track_padding = {}
for i, ts in timestamps:
track_padding[i] = (ts - earliest).total_seconds()
assert track_padding[0] == 0.0 # admin (earliest)
assert track_padding[1] == 70.0 # juan (70s later)
def test_daily_tracks_get_no_precalculated_padding(self):
"""Daily tracks should NOT get padding_seconds (use container metadata)."""
tracks = [
{"s3_key": "daily-track1.webm"},
{"s3_key": "daily-track2.webm"},
]
# Daily tracks don't have "timestamp" field
track_padding = {}
source_platform = "daily"
if source_platform == "livekit":
# This block should NOT execute for daily
pass
# Daily tracks get no pre-calculated padding
assert track_padding == {}
for i, _ in enumerate(tracks):
assert track_padding.get(i) is None
def test_livekit_missing_timestamp_graceful(self):
"""If a LiveKit track is missing timestamp, it should be skipped."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "alice",
"timestamp": "2026-04-01T10:00:00+00:00",
},
{"s3_key": "track2.ogg", "participant_identity": "bob"}, # no timestamp
]
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
try:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
except (ValueError, TypeError):
timestamps.append((i, None))
else:
timestamps.append((i, None))
valid = [(i, ts) for i, ts in timestamps if ts is not None]
assert len(valid) == 1 # only alice has a timestamp
assert valid[0][0] == 0 # track index 0