diff --git a/.gitignore b/.gitignore index 1eb3a8bb..2cebdf5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store server/.env +server/.env.production .env Caddyfile server/exportdanswer diff --git a/.gitleaksignore b/.gitleaksignore index 141c82d5..8f2af36e 100644 --- a/.gitleaksignore +++ b/.gitleaksignore @@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 server/reflector/worker/process.py:generic-api-key:465 +server/reflector/worker/process.py:generic-api-key:594 diff --git a/CHANGELOG.md b/CHANGELOG.md index fefb45c3..b73abd7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03) + + +### Bug Fixes + +* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac)) +* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc)) +* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082)) + +## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30) + + +### Bug Fixes + +* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e)) +* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0)) + +## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30) + + +### Features + +* modal padding ([#837](https://github.com/Monadical-SAS/reflector/issues/837)) ([7fde64e](https://github.com/Monadical-SAS/reflector/commit/7fde64e2529a1d37b0f7507c62d983a7bd0b5b89)) + +## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23) + + +### Features + +* mixdown optional ([#834](https://github.com/Monadical-SAS/reflector/issues/834)) ([fc3ef6c](https://github.com/Monadical-SAS/reflector/commit/fc3ef6c8933231c731fad84e7477a476a6220a5e)) + ## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23) diff --git a/Caddyfile.example b/Caddyfile.example index ebbaabdf..f99f6336 100644 --- a/Caddyfile.example +++ b/Caddyfile.example @@ -1,6 +1,8 @@ -# Reflector Caddyfile -# Replace example.com with your actual domains -# CORS is handled by the backend - Caddy just proxies +# Reflector Caddyfile (optional reverse proxy) +# Use this only when you run Caddy via: docker compose -f docker-compose.prod.yml --profile caddy up -d +# If Coolify, Traefik, or nginx already use ports 80/443, do NOT start Caddy; point your proxy at web:3000 and server:1250. +# +# Replace example.com with your actual domains. CORS is handled by the backend - Caddy just proxies. # # For environment variable substitution, set: # FRONTEND_DOMAIN=app.example.com diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index f897a624..db87264b 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -1,9 +1,14 @@ # Production Docker Compose configuration # Usage: docker compose -f docker-compose.prod.yml up -d # +# Caddy (reverse proxy on ports 80/443) is OPTIONAL and behind the "caddy" profile: +# - With Caddy (self-hosted, you manage SSL): docker compose -f docker-compose.prod.yml --profile caddy up -d +# - Without Caddy (Coolify/Traefik/nginx already on 80/443): docker compose -f docker-compose.prod.yml up -d +# Then point your proxy at web:3000 (frontend) and server:1250 (API). +# # Prerequisites: # 1. Copy .env.example to .env and configure for both server/ and www/ -# 2. Copy Caddyfile.example to Caddyfile and edit with your domains +# 2. If using Caddy: copy Caddyfile.example to Caddyfile and edit your domains # 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh) services: @@ -84,6 +89,8 @@ services: retries: 3 caddy: + profiles: + - caddy image: caddy:2-alpine restart: unless-stopped ports: diff --git a/docs/docs/installation/docker-setup.md b/docs/docs/installation/docker-setup.md index 701ad15e..499ce92d 100644 --- a/docs/docs/installation/docker-setup.md +++ b/docs/docs/installation/docker-setup.md @@ -11,15 +11,15 @@ This page documents the Docker Compose configuration for Reflector. For the comp The `docker-compose.prod.yml` includes these services: -| Service | Image | Purpose | -|---------|-------|---------| -| `web` | `monadicalsas/reflector-frontend` | Next.js frontend | -| `server` | `monadicalsas/reflector-backend` | FastAPI backend | -| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks | -| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler | -| `redis` | `redis:7.2-alpine` | Message broker and cache | -| `postgres` | `postgres:17-alpine` | Primary database | -| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL | +| Service | Image | Purpose | +| ---------- | --------------------------------- | --------------------------------------------------------------------------- | +| `web` | `monadicalsas/reflector-frontend` | Next.js frontend | +| `server` | `monadicalsas/reflector-backend` | FastAPI backend | +| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks | +| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler | +| `redis` | `redis:7.2-alpine` | Message broker and cache | +| `postgres` | `postgres:17-alpine` | Primary database | +| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL (optional; see [Caddy profile](#caddy-profile)) | ## Environment Files @@ -30,6 +30,7 @@ Reflector uses two separate environment files: Used by: `server`, `worker`, `beat` Key variables: + ```env # Database connection DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector @@ -54,6 +55,7 @@ TRANSCRIPT_MODAL_API_KEY=... Used by: `web` Key variables: + ```env # Domain configuration SITE_URL=https://app.example.com @@ -70,26 +72,42 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s ## Volumes -| Volume | Purpose | -|--------|---------| -| `redis_data` | Redis persistence | -| `postgres_data` | PostgreSQL data | -| `server_data` | Uploaded files, local storage | -| `caddy_data` | SSL certificates | -| `caddy_config` | Caddy configuration | +| Volume | Purpose | +| --------------- | ----------------------------- | +| `redis_data` | Redis persistence | +| `postgres_data` | PostgreSQL data | +| `server_data` | Uploaded files, local storage | +| `caddy_data` | SSL certificates | +| `caddy_config` | Caddy configuration | ## Network All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join. +## Caddy profile + +Caddy (ports 80 and 443) is **optional** and behind the `caddy` profile so it does not conflict with an existing reverse proxy (e.g. Coolify, Traefik, nginx). + +- **With Caddy** (you want Reflector to handle SSL): + `docker compose -f docker-compose.prod.yml --profile caddy up -d` +- **Without Caddy** (Coolify or another proxy already on 80/443): + `docker compose -f docker-compose.prod.yml up -d` + Then configure your proxy to send traffic to `web:3000` (frontend) and `server:1250` (API). + ## Common Commands ### Start all services + ```bash +# Without Caddy (e.g. when using Coolify) docker compose -f docker-compose.prod.yml up -d + +# With Caddy as reverse proxy +docker compose -f docker-compose.prod.yml --profile caddy up -d ``` ### View logs + ```bash # All services docker compose -f docker-compose.prod.yml logs -f @@ -99,6 +117,7 @@ docker compose -f docker-compose.prod.yml logs server --tail 50 ``` ### Restart a service + ```bash # Quick restart (doesn't reload .env changes) docker compose -f docker-compose.prod.yml restart server @@ -108,27 +127,32 @@ docker compose -f docker-compose.prod.yml up -d server ``` ### Run database migrations + ```bash docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head ``` ### Access database + ```bash docker compose -f docker-compose.prod.yml exec postgres psql -U reflector ``` ### Pull latest images + ```bash docker compose -f docker-compose.prod.yml pull docker compose -f docker-compose.prod.yml up -d ``` ### Stop all services + ```bash docker compose -f docker-compose.prod.yml down ``` ### Full reset (WARNING: deletes data) + ```bash docker compose -f docker-compose.prod.yml down -v ``` @@ -187,6 +211,7 @@ The Caddyfile supports environment variable substitution: Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly. ### Reload Caddy after changes + ```bash docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile ``` diff --git a/docs/docs/installation/overview.md b/docs/docs/installation/overview.md index f6218d64..9dca5ed7 100644 --- a/docs/docs/installation/overview.md +++ b/docs/docs/installation/overview.md @@ -26,7 +26,7 @@ flowchart LR Before starting, you need: -- **Production server** - 4+ cores, 8GB+ RAM, public IP +- **Production server** - 4+ cores, 8GB+ RAM, public IP - **Two domain names** - e.g., `app.example.com` (frontend) and `api.example.com` (backend) - **GPU processing** - Choose one: - Modal.com account, OR @@ -60,16 +60,17 @@ Type: A Name: api Value: Reflector requires GPU processing for transcription and speaker diarization. Choose one option: -| | **Modal.com (Cloud)** | **Self-Hosted GPU** | -|---|---|---| +| | **Modal.com (Cloud)** | **Self-Hosted GPU** | +| ------------ | --------------------------------- | ---------------------------- | | **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control | -| **Pricing** | Pay-per-use | Fixed infrastructure cost | +| **Pricing** | Pay-per-use | Fixed infrastructure cost | ### Option A: Modal.com (Serverless Cloud GPU) #### Accept HuggingFace Licenses Visit both pages and click "Accept": + - https://huggingface.co/pyannote/speaker-diarization-3.1 - https://huggingface.co/pyannote/segmentation-3.0 @@ -179,6 +180,7 @@ Save these credentials - you'll need them in the next step. ## Configure Environment Reflector has two env files: + - `server/.env` - Backend configuration - `www/.env` - Frontend configuration @@ -190,6 +192,7 @@ nano server/.env ``` **Required settings:** + ```env # Database (defaults work with docker-compose.prod.yml) DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector @@ -249,6 +252,7 @@ nano www/.env ``` **Required settings:** + ```env # Your domains SITE_URL=https://app.example.com @@ -266,7 +270,11 @@ FEATURE_REQUIRE_LOGIN=false --- -## Configure Caddy +## Reverse proxy (Caddy or existing) + +**If Coolify, Traefik, or nginx already use ports 80/443** (e.g. Coolify on your host): skip Caddy. Start the stack without the Caddy profile (see [Start Services](#start-services) below), then point your proxy at `web:3000` (frontend) and `server:1250` (API). + +**If you want Reflector to provide the reverse proxy and SSL:** ```bash cp Caddyfile.example Caddyfile @@ -289,10 +297,18 @@ Replace `example.com` with your domains. The `{$VAR:default}` syntax uses Caddy' ## Start Services +**Without Caddy** (e.g. Coolify already on 80/443): + ```bash docker compose -f docker-compose.prod.yml up -d ``` +**With Caddy** (Reflector handles SSL): + +```bash +docker compose -f docker-compose.prod.yml --profile caddy up -d +``` + Wait for containers to start (first run may take 1-2 minutes to pull images and initialize). --- @@ -300,18 +316,21 @@ Wait for containers to start (first run may take 1-2 minutes to pull images and ## Verify Deployment ### Check services + ```bash docker compose -f docker-compose.prod.yml ps # All should show "Up" ``` ### Test API + ```bash curl https://api.example.com/health # Should return: {"status":"healthy"} ``` ### Test Frontend + - Visit https://app.example.com - You should see the Reflector interface - Try uploading an audio file to test transcription @@ -327,6 +346,7 @@ By default, Reflector is open (no login required). **Authentication is required See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration. Quick summary: + 1. Deploy Authentik on your server 2. Create OAuth provider in Authentik 3. Extract public key for JWT verification @@ -358,6 +378,7 @@ DAILYCO_STORAGE_AWS_ROLE_ARN= ``` Reload env and restart: + ```bash docker compose -f docker-compose.prod.yml up -d server worker ``` @@ -367,35 +388,43 @@ docker compose -f docker-compose.prod.yml up -d server worker ## Troubleshooting ### Check logs for errors + ```bash docker compose -f docker-compose.prod.yml logs server --tail 20 docker compose -f docker-compose.prod.yml logs worker --tail 20 ``` ### Services won't start + ```bash docker compose -f docker-compose.prod.yml logs ``` ### CORS errors in browser + - Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`) - Reload env: `docker compose -f docker-compose.prod.yml up -d server` -### SSL certificate errors +### SSL certificate errors (when using Caddy) + - Caddy auto-provisions Let's Encrypt certificates -- Ensure ports 80 and 443 are open +- Ensure ports 80 and 443 are open and not used by another proxy - Check: `docker compose -f docker-compose.prod.yml logs caddy` +- If port 80 is already in use (e.g. by Coolify), run without Caddy: `docker compose -f docker-compose.prod.yml up -d` and use your existing proxy ### Transcription not working + - Check Modal dashboard: https://modal.com/apps - Verify URLs in `server/.env` match deployed functions - Check worker logs: `docker compose -f docker-compose.prod.yml logs worker` ### "Login required" but auth not configured + - Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env` - Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web` ### Database migrations or connectivity issues + Migrations run automatically on server startup. To check database connectivity or debug migration failures: ```bash @@ -408,4 +437,3 @@ docker compose -f docker-compose.prod.yml exec server uv run python -c "from ref # Manually run migrations (if needed) docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head ``` - diff --git a/gpu/modal_deployments/deploy-all.sh b/gpu/modal_deployments/deploy-all.sh index 42e589a2..8eae3e66 100755 --- a/gpu/modal_deployments/deploy-all.sh +++ b/gpu/modal_deployments/deploy-all.sh @@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then fi echo " -> $DIARIZER_URL" +echo "" +echo "Deploying padding (CPU audio processing via Modal SDK)..." +modal deploy reflector_padding.py +if [ $? -ne 0 ]; then + echo "Error: Failed to deploy padding. Check Modal dashboard for details." + exit 1 +fi +echo " -> reflector-padding.pad_track (Modal SDK function)" + # --- Output Configuration --- echo "" echo "==========================================" @@ -147,4 +156,6 @@ echo "" echo "DIARIZATION_BACKEND=modal" echo "DIARIZATION_URL=$DIARIZER_URL" echo "DIARIZATION_MODAL_API_KEY=$API_KEY" +echo "" +echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)" echo "# --- End Modal Configuration ---" diff --git a/gpu/modal_deployments/reflector_padding.py b/gpu/modal_deployments/reflector_padding.py new file mode 100644 index 00000000..809131e6 --- /dev/null +++ b/gpu/modal_deployments/reflector_padding.py @@ -0,0 +1,277 @@ +""" +Reflector GPU backend - audio padding +====================================== + +CPU-intensive audio padding service for adding silence to audio tracks. +Uses PyAV filter graph (adelay) for precise track synchronization. + +IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py +for Modal deployment isolation (Modal can't import from server/reflector/). If you modify +the PyAV filter graph or padding algorithm, you MUST update both: + - gpu/modal_deployments/reflector_padding.py (this file) + - server/reflector/utils/audio_padding.py + +Constants duplicated from server/reflector/utils/audio_constants.py for same reason. +""" + +import os +import tempfile +from fractions import Fraction +import math +import asyncio + +import modal + +S3_TIMEOUT = 60 # happens 2 times +PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2) +SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down. +DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect + + +app = modal.App("reflector-padding") + +# CPU-based image +image = ( + modal.Image.debian_slim(python_version="3.12") + .apt_install("ffmpeg") # Required by PyAV + .pip_install( + "av==13.1.0", # PyAV for audio processing + "requests==2.32.3", # HTTP for presigned URL downloads/uploads + "fastapi==0.115.12", # API framework + ) +) + +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 +OPUS_STANDARD_SAMPLE_RATE = 48000 +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 +OPUS_DEFAULT_BIT_RATE = 128000 + + +@app.function( + cpu=2.0, + timeout=PADDING_TIMEOUT, + scaledown_window=SCALEDOWN_WINDOW, + image=image, +) +@modal.asgi_app() +def web(): + from fastapi import FastAPI, Request, HTTPException + from pydantic import BaseModel + + class PaddingRequest(BaseModel): + track_url: str + output_url: str + start_time_seconds: float + track_index: int + + class PaddingResponse(BaseModel): + size: int + cancelled: bool = False + + web_app = FastAPI() + + @web_app.post("/pad") + async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse: + """Modal web endpoint for padding audio tracks with disconnect detection. + """ + import logging + + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logger = logging.getLogger(__name__) + + if not req.track_url: + raise HTTPException(status_code=400, detail="track_url cannot be empty") + if not req.output_url: + raise HTTPException(status_code=400, detail="output_url cannot be empty") + if req.start_time_seconds <= 0: + raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}") + if req.start_time_seconds > 18000: + raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)") + + logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s") + + # Thread-safe cancellation flag shared between async disconnect checker and blocking thread + import threading + cancelled = threading.Event() + + async def check_disconnect(): + """Background task to check for client disconnect every 2 seconds.""" + while not cancelled.is_set(): + await asyncio.sleep(DISCONNECT_CHECK_INTERVAL) + if await request.is_disconnected(): + logger.warning("Client disconnected, setting cancellation flag") + cancelled.set() + break + + # Start disconnect checker in background + disconnect_task = asyncio.create_task(check_disconnect()) + + try: + result = await asyncio.get_event_loop().run_in_executor( + None, _pad_track_blocking, req, cancelled, logger + ) + return PaddingResponse(**result) + finally: + cancelled.set() + disconnect_task.cancel() + try: + await disconnect_task + except asyncio.CancelledError: + pass + + def _pad_track_blocking(req, cancelled, logger) -> dict: + """Blocking CPU-bound padding work with periodic cancellation checks. + + Args: + cancelled: threading.Event for thread-safe cancellation signaling + """ + import av + import requests + from av.audio.resampler import AudioResampler + import time + + temp_dir = tempfile.mkdtemp() + input_path = None + output_path = None + last_check = time.time() + + try: + logger.info("Downloading track for padding") + response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + input_path = os.path.join(temp_dir, "track.webm") + total_bytes = 0 + chunk_count = 0 + with open(input_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + chunk_count += 1 + + # Check for cancellation every arbitrary amount of chunks + if chunk_count % 12 == 0: + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info("Cancelled during download, exiting early") + return {"size": 0, "cancelled": True} + last_check = now + logger.info(f"Track downloaded: {total_bytes} bytes") + + if cancelled.is_set(): + logger.info("Cancelled after download, exiting early") + return {"size": 0, "cancelled": True} + + # Apply padding using PyAV + output_path = os.path.join(temp_dir, "padded.webm") + delay_ms = math.floor(req.start_time_seconds * 1000) + logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV") + + in_container = av.open(input_path) + in_stream = next((s for s in in_container.streams if s.type == "audio"), None) + if in_stream is None: + raise ValueError("No audio stream in input") + + with av.open(output_path, "w", format="webm") as out_container: + out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE) + out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE + graph = av.filter.Graph() + + abuf_args = ( + f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_fmt=s16:" + f"channel_layout=stereo" + ) + src = graph.add("abuffer", args=abuf_args, name="src") + aresample_f = graph.add("aresample", args="async=1", name="ares") + delays_arg = f"{delay_ms}|{delay_ms}" + adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay") + sink = graph.add("abuffersink", name="sink") + + src.link_to(aresample_f) + aresample_f.link_to(adelay_f) + adelay_f.link_to(sink) + graph.configure() + + resampler = AudioResampler( + format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE + ) + + for frame in in_container.decode(in_stream): + # Check for cancellation periodically + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info("Cancelled during processing, exiting early") + in_container.close() + return {"size": 0, "cancelled": True} + last_check = now + + out_frames = resampler.resample(frame) or [] + for rframe in out_frames: + rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE + rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + src.push(rframe) + + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush filter graph + src.push(None) + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + + in_container.close() + + file_size = os.path.getsize(output_path) + logger.info(f"Padding complete: {file_size} bytes") + + logger.info("Uploading padded track to S3") + + with open(output_path, "rb") as f: + upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT) + + upload_response.raise_for_status() + logger.info(f"Upload complete: {file_size} bytes") + + return {"size": file_size} + + finally: + if input_path and os.path.exists(input_path): + try: + os.unlink(input_path) + except Exception as e: + logger.warning(f"Failed to cleanup input file: {e}") + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + logger.warning(f"Failed to cleanup output file: {e}") + try: + os.rmdir(temp_dir) + except Exception as e: + logger.warning(f"Failed to cleanup temp directory: {e}") + + return web_app + diff --git a/gpu/self_hosted/Dockerfile b/gpu/self_hosted/Dockerfile index 4865fcc0..8fd56b66 100644 --- a/gpu/self_hosted/Dockerfile +++ b/gpu/self_hosted/Dockerfile @@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \ UV_LINK_MODE=copy \ UV_NO_CACHE=1 +# patch until nvidia updates the sha1 repo +ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config + WORKDIR /tmp -RUN apt-get update \ +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt-get update \ && apt-get install -y \ ffmpeg \ curl \ ca-certificates \ gnupg \ - wget \ - && apt-get clean + wget # Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12 ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb -RUN dpkg -i /cuda-keyring.deb \ +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + dpkg -i /cuda-keyring.deb \ && rm /cuda-keyring.deb \ && apt-get update \ && apt-get install -y --no-install-recommends \ cuda-cudart-12-6 \ libcublas-12-6 \ libcudnn9-cuda-12 \ - libcudnn9-dev-cuda-12 \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* + libcudnn9-dev-cuda-12 ADD https://astral.sh/uv/install.sh /uv-installer.sh RUN sh /uv-installer.sh && rm /uv-installer.sh ENV PATH="/root/.local/bin/:$PATH" @@ -39,6 +43,13 @@ COPY ./app /app/app COPY ./main.py /app/ COPY ./runserver.sh /app/ +# prevent uv failing with too many open files on big cpus +ENV UV_CONCURRENT_INSTALLS=16 + +# first install +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --compile-bytecode --locked + EXPOSE 8000 CMD ["sh", "/app/runserver.sh"] diff --git a/gpu/self_hosted/sequoia.config b/gpu/self_hosted/sequoia.config new file mode 100644 index 00000000..bced077b --- /dev/null +++ b/gpu/self_hosted/sequoia.config @@ -0,0 +1,2 @@ +[hash_algorithms] +sha1 = "always" diff --git a/server/pyproject.toml b/server/pyproject.toml index df6cf536..1cbb5320 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" dependencies = [ "aiohttp>=3.9.0", "aiohttp-cors>=0.7.0", - "av>=10.0.0", + "av>=15.0.0", "requests>=2.31.0", "aiortc>=1.5.0", "sortedcontainers>=2.4.0", diff --git a/server/reflector/hatchet/constants.py b/server/reflector/hatchet/constants.py index fbe6d25b..b3810ad6 100644 --- a/server/reflector/hatchet/constants.py +++ b/server/reflector/hatchet/constants.py @@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10 # Task execution timeouts (seconds) TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates -TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation +TIMEOUT_MEDIUM = ( + 300 # Single LLM calls, waveform generation (5m for slow LLM responses) +) TIMEOUT_LONG = 180 # Action items (larger context LLM) -TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown +TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 2d1ab194..188133c7 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe mtg_session_id = recording.mtg_session_id async with fresh_db_connection(): from reflector.db.transcripts import ( # noqa: PLC0415 + TranscriptDuration, TranscriptParticipant, transcripts_controller, ) @@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe if not transcript: raise ValueError(f"Transcript {input.transcript_id} not found") # Note: title NOT cleared - preserves existing titles + # Duration from Daily API (seconds -> milliseconds) - master source + duration_ms = recording.duration * 1000 if recording.duration else 0 await transcripts_controller.update( transcript, { "events": [], "topics": [], "participants": [], + "duration": duration_ms, }, ) + await append_event_and_broadcast( + input.transcript_id, + transcript, + "DURATION", + TranscriptDuration(duration=duration_ms), + logger=logger, + ) + mtg_session_id = assert_non_none_and_non_empty( mtg_session_id, "mtg_session_id is required" ) @@ -1095,7 +1107,7 @@ async def identify_action_items( @daily_multitrack_pipeline.task( - parents=[generate_waveform, generate_title, generate_recap, identify_action_items], + parents=[process_tracks, generate_title, generate_recap, identify_action_items], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3, ) @@ -1108,12 +1120,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: """ ctx.log("finalize: saving transcript and setting status to 'ended'") - mixdown_result = ctx.task_output(mixdown_tracks) track_result = ctx.task_output(process_tracks) - duration = mixdown_result.duration - all_words = track_result.all_words - # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) created_padded_files = track_result.created_padded_files if created_padded_files: @@ -1133,7 +1141,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: async with fresh_db_connection(): from reflector.db.transcripts import ( # noqa: PLC0415 - TranscriptDuration, TranscriptText, transcripts_controller, ) @@ -1142,8 +1149,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: if transcript is None: raise ValueError(f"Transcript {input.transcript_id} not found in database") - merged_transcript = TranscriptType(words=all_words, translation=None) - await append_event_and_broadcast( input.transcript_id, transcript, @@ -1155,21 +1160,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: logger=logger, ) - # Save duration and clear workflow_run_id (workflow completed successfully) - # Note: title/long_summary/short_summary already saved by their callbacks + # Clear workflow_run_id (workflow completed successfully) + # Note: title/long_summary/short_summary/duration already saved by their callbacks await transcripts_controller.update( transcript, { - "duration": duration, "workflow_run_id": None, # Clear on success - no need to resume }, ) - duration_data = TranscriptDuration(duration=duration) - await append_event_and_broadcast( - input.transcript_id, transcript, "DURATION", duration_data, logger=logger - ) - await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) ctx.log( diff --git a/server/reflector/hatchet/workflows/padding_workflow.py b/server/reflector/hatchet/workflows/padding_workflow.py new file mode 100644 index 00000000..229da5e2 --- /dev/null +++ b/server/reflector/hatchet/workflows/padding_workflow.py @@ -0,0 +1,165 @@ +""" +Hatchet child workflow: PaddingWorkflow +Handles individual audio track padding via Modal.com backend. +""" + +from datetime import timedelta + +import av +from hatchet_sdk import Context +from pydantic import BaseModel + +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.constants import TIMEOUT_AUDIO +from reflector.hatchet.workflows.models import PadTrackResult +from reflector.logger import logger +from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS +from reflector.utils.audio_padding import extract_stream_start_time_from_container + + +class PaddingInput(BaseModel): + """Input for individual track padding.""" + + track_index: int + s3_key: str + bucket_name: str + transcript_id: str + + +hatchet = HatchetClientManager.get_client() + +padding_workflow = hatchet.workflow( + name="PaddingWorkflow", input_validator=PaddingInput +) + + +@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3) +async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: + """Pad audio track with silence based on WebM container start_time.""" + ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}") + logger.info( + "[Hatchet] pad_track", + track_index=input.track_index, + s3_key=input.s3_key, + transcript_id=input.transcript_id, + ) + + try: + # Create fresh storage instance to avoid aioboto3 fork issues + from reflector.settings import settings # noqa: PLC0415 + from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415 + + storage = AwsStorage( + aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + source_url = await storage.get_file_url( + input.s3_key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + bucket=input.bucket_name, + ) + + # Extract start_time to determine if padding needed + with av.open(source_url) as in_container: + if in_container.duration: + try: + duration = timedelta(seconds=in_container.duration // 1_000_000) + ctx.log( + f"pad_track: track {input.track_index}, duration={duration}" + ) + except (ValueError, TypeError, OverflowError) as e: + ctx.log( + f"pad_track: track {input.track_index}, duration error: {str(e)}" + ) + + start_time_seconds = extract_stream_start_time_from_container( + in_container, input.track_index, logger=logger + ) + + if start_time_seconds <= 0: + logger.info( + f"Track {input.track_index} requires no padding", + track_index=input.track_index, + ) + return PadTrackResult( + padded_key=input.s3_key, + bucket_name=input.bucket_name, + size=0, + track_index=input.track_index, + ) + + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" + + # Presign PUT URL for output (Modal will upload directly) + output_url = await storage.get_file_url( + storage_path, + operation="put_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) + + import httpx # noqa: PLC0415 + + from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 + AudioPaddingModalProcessor, + ) + + try: + processor = AudioPaddingModalProcessor() + result = await processor.pad_track( + track_url=source_url, + output_url=output_url, + start_time_seconds=start_time_seconds, + track_index=input.track_index, + ) + file_size = result.size + + ctx.log(f"pad_track: Modal returned size={file_size}") + except httpx.HTTPStatusError as e: + error_detail = e.response.text if hasattr(e.response, "text") else str(e) + logger.error( + "[Hatchet] Modal padding HTTP error", + transcript_id=input.transcript_id, + track_index=input.track_index, + status_code=e.response.status_code if hasattr(e, "response") else None, + error=error_detail, + exc_info=True, + ) + raise Exception( + f"Modal padding failed: HTTP {e.response.status_code}" + ) from e + except httpx.TimeoutException as e: + logger.error( + "[Hatchet] Modal padding timeout", + transcript_id=input.transcript_id, + track_index=input.track_index, + error=str(e), + exc_info=True, + ) + raise Exception("Modal padding timeout") from e + + logger.info( + "[Hatchet] pad_track complete", + track_index=input.track_index, + padded_key=storage_path, + ) + + return PadTrackResult( + padded_key=storage_path, + bucket_name=None, # None = use default transcript storage bucket + size=file_size, + track_index=input.track_index, + ) + + except Exception as e: + logger.error( + "[Hatchet] pad_track failed", + transcript_id=input.transcript_id, + track_index=input.track_index, + error=str(e), + exc_info=True, + ) + raise diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index dd3aea3a..5a9e7505 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure storage/DB connections are not shared across forks. """ -import tempfile from datetime import timedelta -from pathlib import Path import av from hatchet_sdk import Context @@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.logger import logger from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS -from reflector.utils.audio_padding import ( - apply_audio_padding_to_file, - extract_stream_start_time_from_container, -) +from reflector.utils.audio_padding import extract_stream_start_time_from_container class TrackInput(BaseModel): @@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) with av.open(source_url) as in_container: - if in_container.duration: - try: - duration = timedelta(seconds=in_container.duration // 1_000_000) - ctx.log( - f"pad_track: track {input.track_index}, duration={duration}" - ) - except Exception: - ctx.log(f"pad_track: track {input.track_index}, duration=ERROR") - start_time_seconds = extract_stream_start_time_from_container( in_container, input.track_index, logger=logger ) - # If no padding needed, return original S3 key - if start_time_seconds <= 0: - logger.info( - f"Track {input.track_index} requires no padding", - track_index=input.track_index, - ) - return PadTrackResult( - padded_key=input.s3_key, - bucket_name=input.bucket_name, - size=0, - track_index=input.track_index, - ) + # If no padding needed, return original S3 key + if start_time_seconds <= 0: + logger.info( + f"Track {input.track_index} requires no padding", + track_index=input.track_index, + ) + return PadTrackResult( + padded_key=input.s3_key, + bucket_name=input.bucket_name, + size=0, + track_index=input.track_index, + ) - with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file: - temp_path = temp_file.name + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" - try: - apply_audio_padding_to_file( - in_container, - temp_path, - start_time_seconds, - input.track_index, - logger=logger, - ) + # Presign PUT URL for output (Modal uploads directly) + output_url = await storage.get_file_url( + storage_path, + operation="put_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) - file_size = Path(temp_path).stat().st_size - storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" + from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 + AudioPaddingModalProcessor, + ) - logger.info( - f"About to upload padded track", - key=storage_path, - size=file_size, - ) - - with open(temp_path, "rb") as padded_file: - await storage.put_file(storage_path, padded_file) - - logger.info( - f"Uploaded padded track to S3", - key=storage_path, - size=file_size, - ) - finally: - Path(temp_path).unlink(missing_ok=True) + processor = AudioPaddingModalProcessor() + result = await processor.pad_track( + track_url=source_url, + output_url=output_url, + start_time_seconds=start_time_seconds, + track_index=input.track_index, + ) + file_size = result.size ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}") logger.info( diff --git a/server/reflector/processors/audio_padding_modal.py b/server/reflector/processors/audio_padding_modal.py new file mode 100644 index 00000000..49227f1d --- /dev/null +++ b/server/reflector/processors/audio_padding_modal.py @@ -0,0 +1,113 @@ +""" +Modal.com backend for audio padding. +""" + +import asyncio +import os + +import httpx +from pydantic import BaseModel + +from reflector.hatchet.constants import TIMEOUT_AUDIO +from reflector.logger import logger + + +class PaddingResponse(BaseModel): + size: int + cancelled: bool = False + + +class AudioPaddingModalProcessor: + """Audio padding processor using Modal.com CPU backend via HTTP.""" + + def __init__( + self, padding_url: str | None = None, modal_api_key: str | None = None + ): + self.padding_url = padding_url or os.getenv("PADDING_URL") + if not self.padding_url: + raise ValueError( + "PADDING_URL required to use AudioPaddingModalProcessor. " + "Set PADDING_URL environment variable or pass padding_url parameter." + ) + + self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY") + + async def pad_track( + self, + track_url: str, + output_url: str, + start_time_seconds: float, + track_index: int, + ) -> PaddingResponse: + """Pad audio track with silence via Modal backend. + + Args: + track_url: Presigned GET URL for source audio track + output_url: Presigned PUT URL for output WebM + start_time_seconds: Amount of silence to prepend + track_index: Track index for logging + """ + if not track_url: + raise ValueError("track_url cannot be empty") + if start_time_seconds <= 0: + raise ValueError( + f"start_time_seconds must be positive, got {start_time_seconds}" + ) + + log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds) + log.info("Sending Modal padding HTTP request") + + url = f"{self.padding_url}/pad" + + headers = {} + if self.modal_api_key: + headers["Authorization"] = f"Bearer {self.modal_api_key}" + + try: + async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client: + response = await client.post( + url, + headers=headers, + json={ + "track_url": track_url, + "output_url": output_url, + "start_time_seconds": start_time_seconds, + "track_index": track_index, + }, + follow_redirects=True, + ) + + if response.status_code != 200: + error_body = response.text + log.error( + "Modal padding API error", + status_code=response.status_code, + error_body=error_body, + ) + + response.raise_for_status() + result = response.json() + + # Check if work was cancelled + if result.get("cancelled"): + log.warning("Modal padding was cancelled by disconnect detection") + raise asyncio.CancelledError( + "Padding cancelled due to client disconnect" + ) + + log.info("Modal padding complete", size=result["size"]) + return PaddingResponse(**result) + except asyncio.CancelledError: + log.warning( + "Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)" + ) + raise + except httpx.TimeoutException as e: + log.error("Modal padding timeout", error=str(e), exc_info=True) + raise Exception(f"Modal padding timeout: {e}") from e + except httpx.HTTPStatusError as e: + log.error("Modal padding HTTP error", error=str(e), exc_info=True) + raise Exception(f"Modal padding HTTP error: {e}") from e + except Exception as e: + log.error("Modal padding unexpected error", error=str(e), exc_info=True) + raise diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 6fc808d2..3a200d4a 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -98,6 +98,10 @@ class Settings(BaseSettings): # Diarization: local pyannote.audio DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None + # Audio Padding (Modal.com backend) + PADDING_URL: str | None = None + PADDING_MODAL_API_KEY: str | None = None + # Sentry SENTRY_DSN: str | None = None diff --git a/server/reflector/utils/audio_constants.py b/server/reflector/utils/audio_constants.py index 0ad964a9..03270976 100644 --- a/server/reflector/utils/audio_constants.py +++ b/server/reflector/utils/audio_constants.py @@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin """ # Opus codec settings +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 OPUS_STANDARD_SAMPLE_RATE = 48000 +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality # S3 presigned URL expiration diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py index a1f620c4..fc3653bb 100644 --- a/server/reflector/ws_manager.py +++ b/server/reflector/ws_manager.py @@ -11,7 +11,6 @@ broadcast messages to all connected websockets. import asyncio import json -import threading import redis.asyncio as redis from fastapi import WebSocket @@ -98,6 +97,7 @@ class WebsocketManager: async def _pubsub_data_reader(self, pubsub_subscriber): while True: + # timeout=1.0 prevents tight CPU loop when no messages available message = await pubsub_subscriber.get_message( ignore_subscribe_messages=True ) @@ -109,29 +109,38 @@ class WebsocketManager: await socket.send_json(data) +# Process-global singleton to ensure only one WebsocketManager instance exists. +# Multiple instances would cause resource leaks and CPU issues. +_ws_manager: WebsocketManager | None = None + + def get_ws_manager() -> WebsocketManager: """ - Returns the WebsocketManager instance for managing websockets. + Returns the global WebsocketManager singleton. - This function initializes and returns the WebsocketManager instance, - which is responsible for managing websockets and handling websocket - connections. + Creates instance on first call, subsequent calls return cached instance. + Thread-safe via GIL. Concurrent initialization may create duplicate + instances but last write wins (acceptable for this use case). Returns: - WebsocketManager: The initialized WebsocketManager instance. - - Raises: - ImportError: If the 'reflector.settings' module cannot be imported. - RedisConnectionError: If there is an error connecting to the Redis server. + WebsocketManager: The global WebsocketManager instance. """ - local = threading.local() - if hasattr(local, "ws_manager"): - return local.ws_manager + global _ws_manager + if _ws_manager is not None: + return _ws_manager + + # No lock needed - GIL makes this safe enough + # Worst case: race creates two instances, last assignment wins pubsub_client = RedisPubSubManager( host=settings.REDIS_HOST, port=settings.REDIS_PORT, ) - ws_manager = WebsocketManager(pubsub_client=pubsub_client) - local.ws_manager = ws_manager - return ws_manager + _ws_manager = WebsocketManager(pubsub_client=pubsub_client) + return _ws_manager + + +def reset_ws_manager() -> None: + """Reset singleton for testing. DO NOT use in production.""" + global _ws_manager + _ws_manager = None diff --git a/server/tests/conftest.py b/server/tests/conftest.py index 24d2103f..1f4469ea 100644 --- a/server/tests/conftest.py +++ b/server/tests/conftest.py @@ -1,6 +1,5 @@ import os from contextlib import asynccontextmanager -from tempfile import NamedTemporaryFile from unittest.mock import patch import pytest @@ -333,11 +332,14 @@ def celery_enable_logging(): @pytest.fixture(scope="session") def celery_config(): - with NamedTemporaryFile() as f: - yield { - "broker_url": "memory://", - "result_backend": f"db+sqlite:///{f.name}", - } + redis_host = os.environ.get("REDIS_HOST", "localhost") + redis_port = os.environ.get("REDIS_PORT", "6379") + # Use db 2 to avoid conflicts with main app + redis_url = f"redis://{redis_host}:{redis_port}/2" + yield { + "broker_url": redis_url, + "result_backend": redis_url, + } @pytest.fixture(scope="session") @@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch): def __init__(self, queue: asyncio.Queue): self.queue = queue - async def get_message(self, ignore_subscribe_messages: bool = True): + async def get_message( + self, ignore_subscribe_messages: bool = True, timeout: float | None = None + ): + wait_timeout = timeout if timeout is not None else 0.05 try: - return await asyncio.wait_for(self.queue.get(), timeout=0.05) + return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout) except Exception: return None diff --git a/server/tests/test_transcripts_rtc_ws.py b/server/tests/test_transcripts_rtc_ws.py index 35b00912..8c015791 100644 --- a/server/tests/test_transcripts_rtc_ws.py +++ b/server/tests/test_transcripts_rtc_ws.py @@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker) settings.DATA_DIR = DATA_DIR -@pytest.fixture(scope="session") -def celery_includes(): - return ["reflector.pipelines.main_live_pipeline"] +# Using celery_includes from conftest.py which includes both pipelines @pytest.mark.usefixtures("setup_database") diff --git a/server/tests/test_user_websocket_auth.py b/server/tests/test_user_websocket_auth.py index 5a40440f..6ecc87b9 100644 --- a/server/tests/test_user_websocket_auth.py +++ b/server/tests/test_user_websocket_auth.py @@ -56,7 +56,12 @@ def appserver_ws_user(setup_database): if server_instance: server_instance.should_exit = True - server_thread.join(timeout=30) + server_thread.join(timeout=2.0) + + # Reset global singleton for test isolation + from reflector.ws_manager import reset_ws_manager + + reset_ws_manager() @pytest.fixture(autouse=True) @@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user # Connect and then trigger an event via HTTP create async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: + await asyncio.sleep(0.2) + # Emit an event to the user's room via a standard HTTP action from httpx import AsyncClient @@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user "email": "user-abc@example.com", } + # Use in-memory client (global singleton makes it share ws_manager) async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room resp = await ac.post("/transcripts", json={"name": "WS Test"}) diff --git a/server/uv.lock b/server/uv.lock index fd8389d3..ac2849f1 100644 --- a/server/uv.lock +++ b/server/uv.lock @@ -159,21 +159,20 @@ wheels = [ [[package]] name = "aiortc" -version = "1.13.0" +version = "1.14.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aioice" }, { name = "av" }, - { name = "cffi" }, { name = "cryptography" }, { name = "google-crc32c" }, { name = "pyee" }, { name = "pylibsrtp" }, { name = "pyopenssl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 } +sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 } wheels = [ - { url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 }, + { url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 }, ] [[package]] @@ -327,28 +326,24 @@ wheels = [ [[package]] name = "av" -version = "14.4.0" +version = "16.1.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 } +sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 } wheels = [ - { url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 }, - { url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 }, - { url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 }, - { url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 }, - { url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 }, - { url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 }, - { url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 }, - { url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 }, - { url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 }, - { url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 }, - { url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 }, - { url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 }, - { url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 }, - { url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 }, - { url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 }, - { url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 }, - { url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 }, - { url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 }, + { url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 }, + { url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 }, + { url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 }, + { url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 }, + { url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 }, + { url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 }, + { url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 }, + { url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 }, + { url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 }, + { url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 }, + { url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 }, + { url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 }, + { url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 }, + { url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 }, ] [[package]] @@ -3267,7 +3262,7 @@ requires-dist = [ { name = "aiohttp-cors", specifier = ">=0.7.0" }, { name = "aiortc", specifier = ">=1.5.0" }, { name = "alembic", specifier = ">=1.11.3" }, - { name = "av", specifier = ">=10.0.0" }, + { name = "av", specifier = ">=15.0.0" }, { name = "celery", specifier = ">=5.3.4" }, { name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" }, diff --git a/www/app/(app)/rooms/page.tsx b/www/app/(app)/rooms/page.tsx index f542e8e8..e5349bab 100644 --- a/www/app/(app)/rooms/page.tsx +++ b/www/app/(app)/rooms/page.tsx @@ -302,10 +302,10 @@ export default function RoomsList() { return; } - const platform: "whereby" | "daily" | null = + const platform: "whereby" | "daily" = room.platform === "whereby" || room.platform === "daily" ? room.platform - : null; + : "daily"; const roomData = { name: room.name, diff --git a/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx b/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx index c4d5a9fc..10ea2f82 100644 --- a/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx @@ -16,6 +16,7 @@ import { import { useError } from "../../../../(errors)/errorContext"; import { useRouter } from "next/navigation"; import { Box, Grid } from "@chakra-ui/react"; +import { parseNonEmptyString } from "../../../../lib/utils"; export type TranscriptCorrect = { params: Promise<{ @@ -25,8 +26,7 @@ export type TranscriptCorrect = { export default function TranscriptCorrect(props: TranscriptCorrect) { const params = use(props.params); - - const { transcriptId } = params; + const transcriptId = parseNonEmptyString(params.transcriptId); const updateTranscriptMutation = useTranscriptUpdate(); const transcript = useTranscriptGet(transcriptId); diff --git a/www/app/(app)/transcripts/[transcriptId]/page.tsx b/www/app/(app)/transcripts/[transcriptId]/page.tsx index ead2d259..523f8072 100644 --- a/www/app/(app)/transcripts/[transcriptId]/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/page.tsx @@ -9,7 +9,9 @@ import React, { useEffect, useState, use } from "react"; import FinalSummary from "./finalSummary"; import TranscriptTitle from "../transcriptTitle"; import Player from "../player"; +import { useWebSockets } from "../useWebSockets"; import { useRouter } from "next/navigation"; +import { parseNonEmptyString } from "../../../lib/utils"; import { Box, Flex, @@ -30,7 +32,7 @@ type TranscriptDetails = { export default function TranscriptDetails(details: TranscriptDetails) { const params = use(details.params); - const transcriptId = params.transcriptId; + const transcriptId = parseNonEmptyString(params.transcriptId); const router = useRouter(); const statusToRedirect = [ "idle", @@ -49,6 +51,7 @@ export default function TranscriptDetails(details: TranscriptDetails) { transcriptId, waiting || mp3.audioDeleted === true, ); + useWebSockets(transcriptId); const useActiveTopic = useState(null); const [finalSummaryElement, setFinalSummaryElement] = useState(null); diff --git a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx index 4422e077..0b7affaf 100644 --- a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx @@ -10,6 +10,7 @@ import { } from "@chakra-ui/react"; import { useRouter } from "next/navigation"; import { useTranscriptGet } from "../../../../lib/apiHooks"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptProcessing = { params: Promise<{ @@ -19,7 +20,7 @@ type TranscriptProcessing = { export default function TranscriptProcessing(details: TranscriptProcessing) { const params = use(details.params); - const transcriptId = params.transcriptId; + const transcriptId = parseNonEmptyString(params.transcriptId); const router = useRouter(); const transcript = useTranscriptGet(transcriptId); diff --git a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx index d93b34b6..cc6fbbc0 100644 --- a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx @@ -12,6 +12,7 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react"; import LiveTrancription from "../../liveTranscription"; import { useTranscriptGet } from "../../../../lib/apiHooks"; import { TranscriptStatus } from "../../../../lib/transcript"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptDetails = { params: Promise<{ @@ -21,13 +22,14 @@ type TranscriptDetails = { const TranscriptRecord = (details: TranscriptDetails) => { const params = use(details.params); - const transcript = useTranscriptGet(params.transcriptId); + const transcriptId = parseNonEmptyString(params.transcriptId); + const transcript = useTranscriptGet(transcriptId); const [transcriptStarted, setTranscriptStarted] = useState(false); const useActiveTopic = useState(null); - const webSockets = useWebSockets(params.transcriptId); + const webSockets = useWebSockets(transcriptId); - const mp3 = useMp3(params.transcriptId, true); + const mp3 = useMp3(transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx index 9fc6a687..76722d6f 100644 --- a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx @@ -7,6 +7,7 @@ import useMp3 from "../../useMp3"; import { Center, VStack, Text, Heading } from "@chakra-ui/react"; import FileUploadButton from "../../fileUploadButton"; import { useTranscriptGet } from "../../../../lib/apiHooks"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptUpload = { params: Promise<{ @@ -16,12 +17,13 @@ type TranscriptUpload = { const TranscriptUpload = (details: TranscriptUpload) => { const params = use(details.params); - const transcript = useTranscriptGet(params.transcriptId); + const transcriptId = parseNonEmptyString(params.transcriptId); + const transcript = useTranscriptGet(transcriptId); const [transcriptStarted, setTranscriptStarted] = useState(false); - const webSockets = useWebSockets(params.transcriptId); + const webSockets = useWebSockets(transcriptId); - const mp3 = useMp3(params.transcriptId, true); + const mp3 = useMp3(transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/transcriptTitle.tsx b/www/app/(app)/transcripts/transcriptTitle.tsx index ea738673..9eb6f375 100644 --- a/www/app/(app)/transcripts/transcriptTitle.tsx +++ b/www/app/(app)/transcripts/transcriptTitle.tsx @@ -1,5 +1,6 @@ import { useState } from "react"; import type { components } from "../../reflector-api"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; type GetTranscriptWithParticipants = @@ -32,7 +33,7 @@ const TranscriptTitle = (props: TranscriptTitle) => { const [isEditing, setIsEditing] = useState(false); const updateTranscriptMutation = useTranscriptUpdate(); const participantsQuery = useTranscriptParticipants( - props.transcript?.id || null, + props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null, ); const updateTitle = async (newTitle: string, transcriptId: string) => { diff --git a/www/app/(app)/transcripts/useMp3.ts b/www/app/(app)/transcripts/useMp3.ts index cc0635ec..cfeafb90 100644 --- a/www/app/(app)/transcripts/useMp3.ts +++ b/www/app/(app)/transcripts/useMp3.ts @@ -1,5 +1,6 @@ import { useEffect, useState } from "react"; import { useTranscriptGet } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; import { useAuth } from "../../lib/AuthProvider"; import { API_URL } from "../../lib/apiClient"; @@ -27,7 +28,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => { data: transcript, isLoading: transcriptMetadataLoading, error: transcriptError, - } = useTranscriptGet(later ? null : transcriptId); + } = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId)); const [serviceWorker, setServiceWorker] = useState(null); diff --git a/www/app/(app)/transcripts/useParticipants.ts b/www/app/(app)/transcripts/useParticipants.ts index a3674597..0230075b 100644 --- a/www/app/(app)/transcripts/useParticipants.ts +++ b/www/app/(app)/transcripts/useParticipants.ts @@ -1,6 +1,7 @@ import type { components } from "../../reflector-api"; type Participant = components["schemas"]["Participant"]; import { useTranscriptParticipants } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type ErrorParticipants = { error: Error; @@ -32,7 +33,7 @@ const useParticipants = (transcriptId: string): UseParticipants => { isLoading: loading, error, refetch, - } = useTranscriptParticipants(transcriptId || null); + } = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId)); // Type-safe return based on state if (error) { diff --git a/www/app/(app)/transcripts/useTopicWithWords.ts b/www/app/(app)/transcripts/useTopicWithWords.ts index 31e184cc..dcf2dd60 100644 --- a/www/app/(app)/transcripts/useTopicWithWords.ts +++ b/www/app/(app)/transcripts/useTopicWithWords.ts @@ -1,5 +1,6 @@ import type { components } from "../../reflector-api"; import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type GetTranscriptTopicWithWordsPerSpeaker = components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"]; @@ -38,7 +39,7 @@ const useTopicWithWords = ( error, refetch, } = useTranscriptTopicsWithWordsPerSpeaker( - transcriptId || null, + parseMaybeNonEmptyString(transcriptId), topicId || null, ); diff --git a/www/app/(app)/transcripts/useTopics.ts b/www/app/(app)/transcripts/useTopics.ts index 7f337582..faafcf9a 100644 --- a/www/app/(app)/transcripts/useTopics.ts +++ b/www/app/(app)/transcripts/useTopics.ts @@ -1,5 +1,6 @@ import { useTranscriptTopics } from "../../lib/apiHooks"; import type { components } from "../../reflector-api"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; @@ -10,7 +11,11 @@ type TranscriptTopics = { }; const useTopics = (id: string): TranscriptTopics => { - const { data: topics, isLoading: loading, error } = useTranscriptTopics(id); + const { + data: topics, + isLoading: loading, + error, + } = useTranscriptTopics(parseMaybeNonEmptyString(id)); return { topics: topics || null, diff --git a/www/app/(app)/transcripts/useWaveform.ts b/www/app/(app)/transcripts/useWaveform.ts index 8bb8c4c9..896aa002 100644 --- a/www/app/(app)/transcripts/useWaveform.ts +++ b/www/app/(app)/transcripts/useWaveform.ts @@ -1,5 +1,6 @@ import type { components } from "../../reflector-api"; import { useTranscriptWaveform } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type AudioWaveform = components["schemas"]["AudioWaveform"]; @@ -14,7 +15,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => { data: waveform, isLoading: loading, error, - } = useTranscriptWaveform(skip ? null : id); + } = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id)); return { waveform: waveform || null, diff --git a/www/app/(app)/transcripts/useWebSockets.ts b/www/app/(app)/transcripts/useWebSockets.ts index ed44577e..47c036b8 100644 --- a/www/app/(app)/transcripts/useWebSockets.ts +++ b/www/app/(app)/transcripts/useWebSockets.ts @@ -7,6 +7,12 @@ type GetTranscriptSegmentTopic = components["schemas"]["GetTranscriptSegmentTopic"]; import { useQueryClient } from "@tanstack/react-query"; import { $api, WEBSOCKET_URL } from "../../lib/apiClient"; +import { + invalidateTranscript, + invalidateTranscriptTopics, + invalidateTranscriptWaveform, +} from "../../lib/apiHooks"; +import { NonEmptyString } from "../../lib/utils"; export type UseWebSockets = { transcriptTextLive: string; @@ -369,15 +375,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { }); console.debug("TOPIC event:", message.data); // Invalidate topics query to sync with WebSocket data - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}/topics", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscriptTopics( + queryClient, + transcriptId as NonEmptyString, + ); break; case "FINAL_SHORT_SUMMARY": @@ -388,15 +389,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { if (message.data) { setFinalSummary(message.data); // Invalidate transcript query to sync summary - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscript(queryClient, transcriptId as NonEmptyString); } break; @@ -405,15 +398,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { if (message.data) { setTitle(message.data.title); // Invalidate transcript query to sync title - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscript(queryClient, transcriptId as NonEmptyString); } break; @@ -424,6 +409,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { ); if (message.data) { setWaveForm(message.data.waveform); + invalidateTranscriptWaveform( + queryClient, + transcriptId as NonEmptyString, + ); } break; case "DURATION": diff --git a/www/app/[roomName]/MeetingSelection.tsx b/www/app/[roomName]/MeetingSelection.tsx index 63743668..e2356810 100644 --- a/www/app/[roomName]/MeetingSelection.tsx +++ b/www/app/[roomName]/MeetingSelection.tsx @@ -26,7 +26,7 @@ import { useRouter } from "next/navigation"; import { formatDateTime, formatStartedAgo } from "../lib/timeUtils"; import MeetingMinimalHeader from "../components/MeetingMinimalHeader"; import { NonEmptyString } from "../lib/utils"; -import { MeetingId } from "../lib/types"; +import { MeetingId, assertMeetingId } from "../lib/types"; type Meeting = components["schemas"]["Meeting"]; @@ -315,7 +315,9 @@ export default function MeetingSelection({ variant="outline" colorScheme="red" size="md" - onClick={() => handleEndMeeting(meeting.id)} + onClick={() => + handleEndMeeting(assertMeetingId(meeting.id)) + } loading={deactivateMeetingMutation.isPending} > @@ -460,7 +462,9 @@ export default function MeetingSelection({ variant="outline" colorScheme="red" size="md" - onClick={() => handleEndMeeting(meeting.id)} + onClick={() => + handleEndMeeting(assertMeetingId(meeting.id)) + } loading={deactivateMeetingMutation.isPending} > diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index cde7c98e..b80cde48 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -105,7 +105,7 @@ export function useTranscriptProcess() { }); } -export function useTranscriptGet(transcriptId: string | null) { +export function useTranscriptGet(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}", @@ -122,6 +122,16 @@ export function useTranscriptGet(transcriptId: string | null) { ); } +export const invalidateTranscript = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", { + params: { path: { transcript_id: transcriptId } }, + }).queryKey, + }); + export function useRoomGet(roomId: string | null) { const { isAuthenticated } = useAuthReady(); @@ -299,7 +309,7 @@ export function useTranscriptUploadAudio() { ); } -export function useTranscriptWaveform(transcriptId: string | null) { +export function useTranscriptWaveform(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}/audio/waveform", @@ -314,7 +324,21 @@ export function useTranscriptWaveform(transcriptId: string | null) { ); } -export function useTranscriptMP3(transcriptId: string | null) { +export const invalidateTranscriptWaveform = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions( + "get", + "/v1/transcripts/{transcript_id}/audio/waveform", + { + params: { path: { transcript_id: transcriptId } }, + }, + ).queryKey, + }); + +export function useTranscriptMP3(transcriptId: NonEmptyString | null) { const { isAuthenticated } = useAuthReady(); return $api.useQuery( @@ -331,7 +355,7 @@ export function useTranscriptMP3(transcriptId: string | null) { ); } -export function useTranscriptTopics(transcriptId: string | null) { +export function useTranscriptTopics(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}/topics", @@ -346,7 +370,23 @@ export function useTranscriptTopics(transcriptId: string | null) { ); } -export function useTranscriptTopicsWithWords(transcriptId: string | null) { +export const invalidateTranscriptTopics = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions( + "get", + "/v1/transcripts/{transcript_id}/topics", + { + params: { path: { transcript_id: transcriptId } }, + }, + ).queryKey, + }); + +export function useTranscriptTopicsWithWords( + transcriptId: NonEmptyString | null, +) { const { isAuthenticated } = useAuthReady(); return $api.useQuery( @@ -364,7 +404,7 @@ export function useTranscriptTopicsWithWords(transcriptId: string | null) { } export function useTranscriptTopicsWithWordsPerSpeaker( - transcriptId: string | null, + transcriptId: NonEmptyString | null, topicId: string | null, ) { const { isAuthenticated } = useAuthReady(); @@ -386,7 +426,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker( ); } -export function useTranscriptParticipants(transcriptId: string | null) { +export function useTranscriptParticipants(transcriptId: NonEmptyString | null) { const { isAuthenticated } = useAuthReady(); return $api.useQuery(