Compare commits

...

13 Commits

Author SHA1 Message Date
46a10af349 chore(main): release 0.33.0 2026-02-05 17:38:40 -06:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
fa3cf5da0f chore(main): release 0.32.2 (#842) 2026-02-03 22:05:22 -05:00
8707c6694a fix: use Daily API recording.duration as master source for transcript duration (#844)
Set duration early in get_participants from Daily API (seconds -> ms),
ensuring post_zulip has the value before mixdown_tracks completes.

Removes redundant duration update from mixdown_tracks.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 17:15:03 -05:00
4acde4b7fd fix: increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks (#843)
Topic detection was timing out on longer transcripts when LLM
responses are slow. This affects detect_chunk_topic and other
LLM-calling tasks that use TIMEOUT_MEDIUM.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 16:05:16 -05:00
a2ed7d60d5 fix: make caddy optional (#841) 2026-02-03 00:18:47 +01:00
a08f94a5bf chore(main): release 0.32.1 (#840) 2026-01-30 17:34:48 -05:00
Igor Loskutov
c05d1f03cd fix: match httpx pad with hatchet audio timeout 2026-01-30 15:56:18 -05:00
Igor Loskutov
23eb1371cb fix: daily multitrack pipeline finalze dependency fix 2026-01-30 15:19:27 -05:00
2592e369f6 chore(main): release 0.32.0 (#838) 2026-01-30 13:13:59 -05:00
7fde64e252 feat: modal padding (#837)
* Add Modal backend for audio padding

- Create reflector_padding.py Modal deployment (CPU-based)
- Add PaddingWorkflow with conditional Modal/local backend
- Update deploy-all.sh to include padding deployment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-30 13:11:51 -05:00
29 changed files with 1033 additions and 400 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
.DS_Store .DS_Store
server/.env server/.env
server/.env.production
.env .env
Caddyfile Caddyfile
server/exportdanswer server/exportdanswer

View File

@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,41 @@
# Changelog # Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)
### Bug Fixes
* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e))
* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0))
## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30)
### Features
* modal padding ([#837](https://github.com/Monadical-SAS/reflector/issues/837)) ([7fde64e](https://github.com/Monadical-SAS/reflector/commit/7fde64e2529a1d37b0f7507c62d983a7bd0b5b89))
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23) ## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)

View File

@@ -1,6 +1,8 @@
# Reflector Caddyfile # Reflector Caddyfile (optional reverse proxy)
# Replace example.com with your actual domains # Use this only when you run Caddy via: docker compose -f docker-compose.prod.yml --profile caddy up -d
# CORS is handled by the backend - Caddy just proxies # If Coolify, Traefik, or nginx already use ports 80/443, do NOT start Caddy; point your proxy at web:3000 and server:1250.
#
# Replace example.com with your actual domains. CORS is handled by the backend - Caddy just proxies.
# #
# For environment variable substitution, set: # For environment variable substitution, set:
# FRONTEND_DOMAIN=app.example.com # FRONTEND_DOMAIN=app.example.com

View File

@@ -1,9 +1,14 @@
# Production Docker Compose configuration # Production Docker Compose configuration
# Usage: docker compose -f docker-compose.prod.yml up -d # Usage: docker compose -f docker-compose.prod.yml up -d
# #
# Caddy (reverse proxy on ports 80/443) is OPTIONAL and behind the "caddy" profile:
# - With Caddy (self-hosted, you manage SSL): docker compose -f docker-compose.prod.yml --profile caddy up -d
# - Without Caddy (Coolify/Traefik/nginx already on 80/443): docker compose -f docker-compose.prod.yml up -d
# Then point your proxy at web:3000 (frontend) and server:1250 (API).
#
# Prerequisites: # Prerequisites:
# 1. Copy .env.example to .env and configure for both server/ and www/ # 1. Copy .env.example to .env and configure for both server/ and www/
# 2. Copy Caddyfile.example to Caddyfile and edit with your domains # 2. If using Caddy: copy Caddyfile.example to Caddyfile and edit your domains
# 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh) # 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh)
services: services:
@@ -84,6 +89,8 @@ services:
retries: 3 retries: 3
caddy: caddy:
profiles:
- caddy
image: caddy:2-alpine image: caddy:2-alpine
restart: unless-stopped restart: unless-stopped
ports: ports:

View File

@@ -11,15 +11,15 @@ This page documents the Docker Compose configuration for Reflector. For the comp
The `docker-compose.prod.yml` includes these services: The `docker-compose.prod.yml` includes these services:
| Service | Image | Purpose | | Service | Image | Purpose |
|---------|-------|---------| | ---------- | --------------------------------- | --------------------------------------------------------------------------- |
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend | | `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend | | `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks | | `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler | | `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache | | `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database | | `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL | | `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL (optional; see [Caddy profile](#caddy-profile)) |
## Environment Files ## Environment Files
@@ -30,6 +30,7 @@ Reflector uses two separate environment files:
Used by: `server`, `worker`, `beat` Used by: `server`, `worker`, `beat`
Key variables: Key variables:
```env ```env
# Database connection # Database connection
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -54,6 +55,7 @@ TRANSCRIPT_MODAL_API_KEY=...
Used by: `web` Used by: `web`
Key variables: Key variables:
```env ```env
# Domain configuration # Domain configuration
SITE_URL=https://app.example.com SITE_URL=https://app.example.com
@@ -70,26 +72,42 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s
## Volumes ## Volumes
| Volume | Purpose | | Volume | Purpose |
|--------|---------| | --------------- | ----------------------------- |
| `redis_data` | Redis persistence | | `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data | | `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage | | `server_data` | Uploaded files, local storage |
| `caddy_data` | SSL certificates | | `caddy_data` | SSL certificates |
| `caddy_config` | Caddy configuration | | `caddy_config` | Caddy configuration |
## Network ## Network
All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join. All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join.
## Caddy profile
Caddy (ports 80 and 443) is **optional** and behind the `caddy` profile so it does not conflict with an existing reverse proxy (e.g. Coolify, Traefik, nginx).
- **With Caddy** (you want Reflector to handle SSL):
`docker compose -f docker-compose.prod.yml --profile caddy up -d`
- **Without Caddy** (Coolify or another proxy already on 80/443):
`docker compose -f docker-compose.prod.yml up -d`
Then configure your proxy to send traffic to `web:3000` (frontend) and `server:1250` (API).
## Common Commands ## Common Commands
### Start all services ### Start all services
```bash ```bash
# Without Caddy (e.g. when using Coolify)
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
# With Caddy as reverse proxy
docker compose -f docker-compose.prod.yml --profile caddy up -d
``` ```
### View logs ### View logs
```bash ```bash
# All services # All services
docker compose -f docker-compose.prod.yml logs -f docker compose -f docker-compose.prod.yml logs -f
@@ -99,6 +117,7 @@ docker compose -f docker-compose.prod.yml logs server --tail 50
``` ```
### Restart a service ### Restart a service
```bash ```bash
# Quick restart (doesn't reload .env changes) # Quick restart (doesn't reload .env changes)
docker compose -f docker-compose.prod.yml restart server docker compose -f docker-compose.prod.yml restart server
@@ -108,27 +127,32 @@ docker compose -f docker-compose.prod.yml up -d server
``` ```
### Run database migrations ### Run database migrations
```bash ```bash
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
``` ```
### Access database ### Access database
```bash ```bash
docker compose -f docker-compose.prod.yml exec postgres psql -U reflector docker compose -f docker-compose.prod.yml exec postgres psql -U reflector
``` ```
### Pull latest images ### Pull latest images
```bash ```bash
docker compose -f docker-compose.prod.yml pull docker compose -f docker-compose.prod.yml pull
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
``` ```
### Stop all services ### Stop all services
```bash ```bash
docker compose -f docker-compose.prod.yml down docker compose -f docker-compose.prod.yml down
``` ```
### Full reset (WARNING: deletes data) ### Full reset (WARNING: deletes data)
```bash ```bash
docker compose -f docker-compose.prod.yml down -v docker compose -f docker-compose.prod.yml down -v
``` ```
@@ -187,6 +211,7 @@ The Caddyfile supports environment variable substitution:
Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly. Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly.
### Reload Caddy after changes ### Reload Caddy after changes
```bash ```bash
docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile
``` ```

View File

@@ -26,7 +26,7 @@ flowchart LR
Before starting, you need: Before starting, you need:
- **Production server** - 4+ cores, 8GB+ RAM, public IP - **Production server** - 4+ cores, 8GB+ RAM, public IP
- **Two domain names** - e.g., `app.example.com` (frontend) and `api.example.com` (backend) - **Two domain names** - e.g., `app.example.com` (frontend) and `api.example.com` (backend)
- **GPU processing** - Choose one: - **GPU processing** - Choose one:
- Modal.com account, OR - Modal.com account, OR
@@ -60,16 +60,17 @@ Type: A Name: api Value: <your-server-ip>
Reflector requires GPU processing for transcription and speaker diarization. Choose one option: Reflector requires GPU processing for transcription and speaker diarization. Choose one option:
| | **Modal.com (Cloud)** | **Self-Hosted GPU** | | | **Modal.com (Cloud)** | **Self-Hosted GPU** |
|---|---|---| | ------------ | --------------------------------- | ---------------------------- |
| **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control | | **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control |
| **Pricing** | Pay-per-use | Fixed infrastructure cost | | **Pricing** | Pay-per-use | Fixed infrastructure cost |
### Option A: Modal.com (Serverless Cloud GPU) ### Option A: Modal.com (Serverless Cloud GPU)
#### Accept HuggingFace Licenses #### Accept HuggingFace Licenses
Visit both pages and click "Accept": Visit both pages and click "Accept":
- https://huggingface.co/pyannote/speaker-diarization-3.1 - https://huggingface.co/pyannote/speaker-diarization-3.1
- https://huggingface.co/pyannote/segmentation-3.0 - https://huggingface.co/pyannote/segmentation-3.0
@@ -179,6 +180,7 @@ Save these credentials - you'll need them in the next step.
## Configure Environment ## Configure Environment
Reflector has two env files: Reflector has two env files:
- `server/.env` - Backend configuration - `server/.env` - Backend configuration
- `www/.env` - Frontend configuration - `www/.env` - Frontend configuration
@@ -190,6 +192,7 @@ nano server/.env
``` ```
**Required settings:** **Required settings:**
```env ```env
# Database (defaults work with docker-compose.prod.yml) # Database (defaults work with docker-compose.prod.yml)
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -249,6 +252,7 @@ nano www/.env
``` ```
**Required settings:** **Required settings:**
```env ```env
# Your domains # Your domains
SITE_URL=https://app.example.com SITE_URL=https://app.example.com
@@ -266,7 +270,11 @@ FEATURE_REQUIRE_LOGIN=false
--- ---
## Configure Caddy ## Reverse proxy (Caddy or existing)
**If Coolify, Traefik, or nginx already use ports 80/443** (e.g. Coolify on your host): skip Caddy. Start the stack without the Caddy profile (see [Start Services](#start-services) below), then point your proxy at `web:3000` (frontend) and `server:1250` (API).
**If you want Reflector to provide the reverse proxy and SSL:**
```bash ```bash
cp Caddyfile.example Caddyfile cp Caddyfile.example Caddyfile
@@ -289,10 +297,18 @@ Replace `example.com` with your domains. The `{$VAR:default}` syntax uses Caddy'
## Start Services ## Start Services
**Without Caddy** (e.g. Coolify already on 80/443):
```bash ```bash
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
``` ```
**With Caddy** (Reflector handles SSL):
```bash
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
Wait for containers to start (first run may take 1-2 minutes to pull images and initialize). Wait for containers to start (first run may take 1-2 minutes to pull images and initialize).
--- ---
@@ -300,18 +316,21 @@ Wait for containers to start (first run may take 1-2 minutes to pull images and
## Verify Deployment ## Verify Deployment
### Check services ### Check services
```bash ```bash
docker compose -f docker-compose.prod.yml ps docker compose -f docker-compose.prod.yml ps
# All should show "Up" # All should show "Up"
``` ```
### Test API ### Test API
```bash ```bash
curl https://api.example.com/health curl https://api.example.com/health
# Should return: {"status":"healthy"} # Should return: {"status":"healthy"}
``` ```
### Test Frontend ### Test Frontend
- Visit https://app.example.com - Visit https://app.example.com
- You should see the Reflector interface - You should see the Reflector interface
- Try uploading an audio file to test transcription - Try uploading an audio file to test transcription
@@ -327,6 +346,7 @@ By default, Reflector is open (no login required). **Authentication is required
See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration. See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration.
Quick summary: Quick summary:
1. Deploy Authentik on your server 1. Deploy Authentik on your server
2. Create OAuth provider in Authentik 2. Create OAuth provider in Authentik
3. Extract public key for JWT verification 3. Extract public key for JWT verification
@@ -358,6 +378,7 @@ DAILYCO_STORAGE_AWS_ROLE_ARN=<arn:aws:iam::ACCOUNT:role/DailyCo>
``` ```
Reload env and restart: Reload env and restart:
```bash ```bash
docker compose -f docker-compose.prod.yml up -d server worker docker compose -f docker-compose.prod.yml up -d server worker
``` ```
@@ -367,35 +388,43 @@ docker compose -f docker-compose.prod.yml up -d server worker
## Troubleshooting ## Troubleshooting
### Check logs for errors ### Check logs for errors
```bash ```bash
docker compose -f docker-compose.prod.yml logs server --tail 20 docker compose -f docker-compose.prod.yml logs server --tail 20
docker compose -f docker-compose.prod.yml logs worker --tail 20 docker compose -f docker-compose.prod.yml logs worker --tail 20
``` ```
### Services won't start ### Services won't start
```bash ```bash
docker compose -f docker-compose.prod.yml logs docker compose -f docker-compose.prod.yml logs
``` ```
### CORS errors in browser ### CORS errors in browser
- Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`) - Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`)
- Reload env: `docker compose -f docker-compose.prod.yml up -d server` - Reload env: `docker compose -f docker-compose.prod.yml up -d server`
### SSL certificate errors ### SSL certificate errors (when using Caddy)
- Caddy auto-provisions Let's Encrypt certificates - Caddy auto-provisions Let's Encrypt certificates
- Ensure ports 80 and 443 are open - Ensure ports 80 and 443 are open and not used by another proxy
- Check: `docker compose -f docker-compose.prod.yml logs caddy` - Check: `docker compose -f docker-compose.prod.yml logs caddy`
- If port 80 is already in use (e.g. by Coolify), run without Caddy: `docker compose -f docker-compose.prod.yml up -d` and use your existing proxy
### Transcription not working ### Transcription not working
- Check Modal dashboard: https://modal.com/apps - Check Modal dashboard: https://modal.com/apps
- Verify URLs in `server/.env` match deployed functions - Verify URLs in `server/.env` match deployed functions
- Check worker logs: `docker compose -f docker-compose.prod.yml logs worker` - Check worker logs: `docker compose -f docker-compose.prod.yml logs worker`
### "Login required" but auth not configured ### "Login required" but auth not configured
- Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env` - Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env`
- Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web` - Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web`
### Database migrations or connectivity issues ### Database migrations or connectivity issues
Migrations run automatically on server startup. To check database connectivity or debug migration failures: Migrations run automatically on server startup. To check database connectivity or debug migration failures:
```bash ```bash
@@ -408,4 +437,3 @@ docker compose -f docker-compose.prod.yml exec server uv run python -c "from ref
# Manually run migrations (if needed) # Manually run migrations (if needed)
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
``` ```

View File

@@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then
fi fi
echo " -> $DIARIZER_URL" echo " -> $DIARIZER_URL"
echo ""
echo "Deploying padding (CPU audio processing via Modal SDK)..."
modal deploy reflector_padding.py
if [ $? -ne 0 ]; then
echo "Error: Failed to deploy padding. Check Modal dashboard for details."
exit 1
fi
echo " -> reflector-padding.pad_track (Modal SDK function)"
# --- Output Configuration --- # --- Output Configuration ---
echo "" echo ""
echo "==========================================" echo "=========================================="
@@ -147,4 +156,6 @@ echo ""
echo "DIARIZATION_BACKEND=modal" echo "DIARIZATION_BACKEND=modal"
echo "DIARIZATION_URL=$DIARIZER_URL" echo "DIARIZATION_URL=$DIARIZER_URL"
echo "DIARIZATION_MODAL_API_KEY=$API_KEY" echo "DIARIZATION_MODAL_API_KEY=$API_KEY"
echo ""
echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)"
echo "# --- End Modal Configuration ---" echo "# --- End Modal Configuration ---"

View File

@@ -0,0 +1,277 @@
"""
Reflector GPU backend - audio padding
======================================
CPU-intensive audio padding service for adding silence to audio tracks.
Uses PyAV filter graph (adelay) for precise track synchronization.
IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py
for Modal deployment isolation (Modal can't import from server/reflector/). If you modify
the PyAV filter graph or padding algorithm, you MUST update both:
- gpu/modal_deployments/reflector_padding.py (this file)
- server/reflector/utils/audio_padding.py
Constants duplicated from server/reflector/utils/audio_constants.py for same reason.
"""
import os
import tempfile
from fractions import Fraction
import math
import asyncio
import modal
S3_TIMEOUT = 60 # happens 2 times
PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2)
SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down.
DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect
app = modal.App("reflector-padding")
# CPU-based image
image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg") # Required by PyAV
.pip_install(
"av==13.1.0", # PyAV for audio processing
"requests==2.32.3", # HTTP for presigned URL downloads/uploads
"fastapi==0.115.12", # API framework
)
)
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000
@app.function(
cpu=2.0,
timeout=PADDING_TIMEOUT,
scaledown_window=SCALEDOWN_WINDOW,
image=image,
)
@modal.asgi_app()
def web():
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
class PaddingRequest(BaseModel):
track_url: str
output_url: str
start_time_seconds: float
track_index: int
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
web_app = FastAPI()
@web_app.post("/pad")
async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse:
"""Modal web endpoint for padding audio tracks with disconnect detection.
"""
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
if not req.track_url:
raise HTTPException(status_code=400, detail="track_url cannot be empty")
if not req.output_url:
raise HTTPException(status_code=400, detail="output_url cannot be empty")
if req.start_time_seconds <= 0:
raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}")
if req.start_time_seconds > 18000:
raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)")
logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s")
# Thread-safe cancellation flag shared between async disconnect checker and blocking thread
import threading
cancelled = threading.Event()
async def check_disconnect():
"""Background task to check for client disconnect every 2 seconds."""
while not cancelled.is_set():
await asyncio.sleep(DISCONNECT_CHECK_INTERVAL)
if await request.is_disconnected():
logger.warning("Client disconnected, setting cancellation flag")
cancelled.set()
break
# Start disconnect checker in background
disconnect_task = asyncio.create_task(check_disconnect())
try:
result = await asyncio.get_event_loop().run_in_executor(
None, _pad_track_blocking, req, cancelled, logger
)
return PaddingResponse(**result)
finally:
cancelled.set()
disconnect_task.cancel()
try:
await disconnect_task
except asyncio.CancelledError:
pass
def _pad_track_blocking(req, cancelled, logger) -> dict:
"""Blocking CPU-bound padding work with periodic cancellation checks.
Args:
cancelled: threading.Event for thread-safe cancellation signaling
"""
import av
import requests
from av.audio.resampler import AudioResampler
import time
temp_dir = tempfile.mkdtemp()
input_path = None
output_path = None
last_check = time.time()
try:
logger.info("Downloading track for padding")
response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT)
response.raise_for_status()
input_path = os.path.join(temp_dir, "track.webm")
total_bytes = 0
chunk_count = 0
with open(input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
chunk_count += 1
# Check for cancellation every arbitrary amount of chunks
if chunk_count % 12 == 0:
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during download, exiting early")
return {"size": 0, "cancelled": True}
last_check = now
logger.info(f"Track downloaded: {total_bytes} bytes")
if cancelled.is_set():
logger.info("Cancelled after download, exiting early")
return {"size": 0, "cancelled": True}
# Apply padding using PyAV
output_path = os.path.join(temp_dir, "padded.webm")
delay_ms = math.floor(req.start_time_seconds * 1000)
logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV")
in_container = av.open(input_path)
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise ValueError("No audio stream in input")
with av.open(output_path, "w", format="webm") as out_container:
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
for frame in in_container.decode(in_stream):
# Check for cancellation periodically
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during processing, exiting early")
in_container.close()
return {"size": 0, "cancelled": True}
last_check = now
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
in_container.close()
file_size = os.path.getsize(output_path)
logger.info(f"Padding complete: {file_size} bytes")
logger.info("Uploading padded track to S3")
with open(output_path, "rb") as f:
upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT)
upload_response.raise_for_status()
logger.info(f"Upload complete: {file_size} bytes")
return {"size": file_size}
finally:
if input_path and os.path.exists(input_path):
try:
os.unlink(input_path)
except Exception as e:
logger.warning(f"Failed to cleanup input file: {e}")
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except Exception as e:
logger.warning(f"Failed to cleanup output file: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {e}")
return web_app

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \ UV_LINK_MODE=copy \
UV_NO_CACHE=1 UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp WORKDIR /tmp
RUN apt-get update \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \ && apt-get install -y \
ffmpeg \ ffmpeg \
curl \ curl \
ca-certificates \ ca-certificates \
gnupg \ gnupg \
wget \ wget
&& apt-get clean
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12 # Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \ && rm /cuda-keyring.deb \
&& apt-get update \ && apt-get update \
&& apt-get install -y --no-install-recommends \ && apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \ cuda-cudart-12-6 \
libcublas-12-6 \ libcublas-12-6 \
libcudnn9-cuda-12 \ libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \ libcudnn9-dev-cuda-12
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH" ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/ COPY ./main.py /app/
COPY ./runserver.sh /app/ COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000 EXPOSE 8000
CMD ["sh", "/app/runserver.sh"] CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -8,7 +8,7 @@ readme = "README.md"
dependencies = [ dependencies = [
"aiohttp>=3.9.0", "aiohttp>=3.9.0",
"aiohttp-cors>=0.7.0", "aiohttp-cors>=0.7.0",
"av>=10.0.0", "av>=15.0.0",
"requests>=2.31.0", "requests>=2.31.0",
"aiortc>=1.5.0", "aiortc>=1.5.0",
"sortedcontainers>=2.4.0", "sortedcontainers>=2.4.0",

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds) # Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation TIMEOUT_MEDIUM = (
300 # Single LLM calls, waveform generation (5m for slow LLM responses)
)
TIMEOUT_LONG = 180 # Action items (larger context LLM) TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
mtg_session_id = recording.mtg_session_id mtg_session_id = recording.mtg_session_id
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptParticipant, TranscriptParticipant,
transcripts_controller, transcripts_controller,
) )
@@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
if not transcript: if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found") raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles # Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"events": [], "events": [],
"topics": [], "topics": [],
"participants": [], "participants": [],
"duration": duration_ms,
}, },
) )
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty( mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required" mtg_session_id, "mtg_session_id is required"
) )
@@ -1095,7 +1107,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items], parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
) )
@@ -1108,12 +1120,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
""" """
ctx.log("finalize: saving transcript and setting status to 'ended'") ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks) track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files created_padded_files = track_result.created_padded_files
if created_padded_files: if created_padded_files:
@@ -1133,7 +1141,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText, TranscriptText,
transcripts_controller, transcripts_controller,
) )
@@ -1142,8 +1149,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -1155,21 +1160,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
logger=logger, logger=logger,
) )
# Save duration and clear workflow_run_id (workflow completed successfully) # Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks # Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume "workflow_run_id": None, # Clear on success - no need to resume
}, },
) )
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log( ctx.log(

View File

@@ -0,0 +1,165 @@
"""
Hatchet child workflow: PaddingWorkflow
Handles individual audio track padding via Modal.com backend.
"""
from datetime import timedelta
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class PaddingInput(BaseModel):
"""Input for individual track padding."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
hatchet = HatchetClientManager.get_client()
padding_workflow = hatchet.workflow(
name="PaddingWorkflow", input_validator=PaddingInput
)
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
"""Pad audio track with silence based on WebM container start_time."""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
# Extract start_time to determine if padding needed
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except (ValueError, TypeError, OverflowError) as e:
ctx.log(
f"pad_track: track {input.track_index}, duration error: {str(e)}"
)
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
import httpx # noqa: PLC0415
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
try:
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track: Modal returned size={file_size}")
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal padding HTTP error",
transcript_id=input.transcript_id,
track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error(
"[Hatchet] pad_track failed",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise

View File

@@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks. storage/DB connections are not shared across forks.
""" """
import tempfile
from datetime import timedelta from datetime import timedelta
from pathlib import Path
import av import av
from hatchet_sdk import Context from hatchet_sdk import Context
@@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import ( from reflector.utils.audio_padding import extract_stream_start_time_from_container
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
class TrackInput(BaseModel): class TrackInput(BaseModel):
@@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
) )
with av.open(source_url) as in_container: with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container( start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger in_container, input.track_index, logger=logger
) )
# If no padding needed, return original S3 key # If no padding needed, return original S3 key
if start_time_seconds <= 0: if start_time_seconds <= 0:
logger.info( logger.info(
f"Track {input.track_index} requires no padding", f"Track {input.track_index} requires no padding",
track_index=input.track_index, track_index=input.track_index,
) )
return PadTrackResult( return PadTrackResult(
padded_key=input.s3_key, padded_key=input.s3_key,
bucket_name=input.bucket_name, bucket_name=input.bucket_name,
size=0, size=0,
track_index=input.track_index, track_index=input.track_index,
) )
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
temp_path = temp_file.name
try: # Presign PUT URL for output (Modal uploads directly)
apply_audio_padding_to_file( output_url = await storage.get_file_url(
in_container, storage_path,
temp_path, operation="put_object",
start_time_seconds, expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
input.track_index, )
logger=logger,
)
file_size = Path(temp_path).stat().st_size from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" AudioPaddingModalProcessor,
)
logger.info( processor = AudioPaddingModalProcessor()
f"About to upload padded track", result = await processor.pad_track(
key=storage_path, track_url=source_url,
size=file_size, output_url=output_url,
) start_time_seconds=start_time_seconds,
track_index=input.track_index,
with open(temp_path, "rb") as padded_file: )
await storage.put_file(storage_path, padded_file) file_size = result.size
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}") ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info( logger.info(

View File

@@ -0,0 +1,113 @@
"""
Modal.com backend for audio padding.
"""
import asyncio
import os
import httpx
from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
class AudioPaddingModalProcessor:
"""Audio padding processor using Modal.com CPU backend via HTTP."""
def __init__(
self, padding_url: str | None = None, modal_api_key: str | None = None
):
self.padding_url = padding_url or os.getenv("PADDING_URL")
if not self.padding_url:
raise ValueError(
"PADDING_URL required to use AudioPaddingModalProcessor. "
"Set PADDING_URL environment variable or pass padding_url parameter."
)
self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY")
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence via Modal backend.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Sending Modal padding HTTP request")
url = f"{self.padding_url}/pad"
headers = {}
if self.modal_api_key:
headers["Authorization"] = f"Bearer {self.modal_api_key}"
try:
async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client:
response = await client.post(
url,
headers=headers,
json={
"track_url": track_url,
"output_url": output_url,
"start_time_seconds": start_time_seconds,
"track_index": track_index,
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
log.error(
"Modal padding API error",
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()
# Check if work was cancelled
if result.get("cancelled"):
log.warning("Modal padding was cancelled by disconnect detection")
raise asyncio.CancelledError(
"Padding cancelled due to client disconnect"
)
log.info("Modal padding complete", size=result["size"])
return PaddingResponse(**result)
except asyncio.CancelledError:
log.warning(
"Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)"
)
raise
except httpx.TimeoutException as e:
log.error("Modal padding timeout", error=str(e), exc_info=True)
raise Exception(f"Modal padding timeout: {e}") from e
except httpx.HTTPStatusError as e:
log.error("Modal padding HTTP error", error=str(e), exc_info=True)
raise Exception(f"Modal padding HTTP error: {e}") from e
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
use_celery = False # Multitrack processing always uses Hatchet (no Celery fallback)
if config.room_id: # First check if we can replay (outside transaction since it's read-only)
room = await rooms_controller.get_by_id(config.room_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
use_celery = room.use_celery if room else False if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
use_hatchet = not use_celery transcript.workflow_run_id
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
if use_hatchet: await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
# First check if we can replay (outside transaction since it's read-only) logger.info(
transcript = await transcripts_controller.get_by_id(config.transcript_id) "Replaying Hatchet workflow",
if transcript and transcript.workflow_run_id and not force: workflow_id=transcript.workflow_run_id,
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
if can_replay: return None
await HatchetClientManager.replay_workflow( else:
transcript.workflow_run_id # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
) # Log and proceed to start new workflow
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): logger.info(
logger.info( "Old workflow not replayable, starting new",
"Concurrent workflow detected, skipping dispatch", old_workflow_id=transcript.workflow_run_id,
workflow_id=transcript.workflow_run_id, old_status=status.value,
) )
return None except NotFoundException:
except ApiException: # Workflow deleted from Hatchet but ID still in DB
# Workflow might be gone (404) or API issue - proceed with new workflow logger.info(
pass "Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow( # Force: cancel old workflow if exists
workflow_name="DiarizationPipeline", if force and transcript and transcript.workflow_run_id:
input_data={ try:
"recording_id": config.recording_id, await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
"tracks": [{"s3_key": k} for k in config.track_keys], logger.info(
"bucket_name": config.bucket_name, "Cancelled old workflow (--force)",
"transcript_id": config.transcript_id, workflow_id=transcript.workflow_run_id,
"room_id": config.room_id, )
}, except NotFoundException:
additional_metadata={ logger.info(
"transcript_id": config.transcript_id, "Old workflow already deleted (--force)",
"recording_id": config.recording_id, workflow_id=transcript.workflow_run_id,
"daily_recording_id": config.recording_id, )
}, await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
) )
if transcript: logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
await transcripts_controller.update( return None
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -98,6 +98,10 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio # Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Padding (Modal.com backend)
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None
# Sentry # Sentry
SENTRY_DSN: str | None = None SENTRY_DSN: str | None = None
@@ -151,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None

View File

@@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin
""" """
# Opus codec settings # Opus codec settings
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000 OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration # S3 presigned URL expiration

View File

@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_celery = room and room.use_celery # Multitrack processing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
if use_celery: input_data={
logger.info( "recording_id": recording_id,
"Room uses legacy Celery processing", "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
room_id=room.id, "bucket_name": bucket_name,
transcript_id=transcript.id, "transcript_id": transcript.id,
) "room_id": room.id,
},
if use_hatchet: additional_metadata={
workflow_id = await HatchetClientManager.start_workflow( "transcript_id": transcript.id,
workflow_name="DiarizationPipeline", "recording_id": recording_id,
input_data={ "daily_recording_id": recording_id,
"recording_id": recording_id, },
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
) )
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task @shared_task
@@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_celery = room and room.use_celery # Multitrack reprocessing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery if not transcript:
logger.warning(
if use_hatchet: "No transcript for Hatchet reprocessing, skipping",
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) workflow_id = await HatchetClientManager.start_workflow(
# Reprocessing uses recording.meeting_id directly instead of time-based matching workflow_name="DiarizationPipeline",
recording_start_ts = int(recording.recorded_at.timestamp()) input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay( logger.info(
bucket_name=bucket_name, "Queued Daily recording for Hatchet reprocessing",
daily_room_name=meeting.room_name, recording_id=recording.id,
recording_id=recording.id, workflow_id=workflow_id,
track_keys=recording.track_keys, room_name=meeting.room_name,
recording_start_ts=recording_start_ts, track_count=len(recording.track_keys),
) )
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio import asyncio
import json import json
import threading
import redis.asyncio as redis import redis.asyncio as redis
from fastapi import WebSocket from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber): async def _pubsub_data_reader(self, pubsub_subscriber):
while True: while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message( message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True ignore_subscribe_messages=True
) )
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data) await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager: def get_ws_manager() -> WebsocketManager:
""" """
Returns the WebsocketManager instance for managing websockets. Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance, Creates instance on first call, subsequent calls return cached instance.
which is responsible for managing websockets and handling websocket Thread-safe via GIL. Concurrent initialization may create duplicate
connections. instances but last write wins (acceptable for this use case).
Returns: Returns:
WebsocketManager: The initialized WebsocketManager instance. WebsocketManager: The global WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
""" """
local = threading.local() global _ws_manager
if hasattr(local, "ws_manager"):
return local.ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager( pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST, host=settings.REDIS_HOST,
port=settings.REDIS_PORT, port=settings.REDIS_PORT,
) )
ws_manager = WebsocketManager(pubsub_client=pubsub_client) _ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager return _ws_manager
return ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -1,11 +1,10 @@
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -15,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield
@@ -333,11 +333,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def celery_config(): def celery_config():
with NamedTemporaryFile() as f: redis_host = os.environ.get("REDIS_HOST", "localhost")
yield { redis_port = os.environ.get("REDIS_PORT", "6379")
"broker_url": "memory://", # Use db 2 to avoid conflicts with main app
"result_backend": f"db+sqlite:///{f.name}", redis_url = f"redis://{redis_host}:{redis_port}/2"
} yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@@ -370,9 +373,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue): def __init__(self, queue: asyncio.Queue):
self.queue = queue self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True): async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try: try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05) return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception: except Exception:
return None return None

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline # Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called() mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline # Daily.co multitrack recordings should use Hatchet workflow
mock_multitrack_pipeline.delay.assert_called_once_with( mock_hatchet.start_workflow.assert_called_once()
transcript_id=transcript.id, call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
bucket_name="daily-bucket", assert call_kwargs["workflow_name"] == "DiarizationPipeline"
track_keys=track_keys, assert call_kwargs["input_data"]["transcript_id"] == transcript.id
) assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session") # Using celery_includes from conftest.py which includes both pipelines
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance: if server_instance:
server_instance.should_exit = True server_instance.should_exit = True
server_thread.join(timeout=30) server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create # Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action # Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com", "email": "user-abc@example.com",
} }
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"}) resp = await ac.post("/transcripts", json={"name": "WS Test"})

45
server/uv.lock generated
View File

@@ -159,21 +159,20 @@ wheels = [
[[package]] [[package]]
name = "aiortc" name = "aiortc"
version = "1.13.0" version = "1.14.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "aioice" }, { name = "aioice" },
{ name = "av" }, { name = "av" },
{ name = "cffi" },
{ name = "cryptography" }, { name = "cryptography" },
{ name = "google-crc32c" }, { name = "google-crc32c" },
{ name = "pyee" }, { name = "pyee" },
{ name = "pylibsrtp" }, { name = "pylibsrtp" },
{ name = "pyopenssl" }, { name = "pyopenssl" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 } sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 }, { url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 },
] ]
[[package]] [[package]]
@@ -327,28 +326,24 @@ wheels = [
[[package]] [[package]]
name = "av" name = "av"
version = "14.4.0" version = "16.1.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 } sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 }, { url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 },
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 }, { url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 },
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 }, { url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 },
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 }, { url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 },
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 }, { url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 },
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 }, { url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 },
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 }, { url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 },
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 }, { url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 },
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 }, { url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 },
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 }, { url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 },
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 }, { url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 },
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 }, { url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 },
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 }, { url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 },
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 }, { url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 },
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
] ]
[[package]] [[package]]
@@ -3267,7 +3262,7 @@ requires-dist = [
{ name = "aiohttp-cors", specifier = ">=0.7.0" }, { name = "aiohttp-cors", specifier = ">=0.7.0" },
{ name = "aiortc", specifier = ">=1.5.0" }, { name = "aiortc", specifier = ">=1.5.0" },
{ name = "alembic", specifier = ">=1.11.3" }, { name = "alembic", specifier = ">=1.11.3" },
{ name = "av", specifier = ">=10.0.0" }, { name = "av", specifier = ">=15.0.0" },
{ name = "celery", specifier = ">=5.3.4" }, { name = "celery", specifier = ">=5.3.4" },
{ name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" }, { name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" },
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },