diff --git a/docsv2/firewall-ports.md b/docsv2/firewall-ports.md new file mode 100644 index 00000000..f2d091e8 --- /dev/null +++ b/docsv2/firewall-ports.md @@ -0,0 +1,97 @@ +# Firewall & Port Requirements + +Ports that need to be open on your server firewall, organized by deployment mode. + +## With Caddy (--caddy or --ip or --domain) + +Caddy acts as the reverse proxy. Most services are only accessible through Caddy on port 443. + +| Port | Protocol | Direction | Service | Required? | +|------|----------|-----------|---------|-----------| +| 443 | TCP | Inbound | Caddy HTTPS — web app, API, LiveKit signaling (`/lk-ws`) | Yes | +| 80 | TCP | Inbound | Caddy HTTP — redirects to HTTPS | Yes | +| 44200-44300 | UDP | Inbound | LiveKit WebRTC media (audio/video) | Yes (if LiveKit enabled) | +| 7881 | TCP | Inbound | LiveKit TCP media fallback (when UDP is blocked by client network) | Recommended | +| 8888 | TCP | Inbound | Hatchet dashboard (plain HTTP, no TLS) | Optional (admin only) | + +Ports that do NOT need to be open (proxied through Caddy): +- 1250 (backend API) +- 3000 (frontend) +- 7880 (LiveKit signaling — proxied via `/lk-ws`) +- 3900 (Garage S3) + +## Without Caddy (direct access) + +All services need direct port access. Use this only for local development or trusted networks. + +| Port | Protocol | Direction | Service | Required? | +|------|----------|-----------|---------|-----------| +| 3000 | TCP | Inbound | Frontend (Next.js) | Yes | +| 1250 | TCP | Inbound | Backend API (FastAPI) | Yes | +| 7880 | TCP | Inbound | LiveKit signaling (WebSocket) | Yes (if LiveKit enabled) | +| 7881 | TCP | Inbound | LiveKit TCP media fallback | Recommended | +| 44200-44300 | UDP | Inbound | LiveKit WebRTC media | Yes (if LiveKit enabled) | +| 40000-40100 | UDP | Inbound | Reflector WebRTC (browser recording) | Yes (if using browser WebRTC) | +| 3900 | TCP | Inbound | Garage S3 (for presigned URLs in browser) | Yes (if using Garage) | +| 8888 | TCP | Inbound | Hatchet dashboard | Optional | + +> **Important:** Without Caddy, all traffic is plain HTTP. Browsers block microphone/camera access on non-HTTPS pages (except `localhost`). Use `--ip` (which implies Caddy) for any non-localhost deployment. + +## Internal-Only Ports (never expose) + +These ports are used between Docker containers and should NOT be open on the firewall: + +| Port | Service | Purpose | +|------|---------|---------| +| 5432 | PostgreSQL | Database | +| 6379 | Redis | Cache + message broker | +| 7077 | Hatchet gRPC | Worker communication | + +## Cloud Provider Firewall Examples + +### DigitalOcean (with Caddy + LiveKit) + +```bash +# Create firewall +doctl compute firewall create \ + --name reflector \ + --inbound-rules "protocol:tcp,ports:443,address:0.0.0.0/0 protocol:tcp,ports:80,address:0.0.0.0/0 protocol:udp,ports:44200-44300,address:0.0.0.0/0 protocol:tcp,ports:7881,address:0.0.0.0/0 protocol:tcp,ports:22,address:0.0.0.0/0" \ + --outbound-rules "protocol:tcp,ports:all,address:0.0.0.0/0 protocol:udp,ports:all,address:0.0.0.0/0" \ + --droplet-ids +``` + +### AWS Security Group (with Caddy + LiveKit) + +| Type | Port Range | Source | Description | +|------|-----------|--------|-------------| +| HTTPS | 443 | 0.0.0.0/0 | Web app + API + LiveKit signaling | +| HTTP | 80 | 0.0.0.0/0 | Redirect to HTTPS | +| Custom UDP | 44200-44300 | 0.0.0.0/0 | LiveKit WebRTC media | +| Custom TCP | 7881 | 0.0.0.0/0 | LiveKit TCP fallback | +| SSH | 22 | Your IP | Admin access | + +### Ubuntu UFW (with Caddy + LiveKit) + +```bash +sudo ufw allow 443/tcp # Caddy HTTPS +sudo ufw allow 80/tcp # HTTP redirect +sudo ufw allow 7881/tcp # LiveKit TCP fallback +sudo ufw allow 44200:44300/udp # LiveKit WebRTC media +sudo ufw allow 22/tcp # SSH +sudo ufw enable +``` + +## Port Ranges Explained + +### Why 44200-44300 for LiveKit? + +LiveKit's WebRTC ICE candidates use UDP. The port range was chosen to avoid collisions: +- **40000-40100** — Reflector's own WebRTC (browser recording) +- **44200-44300** — LiveKit WebRTC +- **49152-65535** — macOS ephemeral ports (reserved by OS) + +The range is configurable in `livekit.yaml` under `rtc.port_range_start` / `rtc.port_range_end`. If changed, update `docker-compose.selfhosted.yml` port mapping to match. + +### Why 101 ports? + +100 UDP ports support ~100 concurrent WebRTC connections (roughly 50 participants with audio + video). For larger deployments, increase the range in both `livekit.yaml` and `docker-compose.selfhosted.yml`. diff --git a/docsv2/livekit-setup.md b/docsv2/livekit-setup.md index a88374de..ea8a28ae 100644 --- a/docsv2/livekit-setup.md +++ b/docsv2/livekit-setup.md @@ -202,6 +202,17 @@ This avoids mixed-content blocking (browsers reject `ws://` connections on `http Without `--caddy`, browsers connect directly to LiveKit on port 7880 via `ws://`. +### Security Note: on_demand TLS + +When using `--ip` (Caddy with self-signed certs), the Caddyfile uses `tls internal { on_demand }`. This generates certificates dynamically for any hostname/IP on first TLS request. + +**Risk:** An attacker can trigger certificate generation for arbitrary hostnames by sending TLS requests with spoofed SNI values, causing disk and CPU usage. This is a low-severity resource exhaustion risk, not a data theft risk. + +**Mitigations:** +- For LAN/development use: not a concern (not internet-exposed) +- For cloud VMs: restrict port 443 access via firewall to trusted IPs +- For production: use `--domain` with a real domain name instead of `--ip` — Caddy uses Let's Encrypt (no `on_demand` needed) + | Deployment | `LIVEKIT_PUBLIC_URL` | How it works | |---|---|---| | localhost, no Caddy | `ws://localhost:7880` | Direct connection | diff --git a/docsv2/migrate-daily-to-livekit.md b/docsv2/migrate-daily-to-livekit.md new file mode 100644 index 00000000..5c764462 --- /dev/null +++ b/docsv2/migrate-daily-to-livekit.md @@ -0,0 +1,73 @@ +# Migrating from Daily.co to LiveKit + +This guide covers running LiveKit alongside Daily.co or fully replacing it. + +## Both Platforms Run Simultaneously + +LiveKit and Daily.co coexist — the platform is selected **per room**. You don't need to migrate all rooms at once. + +- Existing Daily rooms continue to work as-is +- New rooms can use LiveKit +- Each room's `platform` field determines which video service is used +- Transcripts, topics, summaries work identically regardless of platform + +## Step 1: Enable LiveKit + +Add `--livekit` to your setup command: + +```bash +# If currently running: +./scripts/setup-selfhosted.sh --gpu --ollama-gpu --garage --caddy + +# Add --livekit: +./scripts/setup-selfhosted.sh --gpu --ollama-gpu --livekit --garage --caddy +``` + +This starts `livekit-server` + `livekit-egress` containers alongside your existing stack. + +## Step 2: Set Default Platform + +The setup script automatically sets `DEFAULT_VIDEO_PLATFORM=livekit` in `server/.env`. This means **new rooms** default to LiveKit. Existing rooms keep their current platform. + +To keep Daily as the default for new rooms: +```bash +# In server/.env, change: +DEFAULT_VIDEO_PLATFORM=daily +``` + +## Step 3: Switch Individual Rooms + +In the Rooms admin page, edit any room and change the **Platform** dropdown from "Daily" to "LiveKit". The next meeting in that room will use LiveKit. + +Previously recorded Daily transcripts for that room are unaffected. + +## Step 4: (Optional) Remove Daily.co + +Once all rooms use LiveKit and you no longer need Daily.co: + +1. Remove `DAILY_API_KEY` and related Daily settings from `server/.env` +2. Re-run the setup script — it won't activate the `dailyco` profile +3. Hatchet workers are shared between Daily and LiveKit, so they continue running + +Daily-specific services that stop: +- `hatchet-worker-cpu` with `dailyco` profile (but continues if `livekit` profile is active) +- Daily webhook polling tasks (`poll_daily_recordings`, etc.) + +## What Changes for Users + +| Feature | Daily.co | LiveKit | +|---------|---------|---------| +| Video/audio quality | Daily.co SFU | LiveKit SFU (comparable) | +| Pre-join screen | Daily's built-in iframe | LiveKit PreJoin component (name + device selection) | +| Recording | Starts via REST API from frontend | Auto Track Egress (automatic, no user action) | +| Multitrack audio | Per-participant WebM tracks | Per-participant OGG tracks | +| Transcript quality | Same pipeline | Same pipeline | +| Self-hosted | No (SaaS only) | Yes (fully self-hosted) | + +## Database Changes + +None required. The `platform` field on rooms and meetings already supports `"livekit"`. LiveKit recordings use recording IDs prefixed with `lk-` to distinguish them from Daily recordings. + +## Rollback + +To revert a room back to Daily, just change the Platform dropdown back to "Daily" in the Rooms admin page. No data migration needed. diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index acc15f6b..8fc912ed 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -399,7 +399,9 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe from reflector.db.meetings import ( meetings_controller as mc, # noqa: PLC0415 ) - from reflector.redis_cache import get_redis # noqa: PLC0415 + from reflector.redis_cache import ( + get_async_redis_client, # noqa: PLC0415 + ) meeting = ( await mc.get_by_id(transcript.meeting_id) @@ -407,9 +409,9 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe else None ) if meeting: - redis = await get_redis() + redis_client = await get_async_redis_client() mapping_key = f"livekit:participant_map:{meeting.room_name}" - raw_map = await redis.hgetall(mapping_key) + raw_map = await redis_client.hgetall(mapping_key) identity_to_user_id = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) @@ -572,13 +574,18 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes valid_timestamps = [(i, ts) for i, ts in timestamps if ts is not None] if valid_timestamps: earliest = min(ts for _, ts in valid_timestamps) + # LiveKit Track Egress outputs OGG/Opus files, but the transcription + # service only accepts WebM. The padding step converts OGG→WebM as a + # side effect of applying the adelay filter. For the earliest track + # (offset=0), we use a minimal padding to force this conversion. + LIVEKIT_MIN_PADDING_SECONDS = ( + 0.001 # 1ms — inaudible, forces OGG→WebM conversion + ) + for i, ts in valid_timestamps: offset = (ts - earliest).total_seconds() - # LiveKit tracks are OGG format; even the earliest track (offset=0) - # must go through padding step to convert OGG→WebM for the - # transcription service. Use a tiny padding to force conversion. if offset == 0.0: - offset = 0.001 + offset = LIVEKIT_MIN_PADDING_SECONDS track_padding[i] = offset ctx.log( f"process_tracks: track {i} padding={offset}s (from filename timestamp)" diff --git a/server/reflector/livekit_api/webhooks.py b/server/reflector/livekit_api/webhooks.py index 89fd61f8..59605c7b 100644 --- a/server/reflector/livekit_api/webhooks.py +++ b/server/reflector/livekit_api/webhooks.py @@ -25,11 +25,28 @@ def verify_webhook( """Verify and parse a LiveKit webhook event. Returns the parsed WebhookEvent if valid, None if verification fails. + Logs at different levels depending on failure type: + - WARNING: invalid signature, expired token, malformed JWT (expected rejections) + - ERROR: unexpected exceptions (potential bugs or attacks) """ if isinstance(body, bytes): body = body.decode("utf-8") try: return receiver.receive(body, auth_header) - except Exception as e: - logger.warning("LiveKit webhook verification failed", error=str(e)) + except (ValueError, KeyError) as e: + # Expected verification failures (bad JWT, wrong key, expired, malformed) + logger.warning( + "LiveKit webhook verification failed", + error=str(e), + error_type=type(e).__name__, + ) + return None + except Exception as e: + # Unexpected errors — log at ERROR for visibility (potential attack or SDK bug) + logger.error( + "Unexpected error during LiveKit webhook verification", + error=str(e), + error_type=type(e).__name__, + exc_info=True, + ) return None diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 166a28a1..3ba55f63 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -625,12 +625,12 @@ async def rooms_join_meeting( # Store identity → Reflector user_id mapping for the pipeline # (so TranscriptParticipant.user_id can be set correctly) if user_id: - from reflector.redis_cache import get_redis # noqa: PLC0415 + from reflector.redis_cache import get_async_redis_client # noqa: PLC0415 - redis = await get_redis() + redis_client = await get_async_redis_client() mapping_key = f"livekit:participant_map:{meeting.room_name}" - await redis.hset(mapping_key, participant_identity, user_id) - await redis.expire(mapping_key, 7 * 86400) # 7 day TTL + await redis_client.hset(mapping_key, participant_identity, user_id) + await redis_client.expire(mapping_key, 7 * 86400) # 7 day TTL token = client.create_access_token( room_name=meeting.room_name, @@ -638,6 +638,10 @@ async def rooms_join_meeting( participant_name=participant_name, is_admin=user_id == room.user_id if user_id else False, ) + # Close the platform client to release aiohttp session + if hasattr(client, "close"): + await client.close() + meeting = meeting.model_copy() # For LiveKit, room_url is the WS URL; token goes as a query param meeting.room_url = add_query_param(meeting.room_url, "token", token) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 9df8088a..a9d31c74 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -1239,6 +1239,11 @@ async def _process_livekit_multitrack_inner( meeting_id: str, ): """Inner processing logic for LiveKit multitrack recording.""" + # 1. Discover tracks by listing S3 prefix. + # Wait briefly for egress files to finish flushing to S3 — the room_finished + # webhook fires after empty_timeout, but egress finalization may still be in progress. + import asyncio as _asyncio # noqa: PLC0415 + from reflector.storage import get_source_storage # noqa: PLC0415 from reflector.utils.livekit import ( # noqa: PLC0415 extract_livekit_base_room_name, @@ -1246,32 +1251,60 @@ async def _process_livekit_multitrack_inner( parse_livekit_track_filepath, ) - # 1. Discover tracks by listing S3 prefix + EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds + EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing + + await _asyncio.sleep(EGRESS_FLUSH_DELAY) + storage = get_source_storage("livekit") s3_prefix = f"livekit/{room_name}/" all_keys = await storage.list_objects(prefix=s3_prefix) - if not all_keys: - logger.warning( - "No track files found on S3 for LiveKit room", - room_name=room_name, - s3_prefix=s3_prefix, - ) - return - # Filter to audio tracks only (.ogg) — skip .json manifests and .webm video - audio_keys = filter_audio_tracks(all_keys) + audio_keys = filter_audio_tracks(all_keys) if all_keys else [] + + if not audio_keys: + # Retry once after a longer delay — egress may still be flushing + logger.info( + "No audio tracks found yet, retrying after delay", + room_name=room_name, + retry_delay=EGRESS_RETRY_DELAY, + ) + await _asyncio.sleep(EGRESS_RETRY_DELAY) + all_keys = await storage.list_objects(prefix=s3_prefix) + audio_keys = filter_audio_tracks(all_keys) if all_keys else [] + + # Sanity check: compare audio tracks against egress manifests. + # Each Track Egress (audio or video) produces a .json manifest. + # Video tracks produce .webm files. So expected audio count ≈ manifests - video files. + if all_keys: + manifest_count = sum(1 for k in all_keys if k.endswith(".json")) + video_count = sum(1 for k in all_keys if k.endswith(".webm")) + expected_audio = manifest_count - video_count + if expected_audio > len(audio_keys) and expected_audio > 0: + # Some audio tracks may still be flushing — wait and retry + logger.info( + "Expected more audio tracks based on manifests, waiting for late flushes", + room_name=room_name, + expected=expected_audio, + found=len(audio_keys), + ) + await _asyncio.sleep(EGRESS_RETRY_DELAY) + all_keys = await storage.list_objects(prefix=s3_prefix) + audio_keys = filter_audio_tracks(all_keys) if all_keys else [] + logger.info( - "Found track files on S3", + "S3 track discovery complete", room_name=room_name, - total_files=len(all_keys), + total_files=len(all_keys) if all_keys else 0, audio_files=len(audio_keys), ) if not audio_keys: logger.warning( - "No audio track files found (only manifests/video)", + "No audio track files found on S3 after retries", room_name=room_name, + s3_prefix=s3_prefix, ) return diff --git a/server/tests/test_livekit_backend.py b/server/tests/test_livekit_backend.py new file mode 100644 index 00000000..be3ecaed --- /dev/null +++ b/server/tests/test_livekit_backend.py @@ -0,0 +1,331 @@ +""" +Tests for LiveKit backend: webhook verification, token generation, +display_name sanitization, and platform client behavior. +""" + +import re + +import pytest + +from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook + +# ── Webhook verification ────────────────────────────────────── + + +class TestWebhookVerification: + def _make_receiver(self): + """Create a receiver with test credentials.""" + return create_webhook_receiver( + api_key="test_key", + api_secret="test_secret_that_is_long_enough_for_hmac", + ) + + def test_rejects_empty_auth_header(self): + receiver = self._make_receiver() + result = verify_webhook(receiver, b'{"event":"test"}', "") + assert result is None + + def test_rejects_garbage_auth_header(self): + receiver = self._make_receiver() + result = verify_webhook(receiver, b'{"event":"test"}', "not-a-jwt") + assert result is None + + def test_rejects_empty_body(self): + receiver = self._make_receiver() + result = verify_webhook(receiver, b"", "Bearer some.jwt.token") + assert result is None + + def test_handles_bytes_body(self): + receiver = self._make_receiver() + # Should not crash on bytes input + result = verify_webhook(receiver, b'{"event":"test"}', "invalid") + assert result is None + + def test_handles_string_body(self): + receiver = self._make_receiver() + result = verify_webhook(receiver, '{"event":"test"}', "invalid") + assert result is None + + def test_rejects_wrong_secret(self): + """Webhook signed with different secret should be rejected.""" + receiver = self._make_receiver() + # A JWT signed with a different secret + fake_jwt = "eyJhbGciOiJIUzI1NiJ9.eyJ0ZXN0IjoxfQ.wrong_signature" + result = verify_webhook(receiver, b"{}", fake_jwt) + assert result is None + + +# ── Token generation ────────────────────────────────────────── + + +class TestTokenGeneration: + """Test token generation using the LiveKit SDK directly (no client instantiation).""" + + def _generate_token( + self, room_name="room", identity="user", name=None, admin=False, ttl=86400 + ): + """Generate a token using the SDK directly, avoiding LiveKitAPI client session.""" + from datetime import timedelta + + from livekit.api import AccessToken, VideoGrants + + token = AccessToken( + api_key="test_key", api_secret="test_secret_that_is_long_enough_for_hmac" + ) + token.identity = identity + token.name = name or identity + token.ttl = timedelta(seconds=ttl) + token.with_grants( + VideoGrants( + room_join=True, + room=room_name, + can_publish=True, + can_subscribe=True, + room_admin=admin, + ) + ) + return token.to_jwt() + + def _decode_claims(self, token): + import base64 + import json + + payload = token.split(".")[1] + payload += "=" * (4 - len(payload) % 4) + return json.loads(base64.b64decode(payload)) + + def test_creates_valid_jwt(self): + token = self._generate_token( + room_name="test-room", identity="user123", name="Test User" + ) + assert isinstance(token, str) + assert len(token.split(".")) == 3 + + def test_token_includes_room_name(self): + token = self._generate_token(room_name="my-room-20260401", identity="alice") + claims = self._decode_claims(token) + assert claims.get("video", {}).get("room") == "my-room-20260401" + assert claims.get("sub") == "alice" + + def test_token_respects_admin_flag(self): + token = self._generate_token(identity="admin", admin=True) + claims = self._decode_claims(token) + assert claims["video"]["roomAdmin"] is True + + def test_token_non_admin_by_default(self): + token = self._generate_token(identity="user") + claims = self._decode_claims(token) + assert claims.get("video", {}).get("roomAdmin") in (None, False) + + def test_ttl_is_timedelta(self): + """Verify ttl as timedelta works (previous bug: int caused TypeError).""" + token = self._generate_token(ttl=3600) + assert isinstance(token, str) + + +# ── Display name sanitization ───────────────────────────────── + + +class TestDisplayNameSanitization: + """Test the sanitization logic from rooms.py join endpoint.""" + + def _sanitize(self, display_name: str) -> str: + """Replicate the sanitization from rooms_join_meeting.""" + safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", display_name.strip())[:40] + return safe_name + + def test_normal_name(self): + assert self._sanitize("Alice") == "Alice" + + def test_name_with_spaces(self): + assert self._sanitize("John Doe") == "John_Doe" + + def test_name_with_special_chars(self): + assert self._sanitize("user@email.com") == "user_email_com" + + def test_name_with_unicode(self): + result = self._sanitize("José García") + assert result == "Jos__Garc_a" + assert all( + c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-" + for c in result + ) + + def test_name_with_emoji(self): + result = self._sanitize("👋 Hello") + assert "_" in result # Emoji replaced with underscore + assert "Hello" in result + + def test_very_long_name(self): + long_name = "A" * 100 + result = self._sanitize(long_name) + assert len(result) == 40 + + def test_empty_name(self): + result = self._sanitize("") + assert result == "" + + def test_only_special_chars(self): + result = self._sanitize("!!!") + assert result == "___" + + def test_whitespace_stripped(self): + result = self._sanitize(" Alice ") + assert result == "Alice" + + def test_hyphens_preserved(self): + assert self._sanitize("first-last") == "first-last" + + def test_underscores_preserved(self): + assert self._sanitize("first_last") == "first_last" + + def test_html_injection(self): + result = self._sanitize("") + assert "<" not in result + assert ">" not in result + assert "'" not in result + + +# ── S3 egress configuration ─────────────────────────────────── + + +class TestS3EgressConfig: + """Test S3Upload construction using the SDK directly.""" + + def test_build_s3_upload_requires_all_fields(self): + # Missing fields should raise or produce invalid config + # The validation happens in our client wrapper, not the SDK + # Test the validation logic directly + s3_bucket = None + s3_access_key = "AKID" + s3_secret_key = "secret" + assert not all([s3_bucket, s3_access_key, s3_secret_key]) + + def test_s3_upload_with_credentials(self): + from livekit.api import S3Upload + + upload = S3Upload( + access_key="AKID", + secret="secret123", + bucket="test-bucket", + region="us-east-1", + force_path_style=True, + ) + assert upload.bucket == "test-bucket" + assert upload.force_path_style is True + + def test_s3_upload_with_endpoint(self): + from livekit.api import S3Upload + + upload = S3Upload( + access_key="AKID", + secret="secret", + bucket="bucket", + region="us-east-1", + force_path_style=True, + endpoint="http://garage:3900", + ) + assert upload.endpoint == "http://garage:3900" + + +# ── Platform detection ──────────────────────────────────────── + + +# ── Redis participant mapping ────────────────────────────── + + +class TestParticipantIdentityMapping: + """Test the identity → user_id Redis mapping pattern.""" + + def test_mapping_key_format(self): + room_name = "myroom-20260401172036" + mapping_key = f"livekit:participant_map:{room_name}" + assert mapping_key == "livekit:participant_map:myroom-20260401172036" + + def test_identity_with_uuid_suffix_is_unique(self): + import uuid + + name = "Juan" + id1 = f"{name}-{uuid.uuid4().hex[:6]}" + id2 = f"{name}-{uuid.uuid4().hex[:6]}" + assert id1 != id2 + assert id1.startswith("Juan-") + assert id2.startswith("Juan-") + + def test_strip_uuid_suffix_for_display(self): + """Pipeline strips UUID suffix for display name.""" + identity = "Juan-2bcea0" + display_name = identity.rsplit("-", 1)[0] if "-" in identity else identity + assert display_name == "Juan" + + def test_strip_uuid_preserves_hyphenated_names(self): + identity = "Mary-Jane-abc123" + display_name = identity.rsplit("-", 1)[0] if "-" in identity else identity + assert display_name == "Mary-Jane" + + def test_anon_identity_no_user_id(self): + """Anonymous participants should not have a user_id mapping.""" + identity = "anon-abc123" + # In the pipeline, anon identities don't get looked up + assert identity.startswith("anon-") + + @pytest.mark.asyncio + async def test_redis_hset_hgetall_roundtrip(self): + """Test the actual Redis operations used for participant mapping.""" + try: + from reflector.redis_cache import get_async_redis_client + + redis_client = await get_async_redis_client() + test_key = "livekit:participant_map:__test_room__" + + # Write + await redis_client.hset(test_key, "Juan-abc123", "user-id-1") + await redis_client.hset(test_key, "Alice-def456", "user-id-2") + + # Read + raw_map = await redis_client.hgetall(test_key) + decoded = { + k.decode() if isinstance(k, bytes) else k: v.decode() + if isinstance(v, bytes) + else v + for k, v in raw_map.items() + } + + assert decoded["Juan-abc123"] == "user-id-1" + assert decoded["Alice-def456"] == "user-id-2" + + # Cleanup + await redis_client.delete(test_key) + except Exception: + pytest.skip("Redis not available") + + +# ── Platform detection ──────────────────────────────────────── + + +class TestSourcePlatformDetection: + """Test the recording ID prefix-based platform detection from transcript_process.py.""" + + def test_livekit_prefix(self): + recording_id = "lk-livekit-20260401234423" + platform = "livekit" if recording_id.startswith("lk-") else "daily" + assert platform == "livekit" + + def test_daily_no_prefix(self): + recording_id = "08fa0b24-9220-44c5-846c-3f116cf8e738" + platform = "livekit" if recording_id.startswith("lk-") else "daily" + assert platform == "daily" + + def test_none_recording_id(self): + recording_id = None + platform = ( + "livekit" if recording_id and recording_id.startswith("lk-") else "daily" + ) + assert platform == "daily" + + def test_empty_recording_id(self): + recording_id = "" + platform = ( + "livekit" if recording_id and recording_id.startswith("lk-") else "daily" + ) + assert platform == "daily" diff --git a/www/app/[roomName]/components/LiveKitRoom.tsx b/www/app/[roomName]/components/LiveKitRoom.tsx index 2689bf92..e6d419fa 100644 --- a/www/app/[roomName]/components/LiveKitRoom.tsx +++ b/www/app/[roomName]/components/LiveKitRoom.tsx @@ -87,11 +87,19 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { }); const showEmailFeature = featureEnabled("emailTranscript"); - // ── PreJoin defaults ────────────────────────────────────── - const defaultUsername = - auth.status === "authenticated" || auth.status === "refreshing" - ? auth.user.email?.split("@")[0] || auth.user.id?.slice(0, 12) || "" - : ""; + // ── PreJoin defaults (persisted to localStorage for page refresh) ── + const STORAGE_KEY = `livekit-username-${roomName}`; + const defaultUsername = (() => { + if (typeof window !== "undefined") { + const saved = localStorage.getItem(STORAGE_KEY); + if (saved) return saved; + } + if (auth.status === "authenticated" || auth.status === "refreshing") { + return auth.user.email?.split("@")[0] || auth.user.id?.slice(0, 12) || ""; + } + return ""; + })(); + const isJoining = !!userChoices && !joinedMeeting && !connectionError; // ── Join meeting via backend API after PreJoin submit ───── useEffect(() => { @@ -129,9 +137,16 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { router.push("/browse"); }, [router]); - const handlePreJoinSubmit = useCallback((choices: LocalUserChoices) => { - setUserChoices(choices); - }, []); + const handlePreJoinSubmit = useCallback( + (choices: LocalUserChoices) => { + // Persist username for page refresh + if (choices.username) { + localStorage.setItem(STORAGE_KEY, choices.username); + } + setUserChoices(choices); + }, + [STORAGE_KEY], + ); // ── PreJoin screen (name + device selection) ────────────── if (!userChoices) { @@ -159,6 +174,14 @@ export default function LiveKitRoom({ meeting, room }: LiveKitRoomProps) { } // ── Loading / error states ──────────────────────────────── + if (isJoining) { + return ( +
+ +
+ ); + } + if (connectionError) { return (