Compare commits

..

17 Commits

Author SHA1 Message Date
Igor Loskutov
ad64f43202 Merge main into feature-leave-endpoint
Resolve conflict in apiHooks.ts by keeping import for createFinalURL
and createQuerySerializer which are used by leave/join room functions.
2026-02-05 14:34:01 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
fa3cf5da0f chore(main): release 0.32.2 (#842) 2026-02-03 22:05:22 -05:00
8707c6694a fix: use Daily API recording.duration as master source for transcript duration (#844)
Set duration early in get_participants from Daily API (seconds -> ms),
ensuring post_zulip has the value before mixdown_tracks completes.

Removes redundant duration update from mixdown_tracks.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 17:15:03 -05:00
4acde4b7fd fix: increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks (#843)
Topic detection was timing out on longer transcripts when LLM
responses are slow. This affects detect_chunk_topic and other
LLM-calling tasks that use TIMEOUT_MEDIUM.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 16:05:16 -05:00
a2ed7d60d5 fix: make caddy optional (#841) 2026-02-03 00:18:47 +01:00
a08f94a5bf chore(main): release 0.32.1 (#840) 2026-01-30 17:34:48 -05:00
Igor Loskutov
c05d1f03cd fix: match httpx pad with hatchet audio timeout 2026-01-30 15:56:18 -05:00
Igor Loskutov
23eb1371cb fix: daily multitrack pipeline finalze dependency fix 2026-01-30 15:19:27 -05:00
Igor Loskutov
485b455c69 race condition debug wip 2026-01-30 14:38:12 -05:00
Igor Loskutov
74c9ec2ff1 race condition debug wip 2026-01-30 14:37:53 -05:00
2592e369f6 chore(main): release 0.32.0 (#838) 2026-01-30 13:13:59 -05:00
7fde64e252 feat: modal padding (#837)
* Add Modal backend for audio padding

- Create reflector_padding.py Modal deployment (CPU-based)
- Add PaddingWorkflow with conditional Modal/local backend
- Update deploy-all.sh to include padding deployment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-30 13:11:51 -05:00
Igor Loskutov
aac89e8d03 rejoin tags backend 2026-01-29 15:57:09 -05:00
Igor Loskutov
13088e72f8 feat: Trigger presence poll on join endpoint for Daily meetings
Also trigger poll_daily_room_presence_task when user joins meeting via
/join endpoint, not just on /leave. Webhooks can fail or not exist
(e.g., Whereby has no participant.joined webhook), so frontend-triggered
polls needed for both join and leave events.
2026-01-26 18:05:44 -05:00
Igor Loskutov
775c9b667d feat: Add meeting leave endpoint for faster presence detection (no-mistaken)
Backend:
- Add POST /rooms/{room_name}/meetings/{meeting_id}/leave endpoint
- Triggers poll_daily_room_presence_task immediately on user disconnect
- Reduces detection latency from 0-30s (periodic poll) to ~1-2s

Frontend:
- Add useRoomLeaveMeeting() mutation hook
- Add beforeunload handler in DailyRoom that calls sendBeacon()
- Guarantees API call completion even if tab closes mid-request

Context:
- Daily.co webhooks handle clean disconnects
- This endpoint handles dirty disconnects (tab close, crash, network drop)
- Redis lock prevents spam if multiple users leave simultaneously

This commit is no-mistaken and follows user requirements for readonly research
task that was later approved for implementation.
2026-01-26 17:59:33 -05:00
31 changed files with 1449 additions and 176 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
.DS_Store
server/.env
server/.env.production
.env
Caddyfile
server/exportdanswer

View File

@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,29 @@
# Changelog
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)
### Bug Fixes
* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e))
* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0))
## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30)
### Features
* modal padding ([#837](https://github.com/Monadical-SAS/reflector/issues/837)) ([7fde64e](https://github.com/Monadical-SAS/reflector/commit/7fde64e2529a1d37b0f7507c62d983a7bd0b5b89))
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)

View File

@@ -1,6 +1,8 @@
# Reflector Caddyfile
# Replace example.com with your actual domains
# CORS is handled by the backend - Caddy just proxies
# Reflector Caddyfile (optional reverse proxy)
# Use this only when you run Caddy via: docker compose -f docker-compose.prod.yml --profile caddy up -d
# If Coolify, Traefik, or nginx already use ports 80/443, do NOT start Caddy; point your proxy at web:3000 and server:1250.
#
# Replace example.com with your actual domains. CORS is handled by the backend - Caddy just proxies.
#
# For environment variable substitution, set:
# FRONTEND_DOMAIN=app.example.com

View File

@@ -1,9 +1,14 @@
# Production Docker Compose configuration
# Usage: docker compose -f docker-compose.prod.yml up -d
#
# Caddy (reverse proxy on ports 80/443) is OPTIONAL and behind the "caddy" profile:
# - With Caddy (self-hosted, you manage SSL): docker compose -f docker-compose.prod.yml --profile caddy up -d
# - Without Caddy (Coolify/Traefik/nginx already on 80/443): docker compose -f docker-compose.prod.yml up -d
# Then point your proxy at web:3000 (frontend) and server:1250 (API).
#
# Prerequisites:
# 1. Copy .env.example to .env and configure for both server/ and www/
# 2. Copy Caddyfile.example to Caddyfile and edit with your domains
# 2. If using Caddy: copy Caddyfile.example to Caddyfile and edit your domains
# 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh)
services:
@@ -84,6 +89,8 @@ services:
retries: 3
caddy:
profiles:
- caddy
image: caddy:2-alpine
restart: unless-stopped
ports:

View File

@@ -11,15 +11,15 @@ This page documents the Docker Compose configuration for Reflector. For the comp
The `docker-compose.prod.yml` includes these services:
| Service | Image | Purpose |
|---------|-------|---------|
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL |
| Service | Image | Purpose |
| ---------- | --------------------------------- | --------------------------------------------------------------------------- |
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL (optional; see [Caddy profile](#caddy-profile)) |
## Environment Files
@@ -30,6 +30,7 @@ Reflector uses two separate environment files:
Used by: `server`, `worker`, `beat`
Key variables:
```env
# Database connection
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -54,6 +55,7 @@ TRANSCRIPT_MODAL_API_KEY=...
Used by: `web`
Key variables:
```env
# Domain configuration
SITE_URL=https://app.example.com
@@ -70,26 +72,42 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s
## Volumes
| Volume | Purpose |
|--------|---------|
| `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage |
| `caddy_data` | SSL certificates |
| `caddy_config` | Caddy configuration |
| Volume | Purpose |
| --------------- | ----------------------------- |
| `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage |
| `caddy_data` | SSL certificates |
| `caddy_config` | Caddy configuration |
## Network
All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join.
## Caddy profile
Caddy (ports 80 and 443) is **optional** and behind the `caddy` profile so it does not conflict with an existing reverse proxy (e.g. Coolify, Traefik, nginx).
- **With Caddy** (you want Reflector to handle SSL):
`docker compose -f docker-compose.prod.yml --profile caddy up -d`
- **Without Caddy** (Coolify or another proxy already on 80/443):
`docker compose -f docker-compose.prod.yml up -d`
Then configure your proxy to send traffic to `web:3000` (frontend) and `server:1250` (API).
## Common Commands
### Start all services
```bash
# Without Caddy (e.g. when using Coolify)
docker compose -f docker-compose.prod.yml up -d
# With Caddy as reverse proxy
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
### View logs
```bash
# All services
docker compose -f docker-compose.prod.yml logs -f
@@ -99,6 +117,7 @@ docker compose -f docker-compose.prod.yml logs server --tail 50
```
### Restart a service
```bash
# Quick restart (doesn't reload .env changes)
docker compose -f docker-compose.prod.yml restart server
@@ -108,27 +127,32 @@ docker compose -f docker-compose.prod.yml up -d server
```
### Run database migrations
```bash
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
```
### Access database
```bash
docker compose -f docker-compose.prod.yml exec postgres psql -U reflector
```
### Pull latest images
```bash
docker compose -f docker-compose.prod.yml pull
docker compose -f docker-compose.prod.yml up -d
```
### Stop all services
```bash
docker compose -f docker-compose.prod.yml down
```
### Full reset (WARNING: deletes data)
```bash
docker compose -f docker-compose.prod.yml down -v
```
@@ -187,6 +211,7 @@ The Caddyfile supports environment variable substitution:
Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly.
### Reload Caddy after changes
```bash
docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile
```

View File

@@ -26,7 +26,7 @@ flowchart LR
Before starting, you need:
- **Production server** - 4+ cores, 8GB+ RAM, public IP
- **Production server** - 4+ cores, 8GB+ RAM, public IP
- **Two domain names** - e.g., `app.example.com` (frontend) and `api.example.com` (backend)
- **GPU processing** - Choose one:
- Modal.com account, OR
@@ -60,16 +60,17 @@ Type: A Name: api Value: <your-server-ip>
Reflector requires GPU processing for transcription and speaker diarization. Choose one option:
| | **Modal.com (Cloud)** | **Self-Hosted GPU** |
|---|---|---|
| | **Modal.com (Cloud)** | **Self-Hosted GPU** |
| ------------ | --------------------------------- | ---------------------------- |
| **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control |
| **Pricing** | Pay-per-use | Fixed infrastructure cost |
| **Pricing** | Pay-per-use | Fixed infrastructure cost |
### Option A: Modal.com (Serverless Cloud GPU)
#### Accept HuggingFace Licenses
Visit both pages and click "Accept":
- https://huggingface.co/pyannote/speaker-diarization-3.1
- https://huggingface.co/pyannote/segmentation-3.0
@@ -179,6 +180,7 @@ Save these credentials - you'll need them in the next step.
## Configure Environment
Reflector has two env files:
- `server/.env` - Backend configuration
- `www/.env` - Frontend configuration
@@ -190,6 +192,7 @@ nano server/.env
```
**Required settings:**
```env
# Database (defaults work with docker-compose.prod.yml)
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -249,6 +252,7 @@ nano www/.env
```
**Required settings:**
```env
# Your domains
SITE_URL=https://app.example.com
@@ -266,7 +270,11 @@ FEATURE_REQUIRE_LOGIN=false
---
## Configure Caddy
## Reverse proxy (Caddy or existing)
**If Coolify, Traefik, or nginx already use ports 80/443** (e.g. Coolify on your host): skip Caddy. Start the stack without the Caddy profile (see [Start Services](#start-services) below), then point your proxy at `web:3000` (frontend) and `server:1250` (API).
**If you want Reflector to provide the reverse proxy and SSL:**
```bash
cp Caddyfile.example Caddyfile
@@ -289,10 +297,18 @@ Replace `example.com` with your domains. The `{$VAR:default}` syntax uses Caddy'
## Start Services
**Without Caddy** (e.g. Coolify already on 80/443):
```bash
docker compose -f docker-compose.prod.yml up -d
```
**With Caddy** (Reflector handles SSL):
```bash
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
Wait for containers to start (first run may take 1-2 minutes to pull images and initialize).
---
@@ -300,18 +316,21 @@ Wait for containers to start (first run may take 1-2 minutes to pull images and
## Verify Deployment
### Check services
```bash
docker compose -f docker-compose.prod.yml ps
# All should show "Up"
```
### Test API
```bash
curl https://api.example.com/health
# Should return: {"status":"healthy"}
```
### Test Frontend
- Visit https://app.example.com
- You should see the Reflector interface
- Try uploading an audio file to test transcription
@@ -327,6 +346,7 @@ By default, Reflector is open (no login required). **Authentication is required
See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration.
Quick summary:
1. Deploy Authentik on your server
2. Create OAuth provider in Authentik
3. Extract public key for JWT verification
@@ -358,6 +378,7 @@ DAILYCO_STORAGE_AWS_ROLE_ARN=<arn:aws:iam::ACCOUNT:role/DailyCo>
```
Reload env and restart:
```bash
docker compose -f docker-compose.prod.yml up -d server worker
```
@@ -367,35 +388,43 @@ docker compose -f docker-compose.prod.yml up -d server worker
## Troubleshooting
### Check logs for errors
```bash
docker compose -f docker-compose.prod.yml logs server --tail 20
docker compose -f docker-compose.prod.yml logs worker --tail 20
```
### Services won't start
```bash
docker compose -f docker-compose.prod.yml logs
```
### CORS errors in browser
- Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`)
- Reload env: `docker compose -f docker-compose.prod.yml up -d server`
### SSL certificate errors
### SSL certificate errors (when using Caddy)
- Caddy auto-provisions Let's Encrypt certificates
- Ensure ports 80 and 443 are open
- Ensure ports 80 and 443 are open and not used by another proxy
- Check: `docker compose -f docker-compose.prod.yml logs caddy`
- If port 80 is already in use (e.g. by Coolify), run without Caddy: `docker compose -f docker-compose.prod.yml up -d` and use your existing proxy
### Transcription not working
- Check Modal dashboard: https://modal.com/apps
- Verify URLs in `server/.env` match deployed functions
- Check worker logs: `docker compose -f docker-compose.prod.yml logs worker`
### "Login required" but auth not configured
- Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env`
- Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web`
### Database migrations or connectivity issues
Migrations run automatically on server startup. To check database connectivity or debug migration failures:
```bash
@@ -408,4 +437,3 @@ docker compose -f docker-compose.prod.yml exec server uv run python -c "from ref
# Manually run migrations (if needed)
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
```

View File

@@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then
fi
echo " -> $DIARIZER_URL"
echo ""
echo "Deploying padding (CPU audio processing via Modal SDK)..."
modal deploy reflector_padding.py
if [ $? -ne 0 ]; then
echo "Error: Failed to deploy padding. Check Modal dashboard for details."
exit 1
fi
echo " -> reflector-padding.pad_track (Modal SDK function)"
# --- Output Configuration ---
echo ""
echo "=========================================="
@@ -147,4 +156,6 @@ echo ""
echo "DIARIZATION_BACKEND=modal"
echo "DIARIZATION_URL=$DIARIZER_URL"
echo "DIARIZATION_MODAL_API_KEY=$API_KEY"
echo ""
echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)"
echo "# --- End Modal Configuration ---"

View File

@@ -0,0 +1,277 @@
"""
Reflector GPU backend - audio padding
======================================
CPU-intensive audio padding service for adding silence to audio tracks.
Uses PyAV filter graph (adelay) for precise track synchronization.
IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py
for Modal deployment isolation (Modal can't import from server/reflector/). If you modify
the PyAV filter graph or padding algorithm, you MUST update both:
- gpu/modal_deployments/reflector_padding.py (this file)
- server/reflector/utils/audio_padding.py
Constants duplicated from server/reflector/utils/audio_constants.py for same reason.
"""
import os
import tempfile
from fractions import Fraction
import math
import asyncio
import modal
S3_TIMEOUT = 60 # happens 2 times
PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2)
SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down.
DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect
app = modal.App("reflector-padding")
# CPU-based image
image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg") # Required by PyAV
.pip_install(
"av==13.1.0", # PyAV for audio processing
"requests==2.32.3", # HTTP for presigned URL downloads/uploads
"fastapi==0.115.12", # API framework
)
)
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000
@app.function(
cpu=2.0,
timeout=PADDING_TIMEOUT,
scaledown_window=SCALEDOWN_WINDOW,
image=image,
)
@modal.asgi_app()
def web():
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
class PaddingRequest(BaseModel):
track_url: str
output_url: str
start_time_seconds: float
track_index: int
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
web_app = FastAPI()
@web_app.post("/pad")
async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse:
"""Modal web endpoint for padding audio tracks with disconnect detection.
"""
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
if not req.track_url:
raise HTTPException(status_code=400, detail="track_url cannot be empty")
if not req.output_url:
raise HTTPException(status_code=400, detail="output_url cannot be empty")
if req.start_time_seconds <= 0:
raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}")
if req.start_time_seconds > 18000:
raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)")
logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s")
# Thread-safe cancellation flag shared between async disconnect checker and blocking thread
import threading
cancelled = threading.Event()
async def check_disconnect():
"""Background task to check for client disconnect every 2 seconds."""
while not cancelled.is_set():
await asyncio.sleep(DISCONNECT_CHECK_INTERVAL)
if await request.is_disconnected():
logger.warning("Client disconnected, setting cancellation flag")
cancelled.set()
break
# Start disconnect checker in background
disconnect_task = asyncio.create_task(check_disconnect())
try:
result = await asyncio.get_event_loop().run_in_executor(
None, _pad_track_blocking, req, cancelled, logger
)
return PaddingResponse(**result)
finally:
cancelled.set()
disconnect_task.cancel()
try:
await disconnect_task
except asyncio.CancelledError:
pass
def _pad_track_blocking(req, cancelled, logger) -> dict:
"""Blocking CPU-bound padding work with periodic cancellation checks.
Args:
cancelled: threading.Event for thread-safe cancellation signaling
"""
import av
import requests
from av.audio.resampler import AudioResampler
import time
temp_dir = tempfile.mkdtemp()
input_path = None
output_path = None
last_check = time.time()
try:
logger.info("Downloading track for padding")
response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT)
response.raise_for_status()
input_path = os.path.join(temp_dir, "track.webm")
total_bytes = 0
chunk_count = 0
with open(input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
chunk_count += 1
# Check for cancellation every arbitrary amount of chunks
if chunk_count % 12 == 0:
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during download, exiting early")
return {"size": 0, "cancelled": True}
last_check = now
logger.info(f"Track downloaded: {total_bytes} bytes")
if cancelled.is_set():
logger.info("Cancelled after download, exiting early")
return {"size": 0, "cancelled": True}
# Apply padding using PyAV
output_path = os.path.join(temp_dir, "padded.webm")
delay_ms = math.floor(req.start_time_seconds * 1000)
logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV")
in_container = av.open(input_path)
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise ValueError("No audio stream in input")
with av.open(output_path, "w", format="webm") as out_container:
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
for frame in in_container.decode(in_stream):
# Check for cancellation periodically
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during processing, exiting early")
in_container.close()
return {"size": 0, "cancelled": True}
last_check = now
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
in_container.close()
file_size = os.path.getsize(output_path)
logger.info(f"Padding complete: {file_size} bytes")
logger.info("Uploading padded track to S3")
with open(output_path, "rb") as f:
upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT)
upload_response.raise_for_status()
logger.info(f"Upload complete: {file_size} bytes")
return {"size": file_size}
finally:
if input_path and os.path.exists(input_path):
try:
os.unlink(input_path)
except Exception as e:
logger.warning(f"Failed to cleanup input file: {e}")
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except Exception as e:
logger.warning(f"Failed to cleanup output file: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {e}")
return web_app

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \
UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp
RUN apt-get update \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \
ffmpeg \
curl \
ca-certificates \
gnupg \
wget \
&& apt-get clean
wget
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \
libcublas-12-6 \
libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
libcudnn9-dev-cuda-12
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/
COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000
CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -8,7 +8,7 @@ readme = "README.md"
dependencies = [
"aiohttp>=3.9.0",
"aiohttp-cors>=0.7.0",
"av>=10.0.0",
"av>=15.0.0",
"requests>=2.31.0",
"aiortc>=1.5.0",
"sortedcontainers>=2.4.0",

View File

@@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_MEDIUM = (
300 # Single LLM calls, waveform generation (5m for slow LLM responses)
)
TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptParticipant,
transcripts_controller,
)
@@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
"duration": duration_ms,
},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
@@ -1095,7 +1107,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items],
parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
)
@@ -1108,12 +1120,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files
if created_padded_files:
@@ -1133,7 +1141,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText,
transcripts_controller,
)
@@ -1142,8 +1149,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast(
input.transcript_id,
transcript,
@@ -1155,21 +1160,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
logger=logger,
)
# Save duration and clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks
# Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update(
transcript,
{
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume
},
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log(

View File

@@ -0,0 +1,165 @@
"""
Hatchet child workflow: PaddingWorkflow
Handles individual audio track padding via Modal.com backend.
"""
from datetime import timedelta
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class PaddingInput(BaseModel):
"""Input for individual track padding."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
hatchet = HatchetClientManager.get_client()
padding_workflow = hatchet.workflow(
name="PaddingWorkflow", input_validator=PaddingInput
)
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
"""Pad audio track with silence based on WebM container start_time."""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
# Extract start_time to determine if padding needed
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except (ValueError, TypeError, OverflowError) as e:
ctx.log(
f"pad_track: track {input.track_index}, duration error: {str(e)}"
)
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
import httpx # noqa: PLC0415
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
try:
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track: Modal returned size={file_size}")
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal padding HTTP error",
transcript_id=input.transcript_id,
track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error(
"[Hatchet] pad_track failed",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise

View File

@@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
@@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class TrackInput(BaseModel):
@@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
# Presign PUT URL for output (Modal uploads directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(

View File

@@ -0,0 +1,113 @@
"""
Modal.com backend for audio padding.
"""
import asyncio
import os
import httpx
from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
class AudioPaddingModalProcessor:
"""Audio padding processor using Modal.com CPU backend via HTTP."""
def __init__(
self, padding_url: str | None = None, modal_api_key: str | None = None
):
self.padding_url = padding_url or os.getenv("PADDING_URL")
if not self.padding_url:
raise ValueError(
"PADDING_URL required to use AudioPaddingModalProcessor. "
"Set PADDING_URL environment variable or pass padding_url parameter."
)
self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY")
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence via Modal backend.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Sending Modal padding HTTP request")
url = f"{self.padding_url}/pad"
headers = {}
if self.modal_api_key:
headers["Authorization"] = f"Bearer {self.modal_api_key}"
try:
async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client:
response = await client.post(
url,
headers=headers,
json={
"track_url": track_url,
"output_url": output_url,
"start_time_seconds": start_time_seconds,
"track_index": track_index,
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
log.error(
"Modal padding API error",
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()
# Check if work was cancelled
if result.get("cancelled"):
log.warning("Modal padding was cancelled by disconnect detection")
raise asyncio.CancelledError(
"Padding cancelled due to client disconnect"
)
log.info("Modal padding complete", size=result["size"])
return PaddingResponse(**result)
except asyncio.CancelledError:
log.warning(
"Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)"
)
raise
except httpx.TimeoutException as e:
log.error("Modal padding timeout", error=str(e), exc_info=True)
raise Exception(f"Modal padding timeout: {e}") from e
except httpx.HTTPStatusError as e:
log.error("Modal padding HTTP error", error=str(e), exc_info=True)
raise Exception(f"Modal padding HTTP error: {e}") from e
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise

View File

@@ -98,6 +98,10 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Padding (Modal.com backend)
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None
# Sentry
SENTRY_DSN: str | None = None

View File

@@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin
"""
# Opus codec settings
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration

View File

@@ -129,6 +129,10 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants(
self, meeting_id: str
) -> MeetingParticipantsResponse:

View File

@@ -20,6 +20,7 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
@@ -365,6 +366,53 @@ async def rooms_create_meeting(
return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,

View File

@@ -845,15 +845,47 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = False
has_had_sessions = False
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions:
logger_.debug("Meeting still has active sessions, keep it")
@@ -872,7 +904,20 @@ async def process_meetings():
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting is deactivated")
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio
import json
import threading
import redis.asyncio as redis
from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber):
while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True
)
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager:
"""
Returns the WebsocketManager instance for managing websockets.
Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
Creates instance on first call, subsequent calls return cached instance.
Thread-safe via GIL. Concurrent initialization may create duplicate
instances but last write wins (acceptable for this use case).
Returns:
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
WebsocketManager: The global WebsocketManager instance.
"""
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
global _ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -1,6 +1,5 @@
import os
from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch
import pytest
@@ -333,11 +332,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session")
def celery_config():
with NamedTemporaryFile() as f:
yield {
"broker_url": "memory://",
"result_backend": f"db+sqlite:///{f.name}",
}
redis_host = os.environ.get("REDIS_HOST", "localhost")
redis_port = os.environ.get("REDIS_PORT", "6379")
# Use db 2 to avoid conflicts with main app
redis_url = f"redis://{redis_host}:{redis_port}/2"
yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session")
@@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue):
self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True):
async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception:
return None

View File

@@ -0,0 +1,286 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session")
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
# Using celery_includes from conftest.py which includes both pipelines
@pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance:
server_instance.should_exit = True
server_thread.join(timeout=30)
server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com",
}
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"})

45
server/uv.lock generated
View File

@@ -159,21 +159,20 @@ wheels = [
[[package]]
name = "aiortc"
version = "1.13.0"
version = "1.14.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aioice" },
{ name = "av" },
{ name = "cffi" },
{ name = "cryptography" },
{ name = "google-crc32c" },
{ name = "pyee" },
{ name = "pylibsrtp" },
{ name = "pyopenssl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 }
sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 },
{ url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 },
]
[[package]]
@@ -327,28 +326,24 @@ wheels = [
[[package]]
name = "av"
version = "14.4.0"
version = "16.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 }
sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 },
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 },
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 },
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 },
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 },
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 },
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 },
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 },
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 },
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 },
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 },
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 },
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 },
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 },
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
{ url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 },
{ url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 },
{ url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 },
{ url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 },
{ url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 },
{ url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 },
{ url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 },
{ url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 },
{ url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 },
{ url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 },
{ url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 },
{ url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 },
{ url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 },
{ url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 },
]
[[package]]
@@ -3267,7 +3262,7 @@ requires-dist = [
{ name = "aiohttp-cors", specifier = ">=0.7.0" },
{ name = "aiortc", specifier = ">=1.5.0" },
{ name = "alembic", specifier = ">=1.11.3" },
{ name = "av", specifier = ">=10.0.0" },
{ name = "av", specifier = ">=15.0.0" },
{ name = "celery", specifier = ">=5.3.4" },
{ name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" },
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },

View File

@@ -24,15 +24,24 @@ import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import {
assertExists,
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import {
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
@@ -179,6 +188,58 @@ const useFrame = (
] as const;
};
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
@@ -186,6 +247,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
@@ -195,7 +258,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
const roomName = params?.roomName as string;
if (typeof params.roomName === "object")
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const {
showConsentModal,
@@ -237,6 +302,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
router.push("/browse");
}, [router]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) {
@@ -249,6 +316,15 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
);
const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
@@ -308,8 +384,10 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
meeting.recording_type,
joinedMutation,
roomName,
meeting.id,
meeting.recording_type,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,

View File

@@ -1,12 +1,13 @@
"use client";
import { $api } from "./apiClient";
import { $api, API_URL } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import type { components, operations } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -807,6 +808,44 @@ export function useRoomJoinMeeting() {
);
}
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() {
const { setError } = useError();

View File

@@ -171,6 +171,48 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": {
parameters: {
query?: never;
@@ -2435,6 +2477,72 @@ export interface operations {
};
};
};
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: {
parameters: {
query?: never;