Compare commits

..

1 Commits

Author SHA1 Message Date
Igor Loskutov
e1b790c5a8 Add Modal backend for audio mixdown 2026-01-21 17:06:17 -05:00
59 changed files with 973 additions and 2431 deletions

View File

@@ -3,4 +3,3 @@ docs/docs/installation/auth-setup.md:curl-auth-header:250
docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465

View File

@@ -1,33 +1,5 @@
# Changelog
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)
### Features
* mixdown optional ([#834](https://github.com/Monadical-SAS/reflector/issues/834)) ([fc3ef6c](https://github.com/Monadical-SAS/reflector/commit/fc3ef6c8933231c731fad84e7477a476a6220a5e))
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
### Features
* brady bunch ([#816](https://github.com/Monadical-SAS/reflector/issues/816)) ([6c175a1](https://github.com/Monadical-SAS/reflector/commit/6c175a11d8a3745095bfad06a4ad3ccdfd278433))
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
### Features
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)
### Bug Fixes
* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6))
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)

View File

@@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then
fi
echo " -> $DIARIZER_URL"
echo ""
echo "Deploying mixdown (CPU audio processing)..."
MIXDOWN_URL=$(modal deploy reflector_mixdown.py 2>&1 | grep -o 'https://[^ ]*web.modal.run' | head -1)
if [ -z "$MIXDOWN_URL" ]; then
echo "Error: Failed to deploy mixdown. Check Modal dashboard for details."
exit 1
fi
echo " -> $MIXDOWN_URL"
# --- Output Configuration ---
echo ""
echo "=========================================="
@@ -147,4 +156,8 @@ echo ""
echo "DIARIZATION_BACKEND=modal"
echo "DIARIZATION_URL=$DIARIZER_URL"
echo "DIARIZATION_MODAL_API_KEY=$API_KEY"
echo ""
echo "MIXDOWN_BACKEND=modal"
echo "MIXDOWN_URL=$MIXDOWN_URL"
echo "MIXDOWN_MODAL_API_KEY=$API_KEY"
echo "# --- End Modal Configuration ---"

View File

@@ -0,0 +1,379 @@
"""
Reflector GPU backend - audio mixdown
======================================
CPU-intensive audio mixdown service for combining multiple audio tracks.
Uses PyAV filter graph (amix) for high-quality audio mixing.
"""
import os
import tempfile
import time
from fractions import Fraction
import modal
MIXDOWN_TIMEOUT = 900 # 15 minutes
SCALEDOWN_WINDOW = 60 # 1 minute idle before shutdown
app = modal.App("reflector-mixdown")
# CPU-based image (no GPU needed for audio processing)
image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg") # Required by PyAV
.pip_install(
"av==13.1.0", # PyAV for audio processing
"requests==2.32.3", # HTTP for presigned URL downloads/uploads
"fastapi==0.115.12", # API framework
)
)
@app.function(
cpu=4.0, # 4 CPU cores for audio processing
timeout=MIXDOWN_TIMEOUT,
scaledown_window=SCALEDOWN_WINDOW,
secrets=[modal.Secret.from_name("reflector-gpu")],
image=image,
)
@modal.concurrent(max_inputs=10)
@modal.asgi_app()
def web():
import logging
import secrets
import shutil
import av
import requests
from av.audio.resampler import AudioResampler
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Validate API key exists at startup
API_KEY = os.environ.get("REFLECTOR_GPU_APIKEY")
if not API_KEY:
raise RuntimeError("REFLECTOR_GPU_APIKEY not configured in Modal secrets")
def apikey_auth(apikey: str = Depends(oauth2_scheme)):
# Use constant-time comparison to prevent timing attacks
if secrets.compare_digest(apikey, API_KEY):
return
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
class MixdownRequest(BaseModel):
track_urls: list[str]
output_url: str
target_sample_rate: int = 48000
expected_duration_sec: float | None = None
class MixdownResponse(BaseModel):
duration_ms: float
tracks_mixed: int
audio_uploaded: bool
def download_track(url: str, temp_dir: str, index: int) -> str:
"""Download track from presigned URL to temp file using streaming."""
logger.info(f"Downloading track {index + 1}")
response = requests.get(url, stream=True, timeout=300)
if response.status_code == 404:
raise HTTPException(status_code=404, detail=f"Track {index} not found")
if response.status_code == 403:
raise HTTPException(
status_code=403, detail=f"Track {index} presigned URL expired"
)
response.raise_for_status()
temp_path = os.path.join(temp_dir, f"track_{index}.webm")
total_bytes = 0
with open(temp_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
logger.info(f"Track {index + 1} downloaded: {total_bytes} bytes")
return temp_path
def mixdown_tracks_modal(
track_paths: list[str],
output_path: str,
target_sample_rate: int,
expected_duration_sec: float | None,
logger,
) -> float:
"""Mix multiple audio tracks using PyAV filter graph.
Args:
track_paths: List of local file paths to audio tracks
output_path: Local path for output MP3 file
target_sample_rate: Sample rate for output (Hz)
expected_duration_sec: Optional fallback duration if container metadata unavailable
logger: Logger instance for progress tracking
Returns:
Duration in milliseconds
"""
logger.info(f"Starting mixdown of {len(track_paths)} tracks")
# Build PyAV filter graph: N abuffer -> amix -> aformat -> sink
graph = av.filter.Graph()
inputs = []
for idx in range(len(track_paths)):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Connect inputs to mixer (no delays for Modal implementation)
for idx, in_ctx in enumerate(inputs):
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
# Open all containers
containers = []
try:
for i, path in enumerate(track_paths):
try:
c = av.open(path)
containers.append(c)
except Exception as e:
logger.warning(
f"Failed to open container {i}: {e}",
)
if not containers:
raise ValueError("Could not open any track containers")
# Calculate total duration for progress reporting
max_duration_sec = 0.0
for c in containers:
if c.duration is not None:
dur_sec = c.duration / av.time_base
max_duration_sec = max(max_duration_sec, dur_sec)
if max_duration_sec == 0.0 and expected_duration_sec:
max_duration_sec = expected_duration_sec
# Setup output container
out_container = av.open(output_path, "w", format="mp3")
out_stream = out_container.add_stream("libmp3lame", rate=target_sample_rate)
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
current_max_time = 0.0
last_log_time = time.monotonic()
start_time = time.monotonic()
total_duration = 0
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
inputs[i].push(None) # Signal end of stream
continue
if frame.sample_rate != target_sample_rate:
continue
# Progress logging (every 5 seconds)
if frame.time is not None:
current_max_time = max(current_max_time, frame.time)
now = time.monotonic()
if now - last_log_time >= 5.0:
elapsed = now - start_time
if max_duration_sec > 0:
progress_pct = min(
100.0, (current_max_time / max_duration_sec) * 100
)
logger.info(
f"Mixdown progress: {progress_pct:.1f}% @ {current_max_time:.1f}s (elapsed: {elapsed:.1f}s)"
)
else:
logger.info(
f"Mixdown progress: @ {current_max_time:.1f}s (elapsed: {elapsed:.1f}s)"
)
last_log_time = now
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
# Pull mixed frames from sink and encode
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
# Encode and mux
for packet in out_stream.encode(mixed):
out_container.mux(packet)
total_duration += packet.duration
# Flush remaining frames from filter graph
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
for packet in out_stream.encode(mixed):
out_container.mux(packet)
total_duration += packet.duration
# Flush encoder
for packet in out_stream.encode():
out_container.mux(packet)
total_duration += packet.duration
# Calculate duration in milliseconds
if total_duration > 0:
# Use the same calculation as AudioFileWriterProcessor
duration_ms = round(
float(total_duration * out_stream.time_base * 1000), 2
)
else:
duration_ms = 0.0
out_container.close()
logger.info(f"Mixdown complete: duration={duration_ms}ms")
finally:
# Cleanup all containers
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass
return duration_ms
@app.post("/v1/audio/mixdown", dependencies=[Depends(apikey_auth)])
def mixdown(request: MixdownRequest) -> MixdownResponse:
"""Mix multiple audio tracks into a single MP3 file.
Tracks are downloaded from presigned S3 URLs, mixed using PyAV,
and uploaded to a presigned S3 PUT URL.
"""
if not request.track_urls:
raise HTTPException(status_code=400, detail="No track URLs provided")
logger.info(f"Mixdown request: {len(request.track_urls)} tracks")
temp_dir = tempfile.mkdtemp()
temp_files = []
output_mp3_path = None
try:
# Download all tracks
for i, url in enumerate(request.track_urls):
temp_path = download_track(url, temp_dir, i)
temp_files.append(temp_path)
# Mix tracks
output_mp3_path = os.path.join(temp_dir, "mixed.mp3")
duration_ms = mixdown_tracks_modal(
temp_files,
output_mp3_path,
request.target_sample_rate,
request.expected_duration_sec,
logger,
)
# Upload result to S3
logger.info("Uploading result to S3")
file_size = os.path.getsize(output_mp3_path)
with open(output_mp3_path, "rb") as f:
upload_response = requests.put(
request.output_url, data=f, timeout=300
)
if upload_response.status_code == 403:
raise HTTPException(
status_code=403, detail="Output presigned URL expired"
)
upload_response.raise_for_status()
logger.info(f"Upload complete: {file_size} bytes")
return MixdownResponse(
duration_ms=duration_ms,
tracks_mixed=len(request.track_urls),
audio_uploaded=True,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Mixdown failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Mixdown failed: {str(e)}")
finally:
# Cleanup temp files
for temp_path in temp_files:
try:
os.unlink(temp_path)
except Exception as e:
logger.warning(f"Failed to cleanup temp file {temp_path}: {e}")
if output_mp3_path and os.path.exists(output_mp3_path):
try:
os.unlink(output_mp3_path)
except Exception as e:
logger.warning(f"Failed to cleanup output file {output_mp3_path}: {e}")
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory {temp_dir}: {e}")
return app

View File

@@ -1,496 +0,0 @@
# Daily.co and Reflector Data Model
This document explains the data model relationships between Daily.co's API concepts and Reflector's database schema, clarifying common sources of confusion.
---
## Table of Contents
1. [Core Entities Overview](#core-entities-overview)
2. [Daily.co vs Reflector Terminology](#dailyco-vs-reflector-terminology)
3. [Entity Relationships](#entity-relationships)
4. [Recording Multiplicity](#recording-multiplicity)
5. [Session Identifiers Explained](#session-identifiers-explained)
6. [Time-Based Matching](#time-based-matching)
7. [Multitrack Recording Details](#multitrack-recording-details)
8. [Verified Example](#verified-example)
---
## Core Entities Overview
### Reflector's Four Primary Entities
```
┌─────────────────────────────────────────────────────────────────┐
│ Room (Reflector) │
│ - Persistent meeting template │
│ - User-created configuration │
│ - Example: "team-standup" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Meeting (Reflector) │
│ - Single session instance │
│ - Creates NEW Daily.co room with timestamp │
│ - Example: "team-standup-20260115120000" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Recording (Reflector + Daily.co) │
│ - One segment of audio/video │
│ - New recording created on stop/restart │
│ - track_keys: JSON array of S3 file paths │
└────────────────────┬────────────────────────────────────────────┘
│ 1:1
┌─────────────────────────────────────────────────────────────────┐
│ Transcript (Reflector) │
│ - Processed audio with transcription │
│ - Diarization, summaries, topics │
│ - One transcript per recording │
└─────────────────────────────────────────────────────────────────┘
```
---
## Daily.co vs Reflector Terminology
### Room
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Virtual meeting space on Daily.co platform | User-created meeting template/configuration |
| **Lifetime** | Configurable expiration | Persistent until user deletes |
| **Creation** | API call for each meeting | Pre-created by user once |
| **Reuse** | Can host multiple sessions | Generates new Daily.co room per meeting |
| **Name Format** | `room-name` (reusable) | `room-name` (base identifier) |
| **Timestamping** | Not required | Meeting adds timestamp: `{name}-YYYYMMDDHHMMSS` |
**Example:**
```
Reflector Room: "daily-private-igor" (persistent config)
↓ starts meeting
Daily.co Room: "daily-private-igor-20260110042117"
```
### Meeting
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Session that starts when first participant joins | Explicit database record of a session |
| **Identifier** | `mtgSessionId` (generated by Daily.co) | `meeting.id` (UUID, generated by Reflector) |
| **Creation** | Implicit (first participant join) | Explicit API call before participants join |
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Audio/video files on S3 | Metadata + processing status |
| **Types** | `cloud` (composed video), `raw-tracks` (multitrack) | Stores references + `track_keys` array |
| **Multiplicity** | One recording object per start/stop cycle | One DB row per Daily.co recording object |
| **Identifier** | Daily.co `recording_id` | Same `recording_id` (stored in DB) |
| **Multitrack** | Array of `.webm` files (one per participant) | `track_keys` JSON array with S3 paths |
| **Linkage** | Via `room_name` + `start_ts` | FK `meeting_id` (set via time-based match) |
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
---
## Entity Relationships
### Database Schema Relationships
```sql
-- Simplified schema showing key relationships
TABLE room (
id VARCHAR PRIMARY KEY,
name VARCHAR UNIQUE,
platform VARCHAR -- 'whereby' | 'daily'
)
TABLE meeting (
id VARCHAR PRIMARY KEY,
room_id VARCHAR REFERENCES room(id) ON DELETE CASCADE, -- nullable
room_name VARCHAR, -- Daily.co room name (timestamped)
start_date TIMESTAMP,
platform VARCHAR
)
TABLE recording (
id VARCHAR PRIMARY KEY, -- Daily.co recording_id
meeting_id VARCHAR, -- FK to meeting (set via time-based match)
bucket_name VARCHAR,
object_key VARCHAR, -- S3 prefix
track_keys JSON, -- Array of S3 keys for multitrack
recorded_at TIMESTAMP
)
TABLE transcript (
id VARCHAR PRIMARY KEY,
recording_id VARCHAR, -- nullable FK
meeting_id VARCHAR, -- nullable FK
room_id VARCHAR, -- nullable FK
participants JSON, -- [{id, speaker, name, user_id}, ...]
title VARCHAR,
long_summary VARCHAR,
webvtt TEXT
)
```
**Relationship Cardinalities:**
```
1 Room → N Meetings
1 Meeting → N Recordings (common: 1-21 recordings per meeting)
1 Recording → 1 Transcript
1 Meeting → N Transcripts (via recordings)
```
---
## Recording Multiplicity
### Why Multiple Recordings Per Meeting?
Daily.co creates a **new recording object** (new ID, new files) whenever recording stops and restarts. This happens due to:
1. **Manual stop/start** - User clicks stop, then start recording again
2. **Network reconnection** - Participant drops, reconnects → triggers restart
3. **Participant rejoin** - Last participant leaves, new one joins → new session
---
## Session Identifiers Explained
### The Hidden Entity: Daily.co Meeting Session
Daily.co has an **implicit ephemeral entity** that sits between Room and Recording:
```
Daily.co Room: "daily-private-igor-20260110042117"
├─ Daily.co Meeting Session #1 (mtgSessionId: c04334de...)
│ └─ Recording #3 (f4a50f94) - 4s, 1 track
└─ Daily.co Meeting Session #2 (mtgSessionId: 4cdae3c0...)
├─ Recording #2 (b0fa94da) - 80s, 2 tracks ← recording stopped
└─ Recording #1 (05edf519) - 62s, 1 track ← then restarted
```
**Daily.co Meeting Session:**
- **Lifecycle:** Starts when first participant joins, ends when last participant leaves
- **Identifier:** `mtgSessionId` (generated by Daily.co)
- **Persistence:** Ephemeral - new ID if everyone leaves and someone rejoins
- **Relationship:** 1 Session → N Recordings (if recording stops/restarts during session)
**Key Insight:** Multiple recordings can share the same `mtgSessionId` if recording was stopped and restarted while participants remained connected.
### mtgSessionId (Meeting Session Identifier)
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
**Reflector Tracking:** `daily_participant_session` table
```sql
TABLE daily_participant_session (
id VARCHAR PRIMARY KEY, -- {meeting_id}:{user_id}:{joined_at_ms}
meeting_id VARCHAR,
session_id VARCHAR, -- From webhook (per-participant)
user_id VARCHAR,
user_name VARCHAR,
joined_at TIMESTAMP,
left_at TIMESTAMP
)
```
---
## Time-Based Matching
### Problem Statement
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response:**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null Missing!
}
```
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
---
## Multitrack Recording Details
### track_keys JSON Array
**Schema:** `recording.track_keys` (JSON, nullable)
```sql
-- Example recording with 2 audio tracks
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"track_keys": [
"igormonadical/daily-private-igor-20260110042117/1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565",
"igormonadical/daily-private-igor-20260110042117/1768018896877-9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-1768018899286"
]
}
```
**Semantics:**
- `track_keys = null` → Not multitrack (cloud recording)
- `track_keys = []` → Multitrack recording with no audio captured (silence/muted)
- `track_keys = [...]` → Multitrack with N audio tracks
**Property:** `recording.is_multitrack` (Python)
```python
@property
def is_multitrack(self) -> bool:
return self.track_keys is not None and len(self.track_keys) > 0
```
### Track Filename Format
Daily.co multitrack filenames encode timing and participant information:
**Format:** `{recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}`
**Example:** `1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565`
**Parsed Components:**
```python
# reflector/utils/daily.py:25-60
class DailyRecordingFilename(NamedTuple):
recording_start_ts: int # 1768018896877 (milliseconds)
participant_id: str # 890c0eae-e186-4534-a7bd-7c794b7d6d7f
track_start_ts: int # 1768018914565 (milliseconds)
```
**Note:** Browser downloads from S3 add `.webm` extension due to MIME headers, but S3 object keys have no extension.
### Video Track Filtering
Daily.co API returns both audio and video tracks, but Reflector only processes audio.
**Filtering Logic:** `reflector/worker/process.py:660`
```python
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
```
**Example API Response:**
```json
{
"tracks": [
{"type": "audio", "s3Key": "...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...cam-video-1768018897095"} Filtered out
]
}
```
**Result:** Only 2 audio tracks stored in `recording.track_keys`, video track discarded.
**Rationale:** Reflector is audio transcription system; video not needed for processing.
### Track-to-Participant Mapping
**Flow:**
1. Daily.co webhook/polling provides `track_keys` array
2. Each track filename contains `participant_id`
3. Reflector queries Daily.co API: `GET /meetings/{mtgSessionId}/participants`
4. Maps `participant_id``user_name`
5. Stores in `transcript.participants` JSON:
```json
[
{
"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f",
"speaker": 0,
"name": "test2",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
},
{
"id": "9660e8e9-4297-4f17-951d-0b2bf2401803",
"speaker": 1,
"name": "test",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
}
]
```
**Diarization:** Multitrack recordings don't need speaker diarization AI — speaker identity comes from separate audio tracks.
---
## Example
### Meeting: daily-private-igor-20260110042117
**Context:** User conducted test recording with start/stop cycles, producing 3 recordings.
#### Database State
```sql
-- Meeting
id: 034804b8-cee2-4fb4-94d7-122f6f068a61
room_name: daily-private-igor-20260110042117
start_date: 2026-01-10 04:21:17+00
```
#### Daily.co API Response
```json
[
{
"id": "f4a50f94-053c-4f9d-bda6-78ad051fbc36",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018885,
"duration": 4,
"status": "finished",
"mtgSessionId": "c04334de-42a0-4c2a-96be-a49b068dca85",
"tracks": [
{"type": "audio", "s3Key": "...62e8f3ae...cam-audio-1768018885417"}
]
},
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"duration": 80,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...9660e8e9...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...9660e8e9...cam-video-1768018897095"}
]
},
{
"id": "05edf519-9048-4b49-9a75-73e9826fd950",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018914,
"duration": 62,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914948"}
]
}
]
```
**Key Observations:**
- 3 recording objects returned by Daily.co
- 2 different `mtgSessionId` values (2 different meeting instances)
- Recording #2 has 3 tracks (2 audio + 1 video)
- Timestamps: 1768018885 → 1768018896 (+11s) → 1768018914 (+18s)
#### Reflector Database
**Recordings:**
```
┌──────────────────────────────────────┬──────────────┬────────────┬──────────────────────────────────────┐
│ id │ track_count │ duration │ mtgSessionId │
├──────────────────────────────────────┼──────────────┼────────────┼──────────────────────────────────────┤
│ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 │ 4s │ c04334de-42a0-4c2a-96be-a49b068dca85 │
│ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (video=0) │ 80s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
│ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 │ 62s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
└──────────────────────────────────────┴──────────────┴────────────┴──────────────────────────────────────┘
```
**Note:** Recording #2 has 2 audio tracks (video filtered out), not 3.
**Transcripts:**
```
┌──────────────────────────────────────┬──────────────────────────────────────┬──────────────┬──────────────────────────────────────────────┐
│ id │ recording_id │ participants │ title │
├──────────────────────────────────────┼──────────────────────────────────────┼──────────────┼──────────────────────────────────────────────┤
│ 17149b1f-546c-4837-80a0-f8140bd16592 │ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 (test) │ (empty - no speech) │
│ 49801332-3222-4c11-bdb2-375479fc87f2 │ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (test, │ "Examination and Validation Procedures │
│ │ │ test2) │ Review" │
│ e5271e12-20fb-42d2-b5a8-21438abadef9 │ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 (test2) │ "Technical Sound Check Procedure Review" │
└──────────────────────────────────────┴──────────────────────────────────────┴──────────────┴──────────────────────────────────────────────┘
```
**Transcript Content:**
*Transcript #1* (17149b1f): Empty WebVTT (no audio captured)
*Transcript #2* (49801332):
```webvtt
WEBVTT
00:00:03.109 --> 00:00:05.589
<v Speaker1>Test, test, test. Test, test, test, test, test.
00:00:19.829 --> 00:00:22.710
<v Speaker0>Test test test test test test test test test test test.
```
**AI-Generated Summary:**
> "The meeting focused on the critical importance of rigorous testing for ensuring reliability and quality, with test and test2 emphasizing the need for a structured testing framework and meticulous documentation..."
*Transcript #3* (e5271e12):
```webvtt
WEBVTT
00:00:02.029 --> 00:00:04.910
<v Speaker0>Test, test, test, test, test, test, test, test, test, test, test.
```
#### Validation: track_keys → participants
**Recording #2 (b0fa94da) tracks:**
```json
[
".../890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-...",
".../9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-..."
]
```
**Transcript #2 (49801332) participants:**
```json
[
{"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", "speaker": 0, "name": "test2"},
{"id": "9660e8e9-4297-4f17-951d-0b2bf2401803", "speaker": 1, "name": "test"}
]
```
### Data Flow
```
Daily.co API: 3 recordings
Polling: _poll_raw_tracks_recordings()
Worker: process_multitrack_recording.delay() × 3
DB: 3 recording rows created
Pipeline: Audio processing + transcription × 3
DB: 3 transcript rows created (1:1 with recordings)
UI: User sees 3 separate transcripts
```
**Result:** ✅ 1:1 Recording → Transcript relationship maintained.
---
**Document Version:** 1.0
**Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection

View File

@@ -1,40 +0,0 @@
"""add cloud recording support
Revision ID: 1b1e6a6fc465
Revises: bd3a729bb379
Create Date: 2026-01-09 17:17:33.535620
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1b1e6a6fc465"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column("daily_composed_video_s3_key", sa.String(), nullable=True)
)
batch_op.add_column(
sa.Column("daily_composed_video_duration", sa.Integer(), nullable=True)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("daily_composed_video_duration")
batch_op.drop_column("daily_composed_video_s3_key")
# ### end Alembic commands ###

View File

@@ -1,44 +0,0 @@
"""replace_use_hatchet_with_use_celery
Revision ID: 80beb1ea3269
Revises: bd3a729bb379
Create Date: 2026-01-20 16:26:25.555869
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "80beb1ea3269"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_hatchet")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_celery")

View File

@@ -1,23 +0,0 @@
"""merge cloud recording and celery heads
Revision ID: e69f08ead8ea
Revises: 1b1e6a6fc465, 80beb1ea3269
Create Date: 2026-01-21 21:39:10.326841
"""
from typing import Sequence, Union
# revision identifiers, used by Alembic.
revision: str = "e69f08ead8ea"
down_revision: Union[str, None] = ("1b1e6a6fc465", "80beb1ea3269")
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

@@ -3,7 +3,7 @@ Daily.co API Module
"""
# Client
from .client import DailyApiClient, DailyApiError, RecordingType
from .client import DailyApiClient, DailyApiError
# Request models
from .requests import (
@@ -64,7 +64,6 @@ __all__ = [
# Client
"DailyApiClient",
"DailyApiError",
"RecordingType",
# Requests
"CreateRoomRequest",
"RoomProperties",

View File

@@ -7,8 +7,7 @@ Reference: https://docs.daily.co/reference/rest-api
"""
from http import HTTPStatus
from typing import Any, Literal
from uuid import UUID
from typing import Any
import httpx
import structlog
@@ -33,8 +32,6 @@ from .responses import (
logger = structlog.get_logger(__name__)
RecordingType = Literal["cloud", "raw-tracks"]
class DailyApiError(Exception):
"""Daily.co API error with full request/response context."""
@@ -398,38 +395,6 @@ class DailyApiClient:
return [RecordingResponse(**r) for r in data["data"]]
async def start_recording(
self,
room_name: NonEmptyString,
recording_type: RecordingType,
instance_id: UUID,
) -> dict[str, Any]:
"""Start recording via REST API.
Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start
Args:
room_name: Daily.co room name
recording_type: Recording type
instance_id: UUID for this recording session
Returns:
Recording start confirmation from Daily.co API
Raises:
DailyApiError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/rooms/{room_name}/recordings/start",
headers=self.headers,
json={
"type": recording_type,
"instanceId": str(instance_id),
},
)
return await self._handle_response(response, "start_recording")
# ============================================================================
# MEETING TOKENS
# ============================================================================

View File

@@ -1,37 +0,0 @@
"""
Daily.co recording instanceId generation utilities.
Deterministic instance ID generation for cloud and raw-tracks recordings.
MUST match frontend logic
"""
from uuid import UUID, uuid5
from reflector.utils.string import NonEmptyString
# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching
RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for cloud recording.
Cloud recordings use meeting ID directly as instanceId.
This ensures each meeting has one unique cloud recording.
"""
return UUID(meeting_id)
def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for raw-tracks recording.
Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure
different instanceId from cloud while remaining deterministic.
Daily.co requires cloud and raw-tracks to have different instanceIds
for concurrent recording.
"""
return uuid5(RAW_TRACKS_NAMESPACE, meeting_id)

View File

@@ -88,6 +88,13 @@ class MeetingTokenProperties(BaseModel):
is_owner: bool = Field(
default=False, description="Grant owner privileges to token holder"
)
start_cloud_recording: bool = Field(
default=False, description="Automatically start cloud recording on join"
)
start_cloud_recording_opts: dict | None = Field(
default=None,
description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)",
)
enable_recording_ui: bool = Field(
default=True, description="Show recording controls in UI"
)

View File

@@ -116,7 +116,6 @@ class RecordingS3Info(BaseModel):
bucket_name: NonEmptyString
bucket_region: NonEmptyString
key: NonEmptyString | None = None
endpoint: NonEmptyString | None = None
@@ -133,9 +132,6 @@ class RecordingResponse(BaseModel):
id: NonEmptyString = Field(description="Recording identifier")
room_name: NonEmptyString = Field(description="Room where recording occurred")
start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)")
type: Literal["cloud", "raw-tracks"] | None = Field(
None, description="Recording type (may be missing from API)"
)
status: RecordingStatus = Field(
description="Recording status ('in-progress' or 'finished')"
)
@@ -149,9 +145,6 @@ class RecordingResponse(BaseModel):
None, description="Token for sharing recording"
)
s3: RecordingS3Info | None = Field(None, description="S3 bucket information")
s3key: NonEmptyString | None = Field(
None, description="S3 key for cloud recordings (top-level field)"
)
tracks: list[DailyTrack] = Field(
default_factory=list,
description="Track list for raw-tracks recordings (always array, never null)",

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime
from typing import Any, Literal
import sqlalchemy as sa
@@ -9,7 +9,7 @@ from reflector.db import get_database, metadata
from reflector.db.rooms import Room
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString, assert_equal
from reflector.utils.string import assert_equal
meetings = sa.Table(
"meeting",
@@ -63,9 +63,6 @@ meetings = sa.Table(
nullable=False,
server_default=assert_equal(WHEREBY_PLATFORM, "whereby"),
),
# Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby
sa.Column("daily_composed_video_s3_key", sa.String, nullable=True),
sa.Column("daily_composed_video_duration", sa.Integer, nullable=True),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
)
@@ -113,9 +110,6 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform = WHEREBY_PLATFORM
# Daily.co composed video (Brady Bunch grid) - Daily.co only
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class MeetingController:
@@ -177,90 +171,6 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_name_all(self, room_name: str) -> list[Meeting]:
"""Get all meetings for a room name (not just most recent)."""
query = meetings.select().where(meetings.c.room_name == room_name)
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -350,44 +260,6 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (

View File

@@ -7,7 +7,6 @@ from sqlalchemy import or_
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -72,19 +71,6 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:

View File

@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
nullable=False,
),
sqlalchemy.Column(
"use_celery",
"use_hatchet",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
@@ -97,7 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
use_hatchet: bool = False
skip_consent: bool = False

View File

@@ -12,9 +12,14 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.logger import logger
from reflector.settings import settings
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(

View File

@@ -11,6 +11,7 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger
from reflector.settings import settings
SLOTS = 10
WORKER_NAME = "llm-worker-pool"
@@ -18,6 +19,10 @@ POOL = "llm-io"
def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
return
hatchet = HatchetClientManager.get_client()
logger.info(

View File

@@ -489,7 +489,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
)
@with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
"""Mix all padded tracks into single audio file using PyAV or Modal backend."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
track_result = ctx.task_output(process_tracks)
@@ -513,7 +513,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
storage = _spawn_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
# Presign URLs for padded tracks (same expiration for both backends)
padded_urls = []
for track_info in padded_tracks:
if track_info.key:
@@ -534,33 +534,104 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
logger.error("Mixdown failed - no decodable audio frames found")
raise ValueError("No decodable audio frames in any track")
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
output_key = f"{input.transcript_id}/audio.mp3"
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
# Conditional: Modal or local backend
if settings.MIXDOWN_BACKEND == "modal":
ctx.log("mixdown_tracks: using Modal backend")
writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration)
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
output_key,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS),
expected_duration_sec=recording_duration if recording_duration > 0 else None,
)
await writer.flush()
from reflector.processors.audio_mixdown_modal import ( # noqa: PLC0415
AudioMixdownModalProcessor,
)
file_size = Path(output_path).stat().st_size
storage_path = f"{input.transcript_id}/audio.mp3"
try:
processor = AudioMixdownModalProcessor()
result = await processor.mixdown(
track_urls=valid_urls,
output_url=output_url,
target_sample_rate=target_sample_rate,
expected_duration_sec=recording_duration
if recording_duration > 0
else None,
)
duration_ms = result.duration_ms
tracks_mixed = result.tracks_mixed
with open(output_path, "rb") as mixed_file:
await storage.put_file(storage_path, mixed_file)
ctx.log(
f"mixdown_tracks: Modal returned duration={duration_ms}ms, tracks={tracks_mixed}"
)
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal mixdown HTTP error",
transcript_id=input.transcript_id,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
)
raise RuntimeError(
f"Modal mixdown failed with HTTP {e.response.status_code}: {error_detail}"
)
except httpx.TimeoutException:
logger.error(
"[Hatchet] Modal mixdown timeout",
transcript_id=input.transcript_id,
timeout=settings.MIXDOWN_TIMEOUT,
)
raise RuntimeError(
f"Modal mixdown timeout after {settings.MIXDOWN_TIMEOUT}s"
)
except ValueError as e:
logger.error(
"[Hatchet] Modal mixdown validation error",
transcript_id=input.transcript_id,
error=str(e),
)
raise
else:
ctx.log("mixdown_tracks: using local backend")
Path(output_path).unlink(missing_ok=True)
# Existing local implementation
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
writer = AudioFileWriterProcessor(
path=output_path, on_duration=capture_duration
)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS),
expected_duration_sec=recording_duration
if recording_duration > 0
else None,
)
await writer.flush()
file_size = Path(output_path).stat().st_size
with open(output_path, "rb") as mixed_file:
await storage.put_file(output_key, mixed_file)
Path(output_path).unlink(missing_ok=True)
duration_ms = duration_ms_callback_capture_container[0]
tracks_mixed = len(valid_urls)
ctx.log(f"mixdown_tracks: local mixdown uploaded {file_size} bytes")
# Update DB (same for both backends)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
@@ -570,12 +641,12 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
transcript, {"audio_location": "storage"}
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
ctx.log(f"mixdown_tracks complete: uploaded to {output_key}")
return MixdownResult(
audio_key=storage_path,
duration=duration_ms_callback_capture_container[0],
tracks_mixed=len(valid_urls),
audio_key=output_key,
duration=duration_ms,
tracks_mixed=tracks_mixed,
)
@@ -1095,7 +1166,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items],
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
)
@@ -1149,8 +1220,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
transcript,
"TRANSCRIPT",
TranscriptText(
text="",
translation=None,
text=merged_transcript.text,
translation=merged_transcript.translation,
),
logger=logger,
)
@@ -1347,34 +1418,14 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
f"participants={len(payload.transcript.participants)})"
)
try:
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
except httpx.HTTPStatusError as e:
ctx.log(
f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway"
)
return WebhookResult(
webhook_sent=False, response_code=e.response.status_code
)
except httpx.ConnectError as e:
ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except httpx.TimeoutException as e:
ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except Exception as e:
ctx.log(f"send_webhook unexpected error, continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
return WebhookResult(webhook_sent=True, response_code=response.status_code)

View File

@@ -0,0 +1,89 @@
"""
Modal.com backend for audio mixdown.
Uses Modal's CPU containers to offload audio mixing from Hatchet workers.
Communicates via presigned S3 URLs for both input and output.
"""
import httpx
from pydantic import BaseModel
from reflector.settings import settings
class MixdownResponse(BaseModel):
"""Response from Modal mixdown endpoint."""
duration_ms: float
tracks_mixed: int
audio_uploaded: bool
class AudioMixdownModalProcessor:
"""Audio mixdown processor using Modal.com CPU backend.
Sends track URLs (presigned GET) and output URL (presigned PUT) to Modal.
Modal handles download, mixdown via PyAV, and upload.
"""
def __init__(self, modal_api_key: str | None = None):
if not settings.MIXDOWN_URL:
raise ValueError("MIXDOWN_URL required to use AudioMixdownModalProcessor")
self.mixdown_url = settings.MIXDOWN_URL + "/v1"
self.timeout = settings.MIXDOWN_TIMEOUT
self.modal_api_key = modal_api_key or settings.MIXDOWN_MODAL_API_KEY
if not self.modal_api_key:
raise ValueError(
"MIXDOWN_MODAL_API_KEY required to use AudioMixdownModalProcessor"
)
async def mixdown(
self,
track_urls: list[str],
output_url: str,
target_sample_rate: int,
expected_duration_sec: float | None = None,
) -> MixdownResponse:
"""Mix multiple audio tracks via Modal backend.
Args:
track_urls: List of presigned GET URLs for audio tracks (non-empty)
output_url: Presigned PUT URL for output MP3
target_sample_rate: Sample rate for output (Hz, must be positive)
expected_duration_sec: Optional fallback duration if container metadata unavailable
Returns:
MixdownResponse with duration_ms, tracks_mixed, audio_uploaded
Raises:
ValueError: If track_urls is empty or target_sample_rate invalid
httpx.HTTPStatusError: On HTTP errors (404, 403, 500, etc.)
httpx.TimeoutException: On timeout
"""
# Validate inputs
if not track_urls:
raise ValueError("track_urls cannot be empty")
if target_sample_rate <= 0:
raise ValueError(
f"target_sample_rate must be positive, got {target_sample_rate}"
)
if expected_duration_sec is not None and expected_duration_sec < 0:
raise ValueError(
f"expected_duration_sec cannot be negative, got {expected_duration_sec}"
)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.mixdown_url}/audio/mixdown",
headers={"Authorization": f"Bearer {self.modal_api_key}"},
json={
"track_urls": track_urls,
"output_url": output_url,
"target_sample_rate": target_sample_rate,
"expected_duration_sec": expected_duration_sec,
},
)
response.raise_for_status()
return MixdownResponse(**response.json())

View File

@@ -319,6 +319,21 @@ class ICSSyncService:
calendar = self.fetch_service.parse_ics(ics_content)
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
if room.ics_last_etag == content_hash:
logger.info("No changes in ICS for room", room_id=room.id)
room_url = f"{settings.UI_BASE_URL}/{room.name}"
events, total_events = self.fetch_service.extract_room_events(
calendar, room.name, room_url
)
return {
"status": SyncStatus.UNCHANGED,
"hash": content_hash,
"events_found": len(events),
"total_events": total_events,
"events_created": 0,
"events_updated": 0,
"events_deleted": 0,
}
# Extract matching events
room_url = f"{settings.UI_BASE_URL}/{room.name}"
@@ -356,44 +371,6 @@ class ICSSyncService:
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
return time_since_sync.total_seconds() >= room.ics_fetch_interval
def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool:
"""Check if event data has changed by comparing relevant fields.
IMPORTANT: When adding fields to CalendarEvent/EventData, update this method
and the _COMPARED_FIELDS set below for runtime validation.
"""
# Fields that come from ICS and should trigger updates when changed
_COMPARED_FIELDS = {
"title",
"description",
"start_time",
"end_time",
"location",
"attendees",
"ics_raw_data",
}
# Runtime exhaustiveness check: ensure we're comparing all EventData fields
event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"}
if event_data_fields != _COMPARED_FIELDS:
missing = event_data_fields - _COMPARED_FIELDS
extra = _COMPARED_FIELDS - event_data_fields
raise RuntimeError(
f"_event_data_changed() field mismatch: "
f"missing={missing}, extra={extra}. "
f"Update the comparison logic when adding/removing fields."
)
return (
existing.title != new_data["title"]
or existing.description != new_data["description"]
or existing.start_time != new_data["start_time"]
or existing.end_time != new_data["end_time"]
or existing.location != new_data["location"]
or existing.attendees != new_data["attendees"]
or existing.ics_raw_data != new_data["ics_raw_data"]
)
async def _sync_events_to_database(
self, room_id: str, events: list[EventData]
) -> SyncStats:
@@ -409,14 +386,11 @@ class ICSSyncService:
)
if existing:
# Only count as updated if data actually changed
if self._event_data_changed(existing, event_data):
updated += 1
await calendar_events_controller.upsert(calendar_event)
updated += 1
else:
created += 1
await calendar_events_controller.upsert(calendar_event)
await calendar_events_controller.upsert(calendar_event)
current_ics_uids.append(event_data["ics_uid"])
# Soft delete events that are no longer in calendar

View File

@@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
@@ -23,6 +23,7 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -101,8 +102,8 @@ async def validate_transcript_for_processing(
if transcript.locked:
return ValidationLocked(detail="Recording is locked")
# Check if recording is ready for processing
if transcript.status == "idle" and not transcript.workflow_run_id:
# hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks
@@ -115,8 +116,7 @@ async def validate_transcript_for_processing(
):
return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflow status if workflow_run_id exists
if transcript.workflow_run_id:
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
@@ -181,16 +181,19 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
# Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
room_forces_hatchet = room.use_hatchet if room else False
use_hatchet = not use_celery
# Start durable workflow if enabled (Hatchet)
# and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
if use_celery:
if room_forces_hatchet:
logger.info(
"Room uses legacy Celery processing",
"Room forces Hatchet workflow",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
@@ -212,39 +215,24 @@ async def dispatch_transcript_processing(
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Workflow exists but can't replay (CANCELLED, COMPLETED, etc.)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)

View File

@@ -98,6 +98,17 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Mixdown
# backends:
# - local: in-process PyAV mixdown (runs in same process as Hatchet worker)
# - modal: HTTP API client to Modal.com CPU container
MIXDOWN_BACKEND: str = "local"
MIXDOWN_URL: str | None = None
MIXDOWN_TIMEOUT: int = 900 # 15 minutes
# Mixdown: modal backend
MIXDOWN_MODAL_API_KEY: str | None = None
# Sentry
SENTRY_DSN: str | None = None
@@ -158,10 +169,19 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Hatchet workflow orchestration (always enabled for multitrack processing)
# Durable workflow orchestration
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings()

View File

@@ -1,5 +1,4 @@
from datetime import datetime
from uuid import UUID
from reflector.dailyco_api import (
CreateMeetingTokenRequest,
@@ -13,11 +12,9 @@ from reflector.dailyco_api import (
RoomProperties,
verify_webhook_signature,
)
from reflector.dailyco_api import RecordingType as DailyRecordingType
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room
from reflector.logger import logger
from reflector.storage import get_dailyco_storage
@@ -61,9 +58,10 @@ class DailyClient(VideoPlatformClient):
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif room.recording_type == self.RECORDING_CLOUD:
# Don't set enable_recording - recordings started via REST API (not auto-start)
enable_recording = None
elif (
room.recording_type == self.RECORDING_CLOUD
): # daily "cloud" is not our "cloud"
enable_recording = "raw-tracks"
properties = RoomProperties(
enable_recording=enable_recording,
@@ -108,6 +106,8 @@ class DailyClient(VideoPlatformClient):
Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored.
"""
from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
return []
@@ -179,14 +179,21 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
max_recording_duration_seconds: int | None = None,
) -> NonEmptyString:
start_cloud_recording_opts = None
if start_cloud_recording and max_recording_duration_seconds:
start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds}
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=start_cloud_recording,
start_cloud_recording_opts=start_cloud_recording_opts,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
@@ -194,23 +201,6 @@ class DailyClient(VideoPlatformClient):
result = await self._api_client.create_meeting_token(request)
return result.token
async def start_recording(
self,
room_name: DailyRoomName,
recording_type: DailyRecordingType,
instance_id: UUID,
) -> dict:
"""Start recording via Daily.co REST API.
Args:
instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector)
"""
return await self._api_client.start_recording(
room_name=room_name,
recording_type=recording_type,
instance_id=instance_id,
)
async def close(self):
"""Clean up API client resources."""
await self._api_client.close()

View File

@@ -19,7 +19,6 @@ from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import (
poll_daily_room_presence_task,
process_multitrack_recording,
store_cloud_recording,
)
router = APIRouter()
@@ -175,64 +174,46 @@ async def _handle_recording_started(event: RecordingStartedEvent):
async def _handle_recording_ready(event: RecordingReadyEvent):
room_name = event.payload.room_name
recording_id = event.payload.recording_id
recording_type = event.payload.type
tracks = event.payload.tracks
if not tracks:
logger.warning(
"recording.ready-to-download: missing tracks",
room_name=room_name,
recording_id=recording_id,
payload=event.payload,
)
return
logger.info(
"Recording ready for download",
room_name=room_name,
recording_id=recording_id,
recording_type=recording_type,
num_tracks=len(tracks),
platform="daily",
)
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured")
logger.error(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording"
)
return
if recording_type == "cloud":
await store_cloud_recording(
recording_id=recording_id,
room_name=room_name,
s3_key=event.payload.s3_key,
duration=event.payload.duration,
start_ts=event.payload.start_ts,
source="webhook",
)
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
elif recording_type == "raw-tracks":
tracks = event.payload.tracks
if not tracks:
logger.warning(
"raw-tracks recording: missing tracks array",
room_name=room_name,
recording_id=recording_id,
)
return
logger.info(
"Recording webhook queuing processing",
recording_id=recording_id,
room_name=room_name,
)
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
logger.info(
"Raw-tracks recording queuing processing",
recording_id=recording_id,
room_name=room_name,
num_tracks=len(track_keys),
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
recording_start_ts=event.payload.start_ts,
)
else:
logger.warning(
"Unknown recording type",
recording_type=recording_type,
recording_id=recording_id,
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
)
async def _handle_recording_error(event: RecordingErrorEvent):

View File

@@ -1,23 +1,16 @@
import json
from datetime import datetime, timezone
from typing import Annotated, Any, Optional
from uuid import UUID
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
router = APIRouter()
@@ -80,72 +73,3 @@ async def meeting_deactivate(
await meetings_controller.update_meeting(meeting_id, is_active=False)
return {"status": "success", "meeting_id": meeting_id}
class StartRecordingRequest(BaseModel):
type: RecordingType
instanceId: UUID
@router.post("/meetings/{meeting_id}/recordings/start")
async def start_recording(
meeting_id: NonEmptyString, body: StartRecordingRequest
) -> dict[str, Any]:
"""Start cloud or raw-tracks recording via Daily.co REST API.
Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
Uses different instanceIds for cloud vs raw-tracks (same won't work)
Note: No authentication required - anonymous users supported. TODO this is a DOS vector
"""
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
log.info(f"Started {body.type} recording via REST API")
return {"status": "ok", "result": result}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
try:
error_body = json.loads(e.response_body)
error_info = error_body.get("info", "")
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
log.info(
f"{body.type} recording already active (started by another participant)"
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -73,8 +73,6 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class CreateRoom(BaseModel):
@@ -588,6 +586,7 @@ async def rooms_join_meeting(
)
token = await client.create_meeting_token(
meeting.room_name,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=enable_recording_ui,
user_id=user_id,
is_owner=user_id == room.user_id,

View File

@@ -6,11 +6,6 @@ from celery.schedules import crontab
from reflector.settings import settings
logger = structlog.get_logger(__name__)
# Polling intervals (seconds)
# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery)
POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
@@ -49,7 +44,7 @@ else:
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
},
"trigger_daily_reconciliation": {
"task": "reflector.worker.process.trigger_daily_reconciliation",

View File

@@ -2,7 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List, Literal
from typing import List
from urllib.parse import unquote
import av
@@ -42,7 +42,6 @@ from reflector.utils.daily import (
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
parse_whereby_recording_filename,
@@ -176,18 +175,13 @@ async def process_multitrack_recording(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process raw-tracks (multitrack) recording from Daily.co.
"""
logger.info(
"Processing multitrack recording",
bucket=bucket_name,
room_name=daily_room_name,
recording_id=recording_id,
provided_keys=len(track_keys),
recording_start_ts=recording_start_ts,
)
if not track_keys:
@@ -218,7 +212,7 @@ async def process_multitrack_recording(
)
await _process_multitrack_recording_inner(
bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts
bucket_name, daily_room_name, recording_id, track_keys
)
@@ -227,18 +221,8 @@ async def _process_multitrack_recording_inner(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process multitrack recording (first time or reprocessing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
"""Inner function containing the actual processing logic."""
tz = timezone.utc
recorded_at = datetime.now(tz)
@@ -256,53 +240,7 @@ async def _process_multitrack_recording_inner(
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
meeting = await meetings_controller.get_by_room_name(daily_room_name)
room_name_base = extract_base_room_name(daily_room_name)
@@ -310,8 +248,18 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not meeting:
raise Exception(f"Meeting not found: {room_name_base}")
logger.info(
"Found existing Meeting for recording",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
@@ -323,19 +271,7 @@ async def _process_multitrack_recording_inner(
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
# else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
@@ -351,12 +287,11 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
if use_celery:
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
logger.info(
"Room uses legacy Celery processing",
"Room forces Hatchet workflow",
room_id=room.id,
transcript_id=transcript.id,
)
@@ -402,11 +337,9 @@ async def poll_daily_recordings():
"""Poll Daily.co API for recordings and process missing ones.
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
and stores/queues missing recordings:
- Cloud recordings: Store S3 key in meeting table
- Raw-tracks recordings: Queue multitrack processing
and queues processing for recordings not already in DB.
Acts as fallback when webhooks active, primary discovery when webhooks unavailable.
For each missing recording, uses audio tracks from API response.
Worker-level locking provides idempotency (see process_multitrack_recording).
"""
@@ -447,222 +380,51 @@ async def poll_daily_recordings():
)
return
# Separate cloud and raw-tracks recordings
cloud_recordings = []
raw_tracks_recordings = []
for rec in finished_recordings:
if rec.type:
# Daily.co API returns null type - make sure this assumption stays
# If this logs, Daily.co API changed - we can remove inference logic.
recording_type = rec.type
logger.warning(
"Recording has explicit type field from Daily.co API (unexpected, API may have changed)",
recording_id=rec.id,
room_name=rec.room_name,
recording_type=recording_type,
has_s3key=bool(rec.s3key),
tracks_count=len(rec.tracks),
)
else:
# DAILY.CO API LIMITATION:
# GET /recordings response does NOT include type field.
# Daily.co docs mention type field exists, but API never returns it.
# Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None.
#
# This is not a recent API change - Daily.co has never returned type.
# Must infer from structural properties.
#
# Inference heuristic (reliable for finished recordings):
# - Has tracks array → raw-tracks
# - Has s3key but no tracks → cloud
# - Neither → failed/incomplete recording
if len(rec.tracks) > 0:
recording_type = "raw-tracks"
elif rec.s3key and len(rec.tracks) == 0:
recording_type = "cloud"
else:
logger.warning(
"Recording has no type, no s3key, and no tracks - likely failed recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
duration=rec.duration,
mtg_session_id=rec.mtgSessionId,
)
continue
if recording_type == "cloud":
cloud_recordings.append(rec)
else:
raw_tracks_recordings.append(rec)
logger.debug(
"Poll results",
total=len(finished_recordings),
cloud=len(cloud_recordings),
raw_tracks=len(raw_tracks_recordings),
)
# Process cloud recordings
await _poll_cloud_recordings(cloud_recordings)
# Process raw-tracks recordings
await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name)
async def store_cloud_recording(
recording_id: NonEmptyString,
room_name: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
start_ts: int,
source: Literal["webhook", "polling"],
) -> bool:
"""
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
room_name: Daily.co room name
s3_key: S3 key where recording is stored
duration: Recording duration in seconds
start_ts: Unix timestamp when recording started
source: "webhook" or "polling" (for logging)
Returns:
True if stored, False if skipped/failed
"""
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
room_name=room_name,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""
Store cloud recordings missing from meeting table via polling.
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
recording_ids = [rec.id for rec in raw_tracks_recordings]
recording_ids = [rec.id for rec in finished_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
missing_recordings = [
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
rec for rec in finished_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
"All recordings already in DB",
api_count=len(finished_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
"Found recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
total_api_count=len(finished_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
"No audio tracks found in recording (only video tracks)",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
@@ -670,7 +432,7 @@ async def _poll_raw_tracks_recordings(
continue
logger.info(
"Queueing missing raw-tracks recording for processing",
"Queueing missing recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
@@ -681,7 +443,6 @@ async def _poll_raw_tracks_recordings(
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
@@ -1049,6 +810,7 @@ async def reprocess_failed_daily_recordings():
)
continue
# Fetch room to check use_hatchet flag
room = None
if meeting.room_id:
room = await rooms_controller.get_by_id(meeting.room_id)
@@ -1072,10 +834,10 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
@@ -1121,16 +883,11 @@ async def reprocess_failed_daily_recordings():
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -1,147 +0,0 @@
"""
Tests for Daily.co instanceId generation.
Verifies deterministic behavior and frontend/backend consistency.
"""
import pytest
from reflector.dailyco_api.instance_id import (
RAW_TRACKS_NAMESPACE,
generate_cloud_instance_id,
generate_raw_tracks_instance_id,
)
class TestInstanceIdDeterminism:
"""Test deterministic generation of instanceIds."""
def test_cloud_instance_id_is_meeting_id(self):
"""Cloud instanceId is meeting ID directly (implicitly tests determinism)."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_cloud_instance_id(meeting_id)
result2 = generate_cloud_instance_id(meeting_id)
assert str(result1) == meeting_id
assert result1 == result2
def test_raw_tracks_instance_id_deterministic(self):
"""Raw-tracks instanceId generation is deterministic."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_raw_tracks_instance_id(meeting_id)
result2 = generate_raw_tracks_instance_id(meeting_id)
assert result1 == result2
def test_raw_tracks_different_from_cloud(self):
"""Raw-tracks instanceId differs from cloud instanceId."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
cloud_id = generate_cloud_instance_id(meeting_id)
raw_tracks_id = generate_raw_tracks_instance_id(meeting_id)
assert cloud_id != raw_tracks_id
def test_different_meetings_different_instance_ids(self):
"""Different meetings generate different instanceIds."""
meeting_id1 = "550e8400-e29b-41d4-a716-446655440000"
meeting_id2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8"
cloud1 = generate_cloud_instance_id(meeting_id1)
cloud2 = generate_cloud_instance_id(meeting_id2)
assert cloud1 != cloud2
raw1 = generate_raw_tracks_instance_id(meeting_id1)
raw2 = generate_raw_tracks_instance_id(meeting_id2)
assert raw1 != raw2
class TestFrontendBackendConsistency:
"""Test that backend matches frontend logic."""
def test_namespace_matches_frontend(self):
"""Namespace UUID matches frontend RAW_TRACKS_NAMESPACE constant."""
# From www/app/[roomName]/components/DailyRoom.tsx
frontend_namespace = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
assert str(RAW_TRACKS_NAMESPACE) == frontend_namespace
def test_raw_tracks_generation_matches_frontend_logic(self):
"""Backend UUIDv5 generation matches frontend uuidv5() call."""
# Example meeting ID
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
# Backend result
backend_result = generate_raw_tracks_instance_id(meeting_id)
# Expected result from frontend: uuidv5(meeting.id, RAW_TRACKS_NAMESPACE)
# Python uuid5 uses (namespace, name) argument order
# JavaScript uuid.v5(name, namespace) - same args, different order
# Frontend: uuidv5(meeting.id, "a1b2c3d4-e5f6-7890-abcd-ef1234567890")
# Backend: uuid5(UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), meeting.id)
# Verify it's a valid UUID (will raise if not)
assert len(str(backend_result)) == 36
assert backend_result.version == 5
class TestEdgeCases:
"""Test edge cases and error conditions."""
def test_invalid_uuid_format_raises(self):
"""Invalid UUID format raises ValueError."""
with pytest.raises(ValueError):
generate_cloud_instance_id("not-a-uuid")
def test_lowercase_uuid_normalized_for_cloud(self):
"""Cloud instanceId: lowercase/uppercase UUIDs produce same result."""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
cloud_lower = generate_cloud_instance_id(meeting_id_lower)
cloud_upper = generate_cloud_instance_id(meeting_id_upper)
assert cloud_lower == cloud_upper
def test_uuid5_is_case_sensitive_warning(self):
"""
Documents uuid5 case sensitivity - different case UUIDs produce different hashes.
Not a problem: meeting.id always lowercase from DB and API.
Frontend generates raw-tracks instanceId from lowercase meeting.id.
Backend receives lowercase meeting_id when matching.
This test documents the behavior, not a requirement.
"""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
raw_lower = generate_raw_tracks_instance_id(meeting_id_lower)
raw_upper = generate_raw_tracks_instance_id(meeting_id_upper)
assert raw_lower != raw_upper
class TestMtgSessionIdVsInstanceId:
"""
Documents that Daily.co's mtgSessionId differs from our instanceId.
Why this matters: We investigated using mtgSessionId for matching but discovered
it's Daily.co-generated and unrelated to instanceId we send. This test documents
that finding so we don't investigate it again.
Production data from 2026-01-13:
- Meeting ID: 4ad503b6-8189-4910-a8f7-68cdd1b7f990
- Cloud instanceId: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 (same as meeting ID)
- Raw-tracks instanceId: 784b3af3-c7dd-57f0-ac54-2ee91c6927cb (UUIDv5 derived)
- Recording mtgSessionId: f25a2e09-740f-4932-9c0d-b1bebaa669c6 (different!)
Conclusion: Cannot use mtgSessionId for recording-to-meeting matching.
"""
def test_mtg_session_id_differs_from_our_instance_ids(self):
"""mtgSessionId (Daily.co) != instanceId (ours) for both cloud and raw-tracks."""
meeting_id = "4ad503b6-8189-4910-a8f7-68cdd1b7f990"
expected_raw_tracks_id = "784b3af3-c7dd-57f0-ac54-2ee91c6927cb"
mtg_session_id = "f25a2e09-740f-4932-9c0d-b1bebaa669c6"
cloud_instance_id = generate_cloud_instance_id(meeting_id)
raw_tracks_instance_id = generate_raw_tracks_instance_id(meeting_id)
assert str(cloud_instance_id) == meeting_id
assert str(raw_tracks_instance_id) == expected_raw_tracks_id
assert str(cloud_instance_id) != mtg_session_id
assert str(raw_tracks_instance_id) != mtg_session_id

View File

@@ -2,9 +2,10 @@
Tests for Hatchet workflow dispatch and routing logic.
These tests verify:
1. Hatchet workflow validation and replay logic
2. Force flag to cancel and restart workflows
3. Validation prevents concurrent workflows
1. Routing to Hatchet when HATCHET_ENABLED=True
2. Replay logic for failed workflows
3. Force flag to cancel and restart
4. Validation prevents concurrent workflows
"""
from unittest.mock import AsyncMock, patch
@@ -33,22 +34,25 @@ async def test_hatchet_validation_blocks_running_workflow():
workflow_run_id="running-workflow-123",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
result = await validate_transcript_for_processing(mock_transcript)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@@ -68,21 +72,24 @@ async def test_hatchet_validation_blocks_queued_workflow():
workflow_run_id="queued-workflow-123",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
result = await validate_transcript_for_processing(mock_transcript)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
assert isinstance(result, ValidationAlreadyScheduled)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
@pytest.mark.usefixtures("setup_database")
@@ -103,22 +110,25 @@ async def test_hatchet_validation_allows_failed_workflow():
recording_id="test-recording-id",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
result = await validate_transcript_for_processing(mock_transcript)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
@pytest.mark.usefixtures("setup_database")
@@ -139,21 +149,24 @@ async def test_hatchet_validation_allows_completed_workflow():
recording_id="test-recording-id",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
result = await validate_transcript_for_processing(mock_transcript)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
assert isinstance(result, ValidationOk)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@@ -174,23 +187,26 @@ async def test_hatchet_validation_allows_when_status_check_fails():
recording_id="test-recording-id",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
result = await validate_transcript_for_processing(mock_transcript)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
result = await validate_transcript_for_processing(mock_transcript)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@@ -211,11 +227,47 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
recording_id="test-recording-id",
)
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
@@ -224,8 +276,7 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
# Should not check Hatchet at all
assert isinstance(result, ValidationOk)

View File

@@ -189,17 +189,14 @@ async def test_ics_sync_service_sync_room_calendar():
assert events[0].ics_uid == "sync-event-1"
assert events[0].title == "Sync Test Meeting"
# Second sync with same content (calendar unchanged, but sync always runs)
# Second sync with same content (should be unchanged)
# Refresh room to get updated etag and force sync by setting old sync time
room = await rooms_controller.get_by_id(room.id)
await rooms_controller.update(
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
)
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "success"
assert result["events_created"] == 0
assert result["events_updated"] == 0
assert result["events_deleted"] == 0
assert result["status"] == "unchanged"
# Third sync with updated event
event["summary"] = "Updated Meeting Title"
@@ -291,43 +288,3 @@ async def test_ics_sync_service_error_handling():
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "error"
assert "Network error" in result["error"]
@pytest.mark.asyncio
async def test_event_data_changed_exhaustiveness():
"""Test that _event_data_changed compares all EventData fields (except ics_uid).
This test ensures programmers don't forget to update the comparison logic
when adding new fields to EventData/CalendarEvent.
"""
from reflector.services.ics_sync import EventData
sync_service = ICSSyncService()
from reflector.db.calendar_events import CalendarEvent
now = datetime.now(timezone.utc)
event_data: EventData = {
"ics_uid": "test-123",
"title": "Test",
"description": "Desc",
"location": "Loc",
"start_time": now,
"end_time": now + timedelta(hours=1),
"attendees": [],
"ics_raw_data": "raw",
}
existing = CalendarEvent(
room_id="room1",
**event_data,
)
# Will raise RuntimeError if fields are missing from comparison
result = sync_service._event_data_changed(existing, event_data)
assert result is False
modified_data = event_data.copy()
modified_data["title"] = "Changed Title"
result = sync_service._event_data_changed(existing, modified_data)
assert result is True

View File

@@ -1,374 +0,0 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -162,24 +162,9 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
room = await rooms_controller.add(
name="test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
# Create transcript with Daily.co multitrack recording
transcript = await transcripts_controller.add(
"",
source_kind="room",
@@ -187,7 +172,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
target_language="en",
user_id="test-user",
share_mode="public",
room_id=room.id,
)
track_keys = [

View File

@@ -302,10 +302,10 @@ export default function RoomsList() {
return;
}
const platform: "whereby" | "daily" =
const platform: "whereby" | "daily" | null =
room.platform === "whereby" || room.platform === "daily"
? room.platform
: "daily";
: null;
const roomData = {
name: room.name,

View File

@@ -16,7 +16,6 @@ import {
import { useError } from "../../../../(errors)/errorContext";
import { useRouter } from "next/navigation";
import { Box, Grid } from "@chakra-ui/react";
import { parseNonEmptyString } from "../../../../lib/utils";
export type TranscriptCorrect = {
params: Promise<{
@@ -26,7 +25,8 @@ export type TranscriptCorrect = {
export default function TranscriptCorrect(props: TranscriptCorrect) {
const params = use(props.params);
const transcriptId = parseNonEmptyString(params.transcriptId);
const { transcriptId } = params;
const updateTranscriptMutation = useTranscriptUpdate();
const transcript = useTranscriptGet(transcriptId);

View File

@@ -3,8 +3,7 @@ import React from "react";
import Markdown from "react-markdown";
import "../../../styles/markdown.css";
import type { components } from "../../../reflector-api";
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import { useTranscriptUpdate } from "../../../lib/apiHooks";
import {
@@ -19,7 +18,7 @@ import { LuPen } from "react-icons/lu";
import { useError } from "../../../(errors)/errorContext";
type FinalSummaryProps = {
transcript: GetTranscriptWithParticipants;
transcript: GetTranscript;
topics: GetTranscriptTopic[];
onUpdate: (newSummary: string) => void;
finalSummaryRef: React.Dispatch<React.SetStateAction<HTMLDivElement | null>>;

View File

@@ -9,9 +9,7 @@ import React, { useEffect, useState, use } from "react";
import FinalSummary from "./finalSummary";
import TranscriptTitle from "../transcriptTitle";
import Player from "../player";
import { useWebSockets } from "../useWebSockets";
import { useRouter } from "next/navigation";
import { parseNonEmptyString } from "../../../lib/utils";
import {
Box,
Flex,
@@ -32,7 +30,7 @@ type TranscriptDetails = {
export default function TranscriptDetails(details: TranscriptDetails) {
const params = use(details.params);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcriptId = params.transcriptId;
const router = useRouter();
const statusToRedirect = [
"idle",
@@ -51,7 +49,6 @@ export default function TranscriptDetails(details: TranscriptDetails) {
transcriptId,
waiting || mp3.audioDeleted === true,
);
useWebSockets(transcriptId);
const useActiveTopic = useState<Topic | null>(null);
const [finalSummaryElement, setFinalSummaryElement] =
useState<HTMLDivElement | null>(null);

View File

@@ -10,7 +10,6 @@ import {
} from "@chakra-ui/react";
import { useRouter } from "next/navigation";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptProcessing = {
params: Promise<{
@@ -20,7 +19,7 @@ type TranscriptProcessing = {
export default function TranscriptProcessing(details: TranscriptProcessing) {
const params = use(details.params);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcriptId = params.transcriptId;
const router = useRouter();
const transcript = useTranscriptGet(transcriptId);

View File

@@ -12,7 +12,6 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react";
import LiveTrancription from "../../liveTranscription";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { TranscriptStatus } from "../../../../lib/transcript";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptDetails = {
params: Promise<{
@@ -22,14 +21,13 @@ type TranscriptDetails = {
const TranscriptRecord = (details: TranscriptDetails) => {
const params = use(details.params);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcript = useTranscriptGet(transcriptId);
const transcript = useTranscriptGet(params.transcriptId);
const [transcriptStarted, setTranscriptStarted] = useState(false);
const useActiveTopic = useState<Topic | null>(null);
const webSockets = useWebSockets(transcriptId);
const webSockets = useWebSockets(params.transcriptId);
const mp3 = useMp3(transcriptId, true);
const mp3 = useMp3(params.transcriptId, true);
const router = useRouter();

View File

@@ -7,7 +7,6 @@ import useMp3 from "../../useMp3";
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
import FileUploadButton from "../../fileUploadButton";
import { useTranscriptGet } from "../../../../lib/apiHooks";
import { parseNonEmptyString } from "../../../../lib/utils";
type TranscriptUpload = {
params: Promise<{
@@ -17,13 +16,12 @@ type TranscriptUpload = {
const TranscriptUpload = (details: TranscriptUpload) => {
const params = use(details.params);
const transcriptId = parseNonEmptyString(params.transcriptId);
const transcript = useTranscriptGet(transcriptId);
const transcript = useTranscriptGet(params.transcriptId);
const [transcriptStarted, setTranscriptStarted] = useState(false);
const webSockets = useWebSockets(transcriptId);
const webSockets = useWebSockets(params.transcriptId);
const mp3 = useMp3(transcriptId, true);
const mp3 = useMp3(params.transcriptId, true);
const router = useRouter();

View File

@@ -2,11 +2,10 @@ import type { components } from "../../reflector-api";
import { useTranscriptCreate } from "../../lib/apiHooks";
type CreateTranscript = components["schemas"]["CreateTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscript = components["schemas"]["GetTranscript"];
type UseCreateTranscript = {
transcript: GetTranscriptWithParticipants | null;
transcript: GetTranscript | null;
loading: boolean;
error: Error | null;
create: (transcriptCreationDetails: CreateTranscript) => Promise<void>;

View File

@@ -2,8 +2,7 @@ import { useEffect, useState } from "react";
import { ShareMode, toShareMode } from "../../lib/shareMode";
import type { components } from "../../reflector-api";
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
import {
@@ -28,7 +27,7 @@ import { featureEnabled } from "../../lib/features";
type ShareAndPrivacyProps = {
finalSummaryElement: HTMLDivElement | null;
transcript: GetTranscriptWithParticipants;
transcript: GetTranscript;
topics: GetTranscriptTopic[];
};

View File

@@ -1,8 +1,7 @@
import { useState, useEffect, useMemo } from "react";
import type { components } from "../../reflector-api";
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
BoxProps,
@@ -27,7 +26,7 @@ import {
import { featureEnabled } from "../../lib/features";
type ShareZulipProps = {
transcript: GetTranscriptWithParticipants;
transcript: GetTranscript;
topics: GetTranscriptTopic[];
disabled: boolean;
};

View File

@@ -1,10 +1,8 @@
import { useState } from "react";
import type { components } from "../../reflector-api";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
useTranscriptUpdate,
@@ -22,7 +20,7 @@ type TranscriptTitle = {
onUpdate: (newTitle: string) => void;
// share props
transcript: GetTranscriptWithParticipants | null;
transcript: GetTranscript | null;
topics: GetTranscriptTopic[] | null;
finalSummaryElement: HTMLDivElement | null;
};
@@ -33,7 +31,7 @@ const TranscriptTitle = (props: TranscriptTitle) => {
const [isEditing, setIsEditing] = useState(false);
const updateTranscriptMutation = useTranscriptUpdate();
const participantsQuery = useTranscriptParticipants(
props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null,
props.transcript?.id || null,
);
const updateTitle = async (newTitle: string, transcriptId: string) => {

View File

@@ -1,6 +1,5 @@
import { useEffect, useState } from "react";
import { useTranscriptGet } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
import { useAuth } from "../../lib/AuthProvider";
import { API_URL } from "../../lib/apiClient";
@@ -28,7 +27,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => {
data: transcript,
isLoading: transcriptMetadataLoading,
error: transcriptError,
} = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId));
} = useTranscriptGet(later ? null : transcriptId);
const [serviceWorker, setServiceWorker] =
useState<ServiceWorkerRegistration | null>(null);

View File

@@ -1,7 +1,6 @@
import type { components } from "../../reflector-api";
type Participant = components["schemas"]["Participant"];
import { useTranscriptParticipants } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type ErrorParticipants = {
error: Error;
@@ -33,7 +32,7 @@ const useParticipants = (transcriptId: string): UseParticipants => {
isLoading: loading,
error,
refetch,
} = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId));
} = useTranscriptParticipants(transcriptId || null);
// Type-safe return based on state
if (error) {

View File

@@ -1,6 +1,5 @@
import type { components } from "../../reflector-api";
import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type GetTranscriptTopicWithWordsPerSpeaker =
components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"];
@@ -39,7 +38,7 @@ const useTopicWithWords = (
error,
refetch,
} = useTranscriptTopicsWithWordsPerSpeaker(
parseMaybeNonEmptyString(transcriptId),
transcriptId || null,
topicId || null,
);

View File

@@ -1,6 +1,5 @@
import { useTranscriptTopics } from "../../lib/apiHooks";
import type { components } from "../../reflector-api";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
@@ -11,11 +10,7 @@ type TranscriptTopics = {
};
const useTopics = (id: string): TranscriptTopics => {
const {
data: topics,
isLoading: loading,
error,
} = useTranscriptTopics(parseMaybeNonEmptyString(id));
const { data: topics, isLoading: loading, error } = useTranscriptTopics(id);
return {
topics: topics || null,

View File

@@ -1,6 +1,5 @@
import type { components } from "../../reflector-api";
import { useTranscriptWaveform } from "../../lib/apiHooks";
import { parseMaybeNonEmptyString } from "../../lib/utils";
type AudioWaveform = components["schemas"]["AudioWaveform"];
@@ -15,7 +14,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => {
data: waveform,
isLoading: loading,
error,
} = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id));
} = useTranscriptWaveform(skip ? null : id);
return {
waveform: waveform || null,

View File

@@ -7,12 +7,6 @@ type GetTranscriptSegmentTopic =
components["schemas"]["GetTranscriptSegmentTopic"];
import { useQueryClient } from "@tanstack/react-query";
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
import {
invalidateTranscript,
invalidateTranscriptTopics,
invalidateTranscriptWaveform,
} from "../../lib/apiHooks";
import { NonEmptyString } from "../../lib/utils";
export type UseWebSockets = {
transcriptTextLive: string;
@@ -375,10 +369,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
});
console.debug("TOPIC event:", message.data);
// Invalidate topics query to sync with WebSocket data
invalidateTranscriptTopics(
queryClient,
transcriptId as NonEmptyString,
);
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/topics",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
break;
case "FINAL_SHORT_SUMMARY":
@@ -389,7 +388,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
if (message.data) {
setFinalSummary(message.data);
// Invalidate transcript query to sync summary
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
}
break;
@@ -398,7 +405,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
if (message.data) {
setTitle(message.data.title);
// Invalidate transcript query to sync title
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
}
break;
@@ -409,10 +424,6 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
);
if (message.data) {
setWaveForm(message.data.waveform);
invalidateTranscriptWaveform(
queryClient,
transcriptId as NonEmptyString,
);
}
break;
case "DURATION":

View File

@@ -26,7 +26,7 @@ import { useRouter } from "next/navigation";
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
import { NonEmptyString } from "../lib/utils";
import { MeetingId, assertMeetingId } from "../lib/types";
import { MeetingId } from "../lib/types";
type Meeting = components["schemas"]["Meeting"];
@@ -315,9 +315,7 @@ export default function MeetingSelection({
variant="outline"
colorScheme="red"
size="md"
onClick={() =>
handleEndMeeting(assertMeetingId(meeting.id))
}
onClick={() => handleEndMeeting(meeting.id)}
loading={deactivateMeetingMutation.isPending}
>
<Icon as={LuX} me={2} />
@@ -462,9 +460,7 @@ export default function MeetingSelection({
variant="outline"
colorScheme="red"
size="md"
onClick={() =>
handleEndMeeting(assertMeetingId(meeting.id))
}
onClick={() => handleEndMeeting(meeting.id)}
loading={deactivateMeetingMutation.isPending}
>
<Icon as={LuX} me={2} />

View File

@@ -22,29 +22,14 @@ import DailyIframe, {
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useMeetingStartRecording,
} from "../../lib/apiHooks";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import { omit } from "remeda";
import {
assertExists,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
import { assertExists } from "../../lib/utils";
import { assertMeetingId } from "../../lib/types";
const CONSENT_BUTTON_ID = "recording-consent";
const RECORDING_INDICATOR_ID = "recording-indicator";
// Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
// DO NOT CHANGE: Breaks instanceId determinism across deployments
const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5;
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
@@ -88,7 +73,9 @@ const useFrame = (
cbs: {
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: () => void;
onJoinMeeting: (
startRecording: (args: { type: "raw-tracks" }) => void,
) => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -139,7 +126,7 @@ const useFrame = (
console.error("frame is null in joined-meeting callback");
return;
}
cbs.onJoinMeeting();
cbs.onJoinMeeting(frame.startRecording.bind(frame));
};
frame.on("joined-meeting", joinCb);
return () => {
@@ -186,15 +173,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
const rawTracksInstanceId = parseNonEmptyString(
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
const roomName = params?.roomName as string;
const {
@@ -248,72 +228,19 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
],
);
const handleFrameJoinMeeting = useCallback(() => {
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
rawTracksInstanceId,
});
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
// Daily.co needs time to register call as "hosting" for REST API
const startRecordingWithRetry = (
type: DailyRecordingType,
instanceId: NonEmptyString,
attempt: number = 1,
) => {
setTimeout(() => {
startRecordingMutation.mutate(
{
params: {
path: {
meeting_id: meeting.id,
},
},
body: {
type,
instanceId,
},
},
{
onError: (error: any) => {
const errorText = error?.detail || error?.message || "";
const is404NotHosting = errorText.includes(
"does not seem to be hosting a call",
);
const isActiveStream = errorText.includes(
"has an active stream",
);
if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) {
console.log(
`${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`,
);
startRecordingWithRetry(type, instanceId, attempt + 1);
} else if (isActiveStream) {
console.log(
`${type}: Recording already active (started by another participant)`,
);
} else {
console.error(`Failed to start ${type} recording:`, error);
}
},
},
);
}, RECORDING_START_DELAY_MS);
};
// Start both recordings
startRecordingWithRetry("cloud", cloudInstanceId);
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
meeting.recording_type,
meeting.id,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
]);
const handleFrameJoinMeeting = useCallback(
(startRecording: (args: { type: "raw-tracks" }) => void) => {
try {
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
}
},
[meeting.recording_type],
);
const recordingIconUrl = useMemo(
() => new URL("/recording-icon.svg", window.location.origin),

View File

@@ -6,7 +6,6 @@ import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -104,7 +103,7 @@ export function useTranscriptProcess() {
});
}
export function useTranscriptGet(transcriptId: NonEmptyString | null) {
export function useTranscriptGet(transcriptId: string | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}",
@@ -121,16 +120,6 @@ export function useTranscriptGet(transcriptId: NonEmptyString | null) {
);
}
export const invalidateTranscript = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", {
params: { path: { transcript_id: transcriptId } },
}).queryKey,
});
export function useRoomGet(roomId: string | null) {
const { isAuthenticated } = useAuthReady();
@@ -308,7 +297,7 @@ export function useTranscriptUploadAudio() {
);
}
export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
export function useTranscriptWaveform(transcriptId: string | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}/audio/waveform",
@@ -323,21 +312,7 @@ export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
);
}
export const invalidateTranscriptWaveform = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/audio/waveform",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
export function useTranscriptMP3(transcriptId: string | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -354,7 +329,7 @@ export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
);
}
export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
export function useTranscriptTopics(transcriptId: string | null) {
return $api.useQuery(
"get",
"/v1/transcripts/{transcript_id}/topics",
@@ -369,23 +344,7 @@ export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
);
}
export const invalidateTranscriptTopics = (
queryClient: QueryClient,
transcriptId: NonEmptyString,
) =>
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/transcripts/{transcript_id}/topics",
{
params: { path: { transcript_id: transcriptId } },
},
).queryKey,
});
export function useTranscriptTopicsWithWords(
transcriptId: NonEmptyString | null,
) {
export function useTranscriptTopicsWithWords(transcriptId: string | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -403,7 +362,7 @@ export function useTranscriptTopicsWithWords(
}
export function useTranscriptTopicsWithWordsPerSpeaker(
transcriptId: NonEmptyString | null,
transcriptId: string | null,
topicId: string | null,
) {
const { isAuthenticated } = useAuthReady();
@@ -425,7 +384,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker(
);
}
export function useTranscriptParticipants(transcriptId: NonEmptyString | null) {
export function useTranscriptParticipants(transcriptId: string | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
@@ -608,20 +567,6 @@ export function useTranscriptSpeakerMerge() {
);
}
export function useMeetingStartRecording() {
const { setError } = useError();
return $api.useMutation(
"post",
"/v1/meetings/{meeting_id}/recordings/start",
{
onError: (error) => {
setError(error as Error, "Failed to start recording");
},
},
);
}
export function useMeetingAudioConsent() {
const { setError } = useError();

View File

@@ -1,6 +1,5 @@
import { components } from "../reflector-api";
type ApiTranscriptStatus =
components["schemas"]["GetTranscriptWithParticipants"]["status"];
type ApiTranscriptStatus = components["schemas"]["GetTranscript"]["status"];
export type TranscriptStatus = ApiTranscriptStatus;

View File

@@ -89,5 +89,3 @@ export const assertMeetingId = (s: string): MeetingId => {
// just cast for now
return nes as MeetingId;
};
export type DailyRecordingType = "cloud" | "raw-tracks";

View File

@@ -75,31 +75,6 @@ export interface paths {
patch: operations["v1_meeting_deactivate"];
trace?: never;
};
"/v1/meetings/{meeting_id}/recordings/start": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Start Recording
* @description Start cloud or raw-tracks recording via Daily.co REST API.
*
* Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
* Uses different instanceIds for cloud vs raw-tracks (same won't work)
*
* Note: No authentication required - anonymous users supported. TODO this is a DOS vector
*/
post: operations["v1_start_recording"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms": {
parameters: {
query?: never;
@@ -1569,10 +1544,6 @@ export interface components {
* @enum {string}
*/
platform: "whereby" | "daily";
/** Daily Composed Video S3 Key */
daily_composed_video_s3_key?: string | null;
/** Daily Composed Video Duration */
daily_composed_video_duration?: number | null;
};
/** MeetingConsentRequest */
MeetingConsentRequest: {
@@ -1847,19 +1818,6 @@ export interface components {
/** Words */
words: components["schemas"]["Word"][];
};
/** StartRecordingRequest */
StartRecordingRequest: {
/**
* Type
* @enum {string}
*/
type: "cloud" | "raw-tracks";
/**
* Instanceid
* Format: uuid
*/
instanceId: string;
};
/** Stream */
Stream: {
/** Stream Id */
@@ -2168,43 +2126,6 @@ export interface operations {
};
};
};
v1_start_recording: {
parameters: {
query?: never;
header?: never;
path: {
meeting_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["StartRecordingRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: unknown;
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_list: {
parameters: {
query?: {

View File

@@ -46,7 +46,6 @@
"react-markdown": "^9.0.0",
"react-qr-code": "^2.0.12",
"react-select-search": "^4.1.7",
"react-uuid-hook": "^0.0.6",
"redlock": "5.0.0-beta.2",
"remeda": "^2.31.1",
"sass": "^1.63.6",

25
www/pnpm-lock.yaml generated
View File

@@ -106,9 +106,6 @@ importers:
react-select-search:
specifier: ^4.1.7
version: 4.1.8(prop-types@15.8.1)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
react-uuid-hook:
specifier: ^0.0.6
version: 0.0.6(react@18.3.1)
redlock:
specifier: 5.0.0-beta.2
version: 5.0.0-beta.2
@@ -7631,14 +7628,6 @@ packages:
"@types/react":
optional: true
react-uuid-hook@0.0.6:
resolution:
{
integrity: sha512-u9+EvFbqpWfLE/ReYFry0vYu1BAg1fY9ekr0XLSDNnfWyrnVFytpurwz5qYsIB0psevuvrpZHIcvu7AjUwqinA==,
}
peerDependencies:
react: ">=16.8.0"
react@18.3.1:
resolution:
{
@@ -8782,13 +8771,6 @@ packages:
integrity: sha512-Fykw5U4eZESbq739BeLvEBFRuJODfrlmjx5eJux7W817LjRaq4b7/i4t2zxQmhcX+fAj4nMfRdTzO4tmwLKn0w==,
}
uuid@13.0.0:
resolution:
{
integrity: sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==,
}
hasBin: true
uuid@8.3.2:
resolution:
{
@@ -14588,11 +14570,6 @@ snapshots:
optionalDependencies:
"@types/react": 18.2.20
react-uuid-hook@0.0.6(react@18.3.1):
dependencies:
react: 18.3.1
uuid: 13.0.0
react@18.3.1:
dependencies:
loose-envify: 1.4.0
@@ -15424,8 +15401,6 @@ snapshots:
uuid-validate@0.0.3: {}
uuid@13.0.0: {}
uuid@8.3.2: {}
uuid@9.0.1: {}