feat: full livekit pipeline

This commit is contained in:
Juan
2026-04-01 19:34:57 -05:00
parent 6d84794e36
commit 240f0c9000
17 changed files with 1284 additions and 150 deletions

View File

@@ -35,7 +35,7 @@ services:
image: monadicalsas/reflector-backend:latest
restart: unless-stopped
ports:
- "127.0.0.1:1250:1250"
- "${BIND_HOST:-127.0.0.1}:1250:1250"
- "40000-40100:40000-40100/udp"
env_file:
- ./server/.env
@@ -116,7 +116,7 @@ services:
image: monadicalsas/reflector-frontend:latest
restart: unless-stopped
ports:
- "127.0.0.1:3000:3000"
- "${BIND_HOST:-127.0.0.1}:3000:3000"
env_file:
- ./www/.env
environment:
@@ -300,6 +300,7 @@ services:
ports:
- "80:80"
- "443:443"
- "8888:8888" # Hatchet dashboard (proxied with TLS)
volumes:
- ./Caddyfile:/etc/caddy/Caddyfile:ro
- caddy_data:/data
@@ -339,7 +340,6 @@ services:
postgres:
condition: service_healthy
ports:
- "127.0.0.1:8888:8888"
- "127.0.0.1:7078:7077"
env_file:
- ./.env.hatchet

View File

@@ -28,6 +28,10 @@
# Optional flags:
# --livekit Enable LiveKit self-hosted video platform (generates credentials,
# starts livekit-server + livekit-egress containers)
# --ip IP Set the server's IP address for all URLs. Implies --caddy
# (self-signed HTTPS, required for browser mic/camera access).
# Mutually exclusive with --domain. Use for LAN or cloud VM access.
# On Linux, IP is auto-detected; on macOS, use --ip to specify it.
# --garage Use Garage for local S3-compatible storage
# --caddy Enable Caddy reverse proxy with auto-SSL
# --domain DOMAIN Use a real domain for Caddy (enables Let's Encrypt auto-HTTPS)
@@ -214,6 +218,7 @@ USE_GARAGE=false
USE_LIVEKIT=false
USE_CADDY=false
CUSTOM_DOMAIN="" # optional domain for Let's Encrypt HTTPS
CUSTOM_IP="" # optional --ip override (mutually exclusive with --caddy)
BUILD_IMAGES=false # build backend/frontend from source
ADMIN_PASSWORD="" # optional admin password for password auth
CUSTOM_CA="" # --custom-ca: path to dir or CA cert file
@@ -268,6 +273,14 @@ for i in "${!ARGS[@]}"; do
--garage) USE_GARAGE=true ;;
--livekit) USE_LIVEKIT=true ;;
--caddy) USE_CADDY=true ;;
--ip)
next_i=$((i + 1))
if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then
err "--ip requires an IP address (e.g. --ip 192.168.0.100)"
exit 1
fi
CUSTOM_IP="${ARGS[$next_i]}"
SKIP_NEXT=true ;;
--build) BUILD_IMAGES=true ;;
--password)
next_i=$((i + 1))
@@ -362,6 +375,16 @@ for i in "${!ARGS[@]}"; do
esac
done
# --- Validate flag combinations ---
if [[ -n "$CUSTOM_IP" ]] && [[ -n "$CUSTOM_DOMAIN" ]]; then
err "--ip and --domain are mutually exclusive. Use --ip for IP-based access, or --domain for domain-based access."
exit 1
fi
# --ip implies --caddy (browsers require HTTPS for mic/camera access on non-localhost)
if [[ -n "$CUSTOM_IP" ]]; then
USE_CADDY=true
fi
# --- Save CLI args for config memory (re-run without flags) ---
if [[ $# -gt 0 ]]; then
mkdir -p "$ROOT_DIR/data"
@@ -558,26 +581,25 @@ _generate_livekit_config() {
env_set "$SERVER_ENV" "LIVEKIT_PUBLIC_URL" "$public_lk_url"
env_set "$SERVER_ENV" "DEFAULT_VIDEO_PLATFORM" "livekit"
# LiveKit storage: reuse transcript storage credentials if not separately configured
if ! env_has_key "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" || [[ -z "$(env_get "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" || true)" ]]; then
local ts_bucket ts_region ts_key ts_secret ts_endpoint
ts_bucket=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" 2>/dev/null || echo "reflector-bucket")
ts_region=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" 2>/dev/null || echo "us-east-1")
ts_key=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID" 2>/dev/null || true)
ts_secret=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY" 2>/dev/null || true)
ts_endpoint=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" 2>/dev/null || true)
env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" "$ts_bucket"
env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_REGION" "$ts_region"
[[ -n "$ts_key" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID" "$ts_key"
[[ -n "$ts_secret" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY" "$ts_secret"
[[ -n "$ts_endpoint" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ENDPOINT_URL" "$ts_endpoint"
if [[ -z "$ts_key" ]] || [[ -z "$ts_secret" ]]; then
warn "LiveKit storage: S3 credentials not found — Track Egress recording will fail!"
warn "Configure LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID and LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY in server/.env"
warn "Or run with --garage to auto-configure local S3 storage"
else
ok "LiveKit storage: reusing transcript storage config"
fi
# LiveKit storage: always sync from transcript storage config.
# Endpoint URL must match (changes between Caddy/no-Caddy runs).
local ts_bucket ts_region ts_key ts_secret ts_endpoint
ts_bucket=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" 2>/dev/null || echo "reflector-bucket")
ts_region=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" 2>/dev/null || echo "us-east-1")
ts_key=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID" 2>/dev/null || true)
ts_secret=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY" 2>/dev/null || true)
ts_endpoint=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" 2>/dev/null || true)
env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" "$ts_bucket"
env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_REGION" "$ts_region"
[[ -n "$ts_key" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID" "$ts_key"
[[ -n "$ts_secret" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY" "$ts_secret"
[[ -n "$ts_endpoint" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ENDPOINT_URL" "$ts_endpoint"
if [[ -z "$ts_key" ]] || [[ -z "$ts_secret" ]]; then
warn "LiveKit storage: S3 credentials not found — Track Egress recording will fail!"
warn "Configure LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID and LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY in server/.env"
warn "Or run with --garage to auto-configure local S3 storage"
else
ok "LiveKit storage: synced from transcript storage config"
fi
# Generate livekit.yaml
@@ -850,13 +872,23 @@ step_server_env() {
fi
else
if [[ -n "$PRIMARY_IP" ]]; then
server_base_url="http://$PRIMARY_IP"
server_base_url="http://$PRIMARY_IP:1250"
else
server_base_url="http://localhost:1250"
fi
fi
env_set "$SERVER_ENV" "BASE_URL" "$server_base_url"
env_set "$SERVER_ENV" "CORS_ORIGIN" "$server_base_url"
# CORS: allow the frontend origin (port 3000, not the API port)
local cors_origin="${server_base_url}"
if [[ "$USE_CADDY" != "true" ]]; then
# Without Caddy, frontend is on port 3000, API on 1250
cors_origin="${server_base_url/:1250/:3000}"
# Safety: if substitution didn't change anything, construct explicitly
if [[ "$cors_origin" == "$server_base_url" ]] && [[ -n "$PRIMARY_IP" ]]; then
cors_origin="http://${PRIMARY_IP}:3000"
fi
fi
env_set "$SERVER_ENV" "CORS_ORIGIN" "$cors_origin"
# WebRTC: advertise host IP in ICE candidates so browsers can reach the server
if [[ -n "$PRIMARY_IP" ]]; then
@@ -1066,6 +1098,18 @@ step_server_env() {
env_set "$SERVER_ENV" "HATCHET_CLIENT_HOST_PORT" "hatchet:7077"
ok "Hatchet connectivity configured (workflow engine for processing pipelines)"
# BIND_HOST controls whether server/web ports are exposed on all interfaces
local root_env="$ROOT_DIR/.env"
touch "$root_env"
if [[ "$USE_CADDY" == "true" ]]; then
# With Caddy, services stay on localhost (Caddy is the public entry point)
env_set "$root_env" "BIND_HOST" "127.0.0.1"
elif [[ -n "$PRIMARY_IP" ]]; then
# Without Caddy + detected IP, expose on all interfaces for direct access
env_set "$root_env" "BIND_HOST" "0.0.0.0"
ok "BIND_HOST=0.0.0.0 (ports exposed for direct access)"
fi
ok "server/.env ready"
}
@@ -1093,18 +1137,26 @@ step_www_env() {
base_url="https://localhost"
fi
else
# No Caddy — user's proxy handles SSL. Use http for now, they'll override.
# No Caddy — clients connect directly to services on their ports.
if [[ -n "$PRIMARY_IP" ]]; then
base_url="http://$PRIMARY_IP"
base_url="http://$PRIMARY_IP:3000"
else
base_url="http://localhost"
base_url="http://localhost:3000"
fi
fi
# API_URL: with Caddy, same origin (443 proxies both); without Caddy, API is on port 1250
local api_url="$base_url"
if [[ "$USE_CADDY" != "true" ]]; then
api_url="${base_url/:3000/:1250}"
# fallback if no port substitution happened (e.g. localhost without port)
[[ "$api_url" == "$base_url" ]] && api_url="${base_url}:1250"
fi
env_set "$WWW_ENV" "SITE_URL" "$base_url"
env_set "$WWW_ENV" "NEXTAUTH_URL" "$base_url"
env_set "$WWW_ENV" "NEXTAUTH_SECRET" "$NEXTAUTH_SECRET"
env_set "$WWW_ENV" "API_URL" "$base_url"
env_set "$WWW_ENV" "API_URL" "$api_url"
env_set "$WWW_ENV" "WEBSOCKET_URL" "auto"
env_set "$WWW_ENV" "SERVER_API_URL" "http://server:1250"
env_set "$WWW_ENV" "KV_URL" "redis://redis:6379"
@@ -1226,7 +1278,13 @@ step_garage() {
# Write S3 credentials to server/.env
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_BACKEND" "aws"
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://garage:3900"
# Endpoint URL: use public IP when no Caddy so presigned URLs work in the browser.
# With Caddy, internal hostname is fine (Caddy proxies or browser never sees presigned URLs directly).
if [[ "$USE_CADDY" != "true" ]] && [[ -n "$PRIMARY_IP" ]]; then
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://${PRIMARY_IP}:3900"
else
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://garage:3900"
fi
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" "reflector-media"
env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" "garage"
if [[ "$created_key" == "true" ]]; then
@@ -1355,11 +1413,13 @@ CADDYEOF
ok "Created Caddyfile for $CUSTOM_DOMAIN (Let's Encrypt auto-HTTPS)"
elif [[ -n "$PRIMARY_IP" ]]; then
# No domain, IP only: catch-all :443 with self-signed cert
# (IP connections don't send SNI, so we can't match by address)
# on_demand generates certs dynamically for any hostname/IP on first request
cat > "$caddyfile" << CADDYEOF
# Generated by setup-selfhosted.sh — self-signed cert for IP access
:443 {
tls internal
tls internal {
on_demand
}
handle /v1/* {
reverse_proxy server:1250
}
@@ -1386,7 +1446,9 @@ CADDYEOF
# Hatchet workflow dashboard (Daily.co multitrack processing)
:8888 {
tls internal
tls internal {
on_demand
}
reverse_proxy hatchet:8888
}
CADDYEOF
@@ -1597,7 +1659,7 @@ step_health() {
info "Waiting for Hatchet workflow engine..."
local hatchet_ok=false
for i in $(seq 1 60); do
if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then
if compose_cmd exec -T hatchet curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then
hatchet_ok=true
break
fi
@@ -1645,7 +1707,7 @@ step_hatchet_token() {
# Wait for hatchet to be healthy
local hatchet_ok=false
for i in $(seq 1 60); do
if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then
if compose_cmd exec -T hatchet curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then
hatchet_ok=true
break
fi
@@ -1716,12 +1778,19 @@ main() {
[[ "$BUILD_IMAGES" == "true" ]] && echo " Build: from source"
echo ""
# Detect primary IP
PRIMARY_IP=""
if [[ "$OS" == "Linux" ]]; then
PRIMARY_IP=$(hostname -I 2>/dev/null | awk '{print $1}' || true)
if [[ "$PRIMARY_IP" == "127."* ]] || [[ -z "$PRIMARY_IP" ]]; then
PRIMARY_IP=$(ip -4 route get 1 2>/dev/null | sed -n 's/.*src \([0-9.]*\).*/\1/p' || true)
# Detect primary IP (--ip overrides auto-detection)
if [[ -n "$CUSTOM_IP" ]]; then
PRIMARY_IP="$CUSTOM_IP"
ok "Using provided IP: $PRIMARY_IP"
else
PRIMARY_IP=""
if [[ "$OS" == "Linux" ]]; then
PRIMARY_IP=$(hostname -I 2>/dev/null | awk '{print $1}' || true)
if [[ "$PRIMARY_IP" == "127."* ]] || [[ -z "$PRIMARY_IP" ]]; then
PRIMARY_IP=$(ip -4 route get 1 2>/dev/null | sed -n 's/.*src \([0-9.]*\).*/\1/p' || true)
fi
elif [[ "$OS" == "Darwin" ]]; then
PRIMARY_IP=$(detect_lan_ip)
fi
fi
@@ -1827,10 +1896,12 @@ EOF
echo " App: https://localhost (accept self-signed cert in browser)"
echo " API: https://localhost/v1/"
fi
elif [[ -n "$PRIMARY_IP" ]]; then
echo " App: http://$PRIMARY_IP:3000"
echo " API: http://$PRIMARY_IP:1250"
else
echo " No Caddy — point your reverse proxy at:"
echo " Frontend: web:3000 (or localhost:3000 from host)"
echo " API: server:1250 (or localhost:1250 from host)"
echo " App: http://localhost:3000"
echo " API: http://localhost:1250"
fi
echo ""
if [[ "$HAS_OVERRIDES" == "true" ]]; then

View File

@@ -165,6 +165,17 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_all_inactive_livekit(self) -> list[Meeting]:
"""Get inactive LiveKit meetings (for multitrack processing discovery)."""
query = meetings.select().where(
sa.and_(
meetings.c.is_active == sa.false(),
meetings.c.platform == "livekit",
)
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_by_room_name(
self,
room_name: str,

View File

@@ -486,6 +486,14 @@ class TranscriptController:
return None
return Transcript(**result)
async def get_by_meeting_id(self, meeting_id: str) -> Transcript | None:
"""Get a transcript by meeting_id (first match)."""
query = transcripts.select().where(transcripts.c.meeting_id == meeting_id)
result = await get_database().fetch_one(query)
if not result:
return None
return Transcript(**result)
async def get_by_recording_id(
self, recording_id: str, **kwargs
) -> Transcript | None:

View File

@@ -273,8 +273,10 @@ def with_error_handling(
)
@with_error_handling(TaskName.GET_RECORDING)
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: starting for recording_id={input.recording_id}")
"""Fetch recording metadata. Platform-aware: Daily calls API, LiveKit skips."""
ctx.log(
f"get_recording: starting for recording_id={input.recording_id}, platform={input.source_platform}"
)
ctx.log(
f"get_recording: transcript_id={input.transcript_id}, room_id={input.room_id}"
)
@@ -299,6 +301,18 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
ctx.log(f"get_recording: status set to 'processing' and broadcasted")
# LiveKit: no external API call needed — metadata comes from S3 track listing
if input.source_platform == "livekit":
ctx.log(
"get_recording: LiveKit platform — skipping API call (metadata from S3)"
)
return RecordingResult(
id=input.recording_id,
mtg_session_id=None,
duration=0, # Duration calculated from tracks later
)
# Daily.co: fetch recording metadata from API
if not settings.DAILY_API_KEY:
ctx.log("get_recording: ERROR - DAILY_API_KEY not configured")
raise ValueError("DAILY_API_KEY not configured")
@@ -332,11 +346,12 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
@with_error_handling(TaskName.GET_PARTICIPANTS)
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
"""Fetch participant list and update transcript. Platform-aware."""
ctx.log(
f"get_participants: transcript_id={input.transcript_id}, platform={input.source_platform}"
)
recording = ctx.task_output(get_recording)
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
@@ -347,8 +362,8 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
# Duration from recording metadata (seconds -> milliseconds)
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update(
transcript,
@@ -360,65 +375,99 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(
api_key=daily_api_key, base_url=settings.DAILY_API_URL
) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
if duration_ms:
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
participants_list: list[ParticipantInfo] = []
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
if input.source_platform == "livekit":
# LiveKit: participant identity is in the track dict or can be parsed from filepath
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=participant_id,
user_name=name,
for idx, track in enumerate(input.tracks):
identity = track.get("participant_identity")
if not identity:
# Reprocess path: parse from S3 key
try:
parsed = parse_livekit_track_filepath(track["s3_key"])
identity = parsed.participant_identity
except (ValueError, KeyError):
identity = f"speaker-{idx}"
participant = TranscriptParticipant(
id=identity,
speaker=idx,
name=identity,
user_id=identity if not identity.startswith("anon-") else None,
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=identity,
user_name=identity,
speaker=idx,
)
)
else:
# Daily.co: fetch participant names from API
mtg_session_id = recording.mtg_session_id
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(
api_key=daily_api_key, base_url=settings.DAILY_API_URL
) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=participant_id,
user_name=name,
speaker=idx,
)
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
@@ -440,11 +489,56 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
@with_error_handling(TaskName.PROCESS_TRACKS)
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
ctx.log(
f"process_tracks: spawning {len(input.tracks)} track workflows, platform={input.source_platform}"
)
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
# For LiveKit: calculate padding offsets from filename timestamps.
# OGG files don't have embedded start_time metadata, so we pre-calculate.
track_padding: dict[int, float] = {}
if input.source_platform == "livekit":
from datetime import datetime # noqa: PLC0415
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
timestamps = []
for i, track in enumerate(input.tracks):
ts_str = track.get("timestamp")
if ts_str:
try:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
except (ValueError, TypeError):
ctx.log(
f"process_tracks: could not parse timestamp for track {i}: {ts_str}"
)
timestamps.append((i, None))
else:
# Reprocess path: parse timestamp from S3 key
try:
parsed = parse_livekit_track_filepath(track["s3_key"])
timestamps.append((i, parsed.timestamp))
ctx.log(
f"process_tracks: parsed timestamp from S3 key for track {i}: {parsed.timestamp}"
)
except (ValueError, KeyError):
timestamps.append((i, None))
valid_timestamps = [(i, ts) for i, ts in timestamps if ts is not None]
if valid_timestamps:
earliest = min(ts for _, ts in valid_timestamps)
for i, ts in valid_timestamps:
offset = (ts - earliest).total_seconds()
track_padding[i] = offset
ctx.log(
f"process_tracks: track {i} padding={offset}s (from filename timestamp)"
)
bulk_runs = [
track_workflow.create_bulk_run_item(
input=TrackInput(
@@ -454,6 +548,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
transcript_id=input.transcript_id,
language=source_language,
source_platform=input.source_platform,
padding_seconds=track_padding.get(i),
)
)
for i, track in enumerate(input.tracks)

View File

@@ -37,6 +37,9 @@ class TrackInput(BaseModel):
transcript_id: str
language: str = "en"
source_platform: str = "daily"
# Pre-calculated padding in seconds (from filename timestamps for LiveKit).
# When set, overrides container metadata extraction for start_time.
padding_seconds: float | None = None
hatchet = HatchetClientManager.get_client()
@@ -53,15 +56,19 @@ track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackI
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
For Daily: extracts stream.start_time from WebM container metadata.
For LiveKit: uses pre-calculated padding_seconds from filename timestamps
(OGG files don't have embedded start_time metadata).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
ctx.log(
f"pad_track: track {input.track_index}, s3_key={input.s3_key}, padding_seconds={input.padding_seconds}"
)
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
padding_seconds=input.padding_seconds,
)
try:
@@ -79,10 +86,16 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if input.padding_seconds is not None:
# Pre-calculated offset (LiveKit: from filename timestamps)
start_time_seconds = input.padding_seconds
ctx.log(f"pad_track: using pre-calculated padding={start_time_seconds}s")
else:
# Extract from container metadata (Daily: WebM start_time)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:

View File

@@ -9,6 +9,7 @@ from datetime import timedelta
from livekit.api import (
AccessToken,
AutoTrackEgress,
CreateRoomRequest,
DeleteRoomRequest,
DirectFileOutput,
@@ -17,6 +18,7 @@ from livekit.api import (
ListParticipantsRequest,
LiveKitAPI,
Room,
RoomEgress,
S3Upload,
StopEgressRequest,
TrackEgressRequest,
@@ -55,6 +57,8 @@ class LiveKitApiClient:
name: str,
empty_timeout: int = 300,
max_participants: int = 0,
enable_auto_track_egress: bool = False,
track_egress_filepath: str = "livekit/{room_name}/{publisher_identity}-{time}",
) -> Room:
"""Create a LiveKit room.
@@ -62,11 +66,25 @@ class LiveKitApiClient:
name: Room name (unique identifier).
empty_timeout: Seconds to keep room alive after last participant leaves.
max_participants: 0 = unlimited.
enable_auto_track_egress: If True, automatically record each participant's
audio track to S3 as a separate file (OGG/Opus).
track_egress_filepath: S3 filepath template for auto track egress.
Supports {room_name}, {publisher_identity}, {time}.
"""
egress = None
if enable_auto_track_egress:
egress = RoomEgress(
tracks=AutoTrackEgress(
filepath=track_egress_filepath,
s3=self._build_s3_upload(),
),
)
req = CreateRoomRequest(
name=name,
empty_timeout=empty_timeout,
max_participants=max_participants,
egress=egress,
)
return await self._api.room.create_room(req)

View File

@@ -155,12 +155,17 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
)
if track_keys:
# Detect platform from recording ID prefix
source_platform = (
"livekit" if recording_id and recording_id.startswith("lk-") else "daily"
)
return MultitrackProcessingConfig(
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
recording_id=recording_id,
room_id=validation.room_id,
source_platform=source_platform,
)
return FileProcessingConfig(

View File

@@ -0,0 +1,112 @@
"""
LiveKit track file utilities.
Parse participant identity and timing from Auto Track Egress S3 filepaths.
Actual filepath format from LiveKit Auto Track Egress:
livekit/{room_name}/{publisher_identity}-{ISO_timestamp}-{track_id}.{ext}
Examples:
livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg
livekit/myroom-20260401172036/juan2-63abcf-2026-04-01T195847-TR_AMyoSbM7tAQbYj.ogg
livekit/myroom-20260401172036/EG_K5sipvfB5fTM.json (manifest, skip)
livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm (video, skip)
"""
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from reflector.utils.string import NonEmptyString
@dataclass
class LiveKitTrackFile:
"""Parsed info from a LiveKit track egress filepath."""
s3_key: str
room_name: str
participant_identity: str
timestamp: datetime # Parsed from ISO timestamp in filename
track_id: str # LiveKit track ID (e.g., TR_AMR3SWs74Divho)
# Pattern: livekit/{room_name}/{identity}-{ISO_date}T{time}-{track_id}.{ext}
# The identity can contain alphanumeric, hyphens, underscores
# ISO timestamp is like 2026-04-01T195758
# Track ID starts with TR_
_TRACK_FILENAME_PATTERN = re.compile(
r"^livekit/(?P<room_name>[^/]+)/(?P<identity>.+?)-(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{6})-(?P<track_id>TR_\w+)\.(?P<ext>\w+)$"
)
def parse_livekit_track_filepath(s3_key: str) -> LiveKitTrackFile:
"""Parse a LiveKit track egress filepath into components.
Args:
s3_key: S3 key like 'livekit/myroom-20260401/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg'
Returns:
LiveKitTrackFile with parsed components.
Raises:
ValueError: If the filepath doesn't match the expected format.
"""
match = _TRACK_FILENAME_PATTERN.match(s3_key)
if not match:
raise ValueError(
f"LiveKit track filepath doesn't match expected format: {s3_key}"
)
# Parse ISO-ish timestamp (e.g., 2026-04-01T195758 → datetime)
ts_str = match.group("timestamp")
try:
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H%M%S").replace(tzinfo=timezone.utc)
except ValueError:
raise ValueError(f"Cannot parse timestamp '{ts_str}' from: {s3_key}")
return LiveKitTrackFile(
s3_key=s3_key,
room_name=match.group("room_name"),
participant_identity=match.group("identity"),
timestamp=ts,
track_id=match.group("track_id"),
)
def filter_audio_tracks(s3_keys: list[str]) -> list[str]:
"""Filter S3 keys to only audio tracks (.ogg), excluding manifests and video."""
return [k for k in s3_keys if k.endswith(".ogg")]
def calculate_track_offsets(
tracks: list[LiveKitTrackFile],
) -> list[tuple[LiveKitTrackFile, float]]:
"""Calculate silence padding offset for each track.
The earliest track starts at time zero. Each subsequent track
gets (track_timestamp - earliest_timestamp) seconds of silence prepended.
Returns:
List of (track, offset_seconds) tuples.
"""
if not tracks:
return []
earliest = min(t.timestamp for t in tracks)
return [(t, (t.timestamp - earliest).total_seconds()) for t in tracks]
def extract_livekit_base_room_name(livekit_room_name: str) -> NonEmptyString:
"""Extract base room name from LiveKit timestamped room name.
LiveKit rooms use the same naming as Daily: {base_name}-YYYYMMDDHHMMSS
"""
base_name = livekit_room_name.rsplit("-", 1)[0]
assert base_name, f"Extracted base name is empty from: {livekit_room_name}"
return NonEmptyString(base_name)
def recording_lock_key(room_name: str) -> str:
"""Redis lock key for preventing duplicate processing."""
return f"livekit:processing:{room_name}"

View File

@@ -62,9 +62,25 @@ class LiveKitClient(VideoPlatformClient):
remaining = int((end_date_aware - now).total_seconds())
empty_timeout = max(300, min(remaining, 86400)) # 5 min to 24 hours
# Enable auto track egress for cloud recording (per-participant audio to S3).
# Gracefully degrade if S3 credentials are missing — room still works, just no recording.
enable_recording = room.recording_type == "cloud"
egress_enabled = False
if enable_recording:
try:
self._api_client._build_s3_upload() # Validate credentials exist
egress_enabled = True
except ValueError:
logger.warning(
"S3 credentials not configured — room created without auto track egress. "
"Set LIVEKIT_STORAGE_AWS_* to enable recording.",
room_name=room_name,
)
lk_room = await self._api_client.create_room(
name=room_name,
empty_timeout=empty_timeout,
enable_auto_track_egress=egress_enabled,
)
logger.info(
@@ -72,6 +88,7 @@ class LiveKitClient(VideoPlatformClient):
room_name=lk_room.name,
room_sid=lk_room.sid,
empty_timeout=empty_timeout,
auto_track_egress=egress_enabled,
)
# room_url includes the server URL + room name as query param.

View File

@@ -5,6 +5,10 @@ Track Egress recording completion.
LiveKit sends webhooks as POST requests with JWT authentication
in the Authorization header.
Webhooks are used as fast-path triggers and logging. Track discovery
for the multitrack pipeline uses S3 listing (source of truth), not
webhook data.
"""
from fastapi import APIRouter, HTTPException, Request
@@ -77,10 +81,7 @@ async def livekit_webhook(request: Request):
room_name=event.room.name if event.room else None,
)
case "room_finished":
logger.info(
"Room finished",
room_name=event.room.name if event.room else None,
)
await _handle_room_finished(event)
case "track_published" | "track_unpublished":
logger.debug(
f"Track event: {event_type}",
@@ -142,49 +143,75 @@ async def _handle_participant_left(event):
async def _handle_egress_started(event):
egress = event.egress_info
room_name = egress.room_name if egress else None
logger.info(
"Egress started",
room_name=room_name,
room_name=egress.room_name if egress else None,
egress_id=egress.egress_id if egress else None,
)
async def _handle_egress_ended(event):
"""Handle Track Egress completion — trigger multitrack processing."""
"""Log Track Egress completion. Files are on S3 already; pipeline uses S3 listing."""
egress = event.egress_info
if not egress:
logger.warning("egress_ended: no egress info in payload")
return
room_name = egress.room_name
# Check egress status
# EGRESS_COMPLETE = 3, EGRESS_FAILED = 4
status = egress.status
if status == 4: # EGRESS_FAILED
# EGRESS_FAILED = 4
if egress.status == 4:
logger.error(
"Egress failed",
room_name=room_name,
room_name=egress.room_name,
egress_id=egress.egress_id,
error=egress.error,
)
return
# Extract output file info from egress results
file_results = list(egress.file_results)
logger.info(
"Egress ended",
room_name=room_name,
room_name=egress.room_name,
egress_id=egress.egress_id,
status=status,
status=egress.status,
num_files=len(file_results),
filenames=[f.filename for f in file_results] if file_results else [],
)
# Track Egress produces one file per egress request.
# The multitrack pipeline will be triggered separately once all tracks
# for a room are collected (via periodic polling or explicit trigger).
# TODO: Implement track collection and pipeline trigger
async def _handle_room_finished(event):
"""Fast-path: trigger multitrack processing when room closes.
This is an optimization — if missed, the process_livekit_ended_meetings
beat task catches it within ~2 minutes.
"""
room_name = event.room.name if event.room else None
if not room_name:
logger.warning("room_finished: no room name in payload")
return
logger.info("Room finished", room_name=room_name)
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning("room_finished: meeting not found", room_name=room_name)
return
# Deactivate the meeting — LiveKit room is destroyed, so process_meetings
# can't detect this via API (list_participants returns empty for deleted rooms).
if meeting.is_active:
await meetings_controller.update_meeting(meeting.id, is_active=False)
logger.info("room_finished: meeting deactivated", meeting_id=meeting.id)
# Import here to avoid circular imports (worker imports views)
from reflector.worker.process import process_livekit_multitrack
process_livekit_multitrack.delay(
room_name=room_name,
meeting_id=meeting.id,
)
logger.info(
"room_finished: queued multitrack processing",
meeting_id=meeting.id,
room_name=room_name,
)

View File

@@ -554,6 +554,7 @@ async def rooms_join_meeting(
room_name: str,
meeting_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
display_name: str | None = None,
):
user_id = user["sub"] if user else None
room = await rooms_controller.get_by_name(room_name)
@@ -599,13 +600,27 @@ async def rooms_join_meeting(
meeting.room_url = add_query_param(meeting.room_url, "t", token)
elif meeting.platform == "livekit":
import re
import uuid
client = create_platform_client(meeting.platform)
participant_identity = user_id or f"anon-{meeting_id[:8]}"
participant_name = (
getattr(user, "name", None) or participant_identity
if user
else participant_identity
)
# Identity must be unique per participant to avoid S3 key collisions.
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
uid_suffix = uuid.uuid4().hex[:6]
if display_name:
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", display_name.strip())[:40]
participant_identity = (
f"{safe_name}-{uid_suffix}" if safe_name else f"anon-{uid_suffix}"
)
elif user_id:
email = getattr(user, "email", None)
if email and "@" in email:
participant_identity = f"{email.split('@')[0]}-{uid_suffix}"
else:
participant_identity = f"{user_id[:12]}-{uid_suffix}"
else:
participant_identity = f"anon-{uid_suffix}"
participant_name = display_name or participant_identity
token = client.create_access_token(
room_name=meeting.room_name,
participant_identity=participant_identity,

View File

@@ -85,7 +85,21 @@ def build_beat_schedule(
_livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL)
if _livekit_enabled:
logger.info("LiveKit platform detected")
beat_schedule["process_livekit_ended_meetings"] = {
"task": "reflector.worker.process.process_livekit_ended_meetings",
"schedule": 120, # Every 2 minutes
}
beat_schedule["reprocess_failed_livekit_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_livekit_recordings",
"schedule": crontab(hour=5, minute=0),
}
logger.info(
"LiveKit beat tasks enabled",
tasks=[
"process_livekit_ended_meetings",
"reprocess_failed_livekit_recordings",
],
)
_any_platform = _whereby_enabled or _daily_enabled or _livekit_enabled
if _any_platform:

View File

@@ -874,6 +874,22 @@ async def process_meetings():
logger_.info(
"Meeting deactivated - scheduled time ended with no participants",
)
elif meeting.platform == "livekit" and not has_had_sessions:
# LiveKit rooms are destroyed after empty_timeout. Once gone,
# list_participants returns [] — indistinguishable from "never used".
# Check if meeting was created >10 min ago; if so, assume room is gone.
meeting_start = meeting.start_date
if meeting_start.tzinfo is None:
meeting_start = meeting_start.replace(tzinfo=timezone.utc)
age_minutes = (current_time - meeting_start).total_seconds() / 60
if age_minutes > 10:
should_deactivate = True
logger_.info(
"LiveKit meeting deactivated - room likely destroyed (no sessions after 10 min)",
age_minutes=round(age_minutes, 1),
)
else:
logger_.debug("LiveKit meeting still young, keep it")
else:
logger_.debug("Meeting not yet started, keep it")
@@ -1170,3 +1186,278 @@ async def trigger_daily_reconciliation() -> None:
except Exception as e:
logger.error("Reconciliation trigger failed", error=str(e), exc_info=True)
# ============================================================
# LiveKit multitrack recording tasks
# ============================================================
@shared_task
@asynctask
async def process_livekit_multitrack(
room_name: str,
meeting_id: str,
):
"""
Process LiveKit multitrack recording by discovering tracks on S3.
Tracks are discovered via S3 listing (source of truth), not webhooks.
Called from room_finished webhook (fast-path) or beat task (fallback).
"""
from reflector.utils.livekit import ( # noqa: PLC0415
recording_lock_key,
)
logger.info(
"Processing LiveKit multitrack recording",
room_name=room_name,
meeting_id=meeting_id,
)
lock_key = recording_lock_key(room_name)
async with RedisAsyncLock(
key=lock_key,
timeout=600,
extend_interval=60,
skip_if_locked=True,
blocking=False,
) as lock:
if not lock.acquired:
logger.warning(
"LiveKit processing skipped - lock already held",
room_name=room_name,
lock_key=lock_key,
)
return
await _process_livekit_multitrack_inner(room_name, meeting_id)
async def _process_livekit_multitrack_inner(
room_name: str,
meeting_id: str,
):
"""Inner processing logic for LiveKit multitrack recording."""
from reflector.storage import get_source_storage # noqa: PLC0415
from reflector.utils.livekit import ( # noqa: PLC0415
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
# 1. Discover tracks by listing S3 prefix
storage = get_source_storage("livekit")
s3_prefix = f"livekit/{room_name}/"
all_keys = await storage.list_objects(prefix=s3_prefix)
if not all_keys:
logger.warning(
"No track files found on S3 for LiveKit room",
room_name=room_name,
s3_prefix=s3_prefix,
)
return
# Filter to audio tracks only (.ogg) — skip .json manifests and .webm video
audio_keys = filter_audio_tracks(all_keys)
logger.info(
"Found track files on S3",
room_name=room_name,
total_files=len(all_keys),
audio_files=len(audio_keys),
)
if not audio_keys:
logger.warning(
"No audio track files found (only manifests/video)",
room_name=room_name,
)
return
# 2. Parse track info from filenames
parsed_tracks = []
for key in audio_keys:
try:
parsed = parse_livekit_track_filepath(key)
parsed_tracks.append(parsed)
except ValueError as e:
logger.warning("Skipping unparseable track file", s3_key=key, error=str(e))
if not parsed_tracks:
logger.warning(
"No valid track files found after parsing",
room_name=room_name,
raw_keys=all_keys,
)
return
track_keys = [t.s3_key for t in parsed_tracks]
# 3. Find meeting and room
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found for LiveKit recording",
meeting_id=meeting_id,
room_name=room_name,
)
return
base_room_name = extract_livekit_base_room_name(room_name)
room = await rooms_controller.get_by_name(base_room_name)
if not room:
logger.error("Room not found", room_name=base_room_name)
return
# 4. Create recording
recording_id = f"lk-{room_name}"
bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME or ""
existing_recording = await recordings_controller.get_by_id(recording_id)
if existing_recording and existing_recording.deleted_at is not None:
logger.info("Skipping soft-deleted recording", recording_id=recording_id)
return
if not existing_recording:
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=s3_prefix,
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting.id,
track_keys=track_keys,
)
)
else:
recording = existing_recording
# 5. Create or get transcript
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript and transcript.deleted_at is not None:
logger.info("Skipping soft-deleted transcript", recording_id=recording.id)
return
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
recording_id=recording.id,
share_mode="semi-private",
meeting_id=meeting.id,
room_id=room.id,
)
# 6. Start Hatchet pipeline (reuses DiarizationPipeline with source_platform="livekit")
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [
{
"s3_key": t.s3_key,
"participant_identity": t.participant_identity,
"timestamp": t.timestamp.isoformat(),
}
for t in parsed_tracks
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
"source_platform": "livekit",
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
},
)
logger.info(
"Started LiveKit Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
room_name=room_name,
num_tracks=len(parsed_tracks),
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@asynctask
async def process_livekit_ended_meetings():
"""Check for inactive LiveKit meetings that need multitrack processing.
Runs on a beat schedule. Catches cases where room_finished webhook was missed.
Only processes meetings that:
- Platform is "livekit"
- is_active=False (already deactivated by process_meetings)
- No associated transcript yet
"""
from reflector.db.transcripts import transcripts_controller as tc # noqa: PLC0415
all_livekit = await meetings_controller.get_all_inactive_livekit()
queued = 0
for meeting in all_livekit:
# Skip if already has a transcript
existing = await tc.get_by_meeting_id(meeting.id)
if existing:
continue
logger.info(
"Found unprocessed inactive LiveKit meeting",
meeting_id=meeting.id,
room_name=meeting.room_name,
)
process_livekit_multitrack.delay(
room_name=meeting.room_name,
meeting_id=meeting.id,
)
queued += 1
if queued > 0:
logger.info("Queued LiveKit multitrack processing", count=queued)
@shared_task
@asynctask
async def reprocess_failed_livekit_recordings():
"""Reprocess LiveKit recordings that failed.
Runs daily at 5 AM. Finds recordings with livekit prefix and error status.
"""
bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
return
failed = await recordings_controller.get_multitrack_needing_reprocessing(
bucket_name
)
livekit_failed = [r for r in failed if r.id.startswith("lk-")]
for recording in livekit_failed:
if not recording.meeting_id:
logger.warning(
"Skipping reprocess — no meeting_id",
recording_id=recording.id,
)
continue
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
continue
logger.info(
"Reprocessing failed LiveKit recording",
recording_id=recording.id,
meeting_id=meeting.id,
)
process_livekit_multitrack.delay(
room_name=meeting.room_name,
meeting_id=meeting.id,
)

View File

@@ -0,0 +1,393 @@
"""
Tests for LiveKit track processing: filepath parsing, offset calculation,
and pad_track padding_seconds behavior.
"""
from datetime import datetime, timezone
from fractions import Fraction
import av
import pytest
from reflector.utils.livekit import (
LiveKitTrackFile,
calculate_track_offsets,
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
# ── Filepath parsing ──────────────────────────────────────────
class TestParseLiveKitTrackFilepath:
def test_parses_ogg_audio_track(self):
result = parse_livekit_track_filepath(
"livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg"
)
assert result.room_name == "myroom-20260401172036"
assert result.participant_identity == "juan-4b82ed"
assert result.track_id == "TR_AMR3SWs74Divho"
assert result.timestamp == datetime(2026, 4, 1, 19, 57, 58, tzinfo=timezone.utc)
def test_parses_different_identities(self):
r1 = parse_livekit_track_filepath(
"livekit/room-20260401/alice-a1b2c3-2026-04-01T100000-TR_abc123.ogg"
)
r2 = parse_livekit_track_filepath(
"livekit/room-20260401/bob_smith-d4e5f6-2026-04-01T100030-TR_def456.ogg"
)
assert r1.participant_identity == "alice-a1b2c3"
assert r2.participant_identity == "bob_smith-d4e5f6"
def test_rejects_json_manifest(self):
with pytest.raises(ValueError, match="doesn't match expected format"):
parse_livekit_track_filepath("livekit/myroom-20260401/EG_K5sipvfB5fTM.json")
def test_rejects_webm_video(self):
# webm files match the pattern but are filtered by filter_audio_tracks
result = parse_livekit_track_filepath(
"livekit/myroom-20260401/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm"
)
# webm parses successfully (TR_ prefix matches video tracks too)
assert result.track_id == "TR_VC679dgMQBdfhT"
def test_rejects_invalid_path(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("not/a/valid/path.ogg")
def test_rejects_missing_track_id(self):
with pytest.raises(ValueError):
parse_livekit_track_filepath("livekit/room/user-2026-04-01T100000.ogg")
def test_parses_timestamp_correctly(self):
result = parse_livekit_track_filepath(
"livekit/room-20260401/user-abc123-2026-12-25T235959-TR_test.ogg"
)
assert result.timestamp == datetime(
2026, 12, 25, 23, 59, 59, tzinfo=timezone.utc
)
# ── Audio track filtering ─────────────────────────────────────
class TestFilterAudioTracks:
def test_filters_to_ogg_only(self):
keys = [
"livekit/room/EG_abc.json",
"livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
"livekit/room/user-abc-2026-04-01T100000-TR_video.webm",
"livekit/room/EG_def.json",
"livekit/room/user2-def-2026-04-01T100030-TR_audio2.ogg",
]
result = filter_audio_tracks(keys)
assert len(result) == 2
assert all(k.endswith(".ogg") for k in result)
def test_empty_input(self):
assert filter_audio_tracks([]) == []
def test_no_audio_tracks(self):
keys = ["livekit/room/EG_abc.json", "livekit/room/user-TR_v.webm"]
assert filter_audio_tracks(keys) == []
# ── Offset calculation ─────────────────────────────────────────
class TestCalculateTrackOffsets:
def test_single_track_zero_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
)
]
offsets = calculate_track_offsets(tracks)
assert len(offsets) == 1
assert offsets[0][1] == 0.0
def test_two_tracks_correct_offset(self):
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 1, 10, tzinfo=timezone.utc),
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert offsets[0][1] == 0.0 # alice (earliest)
assert offsets[1][1] == 70.0 # bob (70 seconds later)
def test_three_tracks_earliest_is_zero(self):
tracks = [
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="bob",
timestamp=datetime(2026, 4, 1, 10, 0, 30, tzinfo=timezone.utc),
track_id="TR_2",
),
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="alice",
timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc),
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k3",
room_name="r",
participant_identity="charlie",
timestamp=datetime(2026, 4, 1, 10, 1, 0, tzinfo=timezone.utc),
track_id="TR_3",
),
]
offsets = calculate_track_offsets(tracks)
offset_map = {t.participant_identity: o for t, o in offsets}
assert offset_map["alice"] == 0.0
assert offset_map["bob"] == 30.0
assert offset_map["charlie"] == 60.0
def test_empty_tracks(self):
assert calculate_track_offsets([]) == []
def test_simultaneous_tracks_zero_offsets(self):
ts = datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc)
tracks = [
LiveKitTrackFile(
s3_key="k1",
room_name="r",
participant_identity="a",
timestamp=ts,
track_id="TR_1",
),
LiveKitTrackFile(
s3_key="k2",
room_name="r",
participant_identity="b",
timestamp=ts,
track_id="TR_2",
),
]
offsets = calculate_track_offsets(tracks)
assert all(o == 0.0 for _, o in offsets)
# ── Room name extraction ───────────────────────────────────────
class TestExtractLiveKitBaseRoomName:
def test_strips_timestamp_suffix(self):
assert extract_livekit_base_room_name("myroom-20260401172036") == "myroom"
def test_preserves_hyphenated_name(self):
assert (
extract_livekit_base_room_name("my-room-name-20260401172036")
== "my-room-name"
)
def test_single_segment(self):
assert extract_livekit_base_room_name("room-20260401") == "room"
# ── pad_track padding_seconds behavior ─────────────────────────
class TestPadTrackPaddingSeconds:
"""Test that pad_track correctly uses pre-calculated padding_seconds
for LiveKit (skipping container metadata) vs extracting from container
for Daily (when padding_seconds is None).
"""
def _make_test_ogg(self, path: str, duration_seconds: float = 5.0):
"""Create a minimal OGG/Opus file for testing."""
with av.open(path, "w", format="ogg") as out:
stream = out.add_stream("libopus", rate=48000)
stream.bit_rate = 64000
samples_per_frame = 960 # Opus standard
total_samples = int(duration_seconds * 48000)
pts = 0
while pts < total_samples:
frame = av.AudioFrame(
format="s16", layout="stereo", samples=samples_per_frame
)
# Fill with silence (zeros)
frame.planes[0].update(bytes(samples_per_frame * 2 * 2)) # s16 * stereo
frame.sample_rate = 48000
frame.pts = pts
frame.time_base = Fraction(1, 48000)
for packet in stream.encode(frame):
out.mux(packet)
pts += samples_per_frame
for packet in stream.encode(None):
out.mux(packet)
def test_ogg_has_zero_start_time(self, tmp_path):
"""Verify that OGG files (like LiveKit produces) have start_time=0,
confirming why pre-calculated padding is needed."""
ogg_path = str(tmp_path / "test.ogg")
self._make_test_ogg(ogg_path)
with av.open(ogg_path) as container:
from reflector.utils.audio_padding import (
extract_stream_start_time_from_container,
)
start_time = extract_stream_start_time_from_container(container, 0)
assert start_time <= 0.0, (
"OGG files should have start_time<=0 (no usable offset), confirming "
f"LiveKit tracks need pre-calculated padding_seconds. Got: {start_time}"
)
def test_precalculated_padding_skips_metadata_extraction(self, tmp_path):
"""When padding_seconds is set, pad_track should use it directly
and NOT call extract_stream_start_time_from_container."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=70.0,
)
assert input_data.padding_seconds == 70.0
# The pad_track function checks: if input.padding_seconds is not None → use it
# This means extract_stream_start_time_from_container is never called for LiveKit
def test_none_padding_falls_back_to_metadata(self, tmp_path):
"""When padding_seconds is None (Daily), pad_track should extract
start_time from container metadata."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="daily/room/track.webm",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="daily",
padding_seconds=None,
)
assert input_data.padding_seconds is None
# pad_track will call extract_stream_start_time_from_container for this case
def test_zero_padding_returns_original_key(self):
"""When padding_seconds=0.0, pad_track should return the original S3 key
without applying any padding (same as start_time=0 from metadata)."""
from reflector.hatchet.workflows.track_processing import TrackInput
input_data = TrackInput(
track_index=0,
s3_key="livekit/room/earliest-track.ogg",
bucket_name="test-bucket",
transcript_id="test-transcript",
source_platform="livekit",
padding_seconds=0.0,
)
# padding_seconds=0.0 → start_time_seconds=0.0 → "no padding needed" branch
assert input_data.padding_seconds == 0.0
# ── Pipeline offset calculation (process_tracks logic) ─────────
class TestProcessTracksOffsetCalculation:
"""Test the offset calculation logic used in process_tracks
for LiveKit source_platform."""
def test_livekit_offsets_from_timestamps(self):
"""Simulate the offset calculation done in process_tracks."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "admin-0129c3",
"timestamp": "2026-04-01T23:44:50+00:00",
},
{
"s3_key": "track2.ogg",
"participant_identity": "juan-5a5b41",
"timestamp": "2026-04-01T23:46:00+00:00",
},
]
# Replicate the logic from process_tracks
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
earliest = min(ts for _, ts in timestamps)
track_padding = {}
for i, ts in timestamps:
track_padding[i] = (ts - earliest).total_seconds()
assert track_padding[0] == 0.0 # admin (earliest)
assert track_padding[1] == 70.0 # juan (70s later)
def test_daily_tracks_get_no_precalculated_padding(self):
"""Daily tracks should NOT get padding_seconds (use container metadata)."""
tracks = [
{"s3_key": "daily-track1.webm"},
{"s3_key": "daily-track2.webm"},
]
# Daily tracks don't have "timestamp" field
track_padding = {}
source_platform = "daily"
if source_platform == "livekit":
# This block should NOT execute for daily
pass
# Daily tracks get no pre-calculated padding
assert track_padding == {}
for i, _ in enumerate(tracks):
assert track_padding.get(i) is None
def test_livekit_missing_timestamp_graceful(self):
"""If a LiveKit track is missing timestamp, it should be skipped."""
tracks = [
{
"s3_key": "track1.ogg",
"participant_identity": "alice",
"timestamp": "2026-04-01T10:00:00+00:00",
},
{"s3_key": "track2.ogg", "participant_identity": "bob"}, # no timestamp
]
timestamps = []
for i, track in enumerate(tracks):
ts_str = track.get("timestamp")
if ts_str:
try:
ts = datetime.fromisoformat(ts_str)
timestamps.append((i, ts))
except (ValueError, TypeError):
timestamps.append((i, None))
else:
timestamps.append((i, None))
valid = [(i, ts) for i, ts in timestamps if ts is not None]
assert len(valid) == 1 # only alice has a timestamp
assert valid[0][0] == 0 # track index 0

View File

@@ -7,10 +7,9 @@ import {
LiveKitRoom as LKRoom,
VideoConference,
RoomAudioRenderer,
PreJoin,
type LocalUserChoices,
} from "@livekit/components-react";
// LiveKit component styles — imported in the global layout to avoid
// Next.js CSS import restrictions in client components.
// See: app/[roomName]/layout.tsx
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
@@ -68,6 +67,7 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) {
const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const [connectionError, setConnectionError] = useState(false);
const [userChoices, setUserChoices] = useState<LocalUserChoices | null>(null);
// ── Consent dialog (same hooks as Daily/Whereby) ──────────
const { showConsentButton, showRecordingIndicator } = useConsentDialog({
@@ -87,9 +87,21 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) {
});
const showEmailFeature = featureEnabled("emailTranscript");
// ── Join meeting via backend API to get token ─────────────
// ── PreJoin defaults ──────────────────────────────────────
const defaultUsername =
auth.status === "authenticated" || auth.status === "refreshing"
? auth.user.email?.split("@")[0] || auth.user.id?.slice(0, 12) || ""
: "";
// ── Join meeting via backend API after PreJoin submit ─────
useEffect(() => {
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
if (
authLastUserId === undefined ||
!userChoices ||
!meeting?.id ||
!roomName
)
return;
let cancelled = false;
async function join() {
@@ -97,6 +109,7 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) {
const result = await joinMutation.mutateAsync({
params: {
path: { room_name: roomName, meeting_id: meeting.id },
query: { display_name: userChoices!.username || undefined },
},
});
if (!cancelled) setJoinedMeeting(result);
@@ -110,12 +123,41 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) {
return () => {
cancelled = true;
};
}, [meeting?.id, roomName, authLastUserId]);
}, [meeting?.id, roomName, authLastUserId, userChoices]);
const handleDisconnected = useCallback(() => {
router.push("/browse");
}, [router]);
const handlePreJoinSubmit = useCallback((choices: LocalUserChoices) => {
setUserChoices(choices);
}, []);
// ── PreJoin screen (name + device selection) ──────────────
if (!userChoices) {
return (
<Box
w="100vw"
h="100vh"
display="flex"
alignItems="center"
justifyContent="center"
bg="gray.900"
data-lk-theme="default"
>
<PreJoin
defaults={{
username: defaultUsername,
audioEnabled: true,
videoEnabled: true,
}}
onSubmit={handlePreJoinSubmit}
userLabel="Display Name"
/>
</Box>
);
}
// ── Loading / error states ────────────────────────────────
if (connectionError) {
return (
@@ -170,8 +212,8 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) {
serverUrl={serverUrl}
token={token}
connect={true}
audio={true}
video={true}
audio={userChoices.audioEnabled}
video={userChoices.videoEnabled}
onDisconnected={handleDisconnected}
data-lk-theme="default"
style={{ height: "100%" }}

View File

@@ -3259,7 +3259,9 @@ export interface operations {
};
v1_rooms_join_meeting: {
parameters: {
query?: never;
query?: {
display_name?: string | null;
};
header?: never;
path: {
room_name: string;