Compare commits

...

9 Commits

Author SHA1 Message Date
Igor Loskutov
f345f1f122 Merge branch 'main' into brady-bunch-sync 2026-02-03 17:15:46 -05:00
8707c6694a fix: use Daily API recording.duration as master source for transcript duration (#844)
Set duration early in get_participants from Daily API (seconds -> ms),
ensuring post_zulip has the value before mixdown_tracks completes.

Removes redundant duration update from mixdown_tracks.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 17:15:03 -05:00
4acde4b7fd fix: increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks (#843)
Topic detection was timing out on longer transcripts when LLM
responses are slow. This affects detect_chunk_topic and other
LLM-calling tasks that use TIMEOUT_MEDIUM.

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-03 16:05:16 -05:00
a2ed7d60d5 fix: make caddy optional (#841) 2026-02-03 00:18:47 +01:00
Igor Loskutov
15aa121727 daily-matching 2026-01-30 17:47:47 -05:00
Igor Loskutov
08f4294b36 more daily co api docs 2026-01-30 17:46:38 -05:00
a08f94a5bf chore(main): release 0.32.1 (#840) 2026-01-30 17:34:48 -05:00
Igor Loskutov
c05d1f03cd fix: match httpx pad with hatchet audio timeout 2026-01-30 15:56:18 -05:00
Igor Loskutov
23eb1371cb fix: daily multitrack pipeline finalze dependency fix 2026-01-30 15:19:27 -05:00
23 changed files with 1321 additions and 764 deletions

View File

@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121

View File

@@ -1,5 +1,13 @@
# Changelog # Changelog
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)
### Bug Fixes
* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e))
* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0))
## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30) ## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30)

View File

@@ -1,6 +1,8 @@
# Reflector Caddyfile # Reflector Caddyfile (optional reverse proxy)
# Replace example.com with your actual domains # Use this only when you run Caddy via: docker compose -f docker-compose.prod.yml --profile caddy up -d
# CORS is handled by the backend - Caddy just proxies # If Coolify, Traefik, or nginx already use ports 80/443, do NOT start Caddy; point your proxy at web:3000 and server:1250.
#
# Replace example.com with your actual domains. CORS is handled by the backend - Caddy just proxies.
# #
# For environment variable substitution, set: # For environment variable substitution, set:
# FRONTEND_DOMAIN=app.example.com # FRONTEND_DOMAIN=app.example.com

View File

@@ -1,9 +1,14 @@
# Production Docker Compose configuration # Production Docker Compose configuration
# Usage: docker compose -f docker-compose.prod.yml up -d # Usage: docker compose -f docker-compose.prod.yml up -d
# #
# Caddy (reverse proxy on ports 80/443) is OPTIONAL and behind the "caddy" profile:
# - With Caddy (self-hosted, you manage SSL): docker compose -f docker-compose.prod.yml --profile caddy up -d
# - Without Caddy (Coolify/Traefik/nginx already on 80/443): docker compose -f docker-compose.prod.yml up -d
# Then point your proxy at web:3000 (frontend) and server:1250 (API).
#
# Prerequisites: # Prerequisites:
# 1. Copy .env.example to .env and configure for both server/ and www/ # 1. Copy .env.example to .env and configure for both server/ and www/
# 2. Copy Caddyfile.example to Caddyfile and edit with your domains # 2. If using Caddy: copy Caddyfile.example to Caddyfile and edit your domains
# 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh) # 3. Deploy Modal GPU functions (see gpu/modal_deployments/deploy-all.sh)
services: services:
@@ -84,6 +89,8 @@ services:
retries: 3 retries: 3
caddy: caddy:
profiles:
- caddy
image: caddy:2-alpine image: caddy:2-alpine
restart: unless-stopped restart: unless-stopped
ports: ports:

View File

@@ -12,14 +12,14 @@ This page documents the Docker Compose configuration for Reflector. For the comp
The `docker-compose.prod.yml` includes these services: The `docker-compose.prod.yml` includes these services:
| Service | Image | Purpose | | Service | Image | Purpose |
|---------|-------|---------| | ---------- | --------------------------------- | --------------------------------------------------------------------------- |
| `web` | `monadicalsas/reflector-frontend` | Next.js frontend | | `web` | `monadicalsas/reflector-frontend` | Next.js frontend |
| `server` | `monadicalsas/reflector-backend` | FastAPI backend | | `server` | `monadicalsas/reflector-backend` | FastAPI backend |
| `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks | | `worker` | `monadicalsas/reflector-backend` | Celery worker for background tasks |
| `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler | | `beat` | `monadicalsas/reflector-backend` | Celery beat scheduler |
| `redis` | `redis:7.2-alpine` | Message broker and cache | | `redis` | `redis:7.2-alpine` | Message broker and cache |
| `postgres` | `postgres:17-alpine` | Primary database | | `postgres` | `postgres:17-alpine` | Primary database |
| `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL | | `caddy` | `caddy:2-alpine` | Reverse proxy with auto-SSL (optional; see [Caddy profile](#caddy-profile)) |
## Environment Files ## Environment Files
@@ -30,6 +30,7 @@ Reflector uses two separate environment files:
Used by: `server`, `worker`, `beat` Used by: `server`, `worker`, `beat`
Key variables: Key variables:
```env ```env
# Database connection # Database connection
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -54,6 +55,7 @@ TRANSCRIPT_MODAL_API_KEY=...
Used by: `web` Used by: `web`
Key variables: Key variables:
```env ```env
# Domain configuration # Domain configuration
SITE_URL=https://app.example.com SITE_URL=https://app.example.com
@@ -71,7 +73,7 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s
## Volumes ## Volumes
| Volume | Purpose | | Volume | Purpose |
|--------|---------| | --------------- | ----------------------------- |
| `redis_data` | Redis persistence | | `redis_data` | Redis persistence |
| `postgres_data` | PostgreSQL data | | `postgres_data` | PostgreSQL data |
| `server_data` | Uploaded files, local storage | | `server_data` | Uploaded files, local storage |
@@ -82,14 +84,30 @@ Note: `API_URL` is used client-side (browser), `SERVER_API_URL` is used server-s
All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join. All services share the default network. The network is marked `attachable: true` to allow external containers (like Authentik) to join.
## Caddy profile
Caddy (ports 80 and 443) is **optional** and behind the `caddy` profile so it does not conflict with an existing reverse proxy (e.g. Coolify, Traefik, nginx).
- **With Caddy** (you want Reflector to handle SSL):
`docker compose -f docker-compose.prod.yml --profile caddy up -d`
- **Without Caddy** (Coolify or another proxy already on 80/443):
`docker compose -f docker-compose.prod.yml up -d`
Then configure your proxy to send traffic to `web:3000` (frontend) and `server:1250` (API).
## Common Commands ## Common Commands
### Start all services ### Start all services
```bash ```bash
# Without Caddy (e.g. when using Coolify)
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
# With Caddy as reverse proxy
docker compose -f docker-compose.prod.yml --profile caddy up -d
``` ```
### View logs ### View logs
```bash ```bash
# All services # All services
docker compose -f docker-compose.prod.yml logs -f docker compose -f docker-compose.prod.yml logs -f
@@ -99,6 +117,7 @@ docker compose -f docker-compose.prod.yml logs server --tail 50
``` ```
### Restart a service ### Restart a service
```bash ```bash
# Quick restart (doesn't reload .env changes) # Quick restart (doesn't reload .env changes)
docker compose -f docker-compose.prod.yml restart server docker compose -f docker-compose.prod.yml restart server
@@ -108,27 +127,32 @@ docker compose -f docker-compose.prod.yml up -d server
``` ```
### Run database migrations ### Run database migrations
```bash ```bash
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
``` ```
### Access database ### Access database
```bash ```bash
docker compose -f docker-compose.prod.yml exec postgres psql -U reflector docker compose -f docker-compose.prod.yml exec postgres psql -U reflector
``` ```
### Pull latest images ### Pull latest images
```bash ```bash
docker compose -f docker-compose.prod.yml pull docker compose -f docker-compose.prod.yml pull
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
``` ```
### Stop all services ### Stop all services
```bash ```bash
docker compose -f docker-compose.prod.yml down docker compose -f docker-compose.prod.yml down
``` ```
### Full reset (WARNING: deletes data) ### Full reset (WARNING: deletes data)
```bash ```bash
docker compose -f docker-compose.prod.yml down -v docker compose -f docker-compose.prod.yml down -v
``` ```
@@ -187,6 +211,7 @@ The Caddyfile supports environment variable substitution:
Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly. Set `FRONTEND_DOMAIN` and `API_DOMAIN` environment variables, or edit the file directly.
### Reload Caddy after changes ### Reload Caddy after changes
```bash ```bash
docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile docker compose -f docker-compose.prod.yml exec caddy caddy reload --config /etc/caddy/Caddyfile
``` ```

View File

@@ -61,7 +61,7 @@ Type: A Name: api Value: <your-server-ip>
Reflector requires GPU processing for transcription and speaker diarization. Choose one option: Reflector requires GPU processing for transcription and speaker diarization. Choose one option:
| | **Modal.com (Cloud)** | **Self-Hosted GPU** | | | **Modal.com (Cloud)** | **Self-Hosted GPU** |
|---|---|---| | ------------ | --------------------------------- | ---------------------------- |
| **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control | | **Best for** | No GPU hardware, zero maintenance | Own GPU server, full control |
| **Pricing** | Pay-per-use | Fixed infrastructure cost | | **Pricing** | Pay-per-use | Fixed infrastructure cost |
@@ -70,6 +70,7 @@ Reflector requires GPU processing for transcription and speaker diarization. Cho
#### Accept HuggingFace Licenses #### Accept HuggingFace Licenses
Visit both pages and click "Accept": Visit both pages and click "Accept":
- https://huggingface.co/pyannote/speaker-diarization-3.1 - https://huggingface.co/pyannote/speaker-diarization-3.1
- https://huggingface.co/pyannote/segmentation-3.0 - https://huggingface.co/pyannote/segmentation-3.0
@@ -179,6 +180,7 @@ Save these credentials - you'll need them in the next step.
## Configure Environment ## Configure Environment
Reflector has two env files: Reflector has two env files:
- `server/.env` - Backend configuration - `server/.env` - Backend configuration
- `www/.env` - Frontend configuration - `www/.env` - Frontend configuration
@@ -190,6 +192,7 @@ nano server/.env
``` ```
**Required settings:** **Required settings:**
```env ```env
# Database (defaults work with docker-compose.prod.yml) # Database (defaults work with docker-compose.prod.yml)
DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector DATABASE_URL=postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
@@ -249,6 +252,7 @@ nano www/.env
``` ```
**Required settings:** **Required settings:**
```env ```env
# Your domains # Your domains
SITE_URL=https://app.example.com SITE_URL=https://app.example.com
@@ -266,7 +270,11 @@ FEATURE_REQUIRE_LOGIN=false
--- ---
## Configure Caddy ## Reverse proxy (Caddy or existing)
**If Coolify, Traefik, or nginx already use ports 80/443** (e.g. Coolify on your host): skip Caddy. Start the stack without the Caddy profile (see [Start Services](#start-services) below), then point your proxy at `web:3000` (frontend) and `server:1250` (API).
**If you want Reflector to provide the reverse proxy and SSL:**
```bash ```bash
cp Caddyfile.example Caddyfile cp Caddyfile.example Caddyfile
@@ -289,10 +297,18 @@ Replace `example.com` with your domains. The `{$VAR:default}` syntax uses Caddy'
## Start Services ## Start Services
**Without Caddy** (e.g. Coolify already on 80/443):
```bash ```bash
docker compose -f docker-compose.prod.yml up -d docker compose -f docker-compose.prod.yml up -d
``` ```
**With Caddy** (Reflector handles SSL):
```bash
docker compose -f docker-compose.prod.yml --profile caddy up -d
```
Wait for containers to start (first run may take 1-2 minutes to pull images and initialize). Wait for containers to start (first run may take 1-2 minutes to pull images and initialize).
--- ---
@@ -300,18 +316,21 @@ Wait for containers to start (first run may take 1-2 minutes to pull images and
## Verify Deployment ## Verify Deployment
### Check services ### Check services
```bash ```bash
docker compose -f docker-compose.prod.yml ps docker compose -f docker-compose.prod.yml ps
# All should show "Up" # All should show "Up"
``` ```
### Test API ### Test API
```bash ```bash
curl https://api.example.com/health curl https://api.example.com/health
# Should return: {"status":"healthy"} # Should return: {"status":"healthy"}
``` ```
### Test Frontend ### Test Frontend
- Visit https://app.example.com - Visit https://app.example.com
- You should see the Reflector interface - You should see the Reflector interface
- Try uploading an audio file to test transcription - Try uploading an audio file to test transcription
@@ -327,6 +346,7 @@ By default, Reflector is open (no login required). **Authentication is required
See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration. See [Authentication Setup](./auth-setup) for full Authentik OAuth configuration.
Quick summary: Quick summary:
1. Deploy Authentik on your server 1. Deploy Authentik on your server
2. Create OAuth provider in Authentik 2. Create OAuth provider in Authentik
3. Extract public key for JWT verification 3. Extract public key for JWT verification
@@ -358,6 +378,7 @@ DAILYCO_STORAGE_AWS_ROLE_ARN=<arn:aws:iam::ACCOUNT:role/DailyCo>
``` ```
Reload env and restart: Reload env and restart:
```bash ```bash
docker compose -f docker-compose.prod.yml up -d server worker docker compose -f docker-compose.prod.yml up -d server worker
``` ```
@@ -367,35 +388,43 @@ docker compose -f docker-compose.prod.yml up -d server worker
## Troubleshooting ## Troubleshooting
### Check logs for errors ### Check logs for errors
```bash ```bash
docker compose -f docker-compose.prod.yml logs server --tail 20 docker compose -f docker-compose.prod.yml logs server --tail 20
docker compose -f docker-compose.prod.yml logs worker --tail 20 docker compose -f docker-compose.prod.yml logs worker --tail 20
``` ```
### Services won't start ### Services won't start
```bash ```bash
docker compose -f docker-compose.prod.yml logs docker compose -f docker-compose.prod.yml logs
``` ```
### CORS errors in browser ### CORS errors in browser
- Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`) - Verify `CORS_ORIGIN` in `server/.env` matches your frontend domain exactly (including `https://`)
- Reload env: `docker compose -f docker-compose.prod.yml up -d server` - Reload env: `docker compose -f docker-compose.prod.yml up -d server`
### SSL certificate errors ### SSL certificate errors (when using Caddy)
- Caddy auto-provisions Let's Encrypt certificates - Caddy auto-provisions Let's Encrypt certificates
- Ensure ports 80 and 443 are open - Ensure ports 80 and 443 are open and not used by another proxy
- Check: `docker compose -f docker-compose.prod.yml logs caddy` - Check: `docker compose -f docker-compose.prod.yml logs caddy`
- If port 80 is already in use (e.g. by Coolify), run without Caddy: `docker compose -f docker-compose.prod.yml up -d` and use your existing proxy
### Transcription not working ### Transcription not working
- Check Modal dashboard: https://modal.com/apps - Check Modal dashboard: https://modal.com/apps
- Verify URLs in `server/.env` match deployed functions - Verify URLs in `server/.env` match deployed functions
- Check worker logs: `docker compose -f docker-compose.prod.yml logs worker` - Check worker logs: `docker compose -f docker-compose.prod.yml logs worker`
### "Login required" but auth not configured ### "Login required" but auth not configured
- Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env` - Set `FEATURE_REQUIRE_LOGIN=false` in `www/.env`
- Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web` - Rebuild frontend: `docker compose -f docker-compose.prod.yml up -d --force-recreate web`
### Database migrations or connectivity issues ### Database migrations or connectivity issues
Migrations run automatically on server startup. To check database connectivity or debug migration failures: Migrations run automatically on server startup. To check database connectivity or debug migration failures:
```bash ```bash
@@ -408,4 +437,3 @@ docker compose -f docker-compose.prod.yml exec server uv run python -c "from ref
# Manually run migrations (if needed) # Manually run migrations (if needed)
docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head docker compose -f docker-compose.prod.yml exec server uv run alembic upgrade head
``` ```

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants | | **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp | | **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)). **Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording ### Recording
@@ -101,6 +101,30 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs. **Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
--- ---
## Entity Relationships ## Entity Relationships
@@ -196,6 +220,19 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room). `mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant) ### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks. **Different concept:** Per-participant connection identifier from webhooks.
@@ -220,16 +257,24 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers. Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response:** **Example API response (mtgSessionId can be null OR present):**
```json ```json
{ {
"id": "recording-uuid", "id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117", "room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896, "start_ts": 1768018896,
"mtgSessionId": null Missing! "mtgSessionId": null // ← Often null (unreliable)
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
} }
``` ```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching ### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()` **Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -491,6 +536,10 @@ UI: User sees 3 separate transcripts
--- ---
**Document Version:** 1.0 **Document Version:** 1.1
**Last Verified:** 2026-01-15 **Last Updated:** 2026-01-20
**Data Source:** Production database + Daily.co API inspection **Data Source:** Production database + Daily.co API inspection + empirical testing
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior

View File

@@ -0,0 +1,67 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -0,0 +1,56 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,6 +26,7 @@ def get_database() -> databases.Database:
# import models # import models
import reflector.db.calendar_events # noqa import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa import reflector.db.meetings # noqa
import reflector.db.recordings # noqa import reflector.db.recordings # noqa
import reflector.db.rooms # noqa import reflector.db.rooms # noqa

View File

@@ -0,0 +1,111 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timedelta from datetime import datetime
from typing import Any, Literal from typing import Any, Literal
import sqlalchemy as sa import sqlalchemy as sa
@@ -183,84 +183,6 @@ class MeetingController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results] return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
""" """
Get latest active meeting for a room. Get latest active meeting for a room.
@@ -350,44 +272,6 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query) await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None: async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count.""" """Atomically increment participant count."""
query = ( query = (
@@ -467,6 +351,27 @@ class MeetingConsentController:
result = await get_database().fetch_one(query) result = await get_database().fetch_one(query)
return result is not None return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController() meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController() meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa import sqlalchemy as sa
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4 from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table( recordings = sa.Table(
"recording", "recording",
@@ -31,14 +31,13 @@ recordings = sa.Table(
class Recording(BaseModel): class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4) id: str = Field(default_factory=generate_uuid4)
bucket_name: str bucket_name: str
# for single-track
object_key: str object_key: str
recorded_at: datetime recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed"] = "pending" status: Literal["pending", "processing", "completed", "failed", "orphan"] = (
"pending"
)
meeting_id: str | None = None meeting_id: str | None = None
# for multitrack reprocessing # None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None track_keys: list[str] | None = None
@property @property
@@ -72,20 +71,6 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id) query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query) await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids: if not recording_ids:
return [] return []
@@ -104,9 +89,12 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python. This is more efficient than fetching all recordings and filtering in Python.
""" """
from reflector.db.transcripts import ( # INLINE IMPORT REQUIRED: Circular dependency
transcripts, # noqa: PLC0415 cyclic import # - recordings.py needs transcripts table for JOIN query
) # - transcripts.py imports recordings_controller
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
query = ( query = (
recordings.select() recordings.select()
@@ -124,5 +112,27 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results] recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack] return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController() recordings_controller = RecordingController()

View File

@@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds) # Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation TIMEOUT_MEDIUM = (
300 # Single LLM calls, waveform generation (5m for slow LLM responses)
)
TIMEOUT_LONG = 180 # Action items (larger context LLM) TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
mtg_session_id = recording.mtg_session_id mtg_session_id = recording.mtg_session_id
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptParticipant, TranscriptParticipant,
transcripts_controller, transcripts_controller,
) )
@@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
if not transcript: if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found") raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles # Note: title NOT cleared - preserves existing titles
# Duration from Daily API (seconds -> milliseconds) - master source
duration_ms = recording.duration * 1000 if recording.duration else 0
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"events": [], "events": [],
"topics": [], "topics": [],
"participants": [], "participants": [],
"duration": duration_ms,
}, },
) )
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
mtg_session_id = assert_non_none_and_non_empty( mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required" mtg_session_id, "mtg_session_id is required"
) )
@@ -1095,7 +1107,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items], parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
) )
@@ -1108,12 +1120,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
""" """
ctx.log("finalize: saving transcript and setting status to 'ended'") ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks) track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files created_padded_files = track_result.created_padded_files
if created_padded_files: if created_padded_files:
@@ -1133,7 +1141,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText, TranscriptText,
transcripts_controller, transcripts_controller,
) )
@@ -1142,8 +1149,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -1155,21 +1160,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
logger=logger, logger=logger,
) )
# Save duration and clear workflow_run_id (workflow completed successfully) # Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks # Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume "workflow_run_id": None, # Clear on success - no need to resume
}, },
) )
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log( ctx.log(

View File

@@ -8,6 +8,7 @@ import os
import httpx import httpx
from pydantic import BaseModel from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger from reflector.logger import logger
@@ -63,7 +64,7 @@ class AudioPaddingModalProcessor:
headers["Authorization"] = f"Bearer {self.modal_api_key}" headers["Authorization"] = f"Bearer {self.modal_api_key}"
try: try:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client:
response = await client.post( response = await client.post(
url, url,
headers=headers, headers=headers,

View File

@@ -1,4 +1,6 @@
import json import json
import os
from datetime import datetime, timezone
from typing import assert_never from typing import assert_never
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
@@ -12,7 +14,10 @@ from reflector.dailyco_api import (
RecordingReadyEvent, RecordingReadyEvent,
RecordingStartedEvent, RecordingStartedEvent,
) )
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger from reflector.logger import logger as _logger
from reflector.settings import settings from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
@@ -212,10 +217,73 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"] track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info( logger.info(
"Raw-tracks recording queuing processing", "Raw-tracks recording queuing processing (webhook)",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys), num_tracks=len(track_keys),
) )

View File

@@ -1,4 +1,5 @@
import json import json
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Annotated, Any, Optional from typing import Annotated, Any, Optional
from uuid import UUID from uuid import UUID
@@ -9,16 +10,21 @@ from pydantic import BaseModel
import reflector.auth as auth import reflector.auth as auth
from reflector.dailyco_api import RecordingType from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import ( from reflector.db.meetings import (
MeetingConsent, MeetingConsent,
meeting_consent_controller, meeting_consent_controller,
meetings_controller, meetings_controller,
) )
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@@ -102,13 +108,6 @@ async def start_recording(
if not meeting: if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found") raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try: try:
client = create_platform_client("daily") client = create_platform_client("daily")
result = await client.start_recording( result = await client.start_recording(
@@ -117,9 +116,30 @@ async def start_recording(
instance_id=body.instanceId, instance_id=body.instanceId,
) )
log.info(f"Started {body.type} recording via REST API") recording_id = result["id"]
return {"status": "ok", "result": result} await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
except DailyApiError as e: except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream" # Parse Daily.co error response to detect "has an active stream"
@@ -130,22 +150,42 @@ async def start_recording(
# "has an active stream" means recording already started by another participant # "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200 # This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info: if "has an active stream" in error_info:
log.info( logger.info(
f"{body.type} recording already active (started by another participant)" f"{body.type} recording already active (started by another participant)",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
) )
return {"status": "already_active", "instanceId": str(body.instanceId)} return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError): except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling pass # Fall through to error handling
# All other Daily.co API errors # All other Daily.co API errors
log.error(f"Failed to start {body.type} recording", error=str(e)) logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )
except Exception as e: except Exception as e:
# Non-Daily.co errors # Non-Daily.co errors
log.error(f"Failed to start {body.type} recording", error=str(e)) logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )

View File

@@ -1,6 +1,5 @@
import json import json
import os import os
import re
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Literal from typing import List, Literal
from urllib.parse import unquote from urllib.parse import unquote
@@ -13,10 +12,12 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
DailyParticipantSession, DailyParticipantSession,
daily_participant_sessions_controller, daily_participant_sessions_controller,
) )
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
@@ -230,79 +231,44 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int, recording_start_ts: int,
): ):
""" """
Process multitrack recording (first time or reprocessing). Process multitrack recording.
For first processing (webhook/polling): Recording must already exist with meeting_id set (created by webhook/polling before queueing).
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
""" """
tz = timezone.utc # Get recording (must exist - created by webhook/polling)
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id) recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id: if not recording:
# Reprocessing: recording exists with meeting already linked logger.error(
"Recording not found - should have been created by webhook/polling",
recording_id=recording_id,
)
return
if not recording.meeting_id:
logger.error(
"Recording has no meeting_id - orphan should not be queued",
recording_id=recording_id,
)
return
# Get meeting
meeting = await meetings_controller.get_by_id(recording.meeting_id) meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting: if not meeting:
logger.error( logger.error(
"Reprocessing: meeting not found for recording - skipping", "Meeting not found for recording",
meeting_id=recording.meeting_id, meeting_id=recording.meeting_id,
recording_id=recording_id, recording_id=recording_id,
) )
return return
logger.info( logger.info(
"Reprocessing: using existing recording.meeting_id", "Processing multitrack recording",
recording_id=recording_id, recording_id=recording_id,
meeting_id=meeting.id, meeting_id=meeting.id,
room_name=daily_room_name, room_name=daily_room_name,
) )
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
room_name_base = extract_base_room_name(daily_room_name) room_name_base = extract_base_room_name(daily_room_name)
@@ -310,33 +276,6 @@ async def _process_multitrack_recording_inner(
if not room: if not room:
raise Exception(f"Room not found: {room_name_base}") raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id) transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript: if not transcript:
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
@@ -522,7 +461,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table. Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths. Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values. Uses direct recording_id lookup via daily_recording_requests table.
Args: Args:
recording_id: Daily.co recording ID recording_id: Daily.co recording ID
@@ -535,154 +474,169 @@ async def store_cloud_recording(
Returns: Returns:
True if stored, False if skipped/failed True if stored, False if skipped/failed
""" """
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc) # Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
meeting = await meetings_controller.get_by_room_name_and_time( if not match:
room_name=room_name, # ORPHAN: No request found (pre-migration recording or failed request creation)
recording_start=recording_start, await create_and_log_orphan(
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id, recording_id=recording_id,
bucket_name="",
room_name=room_name, room_name=room_name,
recording_start_ts=start_ts, start_ts=start_ts,
recording_start=recording_start.isoformat(), track_keys=None,
source=source,
) )
return False return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing( success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id, meeting_id=meeting_id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
) )
if not success: if not success:
logger.debug( logger.debug(
f"Cloud recording ({source}): already set (race lost)", f"Cloud recording ({source}): already set (stop/restart?)",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting.id, meeting_id=meeting_id,
) )
return False return False
logger.info( logger.info(
f"Cloud recording stored via {source} (time-based match)", f"Cloud recording stored via {source}",
meeting_id=meeting.id, meeting_id=meeting_id,
recording_id=recording_id, recording_id=recording_id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
) )
return True return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
""" """Process cloud recordings (database deduplication, worker-agnostic).
Store cloud recordings missing from meeting table via polling.
Uses time-based matching via store_cloud_recording(). Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table.
Only first cloud recording per meeting is kept (existing behavior).
""" """
if not cloud_recordings: if not cloud_recordings:
return return
stored_count = 0 for rec in cloud_recordings:
for recording in cloud_recordings: # Lookup request
# Extract S3 key from recording (cloud recordings use s3key field) match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key: if not match:
logger.warning( await create_and_log_orphan(
"Cloud recording: missing S3 key", recording_id=rec.id,
recording_id=recording.id, bucket_name="",
room_name=recording.room_name, room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
) )
continue continue
stored = await store_cloud_recording( meeting_id, _ = match
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info( logger.info(
"Cloud recording polling complete", "Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
total=len(cloud_recordings), )
stored=stored_count, else:
logger.warning(
"Cloud recording already exists for meeting (stop/restart?)",
recording_id=rec.id,
meeting_id=meeting_id,
) )
async def _poll_raw_tracks_recordings( async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse], raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str, bucket_name: NonEmptyString,
): ) -> None:
"""Queue raw-tracks recordings missing from DB (existing logic).""" """Process raw-tracks (database deduplication, worker-agnostic)."""
if not raw_tracks_recordings: if not raw_tracks_recordings:
return return
recording_ids = [rec.id for rec in raw_tracks_recordings] for rec in raw_tracks_recordings:
existing_recordings = await recordings_controller.get_by_ids(recording_ids) # Lookup request FIRST (before any DB writes)
existing_ids = {rec.id for rec in existing_recordings} match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
missing_recordings = [ if not match:
rec for rec in raw_tracks_recordings if rec.id not in existing_ids await create_and_log_orphan(
] recording_id=rec.id,
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
)
continue
logger.info(
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
)
process_multitrack_recording.delay(
bucket_name=bucket_name, bucket_name=bucket_name,
daily_room_name=recording.room_name, room_name=rec.room_name,
recording_id=recording.id, start_ts=rec.start_ts,
track_keys=track_keys, track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
recording_start_ts=recording.start_ts, source="polling",
) )
continue
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id
)
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
)
continue
# DEDUPLICATION: Atomically create recording (single operation, no race window)
# ON CONFLICT → concurrent poller already got it, skip entire logic
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"]
created = await recordings_controller.try_create_with_meeting(
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
)
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name,
track_keys=track_keys,
)
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None: async def poll_daily_room_presence(meeting_id: str) -> None:

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,258 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -0,0 +1,300 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -1,374 +0,0 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100