Compare commits

...

20 Commits

Author SHA1 Message Date
Igor Loskutov
f345f1f122 Merge branch 'main' into brady-bunch-sync 2026-02-03 17:15:46 -05:00
8707c6694a fix: use Daily API recording.duration as master source for transcript duration (#844)
Set duration early in get_participants from Daily API (seconds -> ms),
ensuring post_zulip has the value before mixdown_tracks completes.

Removes redundant duration update from mixdown_tracks.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 17:15:03 -05:00
4acde4b7fd fix: increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks (#843)
Topic detection was timing out on longer transcripts when LLM
responses are slow. This affects detect_chunk_topic and other
LLM-calling tasks that use TIMEOUT_MEDIUM.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 16:05:16 -05:00
a2ed7d60d5 fix: make caddy optional (#841) 2026-02-03 00:18:47 +01:00
Igor Loskutov
15aa121727 daily-matching 2026-01-30 17:47:47 -05:00
Igor Loskutov
08f4294b36 more daily co api docs 2026-01-30 17:46:38 -05:00
a08f94a5bf chore(main): release 0.32.1 (#840) 2026-01-30 17:34:48 -05:00
Igor Loskutov
c05d1f03cd fix: match httpx pad with hatchet audio timeout 2026-01-30 15:56:18 -05:00
Igor Loskutov
23eb1371cb fix: daily multitrack pipeline finalze dependency fix 2026-01-30 15:19:27 -05:00
2592e369f6 chore(main): release 0.32.0 (#838) 2026-01-30 13:13:59 -05:00
7fde64e252 feat: modal padding (#837)
* Add Modal backend for audio padding

- Create reflector_padding.py Modal deployment (CPU-based)
- Add PaddingWorkflow with conditional Modal/local backend
- Update deploy-all.sh to include padding deployment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-30 13:11:51 -05:00
2ca624f052 chore(main): release 0.31.0 (#835) 2026-01-26 13:07:29 -05:00
fc3ef6c893 feat: mixdown optional (#834)
* optional mixdown

* optional mixdown

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-23 15:51:18 -05:00
5d26461477 chore(main): release 0.30.0 (#832) 2026-01-23 13:58:33 -05:00
6c175a11d8 feat: brady bunch (#816)
* brady bunch PRD/tasks

* clean dead daily.co code

* brady bunch prototype (no-mistakes)

* brady bunch prototype (no-mistakes) review

* self-review

* daily poll time match (no-mistakes)

* daily poll self-review (no-mistakes)

* daily poll self-review (no-mistakes)

* daily co doc

* cleanup

* cleanup

* self-review (no-mistakes)

* self-review (no-mistakes)

* self-review

* self-review

* ui typefix

* dupe calls error handling proper

* daily reflector data model doc

* logging style fix

* migration merge

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-23 12:33:06 -05:00
6e786b7631 hatchet processing resilence several fixes (#831)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-22 19:03:33 -05:00
8fc8d8bf4a chore(main): release 0.29.0 (#828) 2026-01-22 12:52:39 -05:00
c723752b7e feat: set hatchet as default for multitracks (#822)
* set hatchet as default for multitracks

* fix: pipeline routing tests for hatchet-default branch

- Create room with use_celery=True to force Celery backend in tests
- Link transcript to room to enable multitrack pipeline routing
- Fixes test failures caused by missing HATCHET_CLIENT_TOKEN in test env

* Update server/reflector/services/transcript_process.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2026-01-21 17:05:03 -05:00
4dc49e5b25 chore(main): release 0.28.1 (#827) 2026-01-21 15:30:42 -05:00
23d2bc283d fix: ics non-sync bugfix (#823)
* ics non-sync bugfix

* fix tests

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-21 15:10:19 -05:00
76 changed files with 3703 additions and 575 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
.DS_Store
server/.env
server/.env.production
.env
Caddyfile
server/exportdanswer

View File

@@ -3,3 +3,5 @@ docs/docs/installation/auth-setup.md:curl-auth-header:250
docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121

View File

@@ -1,5 +1,48 @@
# Changelog
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)
### Bug Fixes
* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e))
* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0))
## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30)
### Features
* modal padding ([#837](https://github.com/Monadical-SAS/reflector/issues/837)) ([7fde64e](https://github.com/Monadical-SAS/reflector/commit/7fde64e2529a1d37b0f7507c62d983a7bd0b5b89))
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)
### Features
* mixdown optional ([#834](https://github.com/Monadical-SAS/reflector/issues/834)) ([fc3ef6c](https://github.com/Monadical-SAS/reflector/commit/fc3ef6c8933231c731fad84e7477a476a6220a5e))
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
### Features
* brady bunch ([#816](https://github.com/Monadical-SAS/reflector/issues/816)) ([6c175a1](https://github.com/Monadical-SAS/reflector/commit/6c175a11d8a3745095bfad06a4ad3ccdfd278433))
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
### Features
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)
### Bug Fixes
* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6))
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)

View File

@@ -1,6 +1,8 @@
# Reflector Caddyfile
# Replace example.com with your actual domains
# CORS is handled by the backend - Caddy just proxies
# Reflector Caddyfile (optional reverse proxy)
# Use this only when you run Caddy via: docker compose -f docker-compose.prod.yml --profile caddy up -d
# If Coolify, Traefik, or nginx already use ports 80/443, do NOT start Caddy; point your proxy at web:3000 and server:1250.
#
# Replace example.com with your actual domains. CORS is handled by the backend - Caddy just proxies.
#
# For environment variable substitution, set:
# FRONTEND_DOMAIN=app.example.com

View File

@@ -1,9 +1,14 @@
# Production Docker Compose configuration
# Usage: docker compose -f docker-compose.prod.yml up -d
#
# Caddy (reverse proxy on ports 80/443) is OPTIONAL and behind the "caddy" profile:
# - With Caddy (self-hosted, you manage SSL): docker compose -f docker-compose.prod.yml --profile caddy up -d
# - Without Caddy (Coolify/Traefik/nginx already on 80/443): docker compose -f docker-compose.prod.yml up -d
# Then point your proxy at web:3000 (frontend) and server:1250 (API).
#
# Prerequisites:
# 1. Copy .env.example to .env and configure for both server/ and www/
# 2. Copy Caddyfile.example to Caddyfile and edit with your domains
# 2. If using Caddy: copy Caddyfile.example to Caddyfile and edit your domains
# 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh)
services:
@@ -84,6 +89,8 @@ services:
retries: 3
caddy:
profiles:
- caddy
image: caddy:2-alpine
restart: unless-stopped
ports:

View File

@@ -11,15 +11,15 @@ This page documents the Docker Compose configuration for Reflector. For the comp
The `docker-compose.prod.yml` includes these services:
| Service | Image | Purpose |
|---------|-------|---------|
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL |
| Service | Image | Purpose |
| ---------- | --------------------------------- | --------------------------------------------------------------------------- |
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL (optional; see [Caddy profile](#caddy-profile)) |
## Environment Files
@@ -30,6 +30,7 @@ Reflector uses two separate environment files:
Used by: `server`, `worker`, `beat`
Key variables:
```env
# Database connection
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -54,6 +55,7 @@ TRANSCRIPT_MODAL_API_KEY=...
Used by: `web`
Key variables:
```env
# Domain configuration
SITE_URL=https://app.example.com
@@ -70,26 +72,42 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s
## Volumes
| Volume | Purpose |
|--------|---------|
| `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage |
| `caddy_data` | SSL certificates |
| `caddy_config` | Caddy configuration |
| Volume | Purpose |
| --------------- | ----------------------------- |
| `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage |
| `caddy_data` | SSL certificates |
| `caddy_config` | Caddy configuration |
## Network
All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join.
## Caddy profile
Caddy (ports 80 and 443) is **optional** and behind the `caddy` profile so it does not conflict with an existing reverse proxy (e.g. Coolify, Traefik, nginx).
- **With Caddy** (you want Reflector to handle SSL):
`docker compose -f docker-compose.prod.yml --profile caddy up -d`
- **Without Caddy** (Coolify or another proxy already on 80/443):
`docker compose -f docker-compose.prod.yml up -d`
Then configure your proxy to send traffic to `web:3000` (frontend) and `server:1250` (API).
## Common Commands
### Start all services
```bash
# Without Caddy (e.g. when using Coolify)
docker compose -f docker-compose.prod.yml up -d
# With Caddy as reverse proxy
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
### View logs
```bash
# All services
docker compose -f docker-compose.prod.yml logs -f
@@ -99,6 +117,7 @@ docker compose -f docker-compose.prod.yml logs server --tail 50
```
### Restart a service
```bash
# Quick restart (doesn't reload .env changes)
docker compose -f docker-compose.prod.yml restart server
@@ -108,27 +127,32 @@ docker compose -f docker-compose.prod.yml up -d server
```
### Run database migrations
```bash
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
```
### Access database
```bash
docker compose -f docker-compose.prod.yml exec postgres psql -U reflector
```
### Pull latest images
```bash
docker compose -f docker-compose.prod.yml pull
docker compose -f docker-compose.prod.yml up -d
```
### Stop all services
```bash
docker compose -f docker-compose.prod.yml down
```
### Full reset (WARNING: deletes data)
```bash
docker compose -f docker-compose.prod.yml down -v
```
@@ -187,6 +211,7 @@ The Caddyfile supports environment variable substitution:
Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly.
### Reload Caddy after changes
```bash
docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile
```

View File

@@ -26,7 +26,7 @@ flowchart LR
Before starting, you need:
- **Production server** - 4+ cores, 8GB+ RAM, public IP
- **Production server** - 4+ cores, 8GB+ RAM, public IP
- **Two domain names** - e.g., `app.example.com` (frontend) and `api.example.com` (backend)
- **GPU processing** - Choose one:
- Modal.com account, OR
@@ -60,16 +60,17 @@ Type: A Name: api Value: <your-server-ip>
Reflector requires GPU processing for transcription and speaker diarization. Choose one option:
| | **Modal.com (Cloud)** | **Self-Hosted GPU** |
|---|---|---|
| | **Modal.com (Cloud)** | **Self-Hosted GPU** |
| ------------ | --------------------------------- | ---------------------------- |
| **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control |
| **Pricing** | Pay-per-use | Fixed infrastructure cost |
| **Pricing** | Pay-per-use | Fixed infrastructure cost |
### Option A: Modal.com (Serverless Cloud GPU)
#### Accept HuggingFace Licenses
Visit both pages and click "Accept":
- https://huggingface.co/pyannote/speaker-diarization-3.1
- https://huggingface.co/pyannote/segmentation-3.0
@@ -179,6 +180,7 @@ Save these credentials - you'll need them in the next step.
## Configure Environment
Reflector has two env files:
- `server/.env` - Backend configuration
- `www/.env` - Frontend configuration
@@ -190,6 +192,7 @@ nano server/.env
```
**Required settings:**
```env
# Database (defaults work with docker-compose.prod.yml)
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -249,6 +252,7 @@ nano www/.env
```
**Required settings:**
```env
# Your domains
SITE_URL=https://app.example.com
@@ -266,7 +270,11 @@ FEATURE_REQUIRE_LOGIN=false
---
## Configure Caddy
## Reverse proxy (Caddy or existing)
**If Coolify, Traefik, or nginx already use ports 80/443** (e.g. Coolify on your host): skip Caddy. Start the stack without the Caddy profile (see [Start Services](#start-services) below), then point your proxy at `web:3000` (frontend) and `server:1250` (API).
**If you want Reflector to provide the reverse proxy and SSL:**
```bash
cp Caddyfile.example Caddyfile
@@ -289,10 +297,18 @@ Replace `example.com` with your domains. The `{$VAR:default}` syntax uses Caddy'
## Start Services
**Without Caddy** (e.g. Coolify already on 80/443):
```bash
docker compose -f docker-compose.prod.yml up -d
```
**With Caddy** (Reflector handles SSL):
```bash
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
Wait for containers to start (first run may take 1-2 minutes to pull images and initialize).
---
@@ -300,18 +316,21 @@ Wait for containers to start (first run may take 1-2 minutes to pull images and
## Verify Deployment
### Check services
```bash
docker compose -f docker-compose.prod.yml ps
# All should show "Up"
```
### Test API
```bash
curl https://api.example.com/health
# Should return: {"status":"healthy"}
```
### Test Frontend
- Visit https://app.example.com
- You should see the Reflector interface
- Try uploading an audio file to test transcription
@@ -327,6 +346,7 @@ By default, Reflector is open (no login required). **Authentication is required
See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration.
Quick summary:
1. Deploy Authentik on your server
2. Create OAuth provider in Authentik
3. Extract public key for JWT verification
@@ -358,6 +378,7 @@ DAILYCO_STORAGE_AWS_ROLE_ARN=<arn:aws:iam::ACCOUNT:role/DailyCo>
```
Reload env and restart:
```bash
docker compose -f docker-compose.prod.yml up -d server worker
```
@@ -367,35 +388,43 @@ docker compose -f docker-compose.prod.yml up -d server worker
## Troubleshooting
### Check logs for errors
```bash
docker compose -f docker-compose.prod.yml logs server --tail 20
docker compose -f docker-compose.prod.yml logs worker --tail 20
```
### Services won't start
```bash
docker compose -f docker-compose.prod.yml logs
```
### CORS errors in browser
- Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`)
- Reload env: `docker compose -f docker-compose.prod.yml up -d server`
### SSL certificate errors
### SSL certificate errors (when using Caddy)
- Caddy auto-provisions Let's Encrypt certificates
- Ensure ports 80 and 443 are open
- Ensure ports 80 and 443 are open and not used by another proxy
- Check: `docker compose -f docker-compose.prod.yml logs caddy`
- If port 80 is already in use (e.g. by Coolify), run without Caddy: `docker compose -f docker-compose.prod.yml up -d` and use your existing proxy
### Transcription not working
- Check Modal dashboard: https://modal.com/apps
- Verify URLs in `server/.env` match deployed functions
- Check worker logs: `docker compose -f docker-compose.prod.yml logs worker`
### "Login required" but auth not configured
- Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env`
- Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web`
### Database migrations or connectivity issues
Migrations run automatically on server startup. To check database connectivity or debug migration failures:
```bash
@@ -408,4 +437,3 @@ docker compose -f docker-compose.prod.yml exec server uv run python -c "from ref
# Manually run migrations (if needed)
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
```

View File

@@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then
fi
echo " -> $DIARIZER_URL"
echo ""
echo "Deploying padding (CPU audio processing via Modal SDK)..."
modal deploy reflector_padding.py
if [ $? -ne 0 ]; then
echo "Error: Failed to deploy padding. Check Modal dashboard for details."
exit 1
fi
echo " -> reflector-padding.pad_track (Modal SDK function)"
# --- Output Configuration ---
echo ""
echo "=========================================="
@@ -147,4 +156,6 @@ echo ""
echo "DIARIZATION_BACKEND=modal"
echo "DIARIZATION_URL=$DIARIZER_URL"
echo "DIARIZATION_MODAL_API_KEY=$API_KEY"
echo ""
echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)"
echo "# --- End Modal Configuration ---"

View File

@@ -0,0 +1,277 @@
"""
Reflector GPU backend - audio padding
======================================
CPU-intensive audio padding service for adding silence to audio tracks.
Uses PyAV filter graph (adelay) for precise track synchronization.
IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py
for Modal deployment isolation (Modal can't import from server/reflector/). If you modify
the PyAV filter graph or padding algorithm, you MUST update both:
- gpu/modal_deployments/reflector_padding.py (this file)
- server/reflector/utils/audio_padding.py
Constants duplicated from server/reflector/utils/audio_constants.py for same reason.
"""
import os
import tempfile
from fractions import Fraction
import math
import asyncio
import modal
S3_TIMEOUT = 60 # happens 2 times
PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2)
SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down.
DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect
app = modal.App("reflector-padding")
# CPU-based image
image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg") # Required by PyAV
.pip_install(
"av==13.1.0", # PyAV for audio processing
"requests==2.32.3", # HTTP for presigned URL downloads/uploads
"fastapi==0.115.12", # API framework
)
)
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000
@app.function(
cpu=2.0,
timeout=PADDING_TIMEOUT,
scaledown_window=SCALEDOWN_WINDOW,
image=image,
)
@modal.asgi_app()
def web():
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
class PaddingRequest(BaseModel):
track_url: str
output_url: str
start_time_seconds: float
track_index: int
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
web_app = FastAPI()
@web_app.post("/pad")
async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse:
"""Modal web endpoint for padding audio tracks with disconnect detection.
"""
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
if not req.track_url:
raise HTTPException(status_code=400, detail="track_url cannot be empty")
if not req.output_url:
raise HTTPException(status_code=400, detail="output_url cannot be empty")
if req.start_time_seconds <= 0:
raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}")
if req.start_time_seconds > 18000:
raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)")
logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s")
# Thread-safe cancellation flag shared between async disconnect checker and blocking thread
import threading
cancelled = threading.Event()
async def check_disconnect():
"""Background task to check for client disconnect every 2 seconds."""
while not cancelled.is_set():
await asyncio.sleep(DISCONNECT_CHECK_INTERVAL)
if await request.is_disconnected():
logger.warning("Client disconnected, setting cancellation flag")
cancelled.set()
break
# Start disconnect checker in background
disconnect_task = asyncio.create_task(check_disconnect())
try:
result = await asyncio.get_event_loop().run_in_executor(
None, _pad_track_blocking, req, cancelled, logger
)
return PaddingResponse(**result)
finally:
cancelled.set()
disconnect_task.cancel()
try:
await disconnect_task
except asyncio.CancelledError:
pass
def _pad_track_blocking(req, cancelled, logger) -> dict:
"""Blocking CPU-bound padding work with periodic cancellation checks.
Args:
cancelled: threading.Event for thread-safe cancellation signaling
"""
import av
import requests
from av.audio.resampler import AudioResampler
import time
temp_dir = tempfile.mkdtemp()
input_path = None
output_path = None
last_check = time.time()
try:
logger.info("Downloading track for padding")
response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT)
response.raise_for_status()
input_path = os.path.join(temp_dir, "track.webm")
total_bytes = 0
chunk_count = 0
with open(input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
chunk_count += 1
# Check for cancellation every arbitrary amount of chunks
if chunk_count % 12 == 0:
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during download, exiting early")
return {"size": 0, "cancelled": True}
last_check = now
logger.info(f"Track downloaded: {total_bytes} bytes")
if cancelled.is_set():
logger.info("Cancelled after download, exiting early")
return {"size": 0, "cancelled": True}
# Apply padding using PyAV
output_path = os.path.join(temp_dir, "padded.webm")
delay_ms = math.floor(req.start_time_seconds * 1000)
logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV")
in_container = av.open(input_path)
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise ValueError("No audio stream in input")
with av.open(output_path, "w", format="webm") as out_container:
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
for frame in in_container.decode(in_stream):
# Check for cancellation periodically
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during processing, exiting early")
in_container.close()
return {"size": 0, "cancelled": True}
last_check = now
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
in_container.close()
file_size = os.path.getsize(output_path)
logger.info(f"Padding complete: {file_size} bytes")
logger.info("Uploading padded track to S3")
with open(output_path, "rb") as f:
upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT)
upload_response.raise_for_status()
logger.info(f"Upload complete: {file_size} bytes")
return {"size": file_size}
finally:
if input_path and os.path.exists(input_path):
try:
os.unlink(input_path)
except Exception as e:
logger.warning(f"Failed to cleanup input file: {e}")
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except Exception as e:
logger.warning(f"Failed to cleanup output file: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {e}")
return web_app

View File

@@ -0,0 +1,545 @@
# Daily.co and Reflector Data Model
This document explains the data model relationships between Daily.co's API concepts and Reflector's database schema, clarifying common sources of confusion.
---
## Table of Contents
1. [Core Entities Overview](#core-entities-overview)
2. [Daily.co vs Reflector Terminology](#dailyco-vs-reflector-terminology)
3. [Entity Relationships](#entity-relationships)
4. [Recording Multiplicity](#recording-multiplicity)
5. [Session Identifiers Explained](#session-identifiers-explained)
6. [Time-Based Matching](#time-based-matching)
7. [Multitrack Recording Details](#multitrack-recording-details)
8. [Verified Example](#verified-example)
---
## Core Entities Overview
### Reflector's Four Primary Entities
```
┌─────────────────────────────────────────────────────────────────┐
│ Room (Reflector) │
│ - Persistent meeting template │
│ - User-created configuration │
│ - Example: "team-standup" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Meeting (Reflector) │
│ - Single session instance │
│ - Creates NEW Daily.co room with timestamp │
│ - Example: "team-standup-20260115120000" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Recording (Reflector + Daily.co) │
│ - One segment of audio/video │
│ - New recording created on stop/restart │
│ - track_keys: JSON array of S3 file paths │
└────────────────────┬────────────────────────────────────────────┘
│ 1:1
┌─────────────────────────────────────────────────────────────────┐
│ Transcript (Reflector) │
│ - Processed audio with transcription │
│ - Diarization, summaries, topics │
│ - One transcript per recording │
└─────────────────────────────────────────────────────────────────┘
```
---
## Daily.co vs Reflector Terminology
### Room
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Virtual meeting space on Daily.co platform | User-created meeting template/configuration |
| **Lifetime** | Configurable expiration | Persistent until user deletes |
| **Creation** | API call for each meeting | Pre-created by user once |
| **Reuse** | Can host multiple sessions | Generates new Daily.co room per meeting |
| **Name Format** | `room-name` (reusable) | `room-name` (base identifier) |
| **Timestamping** | Not required | Meeting adds timestamp: `{name}-YYYYMMDDHHMMSS` |
**Example:**
```
Reflector Room: "daily-private-igor" (persistent config)
↓ starts meeting
Daily.co Room: "daily-private-igor-20260110042117"
```
### Meeting
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Session that starts when first participant joins | Explicit database record of a session |
| **Identifier** | `mtgSessionId` (generated by Daily.co) | `meeting.id` (UUID, generated by Reflector) |
| **Creation** | Implicit (first participant join) | Explicit API call before participants join |
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Audio/video files on S3 | Metadata + processing status |
| **Types** | `cloud` (composed video), `raw-tracks` (multitrack) | Stores references + `track_keys` array |
| **Multiplicity** | One recording object per start/stop cycle | One DB row per Daily.co recording object |
| **Identifier** | Daily.co `recording_id` | Same `recording_id` (stored in DB) |
| **Multitrack** | Array of `.webm` files (one per participant) | `track_keys` JSON array with S3 paths |
| **Linkage** | Via `room_name` + `start_ts` | FK `meeting_id` (set via time-based match) |
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
---
## Entity Relationships
### Database Schema Relationships
```sql
-- Simplified schema showing key relationships
TABLE room (
id VARCHAR PRIMARY KEY,
name VARCHAR UNIQUE,
platform VARCHAR -- 'whereby' | 'daily'
)
TABLE meeting (
id VARCHAR PRIMARY KEY,
room_id VARCHAR REFERENCES room(id) ON DELETE CASCADE, -- nullable
room_name VARCHAR, -- Daily.co room name (timestamped)
start_date TIMESTAMP,
platform VARCHAR
)
TABLE recording (
id VARCHAR PRIMARY KEY, -- Daily.co recording_id
meeting_id VARCHAR, -- FK to meeting (set via time-based match)
bucket_name VARCHAR,
object_key VARCHAR, -- S3 prefix
track_keys JSON, -- Array of S3 keys for multitrack
recorded_at TIMESTAMP
)
TABLE transcript (
id VARCHAR PRIMARY KEY,
recording_id VARCHAR, -- nullable FK
meeting_id VARCHAR, -- nullable FK
room_id VARCHAR, -- nullable FK
participants JSON, -- [{id, speaker, name, user_id}, ...]
title VARCHAR,
long_summary VARCHAR,
webvtt TEXT
)
```
**Relationship Cardinalities:**
```
1 Room → N Meetings
1 Meeting → N Recordings (common: 1-21 recordings per meeting)
1 Recording → 1 Transcript
1 Meeting → N Transcripts (via recordings)
```
---
## Recording Multiplicity
### Why Multiple Recordings Per Meeting?
Daily.co creates a **new recording object** (new ID, new files) whenever recording stops and restarts. This happens due to:
1. **Manual stop/start** - User clicks stop, then start recording again
2. **Network reconnection** - Participant drops, reconnects → triggers restart
3. **Participant rejoin** - Last participant leaves, new one joins → new session
---
## Session Identifiers Explained
### The Hidden Entity: Daily.co Meeting Session
Daily.co has an **implicit ephemeral entity** that sits between Room and Recording:
```
Daily.co Room: "daily-private-igor-20260110042117"
├─ Daily.co Meeting Session #1 (mtgSessionId: c04334de...)
│ └─ Recording #3 (f4a50f94) - 4s, 1 track
└─ Daily.co Meeting Session #2 (mtgSessionId: 4cdae3c0...)
├─ Recording #2 (b0fa94da) - 80s, 2 tracks ← recording stopped
└─ Recording #1 (05edf519) - 62s, 1 track ← then restarted
```
**Daily.co Meeting Session:**
- **Lifecycle:** Starts when first participant joins, ends when last participant leaves
- **Identifier:** `mtgSessionId` (generated by Daily.co)
- **Persistence:** Ephemeral - new ID if everyone leaves and someone rejoins
- **Relationship:** 1 Session → N Recordings (if recording stops/restarts during session)
**Key Insight:** Multiple recordings can share the same `mtgSessionId` if recording was stopped and restarted while participants remained connected.
### mtgSessionId (Meeting Session Identifier)
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
**Reflector Tracking:** `daily_participant_session` table
```sql
TABLE daily_participant_session (
id VARCHAR PRIMARY KEY, -- {meeting_id}:{user_id}:{joined_at_ms}
meeting_id VARCHAR,
session_id VARCHAR, -- From webhook (per-participant)
user_id VARCHAR,
user_name VARCHAR,
joined_at TIMESTAMP,
left_at TIMESTAMP
)
```
---
## Time-Based Matching
### Problem Statement
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response (mtgSessionId can be null OR present):**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null // ← Often null (unreliable)
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
}
```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
---
## Multitrack Recording Details
### track_keys JSON Array
**Schema:** `recording.track_keys` (JSON, nullable)
```sql
-- Example recording with 2 audio tracks
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"track_keys": [
"igormonadical/daily-private-igor-20260110042117/1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565",
"igormonadical/daily-private-igor-20260110042117/1768018896877-9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-1768018899286"
]
}
```
**Semantics:**
- `track_keys = null` → Not multitrack (cloud recording)
- `track_keys = []` → Multitrack recording with no audio captured (silence/muted)
- `track_keys = [...]` → Multitrack with N audio tracks
**Property:** `recording.is_multitrack` (Python)
```python
@property
def is_multitrack(self) -> bool:
return self.track_keys is not None and len(self.track_keys) > 0
```
### Track Filename Format
Daily.co multitrack filenames encode timing and participant information:
**Format:** `{recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}`
**Example:** `1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565`
**Parsed Components:**
```python
# reflector/utils/daily.py:25-60
class DailyRecordingFilename(NamedTuple):
recording_start_ts: int # 1768018896877 (milliseconds)
participant_id: str # 890c0eae-e186-4534-a7bd-7c794b7d6d7f
track_start_ts: int # 1768018914565 (milliseconds)
```
**Note:** Browser downloads from S3 add `.webm` extension due to MIME headers, but S3 object keys have no extension.
### Video Track Filtering
Daily.co API returns both audio and video tracks, but Reflector only processes audio.
**Filtering Logic:** `reflector/worker/process.py:660`
```python
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
```
**Example API Response:**
```json
{
"tracks": [
{"type": "audio", "s3Key": "...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...cam-video-1768018897095"} Filtered out
]
}
```
**Result:** Only 2 audio tracks stored in `recording.track_keys`, video track discarded.
**Rationale:** Reflector is audio transcription system; video not needed for processing.
### Track-to-Participant Mapping
**Flow:**
1. Daily.co webhook/polling provides `track_keys` array
2. Each track filename contains `participant_id`
3. Reflector queries Daily.co API: `GET /meetings/{mtgSessionId}/participants`
4. Maps `participant_id``user_name`
5. Stores in `transcript.participants` JSON:
```json
[
{
"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f",
"speaker": 0,
"name": "test2",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
},
{
"id": "9660e8e9-4297-4f17-951d-0b2bf2401803",
"speaker": 1,
"name": "test",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
}
]
```
**Diarization:** Multitrack recordings don't need speaker diarization AI — speaker identity comes from separate audio tracks.
---
## Example
### Meeting: daily-private-igor-20260110042117
**Context:** User conducted test recording with start/stop cycles, producing 3 recordings.
#### Database State
```sql
-- Meeting
id: 034804b8-cee2-4fb4-94d7-122f6f068a61
room_name: daily-private-igor-20260110042117
start_date: 2026-01-10 04:21:17+00
```
#### Daily.co API Response
```json
[
{
"id": "f4a50f94-053c-4f9d-bda6-78ad051fbc36",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018885,
"duration": 4,
"status": "finished",
"mtgSessionId": "c04334de-42a0-4c2a-96be-a49b068dca85",
"tracks": [
{"type": "audio", "s3Key": "...62e8f3ae...cam-audio-1768018885417"}
]
},
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"duration": 80,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...9660e8e9...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...9660e8e9...cam-video-1768018897095"}
]
},
{
"id": "05edf519-9048-4b49-9a75-73e9826fd950",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018914,
"duration": 62,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914948"}
]
}
]
```
**Key Observations:**
- 3 recording objects returned by Daily.co
- 2 different `mtgSessionId` values (2 different meeting instances)
- Recording #2 has 3 tracks (2 audio + 1 video)
- Timestamps: 1768018885 → 1768018896 (+11s) → 1768018914 (+18s)
#### Reflector Database
**Recordings:**
```
┌──────────────────────────────────────┬──────────────┬────────────┬──────────────────────────────────────┐
│ id │ track_count │ duration │ mtgSessionId │
├──────────────────────────────────────┼──────────────┼────────────┼──────────────────────────────────────┤
│ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 │ 4s │ c04334de-42a0-4c2a-96be-a49b068dca85 │
│ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (video=0) │ 80s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
│ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 │ 62s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
└──────────────────────────────────────┴──────────────┴────────────┴──────────────────────────────────────┘
```
**Note:** Recording #2 has 2 audio tracks (video filtered out), not 3.
**Transcripts:**
```
┌──────────────────────────────────────┬──────────────────────────────────────┬──────────────┬──────────────────────────────────────────────┐
│ id │ recording_id │ participants │ title │
├──────────────────────────────────────┼──────────────────────────────────────┼──────────────┼──────────────────────────────────────────────┤
│ 17149b1f-546c-4837-80a0-f8140bd16592 │ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 (test) │ (empty - no speech) │
│ 49801332-3222-4c11-bdb2-375479fc87f2 │ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (test, │ "Examination and Validation Procedures │
│ │ │ test2) │ Review" │
│ e5271e12-20fb-42d2-b5a8-21438abadef9 │ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 (test2) │ "Technical Sound Check Procedure Review" │
└──────────────────────────────────────┴──────────────────────────────────────┴──────────────┴──────────────────────────────────────────────┘
```
**Transcript Content:**
*Transcript #1* (17149b1f): Empty WebVTT (no audio captured)
*Transcript #2* (49801332):
```webvtt
WEBVTT
00:00:03.109 --> 00:00:05.589
<v Speaker1>Test, test, test. Test, test, test, test, test.
00:00:19.829 --> 00:00:22.710
<v Speaker0>Test test test test test test test test test test test.
```
**AI-Generated Summary:**
> "The meeting focused on the critical importance of rigorous testing for ensuring reliability and quality, with test and test2 emphasizing the need for a structured testing framework and meticulous documentation..."
*Transcript #3* (e5271e12):
```webvtt
WEBVTT
00:00:02.029 --> 00:00:04.910
<v Speaker0>Test, test, test, test, test, test, test, test, test, test, test.
```
#### Validation: track_keys → participants
**Recording #2 (b0fa94da) tracks:**
```json
[
".../890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-...",
".../9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-..."
]
```
**Transcript #2 (49801332) participants:**
```json
[
{"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", "speaker": 0, "name": "test2"},
{"id": "9660e8e9-4297-4f17-951d-0b2bf2401803", "speaker": 1, "name": "test"}
]
```
### Data Flow
```
Daily.co API: 3 recordings
Polling: _poll_raw_tracks_recordings()
Worker: process_multitrack_recording.delay() × 3
DB: 3 recording rows created
Pipeline: Audio processing + transcription × 3
DB: 3 transcript rows created (1:1 with recordings)
UI: User sees 3 separate transcripts
```
**Result:** ✅ 1:1 Recording → Transcript relationship maintained.
---
**Document Version:** 1.1
**Last Updated:** 2026-01-20
**Data Source:** Production database + Daily.co API inspection + empirical testing
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior

View File

@@ -0,0 +1,40 @@
"""add cloud recording support
Revision ID: 1b1e6a6fc465
Revises: bd3a729bb379
Create Date: 2026-01-09 17:17:33.535620
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1b1e6a6fc465"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column("daily_composed_video_s3_key", sa.String(), nullable=True)
)
batch_op.add_column(
sa.Column("daily_composed_video_duration", sa.Integer(), nullable=True)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("daily_composed_video_duration")
batch_op.drop_column("daily_composed_video_s3_key")
# ### end Alembic commands ###

View File

@@ -0,0 +1,44 @@
"""replace_use_hatchet_with_use_celery
Revision ID: 80beb1ea3269
Revises: bd3a729bb379
Create Date: 2026-01-20 16:26:25.555869
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "80beb1ea3269"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_hatchet")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_celery")

View File

@@ -0,0 +1,23 @@
"""merge cloud recording and celery heads
Revision ID: e69f08ead8ea
Revises: 1b1e6a6fc465, 80beb1ea3269
Create Date: 2026-01-21 21:39:10.326841
"""
from typing import Sequence, Union
# revision identifiers, used by Alembic.
revision: str = "e69f08ead8ea"
down_revision: Union[str, None] = ("1b1e6a6fc465", "80beb1ea3269")
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

@@ -0,0 +1,67 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -8,7 +8,7 @@ readme = "README.md"
dependencies = [
"aiohttp>=3.9.0",
"aiohttp-cors>=0.7.0",
"av>=10.0.0",
"av>=15.0.0",
"requests>=2.31.0",
"aiortc>=1.5.0",
"sortedcontainers>=2.4.0",

View File

@@ -3,7 +3,7 @@ Daily.co API Module
"""
# Client
from .client import DailyApiClient, DailyApiError
from .client import DailyApiClient, DailyApiError, RecordingType
# Request models
from .requests import (
@@ -64,6 +64,7 @@ __all__ = [
# Client
"DailyApiClient",
"DailyApiError",
"RecordingType",
# Requests
"CreateRoomRequest",
"RoomProperties",

View File

@@ -7,7 +7,8 @@ Reference: https://docs.daily.co/reference/rest-api
"""
from http import HTTPStatus
from typing import Any
from typing import Any, Literal
from uuid import UUID
import httpx
import structlog
@@ -32,6 +33,8 @@ from .responses import (
logger = structlog.get_logger(__name__)
RecordingType = Literal["cloud", "raw-tracks"]
class DailyApiError(Exception):
"""Daily.co API error with full request/response context."""
@@ -395,6 +398,38 @@ class DailyApiClient:
return [RecordingResponse(**r) for r in data["data"]]
async def start_recording(
self,
room_name: NonEmptyString,
recording_type: RecordingType,
instance_id: UUID,
) -> dict[str, Any]:
"""Start recording via REST API.
Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start
Args:
room_name: Daily.co room name
recording_type: Recording type
instance_id: UUID for this recording session
Returns:
Recording start confirmation from Daily.co API
Raises:
DailyApiError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/rooms/{room_name}/recordings/start",
headers=self.headers,
json={
"type": recording_type,
"instanceId": str(instance_id),
},
)
return await self._handle_response(response, "start_recording")
# ============================================================================
# MEETING TOKENS
# ============================================================================

View File

@@ -0,0 +1,37 @@
"""
Daily.co recording instanceId generation utilities.
Deterministic instance ID generation for cloud and raw-tracks recordings.
MUST match frontend logic
"""
from uuid import UUID, uuid5
from reflector.utils.string import NonEmptyString
# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching
RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for cloud recording.
Cloud recordings use meeting ID directly as instanceId.
This ensures each meeting has one unique cloud recording.
"""
return UUID(meeting_id)
def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for raw-tracks recording.
Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure
different instanceId from cloud while remaining deterministic.
Daily.co requires cloud and raw-tracks to have different instanceIds
for concurrent recording.
"""
return uuid5(RAW_TRACKS_NAMESPACE, meeting_id)

View File

@@ -0,0 +1,56 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -88,13 +88,6 @@ class MeetingTokenProperties(BaseModel):
is_owner: bool = Field(
default=False, description="Grant owner privileges to token holder"
)
start_cloud_recording: bool = Field(
default=False, description="Automatically start cloud recording on join"
)
start_cloud_recording_opts: dict | None = Field(
default=None,
description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)",
)
enable_recording_ui: bool = Field(
default=True, description="Show recording controls in UI"
)

View File

@@ -116,6 +116,7 @@ class RecordingS3Info(BaseModel):
bucket_name: NonEmptyString
bucket_region: NonEmptyString
key: NonEmptyString | None = None
endpoint: NonEmptyString | None = None
@@ -132,6 +133,9 @@ class RecordingResponse(BaseModel):
id: NonEmptyString = Field(description="Recording identifier")
room_name: NonEmptyString = Field(description="Room where recording occurred")
start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)")
type: Literal["cloud", "raw-tracks"] | None = Field(
None, description="Recording type (may be missing from API)"
)
status: RecordingStatus = Field(
description="Recording status ('in-progress' or 'finished')"
)
@@ -145,6 +149,9 @@ class RecordingResponse(BaseModel):
None, description="Token for sharing recording"
)
s3: RecordingS3Info | None = Field(None, description="S3 bucket information")
s3key: NonEmptyString | None = Field(
None, description="S3 key for cloud recordings (top-level field)"
)
tracks: list[DailyTrack] = Field(
default_factory=list,
description="Track list for raw-tracks recordings (always array, never null)",

View File

@@ -26,6 +26,7 @@ def get_database() -> databases.Database:
# import models
import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa

View File

@@ -0,0 +1,111 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -9,7 +9,7 @@ from reflector.db import get_database, metadata
from reflector.db.rooms import Room
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils import generate_uuid4
from reflector.utils.string import assert_equal
from reflector.utils.string import NonEmptyString, assert_equal
meetings = sa.Table(
"meeting",
@@ -63,6 +63,9 @@ meetings = sa.Table(
nullable=False,
server_default=assert_equal(WHEREBY_PLATFORM, "whereby"),
),
# Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby
sa.Column("daily_composed_video_s3_key", sa.String, nullable=True),
sa.Column("daily_composed_video_duration", sa.Integer, nullable=True),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
)
@@ -110,6 +113,9 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform = WHEREBY_PLATFORM
# Daily.co composed video (Brady Bunch grid) - Daily.co only
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class MeetingController:
@@ -171,6 +177,12 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_name_all(self, room_name: str) -> list[Meeting]:
"""Get all meetings for a room name (not just most recent)."""
query = meetings.select().where(meetings.c.room_name == room_name)
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -339,6 +351,27 @@ class MeetingConsentController:
result = await get_database().fetch_one(query)
return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController()

View File

@@ -4,6 +4,7 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
@@ -30,14 +31,13 @@ recordings = sa.Table(
class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4)
bucket_name: str
# for single-track
object_key: str
recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed"] = "pending"
status: Literal["pending", "processing", "completed", "failed", "orphan"] = (
"pending"
)
meeting_id: str | None = None
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio
track_keys: list[str] | None = None
@property
@@ -71,7 +71,6 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:
return []
@@ -90,9 +89,12 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python.
"""
from reflector.db.transcripts import (
transcripts, # noqa: PLC0415 cyclic import
)
# INLINE IMPORT REQUIRED: Circular dependency
# - recordings.py needs transcripts table for JOIN query
# - transcripts.py imports recordings_controller
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
query = (
recordings.select()
@@ -110,5 +112,27 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController()

View File

@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
nullable=False,
),
sqlalchemy.Column(
"use_hatchet",
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
@@ -97,7 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_hatchet: bool = False
use_celery: bool = False
skip_consent: bool = False

View File

@@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_MEDIUM = (
300 # Single LLM calls, waveform generation (5m for slow LLM responses)
)
TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.logger import logger
from reflector.settings import settings
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(

View File

@@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger
from reflector.settings import settings
SLOTS = 10
WORKER_NAME = "llm-worker-pool"
@@ -19,10 +18,6 @@ POOL = "llm-io"
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(

View File

@@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptParticipant,
transcripts_controller,
)
@@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
"duration": duration_ms,
},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
@@ -1095,7 +1107,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task(
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
)
@@ -1108,12 +1120,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files
if created_padded_files:
@@ -1133,7 +1141,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText,
transcripts_controller,
)
@@ -1142,34 +1149,26 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"TRANSCRIPT",
TranscriptText(
text=merged_transcript.text,
translation=merged_transcript.translation,
text="",
translation=None,
),
logger=logger,
)
# Save duration and clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks
# Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update(
transcript,
{
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume
},
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log(
@@ -1347,14 +1346,34 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
f"participants={len(payload.transcript.participants)})"
)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
try:
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=True, response_code=response.status_code)
except httpx.HTTPStatusError as e:
ctx.log(
f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway"
)
return WebhookResult(
webhook_sent=False, response_code=e.response.status_code
)
except httpx.ConnectError as e:
ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except httpx.TimeoutException as e:
ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except Exception as e:
ctx.log(f"send_webhook unexpected error, continuing anyway: {e}")
return WebhookResult(webhook_sent=False)

View File

@@ -0,0 +1,165 @@
"""
Hatchet child workflow: PaddingWorkflow
Handles individual audio track padding via Modal.com backend.
"""
from datetime import timedelta
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class PaddingInput(BaseModel):
"""Input for individual track padding."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
hatchet = HatchetClientManager.get_client()
padding_workflow = hatchet.workflow(
name="PaddingWorkflow", input_validator=PaddingInput
)
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
"""Pad audio track with silence based on WebM container start_time."""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
# Extract start_time to determine if padding needed
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except (ValueError, TypeError, OverflowError) as e:
ctx.log(
f"pad_track: track {input.track_index}, duration error: {str(e)}"
)
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
import httpx # noqa: PLC0415
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
try:
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track: Modal returned size={file_size}")
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal padding HTTP error",
transcript_id=input.transcript_id,
track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error(
"[Hatchet] pad_track failed",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise

View File

@@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
@@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class TrackInput(BaseModel):
@@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
# Presign PUT URL for output (Modal uploads directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(

View File

@@ -0,0 +1,113 @@
"""
Modal.com backend for audio padding.
"""
import asyncio
import os
import httpx
from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
class AudioPaddingModalProcessor:
"""Audio padding processor using Modal.com CPU backend via HTTP."""
def __init__(
self, padding_url: str | None = None, modal_api_key: str | None = None
):
self.padding_url = padding_url or os.getenv("PADDING_URL")
if not self.padding_url:
raise ValueError(
"PADDING_URL required to use AudioPaddingModalProcessor. "
"Set PADDING_URL environment variable or pass padding_url parameter."
)
self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY")
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence via Modal backend.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Sending Modal padding HTTP request")
url = f"{self.padding_url}/pad"
headers = {}
if self.modal_api_key:
headers["Authorization"] = f"Bearer {self.modal_api_key}"
try:
async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client:
response = await client.post(
url,
headers=headers,
json={
"track_url": track_url,
"output_url": output_url,
"start_time_seconds": start_time_seconds,
"track_index": track_index,
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
log.error(
"Modal padding API error",
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()
# Check if work was cancelled
if result.get("cancelled"):
log.warning("Modal padding was cancelled by disconnect detection")
raise asyncio.CancelledError(
"Padding cancelled due to client disconnect"
)
log.info("Modal padding complete", size=result["size"])
return PaddingResponse(**result)
except asyncio.CancelledError:
log.warning(
"Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)"
)
raise
except httpx.TimeoutException as e:
log.error("Modal padding timeout", error=str(e), exc_info=True)
raise Exception(f"Modal padding timeout: {e}") from e
except httpx.HTTPStatusError as e:
log.error("Modal padding HTTP error", error=str(e), exc_info=True)
raise Exception(f"Modal padding HTTP error: {e}") from e
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise

View File

@@ -319,21 +319,6 @@ class ICSSyncService:
calendar = self.fetch_service.parse_ics(ics_content)
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
if room.ics_last_etag == content_hash:
logger.info("No changes in ICS for room", room_id=room.id)
room_url = f"{settings.UI_BASE_URL}/{room.name}"
events, total_events = self.fetch_service.extract_room_events(
calendar, room.name, room_url
)
return {
"status": SyncStatus.UNCHANGED,
"hash": content_hash,
"events_found": len(events),
"total_events": total_events,
"events_created": 0,
"events_updated": 0,
"events_deleted": 0,
}
# Extract matching events
room_url = f"{settings.UI_BASE_URL}/{room.name}"
@@ -371,6 +356,44 @@ class ICSSyncService:
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
return time_since_sync.total_seconds() >= room.ics_fetch_interval
def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool:
"""Check if event data has changed by comparing relevant fields.
IMPORTANT: When adding fields to CalendarEvent/EventData, update this method
and the _COMPARED_FIELDS set below for runtime validation.
"""
# Fields that come from ICS and should trigger updates when changed
_COMPARED_FIELDS = {
"title",
"description",
"start_time",
"end_time",
"location",
"attendees",
"ics_raw_data",
}
# Runtime exhaustiveness check: ensure we're comparing all EventData fields
event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"}
if event_data_fields != _COMPARED_FIELDS:
missing = event_data_fields - _COMPARED_FIELDS
extra = _COMPARED_FIELDS - event_data_fields
raise RuntimeError(
f"_event_data_changed() field mismatch: "
f"missing={missing}, extra={extra}. "
f"Update the comparison logic when adding/removing fields."
)
return (
existing.title != new_data["title"]
or existing.description != new_data["description"]
or existing.start_time != new_data["start_time"]
or existing.end_time != new_data["end_time"]
or existing.location != new_data["location"]
or existing.attendees != new_data["attendees"]
or existing.ics_raw_data != new_data["ics_raw_data"]
)
async def _sync_events_to_database(
self, room_id: str, events: list[EventData]
) -> SyncStats:
@@ -386,11 +409,14 @@ class ICSSyncService:
)
if existing:
updated += 1
# Only count as updated if data actually changed
if self._event_data_changed(existing, event_data):
updated += 1
await calendar_events_controller.upsert(calendar_event)
else:
created += 1
await calendar_events_controller.upsert(calendar_event)
await calendar_events_controller.upsert(calendar_event)
current_ics_uids.append(event_data["ics_uid"])
# Soft delete events that are no longer in calendar

View File

@@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
@@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -102,8 +101,8 @@ async def validate_transcript_for_processing(
if transcript.locked:
return ValidationLocked(detail="Recording is locked")
# hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
# Check if recording is ready for processing
if transcript.status == "idle" and not transcript.workflow_run_id:
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks
@@ -116,7 +115,8 @@ async def validate_transcript_for_processing(
):
return ValidationAlreadyScheduled(detail="already running")
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
# Check Hatchet workflow status if workflow_run_id exists
if transcript.workflow_run_id:
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
@@ -181,19 +181,16 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
room_forces_hatchet = room.use_hatchet if room else False
use_celery = room.use_celery if room else False
# Start durable workflow if enabled (Hatchet)
# and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
use_hatchet = not use_celery
if room_forces_hatchet:
if use_celery:
logger.info(
"Room forces Hatchet workflow",
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
@@ -215,24 +212,39 @@ async def dispatch_transcript_processing(
)
return None
else:
# Workflow exists but can't replay (CANCELLED, COMPLETED, etc.)
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)

View File

@@ -98,6 +98,10 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Padding (Modal.com backend)
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None
# Sentry
SENTRY_DSN: str | None = None
@@ -158,19 +162,10 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
# Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings()

View File

@@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin
"""
# Opus codec settings
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from uuid import UUID
from reflector.dailyco_api import (
CreateMeetingTokenRequest,
@@ -12,9 +13,11 @@ from reflector.dailyco_api import (
RoomProperties,
verify_webhook_signature,
)
from reflector.dailyco_api import RecordingType as DailyRecordingType
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room
from reflector.logger import logger
from reflector.storage import get_dailyco_storage
@@ -58,10 +61,9 @@ class DailyClient(VideoPlatformClient):
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif (
room.recording_type == self.RECORDING_CLOUD
): # daily "cloud" is not our "cloud"
enable_recording = "raw-tracks"
elif room.recording_type == self.RECORDING_CLOUD:
# Don't set enable_recording - recordings started via REST API (not auto-start)
enable_recording = None
properties = RoomProperties(
enable_recording=enable_recording,
@@ -106,8 +108,6 @@ class DailyClient(VideoPlatformClient):
Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored.
"""
from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
return []
@@ -179,21 +179,14 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
max_recording_duration_seconds: int | None = None,
) -> NonEmptyString:
start_cloud_recording_opts = None
if start_cloud_recording and max_recording_duration_seconds:
start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds}
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=start_cloud_recording,
start_cloud_recording_opts=start_cloud_recording_opts,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
@@ -201,6 +194,23 @@ class DailyClient(VideoPlatformClient):
result = await self._api_client.create_meeting_token(request)
return result.token
async def start_recording(
self,
room_name: DailyRoomName,
recording_type: DailyRecordingType,
instance_id: UUID,
) -> dict:
"""Start recording via Daily.co REST API.
Args:
instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector)
"""
return await self._api_client.start_recording(
room_name=room_name,
recording_type=recording_type,
instance_id=instance_id,
)
async def close(self):
"""Clean up API client resources."""
await self._api_client.close()

View File

@@ -1,4 +1,6 @@
import json
import os
from datetime import datetime, timezone
from typing import assert_never
from fastapi import APIRouter, HTTPException, Request
@@ -12,13 +14,17 @@ from reflector.dailyco_api import (
RecordingReadyEvent,
RecordingStartedEvent,
)
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import (
poll_daily_room_presence_task,
process_multitrack_recording,
store_cloud_recording,
)
router = APIRouter()
@@ -174,46 +180,127 @@ async def _handle_recording_started(event: RecordingStartedEvent):
async def _handle_recording_ready(event: RecordingReadyEvent):
room_name = event.payload.room_name
recording_id = event.payload.recording_id
tracks = event.payload.tracks
if not tracks:
logger.warning(
"recording.ready-to-download: missing tracks",
room_name=room_name,
recording_id=recording_id,
payload=event.payload,
)
return
recording_type = event.payload.type
logger.info(
"Recording ready for download",
room_name=room_name,
recording_id=recording_id,
num_tracks=len(tracks),
recording_type=recording_type,
platform="daily",
)
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
logger.error(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording"
)
logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured")
return
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
if recording_type == "cloud":
await store_cloud_recording(
recording_id=recording_id,
room_name=room_name,
s3_key=event.payload.s3_key,
duration=event.payload.duration,
start_ts=event.payload.start_ts,
source="webhook",
)
logger.info(
"Recording webhook queuing processing",
recording_id=recording_id,
room_name=room_name,
)
elif recording_type == "raw-tracks":
tracks = event.payload.tracks
if not tracks:
logger.warning(
"raw-tracks recording: missing tracks array",
room_name=room_name,
recording_id=recording_id,
)
return
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
)
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info(
"Raw-tracks recording queuing processing (webhook)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys),
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
recording_start_ts=event.payload.start_ts,
)
else:
logger.warning(
"Unknown recording type",
recording_type=recording_type,
recording_id=recording_id,
)
async def _handle_recording_error(event: RecordingErrorEvent):

View File

@@ -1,16 +1,29 @@
import json
import logging
from datetime import datetime, timezone
from typing import Annotated, Optional
from typing import Annotated, Any, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -73,3 +86,106 @@ async def meeting_deactivate(
await meetings_controller.update_meeting(meeting_id, is_active=False)
return {"status": "success", "meeting_id": meeting_id}
class StartRecordingRequest(BaseModel):
type: RecordingType
instanceId: UUID
@router.post("/meetings/{meeting_id}/recordings/start")
async def start_recording(
meeting_id: NonEmptyString, body: StartRecordingRequest
) -> dict[str, Any]:
"""Start cloud or raw-tracks recording via Daily.co REST API.
Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
Uses different instanceIds for cloud vs raw-tracks (same won't work)
Note: No authentication required - anonymous users supported. TODO this is a DOS vector
"""
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
try:
client = create_platform_client("daily")
result = await client.start_recording(
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
recording_id = result["id"]
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
try:
error_body = json.loads(e.response_body)
error_info = error_body.get("info", "")
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
logger.info(
f"{body.type} recording already active (started by another participant)",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -73,6 +73,8 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class CreateRoom(BaseModel):
@@ -586,7 +588,6 @@ async def rooms_join_meeting(
)
token = await client.create_meeting_token(
meeting.room_name,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=enable_recording_ui,
user_id=user_id,
is_owner=user_id == room.user_id,

View File

@@ -6,6 +6,11 @@ from celery.schedules import crontab
from reflector.settings import settings
logger = structlog.get_logger(__name__)
# Polling intervals (seconds)
# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery)
POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
@@ -44,7 +49,7 @@ else:
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
},
"trigger_daily_reconciliation": {
"task": "reflector.worker.process.trigger_daily_reconciliation",

View File

@@ -1,8 +1,7 @@
import json
import os
import re
from datetime import datetime, timezone
from typing import List
from typing import List, Literal
from urllib.parse import unquote
import av
@@ -13,10 +12,12 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
@@ -42,6 +43,7 @@ from reflector.utils.daily import (
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
parse_whereby_recording_filename,
@@ -175,13 +177,18 @@ async def process_multitrack_recording(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process raw-tracks (multitrack) recording from Daily.co.
"""
logger.info(
"Processing multitrack recording",
bucket=bucket_name,
room_name=daily_room_name,
recording_id=recording_id,
provided_keys=len(track_keys),
recording_start_ts=recording_start_ts,
)
if not track_keys:
@@ -212,7 +219,7 @@ async def process_multitrack_recording(
)
await _process_multitrack_recording_inner(
bucket_name, daily_room_name, recording_id, track_keys
bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts
)
@@ -221,26 +228,47 @@ async def _process_multitrack_recording_inner(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""Inner function containing the actual processing logic."""
"""
Process multitrack recording.
tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
Recording must already exist with meeting_id set (created by webhook/polling before queueing).
"""
# Get recording (must exist - created by webhook/polling)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
logger.error(
"Recording not found - should have been created by webhook/polling",
recording_id=recording_id,
)
return
meeting = await meetings_controller.get_by_room_name(daily_room_name)
if not recording.meeting_id:
logger.error(
"Recording has no meeting_id - orphan should not be queued",
recording_id=recording_id,
)
return
# Get meeting
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Meeting not found for recording",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -248,31 +276,6 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not meeting:
raise Exception(f"Meeting not found: {room_name_base}")
logger.info(
"Found existing Meeting for recording",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
# else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
transcript = await transcripts_controller.add(
@@ -287,11 +290,12 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
use_celery = room and room.use_celery
use_hatchet = not use_celery
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
if use_celery:
logger.info(
"Room forces Hatchet workflow",
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
@@ -337,9 +341,11 @@ async def poll_daily_recordings():
"""Poll Daily.co API for recordings and process missing ones.
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
and queues processing for recordings not already in DB.
and stores/queues missing recordings:
- Cloud recordings: Store S3 key in meeting table
- Raw-tracks recordings: Queue multitrack processing
For each missing recording, uses audio tracks from API response.
Acts as fallback when webhooks active, primary discovery when webhooks unavailable.
Worker-level locking provides idempotency (see process_multitrack_recording).
"""
@@ -380,71 +386,258 @@ async def poll_daily_recordings():
)
return
recording_ids = [rec.id for rec in finished_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
# Separate cloud and raw-tracks recordings
cloud_recordings = []
raw_tracks_recordings = []
for rec in finished_recordings:
if rec.type:
# Daily.co API returns null type - make sure this assumption stays
# If this logs, Daily.co API changed - we can remove inference logic.
recording_type = rec.type
logger.warning(
"Recording has explicit type field from Daily.co API (unexpected, API may have changed)",
recording_id=rec.id,
room_name=rec.room_name,
recording_type=recording_type,
has_s3key=bool(rec.s3key),
tracks_count=len(rec.tracks),
)
else:
# DAILY.CO API LIMITATION:
# GET /recordings response does NOT include type field.
# Daily.co docs mention type field exists, but API never returns it.
# Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None.
#
# This is not a recent API change - Daily.co has never returned type.
# Must infer from structural properties.
#
# Inference heuristic (reliable for finished recordings):
# - Has tracks array → raw-tracks
# - Has s3key but no tracks → cloud
# - Neither → failed/incomplete recording
if len(rec.tracks) > 0:
recording_type = "raw-tracks"
elif rec.s3key and len(rec.tracks) == 0:
recording_type = "cloud"
else:
logger.warning(
"Recording has no type, no s3key, and no tracks - likely failed recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
duration=rec.duration,
mtg_session_id=rec.mtgSessionId,
)
continue
missing_recordings = [
rec for rec in finished_recordings if rec.id not in existing_ids
]
if recording_type == "cloud":
cloud_recordings.append(rec)
else:
raw_tracks_recordings.append(rec)
if not missing_recordings:
logger.debug(
"All recordings already in DB",
api_count=len(finished_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(finished_recordings),
existing_count=len(existing_recordings),
logger.debug(
"Poll results",
total=len(finished_recordings),
cloud=len(cloud_recordings),
raw_tracks=len(raw_tracks_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
continue
# Process cloud recordings
await _poll_cloud_recordings(cloud_recordings)
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
# Process raw-tracks recordings
await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name)
if not track_keys:
logger.warning(
"No audio tracks found in recording (only video tracks)",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
async def store_cloud_recording(
recording_id: NonEmptyString,
room_name: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
start_ts: int,
source: Literal["webhook", "polling"],
) -> bool:
"""
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses direct recording_id lookup via daily_recording_requests table.
Args:
recording_id: Daily.co recording ID
room_name: Daily.co room name
s3_key: S3 key where recording is stored
duration: Recording duration in seconds
start_ts: Unix timestamp when recording started
source: "webhook" or "polling" (for logging)
Returns:
True if stored, False if skipped/failed
"""
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
if not match:
# ORPHAN: No request found (pre-migration recording or failed request creation)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name="",
room_name=room_name,
start_ts=start_ts,
track_keys=None,
source=source,
)
return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (stop/restart?)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
)
return False
logger.info(
f"Cloud recording stored via {source}",
meeting_id=meeting_id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""Process cloud recordings (database deduplication, worker-agnostic).
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table.
Only first cloud recording per meeting is kept (existing behavior).
"""
if not cloud_recordings:
return
for rec in cloud_recordings:
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
logger.info(
"Queueing missing recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
meeting_id, _ = match
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
logger.warning(
"Cloud recording already exists for meeting (stop/restart?)",
recording_id=rec.id,
meeting_id=meeting_id,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: NonEmptyString,
) -> None:
"""Process raw-tracks (database deduplication, worker-agnostic)."""
if not raw_tracks_recordings:
return
for rec in raw_tracks_recordings:
# Lookup request FIRST (before any DB writes)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
)
continue
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id
)
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
)
continue
# DEDUPLICATION: Atomically create recording (single operation, no race window)
# ON CONFLICT → concurrent poller already got it, skip entire logic
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"]
created = await recordings_controller.try_create_with_meeting(
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
)
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
)
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -810,7 +1003,6 @@ async def reprocess_failed_daily_recordings():
)
continue
# Fetch room to check use_hatchet flag
room = None
if meeting.room_id:
room = await rooms_controller.get_by_id(meeting.room_id)
@@ -834,10 +1026,10 @@ async def reprocess_failed_daily_recordings():
)
continue
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
@@ -883,11 +1075,16 @@ async def reprocess_failed_daily_recordings():
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,258 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -0,0 +1,147 @@
"""
Tests for Daily.co instanceId generation.
Verifies deterministic behavior and frontend/backend consistency.
"""
import pytest
from reflector.dailyco_api.instance_id import (
RAW_TRACKS_NAMESPACE,
generate_cloud_instance_id,
generate_raw_tracks_instance_id,
)
class TestInstanceIdDeterminism:
"""Test deterministic generation of instanceIds."""
def test_cloud_instance_id_is_meeting_id(self):
"""Cloud instanceId is meeting ID directly (implicitly tests determinism)."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_cloud_instance_id(meeting_id)
result2 = generate_cloud_instance_id(meeting_id)
assert str(result1) == meeting_id
assert result1 == result2
def test_raw_tracks_instance_id_deterministic(self):
"""Raw-tracks instanceId generation is deterministic."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_raw_tracks_instance_id(meeting_id)
result2 = generate_raw_tracks_instance_id(meeting_id)
assert result1 == result2
def test_raw_tracks_different_from_cloud(self):
"""Raw-tracks instanceId differs from cloud instanceId."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
cloud_id = generate_cloud_instance_id(meeting_id)
raw_tracks_id = generate_raw_tracks_instance_id(meeting_id)
assert cloud_id != raw_tracks_id
def test_different_meetings_different_instance_ids(self):
"""Different meetings generate different instanceIds."""
meeting_id1 = "550e8400-e29b-41d4-a716-446655440000"
meeting_id2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8"
cloud1 = generate_cloud_instance_id(meeting_id1)
cloud2 = generate_cloud_instance_id(meeting_id2)
assert cloud1 != cloud2
raw1 = generate_raw_tracks_instance_id(meeting_id1)
raw2 = generate_raw_tracks_instance_id(meeting_id2)
assert raw1 != raw2
class TestFrontendBackendConsistency:
"""Test that backend matches frontend logic."""
def test_namespace_matches_frontend(self):
"""Namespace UUID matches frontend RAW_TRACKS_NAMESPACE constant."""
# From www/app/[roomName]/components/DailyRoom.tsx
frontend_namespace = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
assert str(RAW_TRACKS_NAMESPACE) == frontend_namespace
def test_raw_tracks_generation_matches_frontend_logic(self):
"""Backend UUIDv5 generation matches frontend uuidv5() call."""
# Example meeting ID
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
# Backend result
backend_result = generate_raw_tracks_instance_id(meeting_id)
# Expected result from frontend: uuidv5(meeting.id, RAW_TRACKS_NAMESPACE)
# Python uuid5 uses (namespace, name) argument order
# JavaScript uuid.v5(name, namespace) - same args, different order
# Frontend: uuidv5(meeting.id, "a1b2c3d4-e5f6-7890-abcd-ef1234567890")
# Backend: uuid5(UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), meeting.id)
# Verify it's a valid UUID (will raise if not)
assert len(str(backend_result)) == 36
assert backend_result.version == 5
class TestEdgeCases:
"""Test edge cases and error conditions."""
def test_invalid_uuid_format_raises(self):
"""Invalid UUID format raises ValueError."""
with pytest.raises(ValueError):
generate_cloud_instance_id("not-a-uuid")
def test_lowercase_uuid_normalized_for_cloud(self):
"""Cloud instanceId: lowercase/uppercase UUIDs produce same result."""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
cloud_lower = generate_cloud_instance_id(meeting_id_lower)
cloud_upper = generate_cloud_instance_id(meeting_id_upper)
assert cloud_lower == cloud_upper
def test_uuid5_is_case_sensitive_warning(self):
"""
Documents uuid5 case sensitivity - different case UUIDs produce different hashes.
Not a problem: meeting.id always lowercase from DB and API.
Frontend generates raw-tracks instanceId from lowercase meeting.id.
Backend receives lowercase meeting_id when matching.
This test documents the behavior, not a requirement.
"""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
raw_lower = generate_raw_tracks_instance_id(meeting_id_lower)
raw_upper = generate_raw_tracks_instance_id(meeting_id_upper)
assert raw_lower != raw_upper
class TestMtgSessionIdVsInstanceId:
"""
Documents that Daily.co's mtgSessionId differs from our instanceId.
Why this matters: We investigated using mtgSessionId for matching but discovered
it's Daily.co-generated and unrelated to instanceId we send. This test documents
that finding so we don't investigate it again.
Production data from 2026-01-13:
- Meeting ID: 4ad503b6-8189-4910-a8f7-68cdd1b7f990
- Cloud instanceId: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 (same as meeting ID)
- Raw-tracks instanceId: 784b3af3-c7dd-57f0-ac54-2ee91c6927cb (UUIDv5 derived)
- Recording mtgSessionId: f25a2e09-740f-4932-9c0d-b1bebaa669c6 (different!)
Conclusion: Cannot use mtgSessionId for recording-to-meeting matching.
"""
def test_mtg_session_id_differs_from_our_instance_ids(self):
"""mtgSessionId (Daily.co) != instanceId (ours) for both cloud and raw-tracks."""
meeting_id = "4ad503b6-8189-4910-a8f7-68cdd1b7f990"
expected_raw_tracks_id = "784b3af3-c7dd-57f0-ac54-2ee91c6927cb"
mtg_session_id = "f25a2e09-740f-4932-9c0d-b1bebaa669c6"
cloud_instance_id = generate_cloud_instance_id(meeting_id)
raw_tracks_instance_id = generate_raw_tracks_instance_id(meeting_id)
assert str(cloud_instance_id) == meeting_id
assert str(raw_tracks_instance_id) == expected_raw_tracks_id
assert str(cloud_instance_id) != mtg_session_id
assert str(raw_tracks_instance_id) != mtg_session_id

View File

@@ -2,10 +2,9 @@
Tests for Hatchet workflow dispatch and routing logic.
These tests verify:
1. Routing to Hatchet when HATCHET_ENABLED=True
2. Replay logic for failed workflows
3. Force flag to cancel and restart
4. Validation prevents concurrent workflows
1. Hatchet workflow validation and replay logic
2. Force flag to cancel and restart workflows
3. Validation prevents concurrent workflows
"""
from unittest.mock import AsyncMock, patch
@@ -34,25 +33,22 @@ async def test_hatchet_validation_blocks_running_workflow():
workflow_run_id="running-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@@ -72,24 +68,21 @@ async def test_hatchet_validation_blocks_queued_workflow():
workflow_run_id="queued-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
assert isinstance(result, ValidationAlreadyScheduled)
@pytest.mark.usefixtures("setup_database")
@@ -110,25 +103,22 @@ async def test_hatchet_validation_allows_failed_workflow():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
@pytest.mark.usefixtures("setup_database")
@@ -149,24 +139,21 @@ async def test_hatchet_validation_allows_completed_workflow():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@@ -187,26 +174,23 @@ async def test_hatchet_validation_allows_when_status_check_fails():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
result = await validate_transcript_for_processing(mock_transcript)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@@ -227,47 +211,11 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
@@ -276,7 +224,8 @@ async def test_hatchet_validation_skipped_when_disabled():
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet at all
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)

View File

@@ -189,14 +189,17 @@ async def test_ics_sync_service_sync_room_calendar():
assert events[0].ics_uid == "sync-event-1"
assert events[0].title == "Sync Test Meeting"
# Second sync with same content (should be unchanged)
# Second sync with same content (calendar unchanged, but sync always runs)
# Refresh room to get updated etag and force sync by setting old sync time
room = await rooms_controller.get_by_id(room.id)
await rooms_controller.update(
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
)
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "unchanged"
assert result["status"] == "success"
assert result["events_created"] == 0
assert result["events_updated"] == 0
assert result["events_deleted"] == 0
# Third sync with updated event
event["summary"] = "Updated Meeting Title"
@@ -288,3 +291,43 @@ async def test_ics_sync_service_error_handling():
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "error"
assert "Network error" in result["error"]
@pytest.mark.asyncio
async def test_event_data_changed_exhaustiveness():
"""Test that _event_data_changed compares all EventData fields (except ics_uid).
This test ensures programmers don't forget to update the comparison logic
when adding new fields to EventData/CalendarEvent.
"""
from reflector.services.ics_sync import EventData
sync_service = ICSSyncService()
from reflector.db.calendar_events import CalendarEvent
now = datetime.now(timezone.utc)
event_data: EventData = {
"ics_uid": "test-123",
"title": "Test",
"description": "Desc",
"location": "Loc",
"start_time": now,
"end_time": now + timedelta(hours=1),
"attendees": [],
"ics_raw_data": "raw",
}
existing = CalendarEvent(
room_id="room1",
**event_data,
)
# Will raise RuntimeError if fields are missing from comparison
result = sync_service._event_data_changed(existing, event_data)
assert result is False
modified_data = event_data.copy()
modified_data["title"] = "Changed Title"
result = sync_service._event_data_changed(existing, modified_data)
assert result is True

View File

@@ -0,0 +1,300 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
# Create transcript with Daily.co multitrack recording
room = await rooms_controller.add(
name="test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
source_kind="room",
@@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
target_language="en",
user_id="test-user",
share_mode="public",
room_id=room.id,
)
track_keys = [

45
server/uv.lock generated
View File

@@ -159,21 +159,20 @@ wheels = [
[[package]]
name = "aiortc"
version = "1.13.0"
version = "1.14.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aioice" },
{ name = "av" },
{ name = "cffi" },
{ name = "cryptography" },
{ name = "google-crc32c" },
{ name = "pyee" },
{ name = "pylibsrtp" },
{ name = "pyopenssl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 }
sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 },
{ url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 },
]
[[package]]
@@ -327,28 +326,24 @@ wheels = [
[[package]]
name = "av"
version = "14.4.0"
version = "16.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 }
sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 },
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 },
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 },
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 },
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 },
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 },
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 },
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 },
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 },
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 },
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 },
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 },
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 },
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 },
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
{ url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 },
{ url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 },
{ url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 },
{ url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 },
{ url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 },
{ url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 },
{ url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 },
{ url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 },
{ url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 },
{ url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 },
{ url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 },
{ url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 },
{ url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 },
{ url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 },
]
[[package]]
@@ -3267,7 +3262,7 @@ requires-dist = [
{ name = "aiohttp-cors", specifier = ">=0.7.0" },
{ name = "aiortc", specifier = ">=1.5.0" },
{ name = "alembic", specifier = ">=1.11.3" },
{ name = "av", specifier = ">=10.0.0" },
{ name = "av", specifier = ">=15.0.0" },
{ name = "celery", specifier = ">=5.3.4" },
{ name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" },
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },

View File

@@ -302,10 +302,10 @@ export default function RoomsList() {
return;
}
const platform: "whereby" | "daily" | null =
const platform: "whereby" | "daily" =
room.platform === "whereby" || room.platform === "daily"
? room.platform
: null;
: "daily";
const roomData = {
name: room.name,

View File

@@ -16,6 +16,7 @@ import {
import { useError } from "../../../../(errors)/errorContext";
import { useRouter } from "next/navigation";
import { Box, Grid } from "@chakra-ui/react";
import { parseNonEmptyString } from "../../../../lib/utils";
export type TranscriptCorrect = {
params: Promise<{
@@ -25,8 +26,7 @@ export type TranscriptCorrect = {
export default function TranscriptCorrect(props: TranscriptCorrect) {
const params = use(props.params);
const { transcriptId } = params;
const transcriptId = parseNonEmptyString(params.transcriptId);
const updateTranscriptMutation = useTranscriptUpdate();
const transcript = useTranscriptGet(transcriptId);

View File

@@ -3,7 +3,8 @@ import React from "react";
import Markdown from "react-markdown";
import "../../../styles/markdown.css";
import type { components } from "../../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import { useTranscriptUpdate } from "../../../lib/apiHooks";
import {
@@ -18,7 +19,7 @@ import { LuPen } from "react-icons/lu";
import { useError } from "../../../(errors)/errorContext";
type FinalSummaryProps = {
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
onUpdate: (newSummary: string) => void;
finalSummaryRef: React.Dispatch<React.SetStateAction<HTMLDivElement | null>>;

View File

@@ -9,7 +9,9 @@ import React, { useEffect, useState, use } from "react";
import FinalSummary from "./finalSummary";
import TranscriptTitle from "../transcriptTitle";
import Player from "../player";
import { useWebSockets } from "../useWebSockets";
import { useRouter } from "next/navigation";
import { parseNonEmptyString } from "../../../lib/utils";
import {
Box,
Flex,
@@ -30,7 +32,7 @@ type TranscriptDetails = {
export default function TranscriptDetails(details: TranscriptDetails) {
const params = use(details.params);
const transcriptId = params.transcriptId;
const transcriptId = parseNonEmptyString(params.transcriptId);
const router = useRouter();
const statusToRedirect = [
"idle",
@@ -49,6 +51,7 @@ export default function TranscriptDetails(details: TranscriptDetails) {
transcriptId,
waiting || mp3.audioDeleted === true,
);
useWebSockets(transcriptId);
const useActiveTopic = useState<Topic | null>(null);
const [finalSummaryElement, setFinalSummaryElement] =
useState<HTMLDivElement | null>(null);

View File

@@ -10,6 +10,7 @@ import {
} from "@chakra-ui/react";
import { useRouter } from "next/navigation";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptProcessing = {
params: Promise<{
@@ -19,7 +20,7 @@ type TranscriptProcessing = {
export default function TranscriptProcessing(details: TranscriptProcessing) {
const params = use(details.params);
const transcriptId = params.transcriptId;
const transcriptId = parseNonEmptyString(params.transcriptId);
const router = useRouter();
const transcript = useTranscriptGet(transcriptId);

View File

@@ -12,6 +12,7 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react";
import LiveTrancription from "../../liveTranscription";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { TranscriptStatus } from "../../../../lib/transcript";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptDetails = {
params: Promise<{
@@ -21,13 +22,14 @@ type TranscriptDetails = {
const TranscriptRecord = (details: TranscriptDetails) => {
const params = use(details.params);
const transcript = useTranscriptGet(params.transcriptId);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcript = useTranscriptGet(transcriptId);
const [transcriptStarted, setTranscriptStarted] = useState(false);
const useActiveTopic = useState<Topic | null>(null);
const webSockets = useWebSockets(params.transcriptId);
const webSockets = useWebSockets(transcriptId);
const mp3 = useMp3(params.transcriptId, true);
const mp3 = useMp3(transcriptId, true);
const router = useRouter();

View File

@@ -7,6 +7,7 @@ import useMp3 from "../../useMp3";
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
import FileUploadButton from "../../fileUploadButton";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptUpload = {
params: Promise<{
@@ -16,12 +17,13 @@ type TranscriptUpload = {
const TranscriptUpload = (details: TranscriptUpload) => {
const params = use(details.params);
const transcript = useTranscriptGet(params.transcriptId);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcript = useTranscriptGet(transcriptId);
const [transcriptStarted, setTranscriptStarted] = useState(false);
const webSockets = useWebSockets(params.transcriptId);
const webSockets = useWebSockets(transcriptId);
const mp3 = useMp3(params.transcriptId, true);
const mp3 = useMp3(transcriptId, true);
const router = useRouter();

View File

@@ -2,10 +2,11 @@ import type { components } from "../../reflector-api";
import { useTranscriptCreate } from "../../lib/apiHooks";
type CreateTranscript = components["schemas"]["CreateTranscript"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type UseCreateTranscript = {
transcript: GetTranscript | null;
transcript: GetTranscriptWithParticipants | null;
loading: boolean;
error: Error | null;
create: (transcriptCreationDetails: CreateTranscript) => Promise<void>;

View File

@@ -2,7 +2,8 @@ import { useEffect, useState } from "react";
import { ShareMode, toShareMode } from "../../lib/shareMode";
import type { components } from "../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
import {
@@ -27,7 +28,7 @@ import { featureEnabled } from "../../lib/features";
type ShareAndPrivacyProps = {
finalSummaryElement: HTMLDivElement | null;
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
};

View File

@@ -1,7 +1,8 @@
import { useState, useEffect, useMemo } from "react";
import type { components } from "../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
BoxProps,
@@ -26,7 +27,7 @@ import {
import { featureEnabled } from "../../lib/features";
type ShareZulipProps = {
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
disabled: boolean;
};

View File

@@ -1,8 +1,10 @@
import { useState } from "react";
import type { components } from "../../reflector-api";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
useTranscriptUpdate,
@@ -20,7 +22,7 @@ type TranscriptTitle = {
onUpdate: (newTitle: string) => void;
// share props
transcript: GetTranscript | null;
transcript: GetTranscriptWithParticipants | null;
topics: GetTranscriptTopic[] | null;
finalSummaryElement: HTMLDivElement | null;
};
@@ -31,7 +33,7 @@ const TranscriptTitle = (props: TranscriptTitle) => {
const [isEditing, setIsEditing] = useState(false);
const updateTranscriptMutation = useTranscriptUpdate();
const participantsQuery = useTranscriptParticipants(
props.transcript?.id || null,
props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null,
);
const updateTitle = async (newTitle: string, transcriptId: string) => {

View File

@@ -1,5 +1,6 @@
import { useEffect, useState } from "react";
import { useTranscriptGet } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
import { useAuth } from "../../lib/AuthProvider";
import { API_URL } from "../../lib/apiClient";
@@ -27,7 +28,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => {
data: transcript,
isLoading: transcriptMetadataLoading,
error: transcriptError,
} = useTranscriptGet(later ? null : transcriptId);
} = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId));
const [serviceWorker, setServiceWorker] =
useState<ServiceWorkerRegistration | null>(null);

View File

@@ -1,6 +1,7 @@
import type { components } from "../../reflector-api";
type Participant = components["schemas"]["Participant"];
import { useTranscriptParticipants } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type ErrorParticipants = {
error: Error;
@@ -32,7 +33,7 @@ const useParticipants = (transcriptId: string): UseParticipants => {
isLoading: loading,
error,
refetch,
} = useTranscriptParticipants(transcriptId || null);
} = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId));
// Type-safe return based on state
if (error) {

View File

@@ -1,5 +1,6 @@
import type { components } from "../../reflector-api";
import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type GetTranscriptTopicWithWordsPerSpeaker =
components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"];
@@ -38,7 +39,7 @@ const useTopicWithWords = (
error,
refetch,
} = useTranscriptTopicsWithWordsPerSpeaker(
transcriptId || null,
parseMaybeNonEmptyString(transcriptId),
topicId || null,
);

View File

@@ -1,5 +1,6 @@
import { useTranscriptTopics } from "../../lib/apiHooks";
import type { components } from "../../reflector-api";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
@@ -10,7 +11,11 @@ type TranscriptTopics = {
};
const useTopics = (id: string): TranscriptTopics => {
const { data: topics, isLoading: loading, error } = useTranscriptTopics(id);
const {
data: topics,
isLoading: loading,
error,
} = useTranscriptTopics(parseMaybeNonEmptyString(id));
return {
topics: topics || null,

View File

@@ -1,5 +1,6 @@
import type { components } from "../../reflector-api";
import { useTranscriptWaveform } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type AudioWaveform = components["schemas"]["AudioWaveform"];
@@ -14,7 +15,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => {
data: waveform,
isLoading: loading,
error,
} = useTranscriptWaveform(skip ? null : id);
} = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id));
return {
waveform: waveform || null,

View File

@@ -7,6 +7,12 @@ type GetTranscriptSegmentTopic =
components["schemas"]["GetTranscriptSegmentTopic"];
import { useQueryClient } from "@tanstack/react-query";
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
import {
invalidateTranscript,
invalidateTranscriptTopics,
invalidateTranscriptWaveform,
} from "../../lib/apiHooks";
import { NonEmptyString } from "../../lib/utils";
export type UseWebSockets = {
transcriptTextLive: string;
@@ -369,15 +375,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
});
console.debug("TOPIC event:", message.data);
// Invalidate topics query to sync with WebSocket data
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/topics",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
invalidateTranscriptTopics(
queryClient,
transcriptId as NonEmptyString,
);
break;
case "FINAL_SHORT_SUMMARY":
@@ -388,15 +389,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
if (message.data) {
setFinalSummary(message.data);
// Invalidate transcript query to sync summary
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
}
break;
@@ -405,15 +398,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
if (message.data) {
setTitle(message.data.title);
// Invalidate transcript query to sync title
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
}
break;
@@ -424,6 +409,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
);
if (message.data) {
setWaveForm(message.data.waveform);
invalidateTranscriptWaveform(
queryClient,
transcriptId as NonEmptyString,
);
}
break;
case "DURATION":

View File

@@ -26,7 +26,7 @@ import { useRouter } from "next/navigation";
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
import { NonEmptyString } from "../lib/utils";
import { MeetingId } from "../lib/types";
import { MeetingId, assertMeetingId } from "../lib/types";
type Meeting = components["schemas"]["Meeting"];
@@ -315,7 +315,9 @@ export default function MeetingSelection({
variant="outline"
colorScheme="red"
size="md"
onClick={() => handleEndMeeting(meeting.id)}
onClick={() =>
handleEndMeeting(assertMeetingId(meeting.id))
}
loading={deactivateMeetingMutation.isPending}
>
<Icon as={LuX} me={2} />
@@ -460,7 +462,9 @@ export default function MeetingSelection({
variant="outline"
colorScheme="red"
size="md"
onClick={() => handleEndMeeting(meeting.id)}
onClick={() =>
handleEndMeeting(assertMeetingId(meeting.id))
}
loading={deactivateMeetingMutation.isPending}
>
<Icon as={LuX} me={2} />

View File

@@ -22,14 +22,29 @@ import DailyIframe, {
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import {
useRoomJoinMeeting,
useMeetingStartRecording,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import { assertExists } from "../../lib/utils";
import { assertMeetingId } from "../../lib/types";
import {
assertExists,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
const RECORDING_INDICATOR_ID = "recording-indicator";
// Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
// DO NOT CHANGE: Breaks instanceId determinism across deployments
const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5;
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
@@ -73,9 +88,7 @@ const useFrame = (
cbs: {
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: (
startRecording: (args: { type: "raw-tracks" }) => void,
) => void;
onJoinMeeting: () => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -126,7 +139,7 @@ const useFrame = (
console.error("frame is null in joined-meeting callback");
return;
}
cbs.onJoinMeeting(frame.startRecording.bind(frame));
cbs.onJoinMeeting();
};
frame.on("joined-meeting", joinCb);
return () => {
@@ -173,8 +186,15 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
const rawTracksInstanceId = parseNonEmptyString(
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
const roomName = params?.roomName as string;
const {
@@ -228,19 +248,72 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
],
);
const handleFrameJoinMeeting = useCallback(
(startRecording: (args: { type: "raw-tracks" }) => void) => {
try {
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
}
},
[meeting.recording_type],
);
const handleFrameJoinMeeting = useCallback(() => {
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
rawTracksInstanceId,
});
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
// Daily.co needs time to register call as "hosting" for REST API
const startRecordingWithRetry = (
type: DailyRecordingType,
instanceId: NonEmptyString,
attempt: number = 1,
) => {
setTimeout(() => {
startRecordingMutation.mutate(
{
params: {
path: {
meeting_id: meeting.id,
},
},
body: {
type,
instanceId,
},
},
{
onError: (error: any) => {
const errorText = error?.detail || error?.message || "";
const is404NotHosting = errorText.includes(
"does not seem to be hosting a call",
);
const isActiveStream = errorText.includes(
"has an active stream",
);
if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) {
console.log(
`${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`,
);
startRecordingWithRetry(type, instanceId, attempt + 1);
} else if (isActiveStream) {
console.log(
`${type}: Recording already active (started by another participant)`,
);
} else {
console.error(`Failed to start ${type} recording:`, error);
}
},
},
);
}, RECORDING_START_DELAY_MS);
};
// Start both recordings
startRecordingWithRetry("cloud", cloudInstanceId);
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
meeting.recording_type,
meeting.id,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
]);
const recordingIconUrl = useMemo(
() => new URL("/recording-icon.svg", window.location.origin),

View File

@@ -6,6 +6,7 @@ import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -103,7 +104,7 @@ export function useTranscriptProcess() {
});
}
export function useTranscriptGet(transcriptId: string | null) {
export function useTranscriptGet(transcriptId: NonEmptyString | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}",
@@ -120,6 +121,16 @@ export function useTranscriptGet(transcriptId: string | null) {
);
}
export const invalidateTranscript = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", {
params: { path: { transcript_id: transcriptId } },
}).queryKey,
});
export function useRoomGet(roomId: string | null) {
const { isAuthenticated } = useAuthReady();
@@ -297,7 +308,7 @@ export function useTranscriptUploadAudio() {
);
}
export function useTranscriptWaveform(transcriptId: string | null) {
export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}/audio/waveform",
@@ -312,7 +323,21 @@ export function useTranscriptWaveform(transcriptId: string | null) {
);
}
export function useTranscriptMP3(transcriptId: string | null) {
export const invalidateTranscriptWaveform = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/audio/waveform",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -329,7 +354,7 @@ export function useTranscriptMP3(transcriptId: string | null) {
);
}
export function useTranscriptTopics(transcriptId: string | null) {
export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}/topics",
@@ -344,7 +369,23 @@ export function useTranscriptTopics(transcriptId: string | null) {
);
}
export function useTranscriptTopicsWithWords(transcriptId: string | null) {
export const invalidateTranscriptTopics = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/topics",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
export function useTranscriptTopicsWithWords(
transcriptId: NonEmptyString | null,
) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -362,7 +403,7 @@ export function useTranscriptTopicsWithWords(transcriptId: string | null) {
}
export function useTranscriptTopicsWithWordsPerSpeaker(
transcriptId: string | null,
transcriptId: NonEmptyString | null,
topicId: string | null,
) {
const { isAuthenticated } = useAuthReady();
@@ -384,7 +425,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker(
);
}
export function useTranscriptParticipants(transcriptId: string | null) {
export function useTranscriptParticipants(transcriptId: NonEmptyString | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -567,6 +608,20 @@ export function useTranscriptSpeakerMerge() {
);
}
export function useMeetingStartRecording() {
const { setError } = useError();
return $api.useMutation(
"post",
"/v1/meetings/{meeting_id}/recordings/start",
{
onError: (error) => {
setError(error as Error, "Failed to start recording");
},
},
);
}
export function useMeetingAudioConsent() {
const { setError } = useError();

View File

@@ -1,5 +1,6 @@
import { components } from "../reflector-api";
type ApiTranscriptStatus = components["schemas"]["GetTranscript"]["status"];
type ApiTranscriptStatus =
components["schemas"]["GetTranscriptWithParticipants"]["status"];
export type TranscriptStatus = ApiTranscriptStatus;

View File

@@ -89,3 +89,5 @@ export const assertMeetingId = (s: string): MeetingId => {
// just cast for now
return nes as MeetingId;
};
export type DailyRecordingType = "cloud" | "raw-tracks";

View File

@@ -75,6 +75,31 @@ export interface paths {
patch: operations["v1_meeting_deactivate"];
trace?: never;
};
"/v1/meetings/{meeting_id}/recordings/start": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Start Recording
* @description Start cloud or raw-tracks recording via Daily.co REST API.
*
* Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
* Uses different instanceIds for cloud vs raw-tracks (same won't work)
*
* Note: No authentication required - anonymous users supported. TODO this is a DOS vector
*/
post: operations["v1_start_recording"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms": {
parameters: {
query?: never;
@@ -1544,6 +1569,10 @@ export interface components {
* @enum {string}
*/
platform: "whereby" | "daily";
/** Daily Composed Video S3 Key */
daily_composed_video_s3_key?: string | null;
/** Daily Composed Video Duration */
daily_composed_video_duration?: number | null;
};
/** MeetingConsentRequest */
MeetingConsentRequest: {
@@ -1818,6 +1847,19 @@ export interface components {
/** Words */
words: components["schemas"]["Word"][];
};
/** StartRecordingRequest */
StartRecordingRequest: {
/**
* Type
* @enum {string}
*/
type: "cloud" | "raw-tracks";
/**
* Instanceid
* Format: uuid
*/
instanceId: string;
};
/** Stream */
Stream: {
/** Stream Id */
@@ -2126,6 +2168,43 @@ export interface operations {
};
};
};
v1_start_recording: {
parameters: {
query?: never;
header?: never;
path: {
meeting_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["StartRecordingRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: unknown;
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_list: {
parameters: {
query?: {

View File

@@ -46,6 +46,7 @@
"react-markdown": "^9.0.0",
"react-qr-code": "^2.0.12",
"react-select-search": "^4.1.7",
"react-uuid-hook": "^0.0.6",
"redlock": "5.0.0-beta.2",
"remeda": "^2.31.1",
"sass": "^1.63.6",

25
www/pnpm-lock.yaml generated
View File

@@ -106,6 +106,9 @@ importers:
react-select-search:
specifier: ^4.1.7
version: 4.1.8(prop-types@15.8.1)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
react-uuid-hook:
specifier: ^0.0.6
version: 0.0.6(react@18.3.1)
redlock:
specifier: 5.0.0-beta.2
version: 5.0.0-beta.2
@@ -7628,6 +7631,14 @@ packages:
"@types/react":
optional: true
react-uuid-hook@0.0.6:
resolution:
{
integrity: sha512-u9+EvFbqpWfLE/ReYFry0vYu1BAg1fY9ekr0XLSDNnfWyrnVFytpurwz5qYsIB0psevuvrpZHIcvu7AjUwqinA==,
}
peerDependencies:
react: ">=16.8.0"
react@18.3.1:
resolution:
{
@@ -8771,6 +8782,13 @@ packages:
integrity: sha512-Fykw5U4eZESbq739BeLvEBFRuJODfrlmjx5eJux7W817LjRaq4b7/i4t2zxQmhcX+fAj4nMfRdTzO4tmwLKn0w==,
}
uuid@13.0.0:
resolution:
{
integrity: sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==,
}
hasBin: true
uuid@8.3.2:
resolution:
{
@@ -14570,6 +14588,11 @@ snapshots:
optionalDependencies:
"@types/react": 18.2.20
react-uuid-hook@0.0.6(react@18.3.1):
dependencies:
react: 18.3.1
uuid: 13.0.0
react@18.3.1:
dependencies:
loose-envify: 1.4.0
@@ -15401,6 +15424,8 @@ snapshots:
uuid-validate@0.0.3: {}
uuid@13.0.0: {}
uuid@8.3.2: {}
uuid@9.0.1: {}