feat: Livekit - Selfhost video room solution (#946)

* feat: Livekit bare no recording nor pipeline

* feat: full livekit pipeline

* fix: caddy hatchet with livekit

* fix: caddy livekit

* fix: hatchet tls

* fix: agg to webm for no padding

* fix: reflector user id on participants and duration fix

* fix: better docs and internal review fixes

* fix: remove video files livekit
This commit is contained in:
Juan Diego García
2026-04-07 11:55:16 -05:00
committed by GitHub
parent b570d202dc
commit bc8338fa4f
41 changed files with 3731 additions and 146 deletions

View File

@@ -42,6 +42,7 @@ dependencies = [
"pydantic>=2.12.5",
"aiosmtplib>=3.0.0",
"email-validator>=2.0.0",
"livekit-api>=1.1.0",
]
[dependency-groups]

View File

@@ -15,6 +15,7 @@ from reflector.metrics import metrics_init
from reflector.settings import settings
from reflector.views.config import router as config_router
from reflector.views.daily import router as daily_router
from reflector.views.livekit import router as livekit_router
from reflector.views.meetings import router as meetings_router
from reflector.views.rooms import router as rooms_router
from reflector.views.rtc_offer import router as rtc_offer_router
@@ -112,6 +113,7 @@ app.include_router(config_router, prefix="/v1")
app.include_router(zulip_router, prefix="/v1")
app.include_router(whereby_router, prefix="/v1")
app.include_router(daily_router, prefix="/v1/daily")
app.include_router(livekit_router, prefix="/v1/livekit")
if auth_router:
app.include_router(auth_router, prefix="/v1")
add_pagination(app)

View File

@@ -165,6 +165,17 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_all_inactive_livekit(self) -> list[Meeting]:
"""Get inactive LiveKit meetings (for multitrack processing discovery)."""
query = meetings.select().where(
sa.and_(
meetings.c.is_active == sa.false(),
meetings.c.platform == "livekit",
)
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_by_room_name(
self,
room_name: str,

View File

@@ -486,6 +486,14 @@ class TranscriptController:
return None
return Transcript(**result)
async def get_by_meeting_id(self, meeting_id: str) -> Transcript | None:
"""Get a transcript by meeting_id (first match)."""
query = transcripts.select().where(transcripts.c.meeting_id == meeting_id)
result = await get_database().fetch_one(query)
if not result:
return None
return Transcript(**result)
async def get_by_recording_id(
self, recording_id: str, **kwargs
) -> Transcript | None:

View File

@@ -273,8 +273,10 @@ def with_error_handling(
)
@with_error_handling(TaskName.GET_RECORDING)
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: starting for recording_id={input.recording_id}")
"""Fetch recording metadata. Platform-aware: Daily calls API, LiveKit skips."""
ctx.log(
f"get_recording: starting for recording_id={input.recording_id}, platform={input.source_platform}"
)
ctx.log(
f"get_recording: transcript_id={input.transcript_id}, room_id={input.room_id}"
)
@@ -299,6 +301,18 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
ctx.log(f"get_recording: status set to 'processing' and broadcasted")
# LiveKit: no external API call needed — metadata comes from S3 track listing
if input.source_platform == "livekit":
ctx.log(
"get_recording: LiveKit platform — skipping API call (metadata from S3)"
)
return RecordingResult(
id=input.recording_id,
mtg_session_id=None,
duration=0, # Duration calculated from tracks later
)
# Daily.co: fetch recording metadata from API
if not settings.DAILY_API_KEY:
ctx.log("get_recording: ERROR - DAILY_API_KEY not configured")
raise ValueError("DAILY_API_KEY not configured")
@@ -332,11 +346,12 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
@with_error_handling(TaskName.GET_PARTICIPANTS)
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
"""Fetch participant list and update transcript. Platform-aware."""
ctx.log(
f"get_participants: transcript_id={input.transcript_id}, platform={input.source_platform}"
)
recording = ctx.task_output(get_recording)
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
@@ -347,8 +362,8 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
# Duration from recording metadata (seconds -> milliseconds)
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update(
transcript,
@@ -360,65 +375,141 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(
api_key=daily_api_key, base_url=settings.DAILY_API_URL
) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
if duration_ms:
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
participants_list: list[ParticipantInfo] = []
for idx, key in enumerate(cam_audio_keys):
if input.source_platform == "livekit":
# LiveKit: participant identity is in the track dict or can be parsed from filepath
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
# Look up identity → Reflector user_id mapping from Redis
# (stored at join time in rooms.py)
identity_to_user_id: dict[str, str] = {}
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
from reflector.db.meetings import (
meetings_controller as mc, # noqa: PLC0415
)
from reflector.redis_cache import (
get_async_redis_client, # noqa: PLC0415
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
meeting = (
await mc.get_by_id(transcript.meeting_id)
if transcript.meeting_id
else None
)
if meeting:
redis_client = await get_async_redis_client()
mapping_key = f"livekit:participant_map:{meeting.room_name}"
raw_map = await redis_client.hgetall(mapping_key)
identity_to_user_id = {
k.decode() if isinstance(k, bytes) else k: v.decode()
if isinstance(v, bytes)
else v
for k, v in raw_map.items()
}
ctx.log(
f"get_participants: loaded {len(identity_to_user_id)} identity→user_id mappings from Redis"
)
except Exception as e:
ctx.log(
f"get_participants: could not load identity map from Redis: {e}"
)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=participant_id,
user_name=name,
for idx, track in enumerate(input.tracks):
identity = track.get("participant_identity")
if not identity:
# Reprocess path: parse from S3 key
try:
parsed = parse_livekit_track_filepath(track["s3_key"])
identity = parsed.participant_identity
except (ValueError, KeyError):
identity = f"speaker-{idx}"
# Strip the uuid suffix from identity for display name
# e.g., "Juan-2bcea0" → "Juan"
display_name = (
identity.rsplit("-", 1)[0] if "-" in identity else identity
)
reflector_user_id = identity_to_user_id.get(identity)
participant = TranscriptParticipant(
id=identity,
speaker=idx,
name=display_name,
user_id=reflector_user_id,
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=identity,
user_name=display_name,
speaker=idx,
)
)
else:
# Daily.co: fetch participant names from API
mtg_session_id = recording.mtg_session_id
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(
api_key=daily_api_key, base_url=settings.DAILY_API_URL
) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=participant_id,
user_name=name,
speaker=idx,
)
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
@@ -440,11 +531,66 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
@with_error_handling(TaskName.PROCESS_TRACKS)
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
ctx.log(
f"process_tracks: spawning {len(input.tracks)} track workflows, platform={input.source_platform}"
)
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
# For LiveKit: calculate padding offsets from filename timestamps.
# OGG files don't have embedded start_time metadata, so we pre-calculate.
track_padding: dict[int, float] = {}
if input.source_platform == "livekit":
from datetime import datetime # noqa: PLC0415
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
timestamps = []
for i, track in enumerate(input.tracks):
ts_str = track.get("timestamp")
if ts_str:
try:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
except (ValueError, TypeError):
ctx.log(
f"process_tracks: could not parse timestamp for track {i}: {ts_str}"
)
timestamps.append((i, None))
else:
# Reprocess path: parse timestamp from S3 key
try:
parsed = parse_livekit_track_filepath(track["s3_key"])
timestamps.append((i, parsed.timestamp))
ctx.log(
f"process_tracks: parsed timestamp from S3 key for track {i}: {parsed.timestamp}"
)
except (ValueError, KeyError):
timestamps.append((i, None))
valid_timestamps = [(i, ts) for i, ts in timestamps if ts is not None]
if valid_timestamps:
earliest = min(ts for _, ts in valid_timestamps)
# LiveKit Track Egress outputs OGG/Opus files, but the transcription
# service only accepts WebM. The padding step converts OGG→WebM as a
# side effect of applying the adelay filter. For the earliest track
# (offset=0), we use a minimal padding to force this conversion.
LIVEKIT_MIN_PADDING_SECONDS = (
0.001 # 1ms — inaudible, forces OGG→WebM conversion
)
for i, ts in valid_timestamps:
offset = (ts - earliest).total_seconds()
if offset == 0.0:
offset = LIVEKIT_MIN_PADDING_SECONDS
track_padding[i] = offset
ctx.log(
f"process_tracks: track {i} padding={offset}s (from filename timestamp)"
)
bulk_runs = [
track_workflow.create_bulk_run_item(
input=TrackInput(
@@ -454,6 +600,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
transcript_id=input.transcript_id,
language=source_language,
source_platform=input.source_platform,
padding_seconds=track_padding.get(i),
)
)
for i, track in enumerate(input.tracks)
@@ -605,13 +752,31 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
# else: modal backend already uploaded to output_url
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"audio_location": "storage"}
)
update_data = {"audio_location": "storage"}
# Set duration from mixdown if not already set (LiveKit: duration starts at 0)
if not transcript.duration or transcript.duration == 0:
update_data["duration"] = result.duration_ms
await transcripts_controller.update(transcript, update_data)
# Broadcast duration update if it was missing
if not transcript.duration or transcript.duration == 0:
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=result.duration_ms),
logger=logger,
)
ctx.log(
f"mixdown_tracks: set duration={result.duration_ms}ms from mixdown"
)
ctx.log(f"mixdown_tracks complete: {result.size} bytes to {storage_path}")

View File

@@ -37,6 +37,9 @@ class TrackInput(BaseModel):
transcript_id: str
language: str = "en"
source_platform: str = "daily"
# Pre-calculated padding in seconds (from filename timestamps for LiveKit).
# When set, overrides container metadata extraction for start_time.
padding_seconds: float | None = None
hatchet = HatchetClientManager.get_client()
@@ -53,15 +56,19 @@ track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackI
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
For Daily: extracts stream.start_time from WebM container metadata.
For LiveKit: uses pre-calculated padding_seconds from filename timestamps
(OGG files don't have embedded start_time metadata).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
ctx.log(
f"pad_track: track {input.track_index}, s3_key={input.s3_key}, padding_seconds={input.padding_seconds}"
)
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
padding_seconds=input.padding_seconds,
)
try:
@@ -79,10 +86,16 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if input.padding_seconds is not None:
# Pre-calculated offset (LiveKit: from filename timestamps)
start_time_seconds = input.padding_seconds
ctx.log(f"pad_track: using pre-calculated padding={start_time_seconds}s")
else:
# Extract from container metadata (Daily: WebM start_time)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:

View File

@@ -0,0 +1,12 @@
"""
LiveKit API Module — thin wrapper around the livekit-api SDK.
"""
from .client import LiveKitApiClient
from .webhooks import create_webhook_receiver, verify_webhook
__all__ = [
"LiveKitApiClient",
"create_webhook_receiver",
"verify_webhook",
]

View File

@@ -0,0 +1,195 @@
"""
LiveKit API client wrapping the official livekit-api Python SDK.
Handles room management, access tokens, and Track Egress for
per-participant audio recording to S3-compatible storage.
"""
from datetime import timedelta
from livekit.api import (
AccessToken,
AutoTrackEgress,
CreateRoomRequest,
DeleteRoomRequest,
DirectFileOutput,
EgressInfo,
ListEgressRequest,
ListParticipantsRequest,
LiveKitAPI,
Room,
RoomEgress,
S3Upload,
StopEgressRequest,
TrackEgressRequest,
VideoGrants,
)
class LiveKitApiClient:
"""Thin wrapper around LiveKitAPI for Reflector's needs."""
def __init__(
self,
url: str,
api_key: str,
api_secret: str,
s3_bucket: str | None = None,
s3_region: str | None = None,
s3_access_key: str | None = None,
s3_secret_key: str | None = None,
s3_endpoint: str | None = None,
):
self._url = url
self._api_key = api_key
self._api_secret = api_secret
self._s3_bucket = s3_bucket
self._s3_region = s3_region or "us-east-1"
self._s3_access_key = s3_access_key
self._s3_secret_key = s3_secret_key
self._s3_endpoint = s3_endpoint
self._api = LiveKitAPI(url=url, api_key=api_key, api_secret=api_secret)
# ── Room management ──────────────────────────────────────────
async def create_room(
self,
name: str,
empty_timeout: int = 300,
max_participants: int = 0,
enable_auto_track_egress: bool = False,
track_egress_filepath: str = "livekit/{room_name}/{publisher_identity}-{time}",
) -> Room:
"""Create a LiveKit room.
Args:
name: Room name (unique identifier).
empty_timeout: Seconds to keep room alive after last participant leaves.
max_participants: 0 = unlimited.
enable_auto_track_egress: If True, automatically record each participant's
audio track to S3 as a separate file (OGG/Opus).
track_egress_filepath: S3 filepath template for auto track egress.
Supports {room_name}, {publisher_identity}, {time}.
"""
egress = None
if enable_auto_track_egress:
egress = RoomEgress(
tracks=AutoTrackEgress(
filepath=track_egress_filepath,
s3=self._build_s3_upload(),
),
)
req = CreateRoomRequest(
name=name,
empty_timeout=empty_timeout,
max_participants=max_participants,
egress=egress,
)
return await self._api.room.create_room(req)
async def delete_room(self, room_name: str) -> None:
await self._api.room.delete_room(DeleteRoomRequest(room=room_name))
async def list_participants(self, room_name: str):
resp = await self._api.room.list_participants(
ListParticipantsRequest(room=room_name)
)
return resp.participants
# ── Access tokens ────────────────────────────────────────────
def create_access_token(
self,
room_name: str,
participant_identity: str,
participant_name: str | None = None,
can_publish: bool = True,
can_subscribe: bool = True,
room_admin: bool = False,
ttl_seconds: int = 86400,
) -> str:
"""Generate a JWT access token for a participant."""
token = AccessToken(
api_key=self._api_key,
api_secret=self._api_secret,
)
token.identity = participant_identity
token.name = participant_name or participant_identity
token.ttl = timedelta(seconds=ttl_seconds)
token.with_grants(
VideoGrants(
room_join=True,
room=room_name,
can_publish=can_publish,
can_subscribe=can_subscribe,
room_admin=room_admin,
)
)
return token.to_jwt()
# ── Track Egress (per-participant audio recording) ───────────
def _build_s3_upload(self) -> S3Upload:
"""Build S3Upload config for egress output."""
if not all([self._s3_bucket, self._s3_access_key, self._s3_secret_key]):
raise ValueError(
"S3 storage not configured for LiveKit egress. "
"Set LIVEKIT_STORAGE_AWS_* environment variables."
)
kwargs = {
"access_key": self._s3_access_key,
"secret": self._s3_secret_key,
"bucket": self._s3_bucket,
"region": self._s3_region,
"force_path_style": True, # Required for Garage/MinIO
}
if self._s3_endpoint:
kwargs["endpoint"] = self._s3_endpoint
return S3Upload(**kwargs)
async def start_track_egress(
self,
room_name: str,
track_sid: str,
s3_filepath: str,
) -> EgressInfo:
"""Start Track Egress for a single audio track (writes OGG/Opus to S3).
Args:
room_name: LiveKit room name.
track_sid: Track SID to record.
s3_filepath: S3 key path for the output file.
"""
req = TrackEgressRequest(
room_name=room_name,
track_id=track_sid,
file=DirectFileOutput(
filepath=s3_filepath,
s3=self._build_s3_upload(),
),
)
return await self._api.egress.start_track_egress(req)
async def list_egress(self, room_name: str | None = None) -> list[EgressInfo]:
req = ListEgressRequest()
if room_name:
req.room_name = room_name
resp = await self._api.egress.list_egress(req)
return list(resp.items)
async def stop_egress(self, egress_id: str) -> EgressInfo:
return await self._api.egress.stop_egress(
StopEgressRequest(egress_id=egress_id)
)
# ── Cleanup ──────────────────────────────────────────────────
async def close(self):
await self._api.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@@ -0,0 +1,52 @@
"""
LiveKit webhook verification and event parsing.
LiveKit signs webhooks using the API secret as a JWT.
The WebhookReceiver from the SDK handles verification.
"""
from livekit.api import TokenVerifier, WebhookEvent, WebhookReceiver
from reflector.logger import logger
def create_webhook_receiver(api_key: str, api_secret: str) -> WebhookReceiver:
"""Create a WebhookReceiver for verifying LiveKit webhook signatures."""
return WebhookReceiver(
token_verifier=TokenVerifier(api_key=api_key, api_secret=api_secret)
)
def verify_webhook(
receiver: WebhookReceiver,
body: str | bytes,
auth_header: str,
) -> WebhookEvent | None:
"""Verify and parse a LiveKit webhook event.
Returns the parsed WebhookEvent if valid, None if verification fails.
Logs at different levels depending on failure type:
- WARNING: invalid signature, expired token, malformed JWT (expected rejections)
- ERROR: unexpected exceptions (potential bugs or attacks)
"""
if isinstance(body, bytes):
body = body.decode("utf-8")
try:
return receiver.receive(body, auth_header)
except (ValueError, KeyError) as e:
# Expected verification failures (bad JWT, wrong key, expired, malformed)
logger.warning(
"LiveKit webhook verification failed",
error=str(e),
error_type=type(e).__name__,
)
return None
except Exception as e:
# Unexpected errors — log at ERROR for visibility (potential attack or SDK bug)
logger.error(
"Unexpected error during LiveKit webhook verification",
error=str(e),
error_type=type(e).__name__,
exc_info=True,
)
return None

View File

@@ -1,5 +1,6 @@
from typing import Literal
Platform = Literal["whereby", "daily"]
Platform = Literal["whereby", "daily", "livekit"]
WHEREBY_PLATFORM: Platform = "whereby"
DAILY_PLATFORM: Platform = "daily"
LIVEKIT_PLATFORM: Platform = "livekit"

View File

@@ -155,12 +155,17 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
)
if track_keys:
# Detect platform from recording ID prefix
source_platform = (
"livekit" if recording_id and recording_id.startswith("lk-") else "daily"
)
return MultitrackProcessingConfig(
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
recording_id=recording_id,
room_id=validation.room_id,
source_platform=source_platform,
)
return FileProcessingConfig(

View File

@@ -195,6 +195,23 @@ class Settings(BaseSettings):
DAILY_WEBHOOK_UUID: str | None = (
None # Webhook UUID for this environment. Not used by production code
)
# LiveKit integration (self-hosted open-source video platform)
LIVEKIT_URL: str | None = (
None # e.g. ws://livekit:7880 (internal) or wss://livekit.example.com
)
LIVEKIT_API_KEY: str | None = None
LIVEKIT_API_SECRET: str | None = None
LIVEKIT_WEBHOOK_SECRET: str | None = None # Defaults to API_SECRET if not set
# LiveKit egress S3 storage (Track Egress writes per-participant audio here)
LIVEKIT_STORAGE_AWS_BUCKET_NAME: str | None = None
LIVEKIT_STORAGE_AWS_REGION: str | None = None
LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
LIVEKIT_STORAGE_AWS_ENDPOINT_URL: str | None = None # For Garage/MinIO
# Public URL for LiveKit (used in frontend room_url, e.g. wss://livekit.example.com)
LIVEKIT_PUBLIC_URL: str | None = None
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM

View File

@@ -57,6 +57,22 @@ def get_source_storage(platform: str) -> Storage:
aws_secret_access_key=settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY,
)
elif platform == "livekit":
if (
settings.LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID
and settings.LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY
and settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME
):
from reflector.storage.storage_aws import AwsStorage
return AwsStorage(
aws_bucket_name=settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.LIVEKIT_STORAGE_AWS_REGION or "us-east-1",
aws_access_key_id=settings.LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY,
aws_endpoint_url=settings.LIVEKIT_STORAGE_AWS_ENDPOINT_URL,
)
return get_transcripts_storage()

View File

@@ -0,0 +1,112 @@
"""
LiveKit track file utilities.
Parse participant identity and timing from Auto Track Egress S3 filepaths.
Actual filepath format from LiveKit Auto Track Egress:
livekit/{room_name}/{publisher_identity}-{ISO_timestamp}-{track_id}.{ext}
Examples:
livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg
livekit/myroom-20260401172036/juan2-63abcf-2026-04-01T195847-TR_AMyoSbM7tAQbYj.ogg
livekit/myroom-20260401172036/EG_K5sipvfB5fTM.json (manifest, skip)
livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm (video, skip)
"""
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from reflector.utils.string import NonEmptyString
@dataclass
class LiveKitTrackFile:
"""Parsed info from a LiveKit track egress filepath."""
s3_key: str
room_name: str
participant_identity: str
timestamp: datetime # Parsed from ISO timestamp in filename
track_id: str # LiveKit track ID (e.g., TR_AMR3SWs74Divho)
# Pattern: livekit/{room_name}/{identity}-{ISO_date}T{time}-{track_id}.{ext}
# The identity can contain alphanumeric, hyphens, underscores
# ISO timestamp is like 2026-04-01T195758
# Track ID starts with TR_
_TRACK_FILENAME_PATTERN = re.compile(
r"^livekit/(?P<room_name>[^/]+)/(?P<identity>.+?)-(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{6})-(?P<track_id>TR_\w+)\.(?P<ext>\w+)$"
)
def parse_livekit_track_filepath(s3_key: str) -> LiveKitTrackFile:
"""Parse a LiveKit track egress filepath into components.
Args:
s3_key: S3 key like 'livekit/myroom-20260401/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg'
Returns:
LiveKitTrackFile with parsed components.
Raises:
ValueError: If the filepath doesn't match the expected format.
"""
match = _TRACK_FILENAME_PATTERN.match(s3_key)
if not match:
raise ValueError(
f"LiveKit track filepath doesn't match expected format: {s3_key}"
)
# Parse ISO-ish timestamp (e.g., 2026-04-01T195758 → datetime)
ts_str = match.group("timestamp")
try:
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H%M%S").replace(tzinfo=timezone.utc)
except ValueError:
raise ValueError(f"Cannot parse timestamp '{ts_str}' from: {s3_key}")
return LiveKitTrackFile(
s3_key=s3_key,
room_name=match.group("room_name"),
participant_identity=match.group("identity"),
timestamp=ts,
track_id=match.group("track_id"),
)
def filter_audio_tracks(s3_keys: list[str]) -> list[str]:
"""Filter S3 keys to only audio tracks (.ogg), excluding manifests and video."""
return [k for k in s3_keys if k.endswith(".ogg")]
def calculate_track_offsets(
tracks: list[LiveKitTrackFile],
) -> list[tuple[LiveKitTrackFile, float]]:
"""Calculate silence padding offset for each track.
The earliest track starts at time zero. Each subsequent track
gets (track_timestamp - earliest_timestamp) seconds of silence prepended.
Returns:
List of (track, offset_seconds) tuples.
"""
if not tracks:
return []
earliest = min(t.timestamp for t in tracks)
return [(t, (t.timestamp - earliest).total_seconds()) for t in tracks]
def extract_livekit_base_room_name(livekit_room_name: str) -> NonEmptyString:
"""Extract base room name from LiveKit timestamped room name.
LiveKit rooms use the same naming as Daily: {base_name}-YYYYMMDDHHMMSS
"""
base_name = livekit_room_name.rsplit("-", 1)[0]
assert base_name, f"Extracted base name is empty from: {livekit_room_name}"
return NonEmptyString(base_name)
def recording_lock_key(room_name: str) -> str:
"""Redis lock key for preventing duplicate processing."""
return f"livekit:processing:{room_name}"

View File

@@ -1,7 +1,7 @@
from reflector.settings import settings
from reflector.storage import get_dailyco_storage, get_whereby_storage
from ..schemas.platform import WHEREBY_PLATFORM, Platform
from ..schemas.platform import LIVEKIT_PLATFORM, WHEREBY_PLATFORM, Platform
from .base import VideoPlatformClient, VideoPlatformConfig
from .registry import get_platform_client
@@ -44,6 +44,27 @@ def get_platform_config(platform: Platform) -> VideoPlatformConfig:
s3_region=daily_storage.region,
aws_role_arn=daily_storage.role_credential,
)
elif platform == LIVEKIT_PLATFORM:
if not settings.LIVEKIT_URL:
raise ValueError(
"LIVEKIT_URL is required when platform='livekit'. "
"Set LIVEKIT_URL environment variable."
)
if not settings.LIVEKIT_API_KEY or not settings.LIVEKIT_API_SECRET:
raise ValueError(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET are required when platform='livekit'. "
"Set LIVEKIT_API_KEY and LIVEKIT_API_SECRET environment variables."
)
return VideoPlatformConfig(
api_key=settings.LIVEKIT_API_KEY,
webhook_secret=settings.LIVEKIT_WEBHOOK_SECRET
or settings.LIVEKIT_API_SECRET,
api_url=settings.LIVEKIT_URL,
s3_bucket=settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME,
s3_region=settings.LIVEKIT_STORAGE_AWS_REGION,
aws_access_key_id=settings.LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID,
aws_access_key_secret=settings.LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
else:
raise ValueError(f"Unknown platform: {platform}")

View File

@@ -0,0 +1,192 @@
"""
LiveKit video platform client for Reflector.
Self-hosted, open-source alternative to Daily.co.
Uses Track Egress for per-participant audio recording (no composite video).
"""
from datetime import datetime, timezone
from urllib.parse import urlencode
from uuid import uuid4
from reflector.db.rooms import Room
from reflector.livekit_api.client import LiveKitApiClient
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
from reflector.logger import logger
from reflector.settings import settings
from ..schemas.platform import Platform
from ..utils.string import NonEmptyString
from .base import ROOM_PREFIX_SEPARATOR, VideoPlatformClient
from .models import MeetingData, SessionData, VideoPlatformConfig
class LiveKitClient(VideoPlatformClient):
PLATFORM_NAME: Platform = "livekit"
TIMESTAMP_FORMAT = "%Y%m%d%H%M%S"
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
self._api_client = LiveKitApiClient(
url=config.api_url or "",
api_key=config.api_key,
api_secret=config.webhook_secret, # LiveKit uses API secret for both auth and webhooks
s3_bucket=config.s3_bucket,
s3_region=config.s3_region,
s3_access_key=config.aws_access_key_id,
s3_secret_key=config.aws_access_key_secret,
s3_endpoint=settings.LIVEKIT_STORAGE_AWS_ENDPOINT_URL,
)
self._webhook_receiver = create_webhook_receiver(
api_key=config.api_key,
api_secret=config.webhook_secret,
)
async def create_meeting(
self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room
) -> MeetingData:
"""Create a LiveKit room for this meeting.
LiveKit rooms are created explicitly via API. A new room is created
for each Reflector meeting (same pattern as Daily.co).
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime(self.TIMESTAMP_FORMAT)
room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}"
# Calculate empty_timeout from end_date (seconds until expiry)
# Ensure end_date is timezone-aware for subtraction
end_date_aware = (
end_date if end_date.tzinfo else end_date.replace(tzinfo=timezone.utc)
)
remaining = int((end_date_aware - now).total_seconds())
empty_timeout = max(300, min(remaining, 86400)) # 5 min to 24 hours
# Enable auto track egress for cloud recording (per-participant audio to S3).
# Gracefully degrade if S3 credentials are missing — room still works, just no recording.
enable_recording = room.recording_type == "cloud"
egress_enabled = False
if enable_recording:
try:
self._api_client._build_s3_upload() # Validate credentials exist
egress_enabled = True
except ValueError:
logger.warning(
"S3 credentials not configured — room created without auto track egress. "
"Set LIVEKIT_STORAGE_AWS_* to enable recording.",
room_name=room_name,
)
lk_room = await self._api_client.create_room(
name=room_name,
empty_timeout=empty_timeout,
enable_auto_track_egress=egress_enabled,
)
logger.info(
"LiveKit room created",
room_name=lk_room.name,
room_sid=lk_room.sid,
empty_timeout=empty_timeout,
auto_track_egress=egress_enabled,
)
# room_url includes the server URL + room name as query param.
# The join endpoint in rooms.py appends the token as another query param.
# Frontend parses: ws://host:7880?room=<name>&token=<jwt>
public_url = settings.LIVEKIT_PUBLIC_URL or settings.LIVEKIT_URL or ""
room_url = f"{public_url}?{urlencode({'room': lk_room.name})}"
return MeetingData(
meeting_id=lk_room.sid or str(uuid4()),
room_name=lk_room.name,
room_url=room_url,
host_room_url=room_url,
platform=self.PLATFORM_NAME,
extra_data={"livekit_room_sid": lk_room.sid},
)
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
"""Get current participants in a LiveKit room.
For historical sessions, we rely on webhook-stored data (same as Daily).
This returns currently-connected participants.
"""
try:
participants = await self._api_client.list_participants(room_name)
return [
SessionData(
session_id=p.sid,
started_at=datetime.fromtimestamp(
p.joined_at if p.joined_at else 0, tz=timezone.utc
),
ended_at=None, # Still active
)
for p in participants
if p.sid # Skip empty entries
]
except Exception as e:
logger.debug(
"Could not list LiveKit participants (room may not exist)",
room_name=room_name,
error=str(e),
)
return []
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
# LiveKit doesn't have a logo upload concept; handled in frontend theming
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: str | None = None
) -> bool:
"""Verify LiveKit webhook signature.
LiveKit sends the JWT in the Authorization header. The `signature`
param here receives the Authorization header value.
"""
event = verify_webhook(self._webhook_receiver, body, signature)
return event is not None
def create_access_token(
self,
room_name: str,
participant_identity: str,
participant_name: str | None = None,
is_admin: bool = False,
) -> str:
"""Generate a LiveKit access token for a participant."""
return self._api_client.create_access_token(
room_name=room_name,
participant_identity=participant_identity,
participant_name=participant_name,
room_admin=is_admin,
)
async def start_track_egress(
self,
room_name: str,
track_sid: str,
s3_filepath: str,
):
"""Start Track Egress for a single audio track."""
return await self._api_client.start_track_egress(
room_name=room_name,
track_sid=track_sid,
s3_filepath=s3_filepath,
)
async def list_egress(self, room_name: str | None = None):
return await self._api_client.list_egress(room_name=room_name)
async def stop_egress(self, egress_id: str):
return await self._api_client.stop_egress(egress_id=egress_id)
async def close(self):
await self._api_client.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@@ -1,6 +1,11 @@
from typing import Dict, Type
from ..schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM, Platform
from ..schemas.platform import (
DAILY_PLATFORM,
LIVEKIT_PLATFORM,
WHEREBY_PLATFORM,
Platform,
)
from .base import VideoPlatformClient, VideoPlatformConfig
_PLATFORMS: Dict[Platform, Type[VideoPlatformClient]] = {}
@@ -26,10 +31,12 @@ def get_available_platforms() -> list[Platform]:
def _register_builtin_platforms():
from .daily import DailyClient # noqa: PLC0415
from .livekit import LiveKitClient # noqa: PLC0415
from .whereby import WherebyClient # noqa: PLC0415
register_platform(WHEREBY_PLATFORM, WherebyClient)
register_platform(DAILY_PLATFORM, DailyClient)
register_platform(LIVEKIT_PLATFORM, LiveKitClient)
_register_builtin_platforms()

View File

@@ -0,0 +1,246 @@
"""LiveKit webhook handler.
Processes LiveKit webhook events for participant tracking and
Track Egress recording completion.
LiveKit sends webhooks as POST requests with JWT authentication
in the Authorization header.
Webhooks are used as fast-path triggers and logging. Track discovery
for the multitrack pipeline uses S3 listing (source of truth), not
webhook data.
"""
from fastapi import APIRouter, HTTPException, Request
from reflector.db.meetings import meetings_controller
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
from reflector.logger import logger as _logger
from reflector.settings import settings
router = APIRouter()
logger = _logger.bind(platform="livekit")
# Module-level receiver, lazily initialized on first webhook
_webhook_receiver = None
def _get_webhook_receiver():
global _webhook_receiver
if _webhook_receiver is None:
if not settings.LIVEKIT_API_KEY or not settings.LIVEKIT_API_SECRET:
raise ValueError("LiveKit not configured")
_webhook_receiver = create_webhook_receiver(
api_key=settings.LIVEKIT_API_KEY,
api_secret=settings.LIVEKIT_WEBHOOK_SECRET or settings.LIVEKIT_API_SECRET,
)
return _webhook_receiver
@router.post("/webhook")
async def livekit_webhook(request: Request):
"""Handle LiveKit webhook events.
LiveKit webhook events include:
- participant_joined / participant_left
- egress_started / egress_updated / egress_ended
- room_started / room_finished
- track_published / track_unpublished
"""
if not settings.LIVEKIT_API_KEY or not settings.LIVEKIT_API_SECRET:
raise HTTPException(status_code=500, detail="LiveKit not configured")
body = await request.body()
auth_header = request.headers.get("Authorization", "")
receiver = _get_webhook_receiver()
event = verify_webhook(receiver, body, auth_header)
if event is None:
logger.warning(
"Invalid LiveKit webhook signature",
has_auth=bool(auth_header),
has_body=bool(body),
)
raise HTTPException(status_code=401, detail="Invalid webhook signature")
event_type = event.event
match event_type:
case "participant_joined":
await _handle_participant_joined(event)
case "participant_left":
await _handle_participant_left(event)
case "egress_started":
await _handle_egress_started(event)
case "egress_ended":
await _handle_egress_ended(event)
case "room_started":
logger.info(
"Room started",
room_name=event.room.name if event.room else None,
)
case "room_finished":
await _handle_room_finished(event)
case "track_published" | "track_unpublished":
logger.debug(
f"Track event: {event_type}",
room_name=event.room.name if event.room else None,
participant=event.participant.identity if event.participant else None,
)
case _:
logger.debug(
"Unhandled LiveKit webhook event",
event_type=event_type,
)
return {"status": "ok"}
async def _handle_participant_joined(event):
room_name = event.room.name if event.room else None
participant = event.participant
if not room_name or not participant:
logger.warning("participant_joined: missing room or participant data")
return
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning("participant_joined: meeting not found", room_name=room_name)
return
logger.info(
"Participant joined",
meeting_id=meeting.id,
room_name=room_name,
participant_identity=participant.identity,
participant_sid=participant.sid,
)
async def _handle_participant_left(event):
room_name = event.room.name if event.room else None
participant = event.participant
if not room_name or not participant:
logger.warning("participant_left: missing room or participant data")
return
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning("participant_left: meeting not found", room_name=room_name)
return
logger.info(
"Participant left",
meeting_id=meeting.id,
room_name=room_name,
participant_identity=participant.identity,
participant_sid=participant.sid,
)
async def _handle_egress_started(event):
egress = event.egress_info
logger.info(
"Egress started",
room_name=egress.room_name if egress else None,
egress_id=egress.egress_id if egress else None,
)
async def _handle_egress_ended(event):
"""Handle Track Egress completion. Delete video files immediately to save storage.
AutoTrackEgress records ALL tracks (audio + video). Audio is kept for the
transcription pipeline. Video files are unused and deleted on completion.
This saves ~50x storage (video is 98% of egress output for HD cameras).
"""
egress = event.egress_info
if not egress:
logger.warning("egress_ended: no egress info in payload")
return
# EGRESS_FAILED = 4
if egress.status == 4:
logger.error(
"Egress failed",
room_name=egress.room_name,
egress_id=egress.egress_id,
error=egress.error,
)
return
file_results = list(egress.file_results)
logger.info(
"Egress ended",
room_name=egress.room_name,
egress_id=egress.egress_id,
status=egress.status,
num_files=len(file_results),
filenames=[f.filename for f in file_results] if file_results else [],
)
# Delete video files (.webm) immediately — only audio (.ogg) is needed for transcription.
# Video tracks are 50-90x larger than audio and unused by the pipeline.
# JSON manifests are kept (lightweight metadata, ~430 bytes each).
for file_result in file_results:
filename = file_result.filename
if filename and filename.endswith(".webm"):
try:
from reflector.storage import get_source_storage # noqa: PLC0415
storage = get_source_storage("livekit")
await storage.delete_file(filename)
logger.info(
"Deleted video egress file",
filename=filename,
room_name=egress.room_name,
)
except Exception as e:
# Non-critical — pipeline filters these out anyway
logger.warning(
"Failed to delete video egress file",
filename=filename,
error=str(e),
)
async def _handle_room_finished(event):
"""Fast-path: trigger multitrack processing when room closes.
This is an optimization — if missed, the process_livekit_ended_meetings
beat task catches it within ~2 minutes.
"""
room_name = event.room.name if event.room else None
if not room_name:
logger.warning("room_finished: no room name in payload")
return
logger.info("Room finished", room_name=room_name)
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning("room_finished: meeting not found", room_name=room_name)
return
# Deactivate the meeting — LiveKit room is destroyed, so process_meetings
# can't detect this via API (list_participants returns empty for deleted rooms).
if meeting.is_active:
await meetings_controller.update_meeting(meeting.id, is_active=False)
logger.info("room_finished: meeting deactivated", meeting_id=meeting.id)
# Import here to avoid circular imports (worker imports views)
from reflector.worker.process import process_livekit_multitrack
process_livekit_multitrack.delay(
room_name=room_name,
meeting_id=meeting.id,
)
logger.info(
"room_finished: queued multitrack processing",
meeting_id=meeting.id,
room_name=room_name,
)

View File

@@ -554,6 +554,7 @@ async def rooms_join_meeting(
room_name: str,
meeting_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
display_name: str | None = None,
):
user_id = user["sub"] if user else None
room = await rooms_controller.get_by_name(room_name)
@@ -598,4 +599,51 @@ async def rooms_join_meeting(
meeting = meeting.model_copy()
meeting.room_url = add_query_param(meeting.room_url, "t", token)
elif meeting.platform == "livekit":
import re
import uuid
client = create_platform_client(meeting.platform)
# Identity must be unique per participant to avoid S3 key collisions.
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
uid_suffix = uuid.uuid4().hex[:6]
if display_name:
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", display_name.strip())[:40]
participant_identity = (
f"{safe_name}-{uid_suffix}" if safe_name else f"anon-{uid_suffix}"
)
elif user_id:
email = getattr(user, "email", None)
if email and "@" in email:
participant_identity = f"{email.split('@')[0]}-{uid_suffix}"
else:
participant_identity = f"{user_id[:12]}-{uid_suffix}"
else:
participant_identity = f"anon-{uid_suffix}"
participant_name = display_name or participant_identity
# Store identity → Reflector user_id mapping for the pipeline
# (so TranscriptParticipant.user_id can be set correctly)
if user_id:
from reflector.redis_cache import get_async_redis_client # noqa: PLC0415
redis_client = await get_async_redis_client()
mapping_key = f"livekit:participant_map:{meeting.room_name}"
await redis_client.hset(mapping_key, participant_identity, user_id)
await redis_client.expire(mapping_key, 7 * 86400) # 7 day TTL
token = client.create_access_token(
room_name=meeting.room_name,
participant_identity=participant_identity,
participant_name=participant_name,
is_admin=user_id == room.user_id if user_id else False,
)
# Close the platform client to release aiohttp session
if hasattr(client, "close"):
await client.close()
meeting = meeting.model_copy()
# For LiveKit, room_url is the WS URL; token goes as a query param
meeting.room_url = add_query_param(meeting.room_url, "token", token)
return meeting

View File

@@ -83,7 +83,25 @@ def build_beat_schedule(
else:
logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)")
_any_platform = _whereby_enabled or _daily_enabled
_livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL)
if _livekit_enabled:
beat_schedule["process_livekit_ended_meetings"] = {
"task": "reflector.worker.process.process_livekit_ended_meetings",
"schedule": 120, # Every 2 minutes
}
beat_schedule["reprocess_failed_livekit_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_livekit_recordings",
"schedule": crontab(hour=5, minute=0),
}
logger.info(
"LiveKit beat tasks enabled",
tasks=[
"process_livekit_ended_meetings",
"reprocess_failed_livekit_recordings",
],
)
_any_platform = _whereby_enabled or _daily_enabled or _livekit_enabled
if _any_platform:
beat_schedule["process_meetings"] = {
"task": "reflector.worker.process.process_meetings",

View File

@@ -874,6 +874,22 @@ async def process_meetings():
logger_.info(
"Meeting deactivated - scheduled time ended with no participants",
)
elif meeting.platform == "livekit" and not has_had_sessions:
# LiveKit rooms are destroyed after empty_timeout. Once gone,
# list_participants returns [] — indistinguishable from "never used".
# Check if meeting was created >10 min ago; if so, assume room is gone.
meeting_start = meeting.start_date
if meeting_start.tzinfo is None:
meeting_start = meeting_start.replace(tzinfo=timezone.utc)
age_minutes = (current_time - meeting_start).total_seconds() / 60
if age_minutes > 10:
should_deactivate = True
logger_.info(
"LiveKit meeting deactivated - room likely destroyed (no sessions after 10 min)",
age_minutes=round(age_minutes, 1),
)
else:
logger_.debug("LiveKit meeting still young, keep it")
else:
logger_.debug("Meeting not yet started, keep it")
@@ -1170,3 +1186,311 @@ async def trigger_daily_reconciliation() -> None:
except Exception as e:
logger.error("Reconciliation trigger failed", error=str(e), exc_info=True)
# ============================================================
# LiveKit multitrack recording tasks
# ============================================================
@shared_task
@asynctask
async def process_livekit_multitrack(
room_name: str,
meeting_id: str,
):
"""
Process LiveKit multitrack recording by discovering tracks on S3.
Tracks are discovered via S3 listing (source of truth), not webhooks.
Called from room_finished webhook (fast-path) or beat task (fallback).
"""
from reflector.utils.livekit import ( # noqa: PLC0415
recording_lock_key,
)
logger.info(
"Processing LiveKit multitrack recording",
room_name=room_name,
meeting_id=meeting_id,
)
lock_key = recording_lock_key(room_name)
async with RedisAsyncLock(
key=lock_key,
timeout=600,
extend_interval=60,
skip_if_locked=True,
blocking=False,
) as lock:
if not lock.acquired:
logger.warning(
"LiveKit processing skipped - lock already held",
room_name=room_name,
lock_key=lock_key,
)
return
await _process_livekit_multitrack_inner(room_name, meeting_id)
async def _process_livekit_multitrack_inner(
room_name: str,
meeting_id: str,
):
"""Inner processing logic for LiveKit multitrack recording."""
# 1. Discover tracks by listing S3 prefix.
# Wait briefly for egress files to finish flushing to S3 — the room_finished
# webhook fires after empty_timeout, but egress finalization may still be in progress.
import asyncio as _asyncio # noqa: PLC0415
from reflector.storage import get_source_storage # noqa: PLC0415
from reflector.utils.livekit import ( # noqa: PLC0415
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds
EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing
await _asyncio.sleep(EGRESS_FLUSH_DELAY)
storage = get_source_storage("livekit")
s3_prefix = f"livekit/{room_name}/"
all_keys = await storage.list_objects(prefix=s3_prefix)
# Filter to audio tracks only (.ogg) — skip .json manifests and .webm video
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
if not audio_keys:
# Retry once after a longer delay — egress may still be flushing
logger.info(
"No audio tracks found yet, retrying after delay",
room_name=room_name,
retry_delay=EGRESS_RETRY_DELAY,
)
await _asyncio.sleep(EGRESS_RETRY_DELAY)
all_keys = await storage.list_objects(prefix=s3_prefix)
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
# Sanity check: compare audio tracks against egress manifests.
# Each Track Egress (audio or video) produces a .json manifest.
# Video tracks produce .webm files. So expected audio count ≈ manifests - video files.
if all_keys:
manifest_count = sum(1 for k in all_keys if k.endswith(".json"))
video_count = sum(1 for k in all_keys if k.endswith(".webm"))
expected_audio = manifest_count - video_count
if expected_audio > len(audio_keys) and expected_audio > 0:
# Some audio tracks may still be flushing — wait and retry
logger.info(
"Expected more audio tracks based on manifests, waiting for late flushes",
room_name=room_name,
expected=expected_audio,
found=len(audio_keys),
)
await _asyncio.sleep(EGRESS_RETRY_DELAY)
all_keys = await storage.list_objects(prefix=s3_prefix)
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
logger.info(
"S3 track discovery complete",
room_name=room_name,
total_files=len(all_keys) if all_keys else 0,
audio_files=len(audio_keys),
)
if not audio_keys:
logger.warning(
"No audio track files found on S3 after retries",
room_name=room_name,
s3_prefix=s3_prefix,
)
return
# 2. Parse track info from filenames
parsed_tracks = []
for key in audio_keys:
try:
parsed = parse_livekit_track_filepath(key)
parsed_tracks.append(parsed)
except ValueError as e:
logger.warning("Skipping unparseable track file", s3_key=key, error=str(e))
if not parsed_tracks:
logger.warning(
"No valid track files found after parsing",
room_name=room_name,
raw_keys=all_keys,
)
return
track_keys = [t.s3_key for t in parsed_tracks]
# 3. Find meeting and room
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found for LiveKit recording",
meeting_id=meeting_id,
room_name=room_name,
)
return
base_room_name = extract_livekit_base_room_name(room_name)
room = await rooms_controller.get_by_name(base_room_name)
if not room:
logger.error("Room not found", room_name=base_room_name)
return
# 4. Create recording
recording_id = f"lk-{room_name}"
bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME or ""
existing_recording = await recordings_controller.get_by_id(recording_id)
if existing_recording and existing_recording.deleted_at is not None:
logger.info("Skipping soft-deleted recording", recording_id=recording_id)
return
if not existing_recording:
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=s3_prefix,
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting.id,
track_keys=track_keys,
)
)
else:
recording = existing_recording
# 5. Create or get transcript
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript and transcript.deleted_at is not None:
logger.info("Skipping soft-deleted transcript", recording_id=recording.id)
return
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
recording_id=recording.id,
share_mode="semi-private",
meeting_id=meeting.id,
room_id=room.id,
)
# 6. Start Hatchet pipeline (reuses DiarizationPipeline with source_platform="livekit")
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [
{
"s3_key": t.s3_key,
"participant_identity": t.participant_identity,
"timestamp": t.timestamp.isoformat(),
}
for t in parsed_tracks
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
"source_platform": "livekit",
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
},
)
logger.info(
"Started LiveKit Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
room_name=room_name,
num_tracks=len(parsed_tracks),
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@asynctask
async def process_livekit_ended_meetings():
"""Check for inactive LiveKit meetings that need multitrack processing.
Runs on a beat schedule. Catches cases where room_finished webhook was missed.
Only processes meetings that:
- Platform is "livekit"
- is_active=False (already deactivated by process_meetings)
- No associated transcript yet
"""
from reflector.db.transcripts import transcripts_controller as tc # noqa: PLC0415
all_livekit = await meetings_controller.get_all_inactive_livekit()
queued = 0
for meeting in all_livekit:
# Skip if already has a transcript
existing = await tc.get_by_meeting_id(meeting.id)
if existing:
continue
logger.info(
"Found unprocessed inactive LiveKit meeting",
meeting_id=meeting.id,
room_name=meeting.room_name,
)
process_livekit_multitrack.delay(
room_name=meeting.room_name,
meeting_id=meeting.id,
)
queued += 1
if queued > 0:
logger.info("Queued LiveKit multitrack processing", count=queued)
@shared_task
@asynctask
async def reprocess_failed_livekit_recordings():
"""Reprocess LiveKit recordings that failed.
Runs daily at 5 AM. Finds recordings with livekit prefix and error status.
"""
bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
return
failed = await recordings_controller.get_multitrack_needing_reprocessing(
bucket_name
)
livekit_failed = [r for r in failed if r.id.startswith("lk-")]
for recording in livekit_failed:
if not recording.meeting_id:
logger.warning(
"Skipping reprocess — no meeting_id",
recording_id=recording.id,
)
continue
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
continue
logger.info(
"Reprocessing failed LiveKit recording",
recording_id=recording.id,
meeting_id=meeting.id,
)
process_livekit_multitrack.delay(
room_name=meeting.room_name,
meeting_id=meeting.id,
)

View File

@@ -0,0 +1,408 @@
"""
Tests for LiveKit backend: webhook verification, token generation,
display_name sanitization, and platform client behavior.
"""
import re
import pytest
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
# ── Webhook verification ──────────────────────────────────────
class TestWebhookVerification:
def _make_receiver(self):
"""Create a receiver with test credentials."""
return create_webhook_receiver(
api_key="test_key",
api_secret="test_secret_that_is_long_enough_for_hmac",
)
def test_rejects_empty_auth_header(self):
receiver = self._make_receiver()
result = verify_webhook(receiver, b'{"event":"test"}', "")
assert result is None
def test_rejects_garbage_auth_header(self):
receiver = self._make_receiver()
result = verify_webhook(receiver, b'{"event":"test"}', "not-a-jwt")
assert result is None
def test_rejects_empty_body(self):
receiver = self._make_receiver()
result = verify_webhook(receiver, b"", "Bearer some.jwt.token")
assert result is None
def test_handles_bytes_body(self):
receiver = self._make_receiver()
# Should not crash on bytes input
result = verify_webhook(receiver, b'{"event":"test"}', "invalid")
assert result is None
def test_handles_string_body(self):
receiver = self._make_receiver()
result = verify_webhook(receiver, '{"event":"test"}', "invalid")
assert result is None
def test_rejects_wrong_secret(self):
"""Webhook signed with different secret should be rejected."""
receiver = self._make_receiver()
# A JWT signed with a different secret
fake_jwt = "eyJhbGciOiJIUzI1NiJ9.eyJ0ZXN0IjoxfQ.wrong_signature"
result = verify_webhook(receiver, b"{}", fake_jwt)
assert result is None
# ── Token generation ──────────────────────────────────────────
class TestTokenGeneration:
"""Test token generation using the LiveKit SDK directly (no client instantiation)."""
def _generate_token(
self, room_name="room", identity="user", name=None, admin=False, ttl=86400
):
"""Generate a token using the SDK directly, avoiding LiveKitAPI client session."""
from datetime import timedelta
from livekit.api import AccessToken, VideoGrants
token = AccessToken(
api_key="test_key", api_secret="test_secret_that_is_long_enough_for_hmac"
)
token.identity = identity
token.name = name or identity
token.ttl = timedelta(seconds=ttl)
token.with_grants(
VideoGrants(
room_join=True,
room=room_name,
can_publish=True,
can_subscribe=True,
room_admin=admin,
)
)
return token.to_jwt()
def _decode_claims(self, token):
import base64
import json
payload = token.split(".")[1]
payload += "=" * (4 - len(payload) % 4)
return json.loads(base64.b64decode(payload))
def test_creates_valid_jwt(self):
token = self._generate_token(
room_name="test-room", identity="user123", name="Test User"
)
assert isinstance(token, str)
assert len(token.split(".")) == 3
def test_token_includes_room_name(self):
token = self._generate_token(room_name="my-room-20260401", identity="alice")
claims = self._decode_claims(token)
assert claims.get("video", {}).get("room") == "my-room-20260401"
assert claims.get("sub") == "alice"
def test_token_respects_admin_flag(self):
token = self._generate_token(identity="admin", admin=True)
claims = self._decode_claims(token)
assert claims["video"]["roomAdmin"] is True
def test_token_non_admin_by_default(self):
token = self._generate_token(identity="user")
claims = self._decode_claims(token)
assert claims.get("video", {}).get("roomAdmin") in (None, False)
def test_ttl_is_timedelta(self):
"""Verify ttl as timedelta works (previous bug: int caused TypeError)."""
token = self._generate_token(ttl=3600)
assert isinstance(token, str)
# ── Display name sanitization ─────────────────────────────────
class TestDisplayNameSanitization:
"""Test the sanitization logic from rooms.py join endpoint."""
def _sanitize(self, display_name: str) -> str:
"""Replicate the sanitization from rooms_join_meeting."""
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", display_name.strip())[:40]
return safe_name
def test_normal_name(self):
assert self._sanitize("Alice") == "Alice"
def test_name_with_spaces(self):
assert self._sanitize("John Doe") == "John_Doe"
def test_name_with_special_chars(self):
assert self._sanitize("user@email.com") == "user_email_com"
def test_name_with_unicode(self):
result = self._sanitize("José García")
assert result == "Jos__Garc_a"
assert all(
c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
for c in result
)
def test_name_with_emoji(self):
result = self._sanitize("👋 Hello")
assert "_" in result # Emoji replaced with underscore
assert "Hello" in result
def test_very_long_name(self):
long_name = "A" * 100
result = self._sanitize(long_name)
assert len(result) == 40
def test_empty_name(self):
result = self._sanitize("")
assert result == ""
def test_only_special_chars(self):
result = self._sanitize("!!!")
assert result == "___"
def test_whitespace_stripped(self):
result = self._sanitize(" Alice ")
assert result == "Alice"
def test_hyphens_preserved(self):
assert self._sanitize("first-last") == "first-last"
def test_underscores_preserved(self):
assert self._sanitize("first_last") == "first_last"
def test_html_injection(self):
result = self._sanitize("<script>alert('xss')</script>")
assert "<" not in result
assert ">" not in result
assert "'" not in result
# ── S3 egress configuration ───────────────────────────────────
class TestS3EgressConfig:
"""Test S3Upload construction using the SDK directly."""
def test_build_s3_upload_requires_all_fields(self):
# Missing fields should raise or produce invalid config
# The validation happens in our client wrapper, not the SDK
# Test the validation logic directly
s3_bucket = None
s3_access_key = "AKID"
s3_secret_key = "secret"
assert not all([s3_bucket, s3_access_key, s3_secret_key])
def test_s3_upload_with_credentials(self):
from livekit.api import S3Upload
upload = S3Upload(
access_key="AKID",
secret="secret123",
bucket="test-bucket",
region="us-east-1",
force_path_style=True,
)
assert upload.bucket == "test-bucket"
assert upload.force_path_style is True
def test_s3_upload_with_endpoint(self):
from livekit.api import S3Upload
upload = S3Upload(
access_key="AKID",
secret="secret",
bucket="bucket",
region="us-east-1",
force_path_style=True,
endpoint="http://garage:3900",
)
assert upload.endpoint == "http://garage:3900"
# ── Platform detection ────────────────────────────────────────
# ── Redis participant mapping ──────────────────────────────
class TestParticipantIdentityMapping:
"""Test the identity → user_id Redis mapping pattern."""
def test_mapping_key_format(self):
room_name = "myroom-20260401172036"
mapping_key = f"livekit:participant_map:{room_name}"
assert mapping_key == "livekit:participant_map:myroom-20260401172036"
def test_identity_with_uuid_suffix_is_unique(self):
import uuid
name = "Juan"
id1 = f"{name}-{uuid.uuid4().hex[:6]}"
id2 = f"{name}-{uuid.uuid4().hex[:6]}"
assert id1 != id2
assert id1.startswith("Juan-")
assert id2.startswith("Juan-")
def test_strip_uuid_suffix_for_display(self):
"""Pipeline strips UUID suffix for display name."""
identity = "Juan-2bcea0"
display_name = identity.rsplit("-", 1)[0] if "-" in identity else identity
assert display_name == "Juan"
def test_strip_uuid_preserves_hyphenated_names(self):
identity = "Mary-Jane-abc123"
display_name = identity.rsplit("-", 1)[0] if "-" in identity else identity
assert display_name == "Mary-Jane"
def test_anon_identity_no_user_id(self):
"""Anonymous participants should not have a user_id mapping."""
identity = "anon-abc123"
# In the pipeline, anon identities don't get looked up
assert identity.startswith("anon-")
@pytest.mark.asyncio
async def test_redis_hset_hgetall_roundtrip(self):
"""Test the actual Redis operations used for participant mapping."""
try:
from reflector.redis_cache import get_async_redis_client
redis_client = await get_async_redis_client()
test_key = "livekit:participant_map:__test_room__"
# Write
await redis_client.hset(test_key, "Juan-abc123", "user-id-1")
await redis_client.hset(test_key, "Alice-def456", "user-id-2")
# Read
raw_map = await redis_client.hgetall(test_key)
decoded = {
k.decode() if isinstance(k, bytes) else k: v.decode()
if isinstance(v, bytes)
else v
for k, v in raw_map.items()
}
assert decoded["Juan-abc123"] == "user-id-1"
assert decoded["Alice-def456"] == "user-id-2"
# Cleanup
await redis_client.delete(test_key)
except Exception:
pytest.skip("Redis not available")
# ── Egress video cleanup safety ────────────────────────────────
class TestEgressVideoCleanup:
"""Ensure video cleanup logic NEVER deletes audio files."""
AUDIO_FILES = [
"livekit/room-20260401/juan-abc123-2026-04-01T100000-TR_AMR3SWs74Divho.ogg",
"livekit/room-20260401/alice-def456-2026-04-01T100030-TR_AMirKjdAvLteAZ.ogg",
"livekit/room-20260401/bob-789abc-2026-04-01T100100-TR_AMyoSbM7tAQbYj.ogg",
]
VIDEO_FILES = [
"livekit/room-20260401/juan-abc123-2026-04-01T100000-TR_VC679dgMQBdfhT.webm",
"livekit/room-20260401/alice-def456-2026-04-01T100030-TR_VCLsuRuxLp4eik.webm",
]
MANIFEST_FILES = [
"livekit/room-20260401/EG_K5sipvfB5fTM.json",
"livekit/room-20260401/EG_nzwBsH9xzgoj.json",
]
def _should_delete(self, filename: str) -> bool:
"""Replicate the deletion logic from _handle_egress_ended."""
return filename.endswith(".webm")
def test_audio_files_never_deleted(self):
"""CRITICAL: Audio files must NEVER be marked for deletion."""
for f in self.AUDIO_FILES:
assert not self._should_delete(f), f"Audio file would be deleted: {f}"
def test_video_files_are_deleted(self):
for f in self.VIDEO_FILES:
assert self._should_delete(f), f"Video file NOT marked for deletion: {f}"
def test_manifests_are_kept(self):
for f in self.MANIFEST_FILES:
assert not self._should_delete(f), f"Manifest would be deleted: {f}"
def test_ogg_extension_never_matches_delete(self):
"""Double-check: no .ogg file ever matches the deletion condition."""
test_names = [
"anything.ogg",
"livekit/room/track.ogg",
"video.ogg", # Even if someone names it "video.ogg"
".ogg",
"TR_VC_fake_video.ogg", # Video-like track ID but .ogg extension
]
for f in test_names:
assert not self._should_delete(f), f".ogg file would be deleted: {f}"
def test_webm_always_matches_delete(self):
test_names = [
"anything.webm",
"livekit/room/track.webm",
"audio.webm", # Even if someone names it "audio.webm"
".webm",
]
for f in test_names:
assert self._should_delete(f), f".webm file NOT marked for deletion: {f}"
def test_unknown_extensions_are_kept(self):
"""Unknown file types should NOT be deleted (safe by default)."""
test_names = [
"file.mp4",
"file.wav",
"file.mp3",
"file.txt",
"file",
"",
]
for f in test_names:
assert not self._should_delete(
f
), f"Unknown file type would be deleted: {f}"
# ── Platform detection ────────────────────────────────────────
class TestSourcePlatformDetection:
"""Test the recording ID prefix-based platform detection from transcript_process.py."""
def test_livekit_prefix(self):
recording_id = "lk-livekit-20260401234423"
platform = "livekit" if recording_id.startswith("lk-") else "daily"
assert platform == "livekit"
def test_daily_no_prefix(self):
recording_id = "08fa0b24-9220-44c5-846c-3f116cf8e738"
platform = "livekit" if recording_id.startswith("lk-") else "daily"
assert platform == "daily"
def test_none_recording_id(self):
recording_id = None
platform = (
"livekit" if recording_id and recording_id.startswith("lk-") else "daily"
)
assert platform == "daily"
def test_empty_recording_id(self):
recording_id = ""
platform = (
"livekit" if recording_id and recording_id.startswith("lk-") else "daily"
)
assert platform == "daily"

View File

@@ -0,0 +1,393 @@
"""
Tests for LiveKit track processing: filepath parsing, offset calculation,
and pad_track padding_seconds behavior.
"""
from datetime import datetime, timezone
from fractions import Fraction
import av
import pytest
from reflector.utils.livekit import (
LiveKitTrackFile,
calculate_track_offsets,
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
# ── Filepath parsing ──────────────────────────────────────────
class TestParseLiveKitTrackFilepath:
def test_parses_ogg_audio_track(self):
result = parse_livekit_track_filepath(
"livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg"
)
assert result.room_name == "myroom-20260401172036"
assert result.participant_identity == "juan-4b82ed"
assert result.track_id == "TR_AMR3SWs74Divho"
assert result.timestamp == datetime(2026, 4, 1, 19, 57, 58, tzinfo=timezone.utc)
def test_parses_different_identities(self):
r1 = parse_livekit_track_filepath(
"livekit/room-20260401/alice-a1b2c3-2026-04-01T100000-TR_abc123.ogg"
)
r2 = parse_livekit_track_filepath(
"livekit/room-20260401/bob_smith-d4e5f6-2026-04-01T100030-TR_def456.ogg"
)
assert r1.participant_identity == "alice-a1b2c3"
assert r2.participant_identity == "bob_smith-d4e5f6"
def test_rejects_json_manifest(self):
with pytest.raises(ValueError, match="doesn't match expected format"):
parse_livekit_track_filepath("livekit/myroom-20260401/EG_K5sipvfB5fTM.json")
def test_rejects_webm_video(self):
# webm files match the pattern but are filtered by filter_audio_tracks
result = parse_livekit_track_filepath(
"livekit/myroom-20260401/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm"
)
# webm parses successfully (TR_ prefix matches video tracks too)
assert result.track_id == "TR_VC679dgMQBdfhT"
def test_rejects_invalid_path(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("not/a/valid/path.ogg")
def test_rejects_missing_track_id(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("livekit/room/user-2026-04-01T100000.ogg")
def test_parses_timestamp_correctly(self):
result = parse_livekit_track_filepath(
"livekit/room-20260401/user-abc123-2026-12-25T235959-TR_test.ogg"
)
assert result.timestamp == datetime(
2026, 12, 25, 23, 59, 59, tzinfo=timezone.utc
)
# ── Audio track filtering ─────────────────────────────────────
class TestFilterAudioTracks:
def test_filters_to_ogg_only(self):
keys = [
"livekit/room/EG_abc.json",
"livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
"livekit/room/user-abc-2026-04-01T100000-TR_video.webm",
"livekit/room/EG_def.json",
"livekit/room/user2-def-2026-04-01T100030-TR_audio2.ogg",
]
result = filter_audio_tracks(keys)
assert len(result) == 2
assert all(k.endswith(".ogg") for k in result)
def test_empty_input(self):
assert filter_audio_tracks([]) == []
def test_no_audio_tracks(self):
keys = ["livekit/room/EG_abc.json", "livekit/room/user-TR_v.webm"]
assert filter_audio_tracks(keys) == []
# ── Offset calculation ─────────────────────────────────────────
class TestCalculateTrackOffsets:
def test_single_track_zero_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
)
]
offsets = calculate_track_offsets(tracks)
assert len(offsets) == 1
assert offsets[0][1] == 0.0
def test_two_tracks_correct_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 1, 10, tzinfo=timezone.utc),
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert offsets[0][1] == 0.0 # alice (earliest)
assert offsets[1][1] == 70.0 # bob (70 seconds later)
def test_three_tracks_earliest_is_zero(self):
tracks = [
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 0, 30, tzinfo=timezone.utc),
track_id="TR_2",
),
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k3",
room_name="r",
participant_identity="charlie",
timestamp=datetime(2026, 4, 1, 10, 1, 0, tzinfo=timezone.utc),
track_id="TR_3",
),
]
offsets = calculate_track_offsets(tracks)
offset_map = {t.participant_identity: o for t, o in offsets}
assert offset_map["alice"] == 0.0
assert offset_map["bob"] == 30.0
assert offset_map["charlie"] == 60.0
def test_empty_tracks(self):
assert calculate_track_offsets([]) == []
def test_simultaneous_tracks_zero_offsets(self):
ts = datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc)
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="a",
timestamp=ts,
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="b",
timestamp=ts,
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert all(o == 0.0 for _, o in offsets)
# ── Room name extraction ───────────────────────────────────────
class TestExtractLiveKitBaseRoomName:
def test_strips_timestamp_suffix(self):
assert extract_livekit_base_room_name("myroom-20260401172036") == "myroom"
def test_preserves_hyphenated_name(self):
assert (
extract_livekit_base_room_name("my-room-name-20260401172036")
== "my-room-name"
)
def test_single_segment(self):
assert extract_livekit_base_room_name("room-20260401") == "room"
# ── pad_track padding_seconds behavior ─────────────────────────
class TestPadTrackPaddingSeconds:
"""Test that pad_track correctly uses pre-calculated padding_seconds
for LiveKit (skipping container metadata) vs extracting from container
for Daily (when padding_seconds is None).
"""
def _make_test_ogg(self, path: str, duration_seconds: float = 5.0):
"""Create a minimal OGG/Opus file for testing."""
with av.open(path, "w", format="ogg") as out:
stream = out.add_stream("libopus", rate=48000)
stream.bit_rate = 64000
samples_per_frame = 960 # Opus standard
total_samples = int(duration_seconds * 48000)
pts = 0
while pts < total_samples:
frame = av.AudioFrame(
format="s16", layout="stereo", samples=samples_per_frame
)
# Fill with silence (zeros)
frame.planes[0].update(bytes(samples_per_frame * 2 * 2)) # s16 * stereo
frame.sample_rate = 48000
frame.pts = pts
frame.time_base = Fraction(1, 48000)
for packet in stream.encode(frame):
out.mux(packet)
pts += samples_per_frame
for packet in stream.encode(None):
out.mux(packet)
def test_ogg_has_zero_start_time(self, tmp_path):
"""Verify that OGG files (like LiveKit produces) have start_time=0,
confirming why pre-calculated padding is needed."""
ogg_path = str(tmp_path / "test.ogg")
self._make_test_ogg(ogg_path)
with av.open(ogg_path) as container:
from reflector.utils.audio_padding import (
extract_stream_start_time_from_container,
)
start_time = extract_stream_start_time_from_container(container, 0)
assert start_time <= 0.0, (
"OGG files should have start_time<=0 (no usable offset), confirming "
f"LiveKit tracks need pre-calculated padding_seconds. Got: {start_time}"
)
def test_precalculated_padding_skips_metadata_extraction(self, tmp_path):
"""When padding_seconds is set, pad_track should use it directly
and NOT call extract_stream_start_time_from_container."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=70.0,
)
assert input_data.padding_seconds == 70.0
# The pad_track function checks: if input.padding_seconds is not None → use it
# This means extract_stream_start_time_from_container is never called for LiveKit
def test_none_padding_falls_back_to_metadata(self, tmp_path):
"""When padding_seconds is None (Daily), pad_track should extract
start_time from container metadata."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="daily/room/track.webm",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="daily",
padding_seconds=None,
)
assert input_data.padding_seconds is None
# pad_track will call extract_stream_start_time_from_container for this case
def test_zero_padding_returns_original_key(self):
"""When padding_seconds=0.0, pad_track should return the original S3 key
without applying any padding (same as start_time=0 from metadata)."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/earliest-track.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=0.0,
)
# padding_seconds=0.0 → start_time_seconds=0.0 → "no padding needed" branch
assert input_data.padding_seconds == 0.0
# ── Pipeline offset calculation (process_tracks logic) ─────────
class TestProcessTracksOffsetCalculation:
"""Test the offset calculation logic used in process_tracks
for LiveKit source_platform."""
def test_livekit_offsets_from_timestamps(self):
"""Simulate the offset calculation done in process_tracks."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "admin-0129c3",
"timestamp": "2026-04-01T23:44:50+00:00",
},
{
"s3_key": "track2.ogg",
"participant_identity": "juan-5a5b41",
"timestamp": "2026-04-01T23:46:00+00:00",
},
]
# Replicate the logic from process_tracks
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
earliest = min(ts for _, ts in timestamps)
track_padding = {}
for i, ts in timestamps:
track_padding[i] = (ts - earliest).total_seconds()
assert track_padding[0] == 0.0 # admin (earliest)
assert track_padding[1] == 70.0 # juan (70s later)
def test_daily_tracks_get_no_precalculated_padding(self):
"""Daily tracks should NOT get padding_seconds (use container metadata)."""
tracks = [
{"s3_key": "daily-track1.webm"},
{"s3_key": "daily-track2.webm"},
]
# Daily tracks don't have "timestamp" field
track_padding = {}
source_platform = "daily"
if source_platform == "livekit":
# This block should NOT execute for daily
pass
# Daily tracks get no pre-calculated padding
assert track_padding == {}
for i, _ in enumerate(tracks):
assert track_padding.get(i) is None
def test_livekit_missing_timestamp_graceful(self):
"""If a LiveKit track is missing timestamp, it should be skipped."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "alice",
"timestamp": "2026-04-01T10:00:00+00:00",
},
{"s3_key": "track2.ogg", "participant_identity": "bob"}, # no timestamp
]
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
try:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
except (ValueError, TypeError):
timestamps.append((i, None))
else:
timestamps.append((i, None))
valid = [(i, ts) for i, ts in timestamps if ts is not None]
assert len(valid) == 1 # only alice has a timestamp
assert valid[0][0] == 0 # track index 0

40
server/uv.lock generated
View File

@@ -1805,6 +1805,35 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/25/f4/ead6e0e37209b07c9baa3e984ccdb0348ca370b77cea3aaea8ddbb097e00/lightning_utilities-0.15.3-py3-none-any.whl", hash = "sha256:6c55f1bee70084a1cbeaa41ada96e4b3a0fea5909e844dd335bd80f5a73c5f91", size = 31906, upload-time = "2026-02-22T14:48:52.488Z" },
]
[[package]]
name = "livekit-api"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
{ name = "livekit-protocol" },
{ name = "protobuf" },
{ name = "pyjwt" },
{ name = "types-protobuf" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b5/0a/ad3cce124e608c056d6390244ec4dd18c8a4b5f055693a95831da2119af7/livekit_api-1.1.0.tar.gz", hash = "sha256:f94c000534d3a9b506e6aed2f35eb88db1b23bdea33bb322f0144c4e9f73934e", size = 16649, upload-time = "2025-12-02T19:37:11.452Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d3/b9/8d8515e3e0e629ab07d399cf858b8fc7e0a02bbf6384a6592b285264b4b9/livekit_api-1.1.0-py3-none-any.whl", hash = "sha256:bfc1c2c65392eb3f580a2c28108269f0e79873f053578a677eee7bb1de8aa8fb", size = 19620, upload-time = "2025-12-02T19:37:10.075Z" },
]
[[package]]
name = "livekit-protocol"
version = "1.1.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "protobuf" },
{ name = "types-protobuf" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8e/ca/d15e2a2cc8c8aa4ba621fe5f9ffd1806d88ac91c7b8fa4c09a3c0304dd92/livekit_protocol-1.1.3.tar.gz", hash = "sha256:cb4948d2513e81d91583f4a795bf80faa9026cedda509c5714999c7e33564287", size = 88746, upload-time = "2026-03-18T05:25:43.562Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5f/0e/f3d3e48628294df4559cffd0f8e1adf030127029e5a8da9beff9979090a0/livekit_protocol-1.1.3-py3-none-any.whl", hash = "sha256:fdae5640e064ab6549ec3d62d8bac75a3ef44d7ea73716069b419cbe8b360a5c", size = 107498, upload-time = "2026-03-18T05:25:42.077Z" },
]
[[package]]
name = "llama-cloud"
version = "0.1.35"
@@ -3364,6 +3393,7 @@ dependencies = [
{ name = "httpx" },
{ name = "icalendar" },
{ name = "jsonschema" },
{ name = "livekit-api" },
{ name = "llama-index" },
{ name = "llama-index-llms-openai-like" },
{ name = "openai" },
@@ -3445,6 +3475,7 @@ requires-dist = [
{ name = "httpx", specifier = ">=0.24.1" },
{ name = "icalendar", specifier = ">=6.0.0" },
{ name = "jsonschema", specifier = ">=4.23.0" },
{ name = "livekit-api", specifier = ">=1.1.0" },
{ name = "llama-index", specifier = ">=0.12.52" },
{ name = "llama-index-llms-openai-like", specifier = ">=0.4.0" },
{ name = "openai", specifier = ">=1.59.7" },
@@ -4399,6 +4430,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/42/3efaf858001d2c2913de7f354563e3a3a2f0decae3efe98427125a8f441e/typer-0.16.0-py3-none-any.whl", hash = "sha256:1f79bed11d4d02d4310e3c1b7ba594183bcedb0ac73b27a9e5f28f6fb5b98855", size = 46317, upload-time = "2025-05-26T14:30:30.523Z" },
]
[[package]]
name = "types-protobuf"
version = "6.32.1.20260221"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5f/e2/9aa4a3b2469508bd7b4e2ae11cbedaf419222a09a1b94daffcd5efca4023/types_protobuf-6.32.1.20260221.tar.gz", hash = "sha256:6d5fb060a616bfb076cbb61b4b3c3969f5fc8bec5810f9a2f7e648ee5cbcbf6e", size = 64408, upload-time = "2026-02-21T03:55:13.916Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/e8/1fd38926f9cf031188fbc5a96694203ea6f24b0e34bd64a225ec6f6291ba/types_protobuf-6.32.1.20260221-py3-none-any.whl", hash = "sha256:da7cdd947975964a93c30bfbcc2c6841ee646b318d3816b033adc2c4eb6448e4", size = 77956, upload-time = "2026-02-21T03:55:12.894Z" },
]
[[package]]
name = "typing-extensions"
version = "4.14.1"