From 240f0c90008eeb3ba829134efd2e3bf0c2f2623e Mon Sep 17 00:00:00 2001 From: Juan Date: Wed, 1 Apr 2026 19:34:57 -0500 Subject: [PATCH] feat: full livekit pipeline --- docker-compose.selfhosted.yml | 6 +- scripts/setup-selfhosted.sh | 153 +++++-- server/reflector/db/meetings.py | 11 + server/reflector/db/transcripts.py | 8 + .../workflows/daily_multitrack_pipeline.py | 215 +++++++--- .../hatchet/workflows/track_processing.py | 27 +- server/reflector/livekit_api/client.py | 18 + .../reflector/services/transcript_process.py | 5 + server/reflector/utils/livekit.py | 112 +++++ server/reflector/video_platforms/livekit.py | 17 + server/reflector/views/livekit.py | 73 +++- server/reflector/views/rooms.py | 27 +- server/reflector/worker/app.py | 16 +- server/reflector/worker/process.py | 291 +++++++++++++ server/tests/test_livekit_track_processing.py | 393 ++++++++++++++++++ www/app/[roomName]/components/LiveKitRoom.tsx | 58 ++- www/app/reflector-api.d.ts | 4 +- 17 files changed, 1284 insertions(+), 150 deletions(-) create mode 100644 server/reflector/utils/livekit.py create mode 100644 server/tests/test_livekit_track_processing.py diff --git a/docker-compose.selfhosted.yml b/docker-compose.selfhosted.yml index 49d49061..bcf2a95f 100644 --- a/docker-compose.selfhosted.yml +++ b/docker-compose.selfhosted.yml @@ -35,7 +35,7 @@ services: image: monadicalsas/reflector-backend:latest restart: unless-stopped ports: - - "127.0.0.1:1250:1250" + - "${BIND_HOST:-127.0.0.1}:1250:1250" - "40000-40100:40000-40100/udp" env_file: - ./server/.env @@ -116,7 +116,7 @@ services: image: monadicalsas/reflector-frontend:latest restart: unless-stopped ports: - - "127.0.0.1:3000:3000" + - "${BIND_HOST:-127.0.0.1}:3000:3000" env_file: - ./www/.env environment: @@ -300,6 +300,7 @@ services: ports: - "80:80" - "443:443" + - "8888:8888" # Hatchet dashboard (proxied with TLS) volumes: - ./Caddyfile:/etc/caddy/Caddyfile:ro - caddy_data:/data @@ -339,7 +340,6 @@ services: postgres: condition: service_healthy ports: - - "127.0.0.1:8888:8888" - "127.0.0.1:7078:7077" env_file: - ./.env.hatchet diff --git a/scripts/setup-selfhosted.sh b/scripts/setup-selfhosted.sh index 4fa78a17..6ce75515 100755 --- a/scripts/setup-selfhosted.sh +++ b/scripts/setup-selfhosted.sh @@ -28,6 +28,10 @@ # Optional flags: # --livekit Enable LiveKit self-hosted video platform (generates credentials, # starts livekit-server + livekit-egress containers) +# --ip IP Set the server's IP address for all URLs. Implies --caddy +# (self-signed HTTPS, required for browser mic/camera access). +# Mutually exclusive with --domain. Use for LAN or cloud VM access. +# On Linux, IP is auto-detected; on macOS, use --ip to specify it. # --garage Use Garage for local S3-compatible storage # --caddy Enable Caddy reverse proxy with auto-SSL # --domain DOMAIN Use a real domain for Caddy (enables Let's Encrypt auto-HTTPS) @@ -214,6 +218,7 @@ USE_GARAGE=false USE_LIVEKIT=false USE_CADDY=false CUSTOM_DOMAIN="" # optional domain for Let's Encrypt HTTPS +CUSTOM_IP="" # optional --ip override (mutually exclusive with --caddy) BUILD_IMAGES=false # build backend/frontend from source ADMIN_PASSWORD="" # optional admin password for password auth CUSTOM_CA="" # --custom-ca: path to dir or CA cert file @@ -268,6 +273,14 @@ for i in "${!ARGS[@]}"; do --garage) USE_GARAGE=true ;; --livekit) USE_LIVEKIT=true ;; --caddy) USE_CADDY=true ;; + --ip) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--ip requires an IP address (e.g. --ip 192.168.0.100)" + exit 1 + fi + CUSTOM_IP="${ARGS[$next_i]}" + SKIP_NEXT=true ;; --build) BUILD_IMAGES=true ;; --password) next_i=$((i + 1)) @@ -362,6 +375,16 @@ for i in "${!ARGS[@]}"; do esac done +# --- Validate flag combinations --- +if [[ -n "$CUSTOM_IP" ]] && [[ -n "$CUSTOM_DOMAIN" ]]; then + err "--ip and --domain are mutually exclusive. Use --ip for IP-based access, or --domain for domain-based access." + exit 1 +fi +# --ip implies --caddy (browsers require HTTPS for mic/camera access on non-localhost) +if [[ -n "$CUSTOM_IP" ]]; then + USE_CADDY=true +fi + # --- Save CLI args for config memory (re-run without flags) --- if [[ $# -gt 0 ]]; then mkdir -p "$ROOT_DIR/data" @@ -558,26 +581,25 @@ _generate_livekit_config() { env_set "$SERVER_ENV" "LIVEKIT_PUBLIC_URL" "$public_lk_url" env_set "$SERVER_ENV" "DEFAULT_VIDEO_PLATFORM" "livekit" - # LiveKit storage: reuse transcript storage credentials if not separately configured - if ! env_has_key "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" || [[ -z "$(env_get "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" || true)" ]]; then - local ts_bucket ts_region ts_key ts_secret ts_endpoint - ts_bucket=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" 2>/dev/null || echo "reflector-bucket") - ts_region=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" 2>/dev/null || echo "us-east-1") - ts_key=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID" 2>/dev/null || true) - ts_secret=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY" 2>/dev/null || true) - ts_endpoint=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" 2>/dev/null || true) - env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" "$ts_bucket" - env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_REGION" "$ts_region" - [[ -n "$ts_key" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID" "$ts_key" - [[ -n "$ts_secret" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY" "$ts_secret" - [[ -n "$ts_endpoint" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ENDPOINT_URL" "$ts_endpoint" - if [[ -z "$ts_key" ]] || [[ -z "$ts_secret" ]]; then - warn "LiveKit storage: S3 credentials not found — Track Egress recording will fail!" - warn "Configure LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID and LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY in server/.env" - warn "Or run with --garage to auto-configure local S3 storage" - else - ok "LiveKit storage: reusing transcript storage config" - fi + # LiveKit storage: always sync from transcript storage config. + # Endpoint URL must match (changes between Caddy/no-Caddy runs). + local ts_bucket ts_region ts_key ts_secret ts_endpoint + ts_bucket=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" 2>/dev/null || echo "reflector-bucket") + ts_region=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" 2>/dev/null || echo "us-east-1") + ts_key=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID" 2>/dev/null || true) + ts_secret=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY" 2>/dev/null || true) + ts_endpoint=$(env_get "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" 2>/dev/null || true) + env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_BUCKET_NAME" "$ts_bucket" + env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_REGION" "$ts_region" + [[ -n "$ts_key" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID" "$ts_key" + [[ -n "$ts_secret" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY" "$ts_secret" + [[ -n "$ts_endpoint" ]] && env_set "$SERVER_ENV" "LIVEKIT_STORAGE_AWS_ENDPOINT_URL" "$ts_endpoint" + if [[ -z "$ts_key" ]] || [[ -z "$ts_secret" ]]; then + warn "LiveKit storage: S3 credentials not found — Track Egress recording will fail!" + warn "Configure LIVEKIT_STORAGE_AWS_ACCESS_KEY_ID and LIVEKIT_STORAGE_AWS_SECRET_ACCESS_KEY in server/.env" + warn "Or run with --garage to auto-configure local S3 storage" + else + ok "LiveKit storage: synced from transcript storage config" fi # Generate livekit.yaml @@ -850,13 +872,23 @@ step_server_env() { fi else if [[ -n "$PRIMARY_IP" ]]; then - server_base_url="http://$PRIMARY_IP" + server_base_url="http://$PRIMARY_IP:1250" else server_base_url="http://localhost:1250" fi fi env_set "$SERVER_ENV" "BASE_URL" "$server_base_url" - env_set "$SERVER_ENV" "CORS_ORIGIN" "$server_base_url" + # CORS: allow the frontend origin (port 3000, not the API port) + local cors_origin="${server_base_url}" + if [[ "$USE_CADDY" != "true" ]]; then + # Without Caddy, frontend is on port 3000, API on 1250 + cors_origin="${server_base_url/:1250/:3000}" + # Safety: if substitution didn't change anything, construct explicitly + if [[ "$cors_origin" == "$server_base_url" ]] && [[ -n "$PRIMARY_IP" ]]; then + cors_origin="http://${PRIMARY_IP}:3000" + fi + fi + env_set "$SERVER_ENV" "CORS_ORIGIN" "$cors_origin" # WebRTC: advertise host IP in ICE candidates so browsers can reach the server if [[ -n "$PRIMARY_IP" ]]; then @@ -1066,6 +1098,18 @@ step_server_env() { env_set "$SERVER_ENV" "HATCHET_CLIENT_HOST_PORT" "hatchet:7077" ok "Hatchet connectivity configured (workflow engine for processing pipelines)" + # BIND_HOST controls whether server/web ports are exposed on all interfaces + local root_env="$ROOT_DIR/.env" + touch "$root_env" + if [[ "$USE_CADDY" == "true" ]]; then + # With Caddy, services stay on localhost (Caddy is the public entry point) + env_set "$root_env" "BIND_HOST" "127.0.0.1" + elif [[ -n "$PRIMARY_IP" ]]; then + # Without Caddy + detected IP, expose on all interfaces for direct access + env_set "$root_env" "BIND_HOST" "0.0.0.0" + ok "BIND_HOST=0.0.0.0 (ports exposed for direct access)" + fi + ok "server/.env ready" } @@ -1093,18 +1137,26 @@ step_www_env() { base_url="https://localhost" fi else - # No Caddy — user's proxy handles SSL. Use http for now, they'll override. + # No Caddy — clients connect directly to services on their ports. if [[ -n "$PRIMARY_IP" ]]; then - base_url="http://$PRIMARY_IP" + base_url="http://$PRIMARY_IP:3000" else - base_url="http://localhost" + base_url="http://localhost:3000" fi fi + # API_URL: with Caddy, same origin (443 proxies both); without Caddy, API is on port 1250 + local api_url="$base_url" + if [[ "$USE_CADDY" != "true" ]]; then + api_url="${base_url/:3000/:1250}" + # fallback if no port substitution happened (e.g. localhost without port) + [[ "$api_url" == "$base_url" ]] && api_url="${base_url}:1250" + fi + env_set "$WWW_ENV" "SITE_URL" "$base_url" env_set "$WWW_ENV" "NEXTAUTH_URL" "$base_url" env_set "$WWW_ENV" "NEXTAUTH_SECRET" "$NEXTAUTH_SECRET" - env_set "$WWW_ENV" "API_URL" "$base_url" + env_set "$WWW_ENV" "API_URL" "$api_url" env_set "$WWW_ENV" "WEBSOCKET_URL" "auto" env_set "$WWW_ENV" "SERVER_API_URL" "http://server:1250" env_set "$WWW_ENV" "KV_URL" "redis://redis:6379" @@ -1226,7 +1278,13 @@ step_garage() { # Write S3 credentials to server/.env env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_BACKEND" "aws" - env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://garage:3900" + # Endpoint URL: use public IP when no Caddy so presigned URLs work in the browser. + # With Caddy, internal hostname is fine (Caddy proxies or browser never sees presigned URLs directly). + if [[ "$USE_CADDY" != "true" ]] && [[ -n "$PRIMARY_IP" ]]; then + env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://${PRIMARY_IP}:3900" + else + env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL" "http://garage:3900" + fi env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_BUCKET_NAME" "reflector-media" env_set "$SERVER_ENV" "TRANSCRIPT_STORAGE_AWS_REGION" "garage" if [[ "$created_key" == "true" ]]; then @@ -1355,11 +1413,13 @@ CADDYEOF ok "Created Caddyfile for $CUSTOM_DOMAIN (Let's Encrypt auto-HTTPS)" elif [[ -n "$PRIMARY_IP" ]]; then # No domain, IP only: catch-all :443 with self-signed cert - # (IP connections don't send SNI, so we can't match by address) + # on_demand generates certs dynamically for any hostname/IP on first request cat > "$caddyfile" << CADDYEOF # Generated by setup-selfhosted.sh — self-signed cert for IP access :443 { - tls internal + tls internal { + on_demand + } handle /v1/* { reverse_proxy server:1250 } @@ -1386,7 +1446,9 @@ CADDYEOF # Hatchet workflow dashboard (Daily.co multitrack processing) :8888 { - tls internal + tls internal { + on_demand + } reverse_proxy hatchet:8888 } CADDYEOF @@ -1597,7 +1659,7 @@ step_health() { info "Waiting for Hatchet workflow engine..." local hatchet_ok=false for i in $(seq 1 60); do - if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then + if compose_cmd exec -T hatchet curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then hatchet_ok=true break fi @@ -1645,7 +1707,7 @@ step_hatchet_token() { # Wait for hatchet to be healthy local hatchet_ok=false for i in $(seq 1 60); do - if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then + if compose_cmd exec -T hatchet curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then hatchet_ok=true break fi @@ -1716,12 +1778,19 @@ main() { [[ "$BUILD_IMAGES" == "true" ]] && echo " Build: from source" echo "" - # Detect primary IP - PRIMARY_IP="" - if [[ "$OS" == "Linux" ]]; then - PRIMARY_IP=$(hostname -I 2>/dev/null | awk '{print $1}' || true) - if [[ "$PRIMARY_IP" == "127."* ]] || [[ -z "$PRIMARY_IP" ]]; then - PRIMARY_IP=$(ip -4 route get 1 2>/dev/null | sed -n 's/.*src \([0-9.]*\).*/\1/p' || true) + # Detect primary IP (--ip overrides auto-detection) + if [[ -n "$CUSTOM_IP" ]]; then + PRIMARY_IP="$CUSTOM_IP" + ok "Using provided IP: $PRIMARY_IP" + else + PRIMARY_IP="" + if [[ "$OS" == "Linux" ]]; then + PRIMARY_IP=$(hostname -I 2>/dev/null | awk '{print $1}' || true) + if [[ "$PRIMARY_IP" == "127."* ]] || [[ -z "$PRIMARY_IP" ]]; then + PRIMARY_IP=$(ip -4 route get 1 2>/dev/null | sed -n 's/.*src \([0-9.]*\).*/\1/p' || true) + fi + elif [[ "$OS" == "Darwin" ]]; then + PRIMARY_IP=$(detect_lan_ip) fi fi @@ -1827,10 +1896,12 @@ EOF echo " App: https://localhost (accept self-signed cert in browser)" echo " API: https://localhost/v1/" fi + elif [[ -n "$PRIMARY_IP" ]]; then + echo " App: http://$PRIMARY_IP:3000" + echo " API: http://$PRIMARY_IP:1250" else - echo " No Caddy — point your reverse proxy at:" - echo " Frontend: web:3000 (or localhost:3000 from host)" - echo " API: server:1250 (or localhost:1250 from host)" + echo " App: http://localhost:3000" + echo " API: http://localhost:1250" fi echo "" if [[ "$HAS_OVERRIDES" == "true" ]]; then diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index ba7b8a3a..212877f0 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -165,6 +165,17 @@ class MeetingController: results = await get_database().fetch_all(query) return [Meeting(**result) for result in results] + async def get_all_inactive_livekit(self) -> list[Meeting]: + """Get inactive LiveKit meetings (for multitrack processing discovery).""" + query = meetings.select().where( + sa.and_( + meetings.c.is_active == sa.false(), + meetings.c.platform == "livekit", + ) + ) + results = await get_database().fetch_all(query) + return [Meeting(**result) for result in results] + async def get_by_room_name( self, room_name: str, diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index d903c263..3d137413 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -486,6 +486,14 @@ class TranscriptController: return None return Transcript(**result) + async def get_by_meeting_id(self, meeting_id: str) -> Transcript | None: + """Get a transcript by meeting_id (first match).""" + query = transcripts.select().where(transcripts.c.meeting_id == meeting_id) + result = await get_database().fetch_one(query) + if not result: + return None + return Transcript(**result) + async def get_by_recording_id( self, recording_id: str, **kwargs ) -> Transcript | None: diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index ef8a5c16..ec99f47b 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -273,8 +273,10 @@ def with_error_handling( ) @with_error_handling(TaskName.GET_RECORDING) async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: - """Fetch recording metadata from Daily.co API.""" - ctx.log(f"get_recording: starting for recording_id={input.recording_id}") + """Fetch recording metadata. Platform-aware: Daily calls API, LiveKit skips.""" + ctx.log( + f"get_recording: starting for recording_id={input.recording_id}, platform={input.source_platform}" + ) ctx.log( f"get_recording: transcript_id={input.transcript_id}, room_id={input.room_id}" ) @@ -299,6 +301,18 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: ) ctx.log(f"get_recording: status set to 'processing' and broadcasted") + # LiveKit: no external API call needed — metadata comes from S3 track listing + if input.source_platform == "livekit": + ctx.log( + "get_recording: LiveKit platform — skipping API call (metadata from S3)" + ) + return RecordingResult( + id=input.recording_id, + mtg_session_id=None, + duration=0, # Duration calculated from tracks later + ) + + # Daily.co: fetch recording metadata from API if not settings.DAILY_API_KEY: ctx.log("get_recording: ERROR - DAILY_API_KEY not configured") raise ValueError("DAILY_API_KEY not configured") @@ -332,11 +346,12 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: ) @with_error_handling(TaskName.GET_PARTICIPANTS) async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult: - """Fetch participant list from Daily.co API and update transcript in database.""" - ctx.log(f"get_participants: transcript_id={input.transcript_id}") + """Fetch participant list and update transcript. Platform-aware.""" + ctx.log( + f"get_participants: transcript_id={input.transcript_id}, platform={input.source_platform}" + ) recording = ctx.task_output(get_recording) - mtg_session_id = recording.mtg_session_id async with fresh_db_connection(): from reflector.db.transcripts import ( # noqa: PLC0415 TranscriptDuration, @@ -347,8 +362,8 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe transcript = await transcripts_controller.get_by_id(input.transcript_id) if not transcript: raise ValueError(f"Transcript {input.transcript_id} not found") - # Note: title NOT cleared - preserves existing titles - # Duration from Daily API (seconds -> milliseconds) - master source + + # Duration from recording metadata (seconds -> milliseconds) duration_ms = recording.duration * 1000 if recording.duration else 0 await transcripts_controller.update( transcript, @@ -360,65 +375,99 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe }, ) - await append_event_and_broadcast( - input.transcript_id, - transcript, - "DURATION", - TranscriptDuration(duration=duration_ms), - logger=logger, - ) - - mtg_session_id = assert_non_none_and_non_empty( - mtg_session_id, "mtg_session_id is required" - ) - daily_api_key = assert_non_none_and_non_empty( - settings.DAILY_API_KEY, "DAILY_API_KEY is required" - ) - - async with DailyApiClient( - api_key=daily_api_key, base_url=settings.DAILY_API_URL - ) as client: - participants = await client.get_meeting_participants(mtg_session_id) - - id_to_name = {} - id_to_user_id = {} - for p in participants.data: - if p.user_name: - id_to_name[p.participant_id] = p.user_name - if p.user_id: - id_to_user_id[p.participant_id] = p.user_id - - track_keys = [t["s3_key"] for t in input.tracks] - cam_audio_keys = filter_cam_audio_tracks(track_keys) + if duration_ms: + await append_event_and_broadcast( + input.transcript_id, + transcript, + "DURATION", + TranscriptDuration(duration=duration_ms), + logger=logger, + ) participants_list: list[ParticipantInfo] = [] - for idx, key in enumerate(cam_audio_keys): - try: - parsed = parse_daily_recording_filename(key) - participant_id = parsed.participant_id - except ValueError as e: - logger.error( - "Failed to parse Daily recording filename", - error=str(e), - key=key, - ) - continue - default_name = f"Speaker {idx}" - name = id_to_name.get(participant_id, default_name) - user_id = id_to_user_id.get(participant_id) - - participant = TranscriptParticipant( - id=participant_id, speaker=idx, name=name, user_id=user_id + if input.source_platform == "livekit": + # LiveKit: participant identity is in the track dict or can be parsed from filepath + from reflector.utils.livekit import ( + parse_livekit_track_filepath, # noqa: PLC0415 ) - await transcripts_controller.upsert_participant(transcript, participant) - participants_list.append( - ParticipantInfo( - participant_id=participant_id, - user_name=name, + + for idx, track in enumerate(input.tracks): + identity = track.get("participant_identity") + if not identity: + # Reprocess path: parse from S3 key + try: + parsed = parse_livekit_track_filepath(track["s3_key"]) + identity = parsed.participant_identity + except (ValueError, KeyError): + identity = f"speaker-{idx}" + participant = TranscriptParticipant( + id=identity, speaker=idx, + name=identity, + user_id=identity if not identity.startswith("anon-") else None, ) + await transcripts_controller.upsert_participant(transcript, participant) + participants_list.append( + ParticipantInfo( + participant_id=identity, + user_name=identity, + speaker=idx, + ) + ) + else: + # Daily.co: fetch participant names from API + mtg_session_id = recording.mtg_session_id + mtg_session_id = assert_non_none_and_non_empty( + mtg_session_id, "mtg_session_id is required" ) + daily_api_key = assert_non_none_and_non_empty( + settings.DAILY_API_KEY, "DAILY_API_KEY is required" + ) + + async with DailyApiClient( + api_key=daily_api_key, base_url=settings.DAILY_API_URL + ) as client: + participants = await client.get_meeting_participants(mtg_session_id) + + id_to_name = {} + id_to_user_id = {} + for p in participants.data: + if p.user_name: + id_to_name[p.participant_id] = p.user_name + if p.user_id: + id_to_user_id[p.participant_id] = p.user_id + + track_keys = [t["s3_key"] for t in input.tracks] + cam_audio_keys = filter_cam_audio_tracks(track_keys) + + for idx, key in enumerate(cam_audio_keys): + try: + parsed = parse_daily_recording_filename(key) + participant_id = parsed.participant_id + except ValueError as e: + logger.error( + "Failed to parse Daily recording filename", + error=str(e), + key=key, + ) + continue + + default_name = f"Speaker {idx}" + name = id_to_name.get(participant_id, default_name) + user_id = id_to_user_id.get(participant_id) + + participant = TranscriptParticipant( + id=participant_id, speaker=idx, name=name, user_id=user_id + ) + await transcripts_controller.upsert_participant(transcript, participant) + participants_list.append( + ParticipantInfo( + participant_id=participant_id, + user_name=name, + speaker=idx, + ) + ) ctx.log(f"get_participants complete: {len(participants_list)} participants") @@ -440,11 +489,56 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe @with_error_handling(TaskName.PROCESS_TRACKS) async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult: """Spawn child workflows for each track (dynamic fan-out).""" - ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows") + ctx.log( + f"process_tracks: spawning {len(input.tracks)} track workflows, platform={input.source_platform}" + ) participants_result = ctx.task_output(get_participants) source_language = participants_result.source_language + # For LiveKit: calculate padding offsets from filename timestamps. + # OGG files don't have embedded start_time metadata, so we pre-calculate. + track_padding: dict[int, float] = {} + if input.source_platform == "livekit": + from datetime import datetime # noqa: PLC0415 + + from reflector.utils.livekit import ( + parse_livekit_track_filepath, # noqa: PLC0415 + ) + + timestamps = [] + for i, track in enumerate(input.tracks): + ts_str = track.get("timestamp") + if ts_str: + try: + ts = datetime.fromisoformat(ts_str) + timestamps.append((i, ts)) + except (ValueError, TypeError): + ctx.log( + f"process_tracks: could not parse timestamp for track {i}: {ts_str}" + ) + timestamps.append((i, None)) + else: + # Reprocess path: parse timestamp from S3 key + try: + parsed = parse_livekit_track_filepath(track["s3_key"]) + timestamps.append((i, parsed.timestamp)) + ctx.log( + f"process_tracks: parsed timestamp from S3 key for track {i}: {parsed.timestamp}" + ) + except (ValueError, KeyError): + timestamps.append((i, None)) + + valid_timestamps = [(i, ts) for i, ts in timestamps if ts is not None] + if valid_timestamps: + earliest = min(ts for _, ts in valid_timestamps) + for i, ts in valid_timestamps: + offset = (ts - earliest).total_seconds() + track_padding[i] = offset + ctx.log( + f"process_tracks: track {i} padding={offset}s (from filename timestamp)" + ) + bulk_runs = [ track_workflow.create_bulk_run_item( input=TrackInput( @@ -454,6 +548,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes transcript_id=input.transcript_id, language=source_language, source_platform=input.source_platform, + padding_seconds=track_padding.get(i), ) ) for i, track in enumerate(input.tracks) diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index 2458ee0c..b2b477f2 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -37,6 +37,9 @@ class TrackInput(BaseModel): transcript_id: str language: str = "en" source_platform: str = "daily" + # Pre-calculated padding in seconds (from filename timestamps for LiveKit). + # When set, overrides container metadata extraction for start_time. + padding_seconds: float | None = None hatchet = HatchetClientManager.get_client() @@ -53,15 +56,19 @@ track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackI async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: """Pad single audio track with silence for alignment. - Extracts stream.start_time from WebM container metadata and applies - silence padding using PyAV filter graph (adelay). + For Daily: extracts stream.start_time from WebM container metadata. + For LiveKit: uses pre-calculated padding_seconds from filename timestamps + (OGG files don't have embedded start_time metadata). """ - ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}") + ctx.log( + f"pad_track: track {input.track_index}, s3_key={input.s3_key}, padding_seconds={input.padding_seconds}" + ) logger.info( "[Hatchet] pad_track", track_index=input.track_index, s3_key=input.s3_key, transcript_id=input.transcript_id, + padding_seconds=input.padding_seconds, ) try: @@ -79,10 +86,16 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: bucket=input.bucket_name, ) - with av.open(source_url) as in_container: - start_time_seconds = extract_stream_start_time_from_container( - in_container, input.track_index, logger=logger - ) + if input.padding_seconds is not None: + # Pre-calculated offset (LiveKit: from filename timestamps) + start_time_seconds = input.padding_seconds + ctx.log(f"pad_track: using pre-calculated padding={start_time_seconds}s") + else: + # Extract from container metadata (Daily: WebM start_time) + with av.open(source_url) as in_container: + start_time_seconds = extract_stream_start_time_from_container( + in_container, input.track_index, logger=logger + ) # If no padding needed, return original S3 key if start_time_seconds <= 0: diff --git a/server/reflector/livekit_api/client.py b/server/reflector/livekit_api/client.py index 48f533d0..963482e8 100644 --- a/server/reflector/livekit_api/client.py +++ b/server/reflector/livekit_api/client.py @@ -9,6 +9,7 @@ from datetime import timedelta from livekit.api import ( AccessToken, + AutoTrackEgress, CreateRoomRequest, DeleteRoomRequest, DirectFileOutput, @@ -17,6 +18,7 @@ from livekit.api import ( ListParticipantsRequest, LiveKitAPI, Room, + RoomEgress, S3Upload, StopEgressRequest, TrackEgressRequest, @@ -55,6 +57,8 @@ class LiveKitApiClient: name: str, empty_timeout: int = 300, max_participants: int = 0, + enable_auto_track_egress: bool = False, + track_egress_filepath: str = "livekit/{room_name}/{publisher_identity}-{time}", ) -> Room: """Create a LiveKit room. @@ -62,11 +66,25 @@ class LiveKitApiClient: name: Room name (unique identifier). empty_timeout: Seconds to keep room alive after last participant leaves. max_participants: 0 = unlimited. + enable_auto_track_egress: If True, automatically record each participant's + audio track to S3 as a separate file (OGG/Opus). + track_egress_filepath: S3 filepath template for auto track egress. + Supports {room_name}, {publisher_identity}, {time}. """ + egress = None + if enable_auto_track_egress: + egress = RoomEgress( + tracks=AutoTrackEgress( + filepath=track_egress_filepath, + s3=self._build_s3_upload(), + ), + ) + req = CreateRoomRequest( name=name, empty_timeout=empty_timeout, max_participants=max_participants, + egress=egress, ) return await self._api.room.create_room(req) diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index d15df299..b4040753 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -155,12 +155,17 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu ) if track_keys: + # Detect platform from recording ID prefix + source_platform = ( + "livekit" if recording_id and recording_id.startswith("lk-") else "daily" + ) return MultitrackProcessingConfig( bucket_name=bucket_name, # type: ignore (validated above) track_keys=track_keys, transcript_id=validation.transcript_id, recording_id=recording_id, room_id=validation.room_id, + source_platform=source_platform, ) return FileProcessingConfig( diff --git a/server/reflector/utils/livekit.py b/server/reflector/utils/livekit.py new file mode 100644 index 00000000..de05967a --- /dev/null +++ b/server/reflector/utils/livekit.py @@ -0,0 +1,112 @@ +""" +LiveKit track file utilities. + +Parse participant identity and timing from Auto Track Egress S3 filepaths. + +Actual filepath format from LiveKit Auto Track Egress: + livekit/{room_name}/{publisher_identity}-{ISO_timestamp}-{track_id}.{ext} + +Examples: + livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg + livekit/myroom-20260401172036/juan2-63abcf-2026-04-01T195847-TR_AMyoSbM7tAQbYj.ogg + livekit/myroom-20260401172036/EG_K5sipvfB5fTM.json (manifest, skip) + livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm (video, skip) +""" + +import re +from dataclasses import dataclass +from datetime import datetime, timezone + +from reflector.utils.string import NonEmptyString + + +@dataclass +class LiveKitTrackFile: + """Parsed info from a LiveKit track egress filepath.""" + + s3_key: str + room_name: str + participant_identity: str + timestamp: datetime # Parsed from ISO timestamp in filename + track_id: str # LiveKit track ID (e.g., TR_AMR3SWs74Divho) + + +# Pattern: livekit/{room_name}/{identity}-{ISO_date}T{time}-{track_id}.{ext} +# The identity can contain alphanumeric, hyphens, underscores +# ISO timestamp is like 2026-04-01T195758 +# Track ID starts with TR_ +_TRACK_FILENAME_PATTERN = re.compile( + r"^livekit/(?P[^/]+)/(?P.+?)-(?P\d{4}-\d{2}-\d{2}T\d{6})-(?PTR_\w+)\.(?P\w+)$" +) + + +def parse_livekit_track_filepath(s3_key: str) -> LiveKitTrackFile: + """Parse a LiveKit track egress filepath into components. + + Args: + s3_key: S3 key like 'livekit/myroom-20260401/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg' + + Returns: + LiveKitTrackFile with parsed components. + + Raises: + ValueError: If the filepath doesn't match the expected format. + """ + match = _TRACK_FILENAME_PATTERN.match(s3_key) + if not match: + raise ValueError( + f"LiveKit track filepath doesn't match expected format: {s3_key}" + ) + + # Parse ISO-ish timestamp (e.g., 2026-04-01T195758 → datetime) + ts_str = match.group("timestamp") + try: + ts = datetime.strptime(ts_str, "%Y-%m-%dT%H%M%S").replace(tzinfo=timezone.utc) + except ValueError: + raise ValueError(f"Cannot parse timestamp '{ts_str}' from: {s3_key}") + + return LiveKitTrackFile( + s3_key=s3_key, + room_name=match.group("room_name"), + participant_identity=match.group("identity"), + timestamp=ts, + track_id=match.group("track_id"), + ) + + +def filter_audio_tracks(s3_keys: list[str]) -> list[str]: + """Filter S3 keys to only audio tracks (.ogg), excluding manifests and video.""" + return [k for k in s3_keys if k.endswith(".ogg")] + + +def calculate_track_offsets( + tracks: list[LiveKitTrackFile], +) -> list[tuple[LiveKitTrackFile, float]]: + """Calculate silence padding offset for each track. + + The earliest track starts at time zero. Each subsequent track + gets (track_timestamp - earliest_timestamp) seconds of silence prepended. + + Returns: + List of (track, offset_seconds) tuples. + """ + if not tracks: + return [] + + earliest = min(t.timestamp for t in tracks) + return [(t, (t.timestamp - earliest).total_seconds()) for t in tracks] + + +def extract_livekit_base_room_name(livekit_room_name: str) -> NonEmptyString: + """Extract base room name from LiveKit timestamped room name. + + LiveKit rooms use the same naming as Daily: {base_name}-YYYYMMDDHHMMSS + """ + base_name = livekit_room_name.rsplit("-", 1)[0] + assert base_name, f"Extracted base name is empty from: {livekit_room_name}" + return NonEmptyString(base_name) + + +def recording_lock_key(room_name: str) -> str: + """Redis lock key for preventing duplicate processing.""" + return f"livekit:processing:{room_name}" diff --git a/server/reflector/video_platforms/livekit.py b/server/reflector/video_platforms/livekit.py index ab579bfd..babc3eed 100644 --- a/server/reflector/video_platforms/livekit.py +++ b/server/reflector/video_platforms/livekit.py @@ -62,9 +62,25 @@ class LiveKitClient(VideoPlatformClient): remaining = int((end_date_aware - now).total_seconds()) empty_timeout = max(300, min(remaining, 86400)) # 5 min to 24 hours + # Enable auto track egress for cloud recording (per-participant audio to S3). + # Gracefully degrade if S3 credentials are missing — room still works, just no recording. + enable_recording = room.recording_type == "cloud" + egress_enabled = False + if enable_recording: + try: + self._api_client._build_s3_upload() # Validate credentials exist + egress_enabled = True + except ValueError: + logger.warning( + "S3 credentials not configured — room created without auto track egress. " + "Set LIVEKIT_STORAGE_AWS_* to enable recording.", + room_name=room_name, + ) + lk_room = await self._api_client.create_room( name=room_name, empty_timeout=empty_timeout, + enable_auto_track_egress=egress_enabled, ) logger.info( @@ -72,6 +88,7 @@ class LiveKitClient(VideoPlatformClient): room_name=lk_room.name, room_sid=lk_room.sid, empty_timeout=empty_timeout, + auto_track_egress=egress_enabled, ) # room_url includes the server URL + room name as query param. diff --git a/server/reflector/views/livekit.py b/server/reflector/views/livekit.py index a02fae43..a89e8f17 100644 --- a/server/reflector/views/livekit.py +++ b/server/reflector/views/livekit.py @@ -5,6 +5,10 @@ Track Egress recording completion. LiveKit sends webhooks as POST requests with JWT authentication in the Authorization header. + +Webhooks are used as fast-path triggers and logging. Track discovery +for the multitrack pipeline uses S3 listing (source of truth), not +webhook data. """ from fastapi import APIRouter, HTTPException, Request @@ -77,10 +81,7 @@ async def livekit_webhook(request: Request): room_name=event.room.name if event.room else None, ) case "room_finished": - logger.info( - "Room finished", - room_name=event.room.name if event.room else None, - ) + await _handle_room_finished(event) case "track_published" | "track_unpublished": logger.debug( f"Track event: {event_type}", @@ -142,49 +143,75 @@ async def _handle_participant_left(event): async def _handle_egress_started(event): egress = event.egress_info - room_name = egress.room_name if egress else None - logger.info( "Egress started", - room_name=room_name, + room_name=egress.room_name if egress else None, egress_id=egress.egress_id if egress else None, ) async def _handle_egress_ended(event): - """Handle Track Egress completion — trigger multitrack processing.""" + """Log Track Egress completion. Files are on S3 already; pipeline uses S3 listing.""" egress = event.egress_info if not egress: logger.warning("egress_ended: no egress info in payload") return - room_name = egress.room_name - - # Check egress status - # EGRESS_COMPLETE = 3, EGRESS_FAILED = 4 - status = egress.status - if status == 4: # EGRESS_FAILED + # EGRESS_FAILED = 4 + if egress.status == 4: logger.error( "Egress failed", - room_name=room_name, + room_name=egress.room_name, egress_id=egress.egress_id, error=egress.error, ) return - # Extract output file info from egress results file_results = list(egress.file_results) - logger.info( "Egress ended", - room_name=room_name, + room_name=egress.room_name, egress_id=egress.egress_id, - status=status, + status=egress.status, num_files=len(file_results), filenames=[f.filename for f in file_results] if file_results else [], ) - # Track Egress produces one file per egress request. - # The multitrack pipeline will be triggered separately once all tracks - # for a room are collected (via periodic polling or explicit trigger). - # TODO: Implement track collection and pipeline trigger + +async def _handle_room_finished(event): + """Fast-path: trigger multitrack processing when room closes. + + This is an optimization — if missed, the process_livekit_ended_meetings + beat task catches it within ~2 minutes. + """ + room_name = event.room.name if event.room else None + if not room_name: + logger.warning("room_finished: no room name in payload") + return + + logger.info("Room finished", room_name=room_name) + + meeting = await meetings_controller.get_by_room_name(room_name) + if not meeting: + logger.warning("room_finished: meeting not found", room_name=room_name) + return + + # Deactivate the meeting — LiveKit room is destroyed, so process_meetings + # can't detect this via API (list_participants returns empty for deleted rooms). + if meeting.is_active: + await meetings_controller.update_meeting(meeting.id, is_active=False) + logger.info("room_finished: meeting deactivated", meeting_id=meeting.id) + + # Import here to avoid circular imports (worker imports views) + from reflector.worker.process import process_livekit_multitrack + + process_livekit_multitrack.delay( + room_name=room_name, + meeting_id=meeting.id, + ) + + logger.info( + "room_finished: queued multitrack processing", + meeting_id=meeting.id, + room_name=room_name, + ) diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index c9ac2f2f..5ff90c97 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -554,6 +554,7 @@ async def rooms_join_meeting( room_name: str, meeting_id: str, user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], + display_name: str | None = None, ): user_id = user["sub"] if user else None room = await rooms_controller.get_by_name(room_name) @@ -599,13 +600,27 @@ async def rooms_join_meeting( meeting.room_url = add_query_param(meeting.room_url, "t", token) elif meeting.platform == "livekit": + import re + import uuid + client = create_platform_client(meeting.platform) - participant_identity = user_id or f"anon-{meeting_id[:8]}" - participant_name = ( - getattr(user, "name", None) or participant_identity - if user - else participant_identity - ) + # Identity must be unique per participant to avoid S3 key collisions. + # Format: {readable_name}-{short_uuid} ensures uniqueness even for same names. + uid_suffix = uuid.uuid4().hex[:6] + if display_name: + safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", display_name.strip())[:40] + participant_identity = ( + f"{safe_name}-{uid_suffix}" if safe_name else f"anon-{uid_suffix}" + ) + elif user_id: + email = getattr(user, "email", None) + if email and "@" in email: + participant_identity = f"{email.split('@')[0]}-{uid_suffix}" + else: + participant_identity = f"{user_id[:12]}-{uid_suffix}" + else: + participant_identity = f"anon-{uid_suffix}" + participant_name = display_name or participant_identity token = client.create_access_token( room_name=meeting.room_name, participant_identity=participant_identity, diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 9bd05491..5e2f730f 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -85,7 +85,21 @@ def build_beat_schedule( _livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL) if _livekit_enabled: - logger.info("LiveKit platform detected") + beat_schedule["process_livekit_ended_meetings"] = { + "task": "reflector.worker.process.process_livekit_ended_meetings", + "schedule": 120, # Every 2 minutes + } + beat_schedule["reprocess_failed_livekit_recordings"] = { + "task": "reflector.worker.process.reprocess_failed_livekit_recordings", + "schedule": crontab(hour=5, minute=0), + } + logger.info( + "LiveKit beat tasks enabled", + tasks=[ + "process_livekit_ended_meetings", + "reprocess_failed_livekit_recordings", + ], + ) _any_platform = _whereby_enabled or _daily_enabled or _livekit_enabled if _any_platform: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index aa7d8042..9df8088a 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -874,6 +874,22 @@ async def process_meetings(): logger_.info( "Meeting deactivated - scheduled time ended with no participants", ) + elif meeting.platform == "livekit" and not has_had_sessions: + # LiveKit rooms are destroyed after empty_timeout. Once gone, + # list_participants returns [] — indistinguishable from "never used". + # Check if meeting was created >10 min ago; if so, assume room is gone. + meeting_start = meeting.start_date + if meeting_start.tzinfo is None: + meeting_start = meeting_start.replace(tzinfo=timezone.utc) + age_minutes = (current_time - meeting_start).total_seconds() / 60 + if age_minutes > 10: + should_deactivate = True + logger_.info( + "LiveKit meeting deactivated - room likely destroyed (no sessions after 10 min)", + age_minutes=round(age_minutes, 1), + ) + else: + logger_.debug("LiveKit meeting still young, keep it") else: logger_.debug("Meeting not yet started, keep it") @@ -1170,3 +1186,278 @@ async def trigger_daily_reconciliation() -> None: except Exception as e: logger.error("Reconciliation trigger failed", error=str(e), exc_info=True) + + +# ============================================================ +# LiveKit multitrack recording tasks +# ============================================================ + + +@shared_task +@asynctask +async def process_livekit_multitrack( + room_name: str, + meeting_id: str, +): + """ + Process LiveKit multitrack recording by discovering tracks on S3. + + Tracks are discovered via S3 listing (source of truth), not webhooks. + Called from room_finished webhook (fast-path) or beat task (fallback). + """ + from reflector.utils.livekit import ( # noqa: PLC0415 + recording_lock_key, + ) + + logger.info( + "Processing LiveKit multitrack recording", + room_name=room_name, + meeting_id=meeting_id, + ) + + lock_key = recording_lock_key(room_name) + async with RedisAsyncLock( + key=lock_key, + timeout=600, + extend_interval=60, + skip_if_locked=True, + blocking=False, + ) as lock: + if not lock.acquired: + logger.warning( + "LiveKit processing skipped - lock already held", + room_name=room_name, + lock_key=lock_key, + ) + return + + await _process_livekit_multitrack_inner(room_name, meeting_id) + + +async def _process_livekit_multitrack_inner( + room_name: str, + meeting_id: str, +): + """Inner processing logic for LiveKit multitrack recording.""" + from reflector.storage import get_source_storage # noqa: PLC0415 + from reflector.utils.livekit import ( # noqa: PLC0415 + extract_livekit_base_room_name, + filter_audio_tracks, + parse_livekit_track_filepath, + ) + + # 1. Discover tracks by listing S3 prefix + storage = get_source_storage("livekit") + s3_prefix = f"livekit/{room_name}/" + all_keys = await storage.list_objects(prefix=s3_prefix) + + if not all_keys: + logger.warning( + "No track files found on S3 for LiveKit room", + room_name=room_name, + s3_prefix=s3_prefix, + ) + return + + # Filter to audio tracks only (.ogg) — skip .json manifests and .webm video + audio_keys = filter_audio_tracks(all_keys) + logger.info( + "Found track files on S3", + room_name=room_name, + total_files=len(all_keys), + audio_files=len(audio_keys), + ) + + if not audio_keys: + logger.warning( + "No audio track files found (only manifests/video)", + room_name=room_name, + ) + return + + # 2. Parse track info from filenames + parsed_tracks = [] + for key in audio_keys: + try: + parsed = parse_livekit_track_filepath(key) + parsed_tracks.append(parsed) + except ValueError as e: + logger.warning("Skipping unparseable track file", s3_key=key, error=str(e)) + + if not parsed_tracks: + logger.warning( + "No valid track files found after parsing", + room_name=room_name, + raw_keys=all_keys, + ) + return + + track_keys = [t.s3_key for t in parsed_tracks] + + # 3. Find meeting and room + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + logger.error( + "Meeting not found for LiveKit recording", + meeting_id=meeting_id, + room_name=room_name, + ) + return + + base_room_name = extract_livekit_base_room_name(room_name) + room = await rooms_controller.get_by_name(base_room_name) + if not room: + logger.error("Room not found", room_name=base_room_name) + return + + # 4. Create recording + recording_id = f"lk-{room_name}" + bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME or "" + + existing_recording = await recordings_controller.get_by_id(recording_id) + if existing_recording and existing_recording.deleted_at is not None: + logger.info("Skipping soft-deleted recording", recording_id=recording_id) + return + + if not existing_recording: + recording = await recordings_controller.create( + Recording( + id=recording_id, + bucket_name=bucket_name, + object_key=s3_prefix, + recorded_at=datetime.now(timezone.utc), + meeting_id=meeting.id, + track_keys=track_keys, + ) + ) + else: + recording = existing_recording + + # 5. Create or get transcript + transcript = await transcripts_controller.get_by_recording_id(recording.id) + if transcript and transcript.deleted_at is not None: + logger.info("Skipping soft-deleted transcript", recording_id=recording.id) + return + if not transcript: + transcript = await transcripts_controller.add( + "", + source_kind=SourceKind.ROOM, + source_language="en", + target_language="en", + user_id=room.user_id, + recording_id=recording.id, + share_mode="semi-private", + meeting_id=meeting.id, + room_id=room.id, + ) + + # 6. Start Hatchet pipeline (reuses DiarizationPipeline with source_platform="livekit") + workflow_id = await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": recording_id, + "tracks": [ + { + "s3_key": t.s3_key, + "participant_identity": t.participant_identity, + "timestamp": t.timestamp.isoformat(), + } + for t in parsed_tracks + ], + "bucket_name": bucket_name, + "transcript_id": transcript.id, + "room_id": room.id, + "source_platform": "livekit", + }, + additional_metadata={ + "transcript_id": transcript.id, + "recording_id": recording_id, + }, + ) + logger.info( + "Started LiveKit Hatchet workflow", + workflow_id=workflow_id, + transcript_id=transcript.id, + room_name=room_name, + num_tracks=len(parsed_tracks), + ) + + await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id}) + + +@shared_task +@asynctask +async def process_livekit_ended_meetings(): + """Check for inactive LiveKit meetings that need multitrack processing. + + Runs on a beat schedule. Catches cases where room_finished webhook was missed. + Only processes meetings that: + - Platform is "livekit" + - is_active=False (already deactivated by process_meetings) + - No associated transcript yet + """ + from reflector.db.transcripts import transcripts_controller as tc # noqa: PLC0415 + + all_livekit = await meetings_controller.get_all_inactive_livekit() + + queued = 0 + for meeting in all_livekit: + # Skip if already has a transcript + existing = await tc.get_by_meeting_id(meeting.id) + if existing: + continue + + logger.info( + "Found unprocessed inactive LiveKit meeting", + meeting_id=meeting.id, + room_name=meeting.room_name, + ) + + process_livekit_multitrack.delay( + room_name=meeting.room_name, + meeting_id=meeting.id, + ) + queued += 1 + + if queued > 0: + logger.info("Queued LiveKit multitrack processing", count=queued) + + +@shared_task +@asynctask +async def reprocess_failed_livekit_recordings(): + """Reprocess LiveKit recordings that failed. + + Runs daily at 5 AM. Finds recordings with livekit prefix and error status. + """ + bucket_name = settings.LIVEKIT_STORAGE_AWS_BUCKET_NAME + if not bucket_name: + return + + failed = await recordings_controller.get_multitrack_needing_reprocessing( + bucket_name + ) + livekit_failed = [r for r in failed if r.id.startswith("lk-")] + + for recording in livekit_failed: + if not recording.meeting_id: + logger.warning( + "Skipping reprocess — no meeting_id", + recording_id=recording.id, + ) + continue + + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if not meeting: + continue + + logger.info( + "Reprocessing failed LiveKit recording", + recording_id=recording.id, + meeting_id=meeting.id, + ) + + process_livekit_multitrack.delay( + room_name=meeting.room_name, + meeting_id=meeting.id, + ) diff --git a/server/tests/test_livekit_track_processing.py b/server/tests/test_livekit_track_processing.py new file mode 100644 index 00000000..4e2e98b7 --- /dev/null +++ b/server/tests/test_livekit_track_processing.py @@ -0,0 +1,393 @@ +""" +Tests for LiveKit track processing: filepath parsing, offset calculation, +and pad_track padding_seconds behavior. +""" + +from datetime import datetime, timezone +from fractions import Fraction + +import av +import pytest + +from reflector.utils.livekit import ( + LiveKitTrackFile, + calculate_track_offsets, + extract_livekit_base_room_name, + filter_audio_tracks, + parse_livekit_track_filepath, +) + +# ── Filepath parsing ────────────────────────────────────────── + + +class TestParseLiveKitTrackFilepath: + def test_parses_ogg_audio_track(self): + result = parse_livekit_track_filepath( + "livekit/myroom-20260401172036/juan-4b82ed-2026-04-01T195758-TR_AMR3SWs74Divho.ogg" + ) + assert result.room_name == "myroom-20260401172036" + assert result.participant_identity == "juan-4b82ed" + assert result.track_id == "TR_AMR3SWs74Divho" + assert result.timestamp == datetime(2026, 4, 1, 19, 57, 58, tzinfo=timezone.utc) + + def test_parses_different_identities(self): + r1 = parse_livekit_track_filepath( + "livekit/room-20260401/alice-a1b2c3-2026-04-01T100000-TR_abc123.ogg" + ) + r2 = parse_livekit_track_filepath( + "livekit/room-20260401/bob_smith-d4e5f6-2026-04-01T100030-TR_def456.ogg" + ) + assert r1.participant_identity == "alice-a1b2c3" + assert r2.participant_identity == "bob_smith-d4e5f6" + + def test_rejects_json_manifest(self): + with pytest.raises(ValueError, match="doesn't match expected format"): + parse_livekit_track_filepath("livekit/myroom-20260401/EG_K5sipvfB5fTM.json") + + def test_rejects_webm_video(self): + # webm files match the pattern but are filtered by filter_audio_tracks + result = parse_livekit_track_filepath( + "livekit/myroom-20260401/juan-4b82ed-2026-04-01T195727-TR_VC679dgMQBdfhT.webm" + ) + # webm parses successfully (TR_ prefix matches video tracks too) + assert result.track_id == "TR_VC679dgMQBdfhT" + + def test_rejects_invalid_path(self): + with pytest.raises(ValueError): + parse_livekit_track_filepath("not/a/valid/path.ogg") + + def test_rejects_missing_track_id(self): + with pytest.raises(ValueError): + parse_livekit_track_filepath("livekit/room/user-2026-04-01T100000.ogg") + + def test_parses_timestamp_correctly(self): + result = parse_livekit_track_filepath( + "livekit/room-20260401/user-abc123-2026-12-25T235959-TR_test.ogg" + ) + assert result.timestamp == datetime( + 2026, 12, 25, 23, 59, 59, tzinfo=timezone.utc + ) + + +# ── Audio track filtering ───────────────────────────────────── + + +class TestFilterAudioTracks: + def test_filters_to_ogg_only(self): + keys = [ + "livekit/room/EG_abc.json", + "livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg", + "livekit/room/user-abc-2026-04-01T100000-TR_video.webm", + "livekit/room/EG_def.json", + "livekit/room/user2-def-2026-04-01T100030-TR_audio2.ogg", + ] + result = filter_audio_tracks(keys) + assert len(result) == 2 + assert all(k.endswith(".ogg") for k in result) + + def test_empty_input(self): + assert filter_audio_tracks([]) == [] + + def test_no_audio_tracks(self): + keys = ["livekit/room/EG_abc.json", "livekit/room/user-TR_v.webm"] + assert filter_audio_tracks(keys) == [] + + +# ── Offset calculation ───────────────────────────────────────── + + +class TestCalculateTrackOffsets: + def test_single_track_zero_offset(self): + tracks = [ + LiveKitTrackFile( + s3_key="k1", + room_name="r", + participant_identity="alice", + timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc), + track_id="TR_1", + ) + ] + offsets = calculate_track_offsets(tracks) + assert len(offsets) == 1 + assert offsets[0][1] == 0.0 + + def test_two_tracks_correct_offset(self): + tracks = [ + LiveKitTrackFile( + s3_key="k1", + room_name="r", + participant_identity="alice", + timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc), + track_id="TR_1", + ), + LiveKitTrackFile( + s3_key="k2", + room_name="r", + participant_identity="bob", + timestamp=datetime(2026, 4, 1, 10, 1, 10, tzinfo=timezone.utc), + track_id="TR_2", + ), + ] + offsets = calculate_track_offsets(tracks) + assert offsets[0][1] == 0.0 # alice (earliest) + assert offsets[1][1] == 70.0 # bob (70 seconds later) + + def test_three_tracks_earliest_is_zero(self): + tracks = [ + LiveKitTrackFile( + s3_key="k2", + room_name="r", + participant_identity="bob", + timestamp=datetime(2026, 4, 1, 10, 0, 30, tzinfo=timezone.utc), + track_id="TR_2", + ), + LiveKitTrackFile( + s3_key="k1", + room_name="r", + participant_identity="alice", + timestamp=datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc), + track_id="TR_1", + ), + LiveKitTrackFile( + s3_key="k3", + room_name="r", + participant_identity="charlie", + timestamp=datetime(2026, 4, 1, 10, 1, 0, tzinfo=timezone.utc), + track_id="TR_3", + ), + ] + offsets = calculate_track_offsets(tracks) + offset_map = {t.participant_identity: o for t, o in offsets} + assert offset_map["alice"] == 0.0 + assert offset_map["bob"] == 30.0 + assert offset_map["charlie"] == 60.0 + + def test_empty_tracks(self): + assert calculate_track_offsets([]) == [] + + def test_simultaneous_tracks_zero_offsets(self): + ts = datetime(2026, 4, 1, 10, 0, 0, tzinfo=timezone.utc) + tracks = [ + LiveKitTrackFile( + s3_key="k1", + room_name="r", + participant_identity="a", + timestamp=ts, + track_id="TR_1", + ), + LiveKitTrackFile( + s3_key="k2", + room_name="r", + participant_identity="b", + timestamp=ts, + track_id="TR_2", + ), + ] + offsets = calculate_track_offsets(tracks) + assert all(o == 0.0 for _, o in offsets) + + +# ── Room name extraction ─────────────────────────────────────── + + +class TestExtractLiveKitBaseRoomName: + def test_strips_timestamp_suffix(self): + assert extract_livekit_base_room_name("myroom-20260401172036") == "myroom" + + def test_preserves_hyphenated_name(self): + assert ( + extract_livekit_base_room_name("my-room-name-20260401172036") + == "my-room-name" + ) + + def test_single_segment(self): + assert extract_livekit_base_room_name("room-20260401") == "room" + + +# ── pad_track padding_seconds behavior ───────────────────────── + + +class TestPadTrackPaddingSeconds: + """Test that pad_track correctly uses pre-calculated padding_seconds + for LiveKit (skipping container metadata) vs extracting from container + for Daily (when padding_seconds is None). + """ + + def _make_test_ogg(self, path: str, duration_seconds: float = 5.0): + """Create a minimal OGG/Opus file for testing.""" + with av.open(path, "w", format="ogg") as out: + stream = out.add_stream("libopus", rate=48000) + stream.bit_rate = 64000 + samples_per_frame = 960 # Opus standard + total_samples = int(duration_seconds * 48000) + pts = 0 + while pts < total_samples: + frame = av.AudioFrame( + format="s16", layout="stereo", samples=samples_per_frame + ) + # Fill with silence (zeros) + frame.planes[0].update(bytes(samples_per_frame * 2 * 2)) # s16 * stereo + frame.sample_rate = 48000 + frame.pts = pts + frame.time_base = Fraction(1, 48000) + for packet in stream.encode(frame): + out.mux(packet) + pts += samples_per_frame + for packet in stream.encode(None): + out.mux(packet) + + def test_ogg_has_zero_start_time(self, tmp_path): + """Verify that OGG files (like LiveKit produces) have start_time=0, + confirming why pre-calculated padding is needed.""" + ogg_path = str(tmp_path / "test.ogg") + self._make_test_ogg(ogg_path) + + with av.open(ogg_path) as container: + from reflector.utils.audio_padding import ( + extract_stream_start_time_from_container, + ) + + start_time = extract_stream_start_time_from_container(container, 0) + + assert start_time <= 0.0, ( + "OGG files should have start_time<=0 (no usable offset), confirming " + f"LiveKit tracks need pre-calculated padding_seconds. Got: {start_time}" + ) + + def test_precalculated_padding_skips_metadata_extraction(self, tmp_path): + """When padding_seconds is set, pad_track should use it directly + and NOT call extract_stream_start_time_from_container.""" + from reflector.hatchet.workflows.track_processing import TrackInput + + input_data = TrackInput( + track_index=0, + s3_key="livekit/room/user-abc-2026-04-01T100000-TR_audio.ogg", + bucket_name="test-bucket", + transcript_id="test-transcript", + source_platform="livekit", + padding_seconds=70.0, + ) + + assert input_data.padding_seconds == 70.0 + # The pad_track function checks: if input.padding_seconds is not None → use it + # This means extract_stream_start_time_from_container is never called for LiveKit + + def test_none_padding_falls_back_to_metadata(self, tmp_path): + """When padding_seconds is None (Daily), pad_track should extract + start_time from container metadata.""" + from reflector.hatchet.workflows.track_processing import TrackInput + + input_data = TrackInput( + track_index=0, + s3_key="daily/room/track.webm", + bucket_name="test-bucket", + transcript_id="test-transcript", + source_platform="daily", + padding_seconds=None, + ) + + assert input_data.padding_seconds is None + # pad_track will call extract_stream_start_time_from_container for this case + + def test_zero_padding_returns_original_key(self): + """When padding_seconds=0.0, pad_track should return the original S3 key + without applying any padding (same as start_time=0 from metadata).""" + from reflector.hatchet.workflows.track_processing import TrackInput + + input_data = TrackInput( + track_index=0, + s3_key="livekit/room/earliest-track.ogg", + bucket_name="test-bucket", + transcript_id="test-transcript", + source_platform="livekit", + padding_seconds=0.0, + ) + + # padding_seconds=0.0 → start_time_seconds=0.0 → "no padding needed" branch + assert input_data.padding_seconds == 0.0 + + +# ── Pipeline offset calculation (process_tracks logic) ───────── + + +class TestProcessTracksOffsetCalculation: + """Test the offset calculation logic used in process_tracks + for LiveKit source_platform.""" + + def test_livekit_offsets_from_timestamps(self): + """Simulate the offset calculation done in process_tracks.""" + tracks = [ + { + "s3_key": "track1.ogg", + "participant_identity": "admin-0129c3", + "timestamp": "2026-04-01T23:44:50+00:00", + }, + { + "s3_key": "track2.ogg", + "participant_identity": "juan-5a5b41", + "timestamp": "2026-04-01T23:46:00+00:00", + }, + ] + + # Replicate the logic from process_tracks + timestamps = [] + for i, track in enumerate(tracks): + ts_str = track.get("timestamp") + if ts_str: + ts = datetime.fromisoformat(ts_str) + timestamps.append((i, ts)) + + earliest = min(ts for _, ts in timestamps) + track_padding = {} + for i, ts in timestamps: + track_padding[i] = (ts - earliest).total_seconds() + + assert track_padding[0] == 0.0 # admin (earliest) + assert track_padding[1] == 70.0 # juan (70s later) + + def test_daily_tracks_get_no_precalculated_padding(self): + """Daily tracks should NOT get padding_seconds (use container metadata).""" + tracks = [ + {"s3_key": "daily-track1.webm"}, + {"s3_key": "daily-track2.webm"}, + ] + + # Daily tracks don't have "timestamp" field + track_padding = {} + source_platform = "daily" + + if source_platform == "livekit": + # This block should NOT execute for daily + pass + + # Daily tracks get no pre-calculated padding + assert track_padding == {} + for i, _ in enumerate(tracks): + assert track_padding.get(i) is None + + def test_livekit_missing_timestamp_graceful(self): + """If a LiveKit track is missing timestamp, it should be skipped.""" + tracks = [ + { + "s3_key": "track1.ogg", + "participant_identity": "alice", + "timestamp": "2026-04-01T10:00:00+00:00", + }, + {"s3_key": "track2.ogg", "participant_identity": "bob"}, # no timestamp + ] + + timestamps = [] + for i, track in enumerate(tracks): + ts_str = track.get("timestamp") + if ts_str: + try: + ts = datetime.fromisoformat(ts_str) + timestamps.append((i, ts)) + except (ValueError, TypeError): + timestamps.append((i, None)) + else: + timestamps.append((i, None)) + + valid = [(i, ts) for i, ts in timestamps if ts is not None] + assert len(valid) == 1 # only alice has a timestamp + assert valid[0][0] == 0 # track index 0 diff --git a/www/app/[roomName]/components/LiveKitRoom.tsx b/www/app/[roomName]/components/LiveKitRoom.tsx index 7cdfed0c..2689bf92 100644 --- a/www/app/[roomName]/components/LiveKitRoom.tsx +++ b/www/app/[roomName]/components/LiveKitRoom.tsx @@ -7,10 +7,9 @@ import { LiveKitRoom as LKRoom, VideoConference, RoomAudioRenderer, + PreJoin, + type LocalUserChoices, } from "@livekit/components-react"; -// LiveKit component styles — imported in the global layout to avoid -// Next.js CSS import restrictions in client components. -// See: app/[roomName]/layout.tsx import type { components } from "../../reflector-api"; import { useAuth } from "../../lib/AuthProvider"; import { useRoomJoinMeeting } from "../../lib/apiHooks"; @@ -68,6 +67,7 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { const joinMutation = useRoomJoinMeeting(); const [joinedMeeting, setJoinedMeeting] = useState(null); const [connectionError, setConnectionError] = useState(false); + const [userChoices, setUserChoices] = useState(null); // ── Consent dialog (same hooks as Daily/Whereby) ────────── const { showConsentButton, showRecordingIndicator } = useConsentDialog({ @@ -87,9 +87,21 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { }); const showEmailFeature = featureEnabled("emailTranscript"); - // ── Join meeting via backend API to get token ───────────── + // ── PreJoin defaults ────────────────────────────────────── + const defaultUsername = + auth.status === "authenticated" || auth.status === "refreshing" + ? auth.user.email?.split("@")[0] || auth.user.id?.slice(0, 12) || "" + : ""; + + // ── Join meeting via backend API after PreJoin submit ───── useEffect(() => { - if (authLastUserId === undefined || !meeting?.id || !roomName) return; + if ( + authLastUserId === undefined || + !userChoices || + !meeting?.id || + !roomName + ) + return; let cancelled = false; async function join() { @@ -97,6 +109,7 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { const result = await joinMutation.mutateAsync({ params: { path: { room_name: roomName, meeting_id: meeting.id }, + query: { display_name: userChoices!.username || undefined }, }, }); if (!cancelled) setJoinedMeeting(result); @@ -110,12 +123,41 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { return () => { cancelled = true; }; - }, [meeting?.id, roomName, authLastUserId]); + }, [meeting?.id, roomName, authLastUserId, userChoices]); const handleDisconnected = useCallback(() => { router.push("/browse"); }, [router]); + const handlePreJoinSubmit = useCallback((choices: LocalUserChoices) => { + setUserChoices(choices); + }, []); + + // ── PreJoin screen (name + device selection) ────────────── + if (!userChoices) { + return ( + + + + ); + } + // ── Loading / error states ──────────────────────────────── if (connectionError) { return ( @@ -170,8 +212,8 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { serverUrl={serverUrl} token={token} connect={true} - audio={true} - video={true} + audio={userChoices.audioEnabled} + video={userChoices.videoEnabled} onDisconnected={handleDisconnected} data-lk-theme="default" style={{ height: "100%" }} diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index f759b513..dd41259f 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -3259,7 +3259,9 @@ export interface operations { }; v1_rooms_join_meeting: { parameters: { - query?: never; + query?: { + display_name?: string | null; + }; header?: never; path: { room_name: string;