Compare commits

...

16 Commits

Author SHA1 Message Date
Igor Loskutov
485b455c69 race condition debug wip 2026-01-30 14:38:12 -05:00
Igor Loskutov
74c9ec2ff1 race condition debug wip 2026-01-30 14:37:53 -05:00
Igor Loskutov
aac89e8d03 rejoin tags backend 2026-01-29 15:57:09 -05:00
Igor Loskutov
13088e72f8 feat: Trigger presence poll on join endpoint for Daily meetings
Also trigger poll_daily_room_presence_task when user joins meeting via
/join endpoint, not just on /leave. Webhooks can fail or not exist
(e.g., Whereby has no participant.joined webhook), so frontend-triggered
polls needed for both join and leave events.
2026-01-26 18:05:44 -05:00
Igor Loskutov
775c9b667d feat: Add meeting leave endpoint for faster presence detection (no-mistaken)
Backend:
- Add POST /rooms/{room_name}/meetings/{meeting_id}/leave endpoint
- Triggers poll_daily_room_presence_task immediately on user disconnect
- Reduces detection latency from 0-30s (periodic poll) to ~1-2s

Frontend:
- Add useRoomLeaveMeeting() mutation hook
- Add beforeunload handler in DailyRoom that calls sendBeacon()
- Guarantees API call completion even if tab closes mid-request

Context:
- Daily.co webhooks handle clean disconnects
- This endpoint handles dirty disconnects (tab close, crash, network drop)
- Redis lock prevents spam if multiple users leave simultaneously

This commit is no-mistaken and follows user requirements for readonly research
task that was later approved for implementation.
2026-01-26 17:59:33 -05:00
5d26461477 chore(main): release 0.30.0 (#832) 2026-01-23 13:58:33 -05:00
6c175a11d8 feat: brady bunch (#816)
* brady bunch PRD/tasks

* clean dead daily.co code

* brady bunch prototype (no-mistakes)

* brady bunch prototype (no-mistakes) review

* self-review

* daily poll time match (no-mistakes)

* daily poll self-review (no-mistakes)

* daily poll self-review (no-mistakes)

* daily co doc

* cleanup

* cleanup

* self-review (no-mistakes)

* self-review (no-mistakes)

* self-review

* self-review

* ui typefix

* dupe calls error handling proper

* daily reflector data model doc

* logging style fix

* migration merge

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-23 12:33:06 -05:00
6e786b7631 hatchet processing resilence several fixes (#831)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-22 19:03:33 -05:00
8fc8d8bf4a chore(main): release 0.29.0 (#828) 2026-01-22 12:52:39 -05:00
c723752b7e feat: set hatchet as default for multitracks (#822)
* set hatchet as default for multitracks

* fix: pipeline routing tests for hatchet-default branch

- Create room with use_celery=True to force Celery backend in tests
- Link transcript to room to enable multitrack pipeline routing
- Fixes test failures caused by missing HATCHET_CLIENT_TOKEN in test env

* Update server/reflector/services/transcript_process.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2026-01-21 17:05:03 -05:00
4dc49e5b25 chore(main): release 0.28.1 (#827) 2026-01-21 15:30:42 -05:00
23d2bc283d fix: ics non-sync bugfix (#823)
* ics non-sync bugfix

* fix tests

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-21 15:10:19 -05:00
c8743fdf1c fix webhook tests (#826)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-21 14:31:20 -05:00
8a293882ad timeout to untighten ws python loop (#821)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-20 16:29:09 -05:00
d83c4a30b4 chore(main): release 0.28.0 (#820) 2026-01-20 12:35:26 -05:00
3b6540eae5 feat: worker affinity (#819)
* worker affinity

* worker affinity

* worker affinity

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-20 12:27:16 -05:00
48 changed files with 3064 additions and 419 deletions

View File

@@ -3,3 +3,4 @@ docs/docs/installation/auth-setup.md:curl-auth-header:250
docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465

View File

@@ -1,5 +1,33 @@
# Changelog
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
### Features
* brady bunch ([#816](https://github.com/Monadical-SAS/reflector/issues/816)) ([6c175a1](https://github.com/Monadical-SAS/reflector/commit/6c175a11d8a3745095bfad06a4ad3ccdfd278433))
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
### Features
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)
### Bug Fixes
* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6))
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
### Features
* worker affinity ([#819](https://github.com/Monadical-SAS/reflector/issues/819)) ([3b6540e](https://github.com/Monadical-SAS/reflector/commit/3b6540eae5b597449f98661bdf15483b77be3268))
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)

View File

@@ -34,7 +34,7 @@ services:
environment:
ENTRYPOINT: beat
hatchet-worker:
hatchet-worker-cpu:
build:
context: server
volumes:
@@ -43,7 +43,20 @@ services:
env_file:
- ./server/.env
environment:
ENTRYPOINT: hatchet-worker
ENTRYPOINT: hatchet-worker-cpu
depends_on:
hatchet:
condition: service_healthy
hatchet-worker-llm:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: hatchet-worker-llm
depends_on:
hatchet:
condition: service_healthy

View File

@@ -0,0 +1,496 @@
# Daily.co and Reflector Data Model
This document explains the data model relationships between Daily.co's API concepts and Reflector's database schema, clarifying common sources of confusion.
---
## Table of Contents
1. [Core Entities Overview](#core-entities-overview)
2. [Daily.co vs Reflector Terminology](#dailyco-vs-reflector-terminology)
3. [Entity Relationships](#entity-relationships)
4. [Recording Multiplicity](#recording-multiplicity)
5. [Session Identifiers Explained](#session-identifiers-explained)
6. [Time-Based Matching](#time-based-matching)
7. [Multitrack Recording Details](#multitrack-recording-details)
8. [Verified Example](#verified-example)
---
## Core Entities Overview
### Reflector's Four Primary Entities
```
┌─────────────────────────────────────────────────────────────────┐
│ Room (Reflector) │
│ - Persistent meeting template │
│ - User-created configuration │
│ - Example: "team-standup" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Meeting (Reflector) │
│ - Single session instance │
│ - Creates NEW Daily.co room with timestamp │
│ - Example: "team-standup-20260115120000" │
└────────────────────┬────────────────────────────────────────────┘
│ 1:N
┌─────────────────────────────────────────────────────────────────┐
│ Recording (Reflector + Daily.co) │
│ - One segment of audio/video │
│ - New recording created on stop/restart │
│ - track_keys: JSON array of S3 file paths │
└────────────────────┬────────────────────────────────────────────┘
│ 1:1
┌─────────────────────────────────────────────────────────────────┐
│ Transcript (Reflector) │
│ - Processed audio with transcription │
│ - Diarization, summaries, topics │
│ - One transcript per recording │
└─────────────────────────────────────────────────────────────────┘
```
---
## Daily.co vs Reflector Terminology
### Room
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Virtual meeting space on Daily.co platform | User-created meeting template/configuration |
| **Lifetime** | Configurable expiration | Persistent until user deletes |
| **Creation** | API call for each meeting | Pre-created by user once |
| **Reuse** | Can host multiple sessions | Generates new Daily.co room per meeting |
| **Name Format** | `room-name` (reusable) | `room-name` (base identifier) |
| **Timestamping** | Not required | Meeting adds timestamp: `{name}-YYYYMMDDHHMMSS` |
**Example:**
```
Reflector Room: "daily-private-igor" (persistent config)
↓ starts meeting
Daily.co Room: "daily-private-igor-20260110042117"
```
### Meeting
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Session that starts when first participant joins | Explicit database record of a session |
| **Identifier** | `mtgSessionId` (generated by Daily.co) | `meeting.id` (UUID, generated by Reflector) |
| **Creation** | Implicit (first participant join) | Explicit API call before participants join |
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
| Aspect | Daily.co | Reflector |
|--------|----------|-----------|
| **Definition** | Audio/video files on S3 | Metadata + processing status |
| **Types** | `cloud` (composed video), `raw-tracks` (multitrack) | Stores references + `track_keys` array |
| **Multiplicity** | One recording object per start/stop cycle | One DB row per Daily.co recording object |
| **Identifier** | Daily.co `recording_id` | Same `recording_id` (stored in DB) |
| **Multitrack** | Array of `.webm` files (one per participant) | `track_keys` JSON array with S3 paths |
| **Linkage** | Via `room_name` + `start_ts` | FK `meeting_id` (set via time-based match) |
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
---
## Entity Relationships
### Database Schema Relationships
```sql
-- Simplified schema showing key relationships
TABLE room (
id VARCHAR PRIMARY KEY,
name VARCHAR UNIQUE,
platform VARCHAR -- 'whereby' | 'daily'
)
TABLE meeting (
id VARCHAR PRIMARY KEY,
room_id VARCHAR REFERENCES room(id) ON DELETE CASCADE, -- nullable
room_name VARCHAR, -- Daily.co room name (timestamped)
start_date TIMESTAMP,
platform VARCHAR
)
TABLE recording (
id VARCHAR PRIMARY KEY, -- Daily.co recording_id
meeting_id VARCHAR, -- FK to meeting (set via time-based match)
bucket_name VARCHAR,
object_key VARCHAR, -- S3 prefix
track_keys JSON, -- Array of S3 keys for multitrack
recorded_at TIMESTAMP
)
TABLE transcript (
id VARCHAR PRIMARY KEY,
recording_id VARCHAR, -- nullable FK
meeting_id VARCHAR, -- nullable FK
room_id VARCHAR, -- nullable FK
participants JSON, -- [{id, speaker, name, user_id}, ...]
title VARCHAR,
long_summary VARCHAR,
webvtt TEXT
)
```
**Relationship Cardinalities:**
```
1 Room → N Meetings
1 Meeting → N Recordings (common: 1-21 recordings per meeting)
1 Recording → 1 Transcript
1 Meeting → N Transcripts (via recordings)
```
---
## Recording Multiplicity
### Why Multiple Recordings Per Meeting?
Daily.co creates a **new recording object** (new ID, new files) whenever recording stops and restarts. This happens due to:
1. **Manual stop/start** - User clicks stop, then start recording again
2. **Network reconnection** - Participant drops, reconnects → triggers restart
3. **Participant rejoin** - Last participant leaves, new one joins → new session
---
## Session Identifiers Explained
### The Hidden Entity: Daily.co Meeting Session
Daily.co has an **implicit ephemeral entity** that sits between Room and Recording:
```
Daily.co Room: "daily-private-igor-20260110042117"
├─ Daily.co Meeting Session #1 (mtgSessionId: c04334de...)
│ └─ Recording #3 (f4a50f94) - 4s, 1 track
└─ Daily.co Meeting Session #2 (mtgSessionId: 4cdae3c0...)
├─ Recording #2 (b0fa94da) - 80s, 2 tracks ← recording stopped
└─ Recording #1 (05edf519) - 62s, 1 track ← then restarted
```
**Daily.co Meeting Session:**
- **Lifecycle:** Starts when first participant joins, ends when last participant leaves
- **Identifier:** `mtgSessionId` (generated by Daily.co)
- **Persistence:** Ephemeral - new ID if everyone leaves and someone rejoins
- **Relationship:** 1 Session → N Recordings (if recording stops/restarts during session)
**Key Insight:** Multiple recordings can share the same `mtgSessionId` if recording was stopped and restarted while participants remained connected.
### mtgSessionId (Meeting Session Identifier)
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
**Reflector Tracking:** `daily_participant_session` table
```sql
TABLE daily_participant_session (
id VARCHAR PRIMARY KEY, -- {meeting_id}:{user_id}:{joined_at_ms}
meeting_id VARCHAR,
session_id VARCHAR, -- From webhook (per-participant)
user_id VARCHAR,
user_name VARCHAR,
joined_at TIMESTAMP,
left_at TIMESTAMP
)
```
---
## Time-Based Matching
### Problem Statement
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response:**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null Missing!
}
```
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
---
## Multitrack Recording Details
### track_keys JSON Array
**Schema:** `recording.track_keys` (JSON, nullable)
```sql
-- Example recording with 2 audio tracks
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"track_keys": [
"igormonadical/daily-private-igor-20260110042117/1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565",
"igormonadical/daily-private-igor-20260110042117/1768018896877-9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-1768018899286"
]
}
```
**Semantics:**
- `track_keys = null` → Not multitrack (cloud recording)
- `track_keys = []` → Multitrack recording with no audio captured (silence/muted)
- `track_keys = [...]` → Multitrack with N audio tracks
**Property:** `recording.is_multitrack` (Python)
```python
@property
def is_multitrack(self) -> bool:
return self.track_keys is not None and len(self.track_keys) > 0
```
### Track Filename Format
Daily.co multitrack filenames encode timing and participant information:
**Format:** `{recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}`
**Example:** `1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565`
**Parsed Components:**
```python
# reflector/utils/daily.py:25-60
class DailyRecordingFilename(NamedTuple):
recording_start_ts: int # 1768018896877 (milliseconds)
participant_id: str # 890c0eae-e186-4534-a7bd-7c794b7d6d7f
track_start_ts: int # 1768018914565 (milliseconds)
```
**Note:** Browser downloads from S3 add `.webm` extension due to MIME headers, but S3 object keys have no extension.
### Video Track Filtering
Daily.co API returns both audio and video tracks, but Reflector only processes audio.
**Filtering Logic:** `reflector/worker/process.py:660`
```python
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
```
**Example API Response:**
```json
{
"tracks": [
{"type": "audio", "s3Key": "...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...cam-video-1768018897095"} Filtered out
]
}
```
**Result:** Only 2 audio tracks stored in `recording.track_keys`, video track discarded.
**Rationale:** Reflector is audio transcription system; video not needed for processing.
### Track-to-Participant Mapping
**Flow:**
1. Daily.co webhook/polling provides `track_keys` array
2. Each track filename contains `participant_id`
3. Reflector queries Daily.co API: `GET /meetings/{mtgSessionId}/participants`
4. Maps `participant_id``user_name`
5. Stores in `transcript.participants` JSON:
```json
[
{
"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f",
"speaker": 0,
"name": "test2",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
},
{
"id": "9660e8e9-4297-4f17-951d-0b2bf2401803",
"speaker": 1,
"name": "test",
"user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22"
}
]
```
**Diarization:** Multitrack recordings don't need speaker diarization AI — speaker identity comes from separate audio tracks.
---
## Example
### Meeting: daily-private-igor-20260110042117
**Context:** User conducted test recording with start/stop cycles, producing 3 recordings.
#### Database State
```sql
-- Meeting
id: 034804b8-cee2-4fb4-94d7-122f6f068a61
room_name: daily-private-igor-20260110042117
start_date: 2026-01-10 04:21:17+00
```
#### Daily.co API Response
```json
[
{
"id": "f4a50f94-053c-4f9d-bda6-78ad051fbc36",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018885,
"duration": 4,
"status": "finished",
"mtgSessionId": "c04334de-42a0-4c2a-96be-a49b068dca85",
"tracks": [
{"type": "audio", "s3Key": "...62e8f3ae...cam-audio-1768018885417"}
]
},
{
"id": "b0fa94da-73b5-4f95-9239-5216a682a505",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"duration": 80,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914565"},
{"type": "audio", "s3Key": "...9660e8e9...cam-audio-1768018899286"},
{"type": "video", "s3Key": "...9660e8e9...cam-video-1768018897095"}
]
},
{
"id": "05edf519-9048-4b49-9a75-73e9826fd950",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018914,
"duration": 62,
"status": "finished",
"mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345",
"tracks": [
{"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914948"}
]
}
]
```
**Key Observations:**
- 3 recording objects returned by Daily.co
- 2 different `mtgSessionId` values (2 different meeting instances)
- Recording #2 has 3 tracks (2 audio + 1 video)
- Timestamps: 1768018885 → 1768018896 (+11s) → 1768018914 (+18s)
#### Reflector Database
**Recordings:**
```
┌──────────────────────────────────────┬──────────────┬────────────┬──────────────────────────────────────┐
│ id │ track_count │ duration │ mtgSessionId │
├──────────────────────────────────────┼──────────────┼────────────┼──────────────────────────────────────┤
│ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 │ 4s │ c04334de-42a0-4c2a-96be-a49b068dca85 │
│ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (video=0) │ 80s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
│ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 │ 62s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │
└──────────────────────────────────────┴──────────────┴────────────┴──────────────────────────────────────┘
```
**Note:** Recording #2 has 2 audio tracks (video filtered out), not 3.
**Transcripts:**
```
┌──────────────────────────────────────┬──────────────────────────────────────┬──────────────┬──────────────────────────────────────────────┐
│ id │ recording_id │ participants │ title │
├──────────────────────────────────────┼──────────────────────────────────────┼──────────────┼──────────────────────────────────────────────┤
│ 17149b1f-546c-4837-80a0-f8140bd16592 │ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 (test) │ (empty - no speech) │
│ 49801332-3222-4c11-bdb2-375479fc87f2 │ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (test, │ "Examination and Validation Procedures │
│ │ │ test2) │ Review" │
│ e5271e12-20fb-42d2-b5a8-21438abadef9 │ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 (test2) │ "Technical Sound Check Procedure Review" │
└──────────────────────────────────────┴──────────────────────────────────────┴──────────────┴──────────────────────────────────────────────┘
```
**Transcript Content:**
*Transcript #1* (17149b1f): Empty WebVTT (no audio captured)
*Transcript #2* (49801332):
```webvtt
WEBVTT
00:00:03.109 --> 00:00:05.589
<v Speaker1>Test, test, test. Test, test, test, test, test.
00:00:19.829 --> 00:00:22.710
<v Speaker0>Test test test test test test test test test test test.
```
**AI-Generated Summary:**
> "The meeting focused on the critical importance of rigorous testing for ensuring reliability and quality, with test and test2 emphasizing the need for a structured testing framework and meticulous documentation..."
*Transcript #3* (e5271e12):
```webvtt
WEBVTT
00:00:02.029 --> 00:00:04.910
<v Speaker0>Test, test, test, test, test, test, test, test, test, test, test.
```
#### Validation: track_keys → participants
**Recording #2 (b0fa94da) tracks:**
```json
[
".../890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-...",
".../9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-..."
]
```
**Transcript #2 (49801332) participants:**
```json
[
{"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", "speaker": 0, "name": "test2"},
{"id": "9660e8e9-4297-4f17-951d-0b2bf2401803", "speaker": 1, "name": "test"}
]
```
### Data Flow
```
Daily.co API: 3 recordings
Polling: _poll_raw_tracks_recordings()
Worker: process_multitrack_recording.delay() × 3
DB: 3 recording rows created
Pipeline: Audio processing + transcription × 3
DB: 3 transcript rows created (1:1 with recordings)
UI: User sees 3 separate transcripts
```
**Result:** ✅ 1:1 Recording → Transcript relationship maintained.
---
**Document Version:** 1.0
**Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection

View File

@@ -0,0 +1,40 @@
"""add cloud recording support
Revision ID: 1b1e6a6fc465
Revises: bd3a729bb379
Create Date: 2026-01-09 17:17:33.535620
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1b1e6a6fc465"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column("daily_composed_video_s3_key", sa.String(), nullable=True)
)
batch_op.add_column(
sa.Column("daily_composed_video_duration", sa.Integer(), nullable=True)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("daily_composed_video_duration")
batch_op.drop_column("daily_composed_video_s3_key")
# ### end Alembic commands ###

View File

@@ -0,0 +1,44 @@
"""replace_use_hatchet_with_use_celery
Revision ID: 80beb1ea3269
Revises: bd3a729bb379
Create Date: 2026-01-20 16:26:25.555869
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "80beb1ea3269"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_hatchet")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_celery")

View File

@@ -0,0 +1,23 @@
"""merge cloud recording and celery heads
Revision ID: e69f08ead8ea
Revises: 1b1e6a6fc465, 80beb1ea3269
Create Date: 2026-01-21 21:39:10.326841
"""
from typing import Sequence, Union
# revision identifiers, used by Alembic.
revision: str = "e69f08ead8ea"
down_revision: Union[str, None] = ("1b1e6a6fc465", "80beb1ea3269")
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

@@ -3,7 +3,7 @@ Daily.co API Module
"""
# Client
from .client import DailyApiClient, DailyApiError
from .client import DailyApiClient, DailyApiError, RecordingType
# Request models
from .requests import (
@@ -64,6 +64,7 @@ __all__ = [
# Client
"DailyApiClient",
"DailyApiError",
"RecordingType",
# Requests
"CreateRoomRequest",
"RoomProperties",

View File

@@ -7,7 +7,8 @@ Reference: https://docs.daily.co/reference/rest-api
"""
from http import HTTPStatus
from typing import Any
from typing import Any, Literal
from uuid import UUID
import httpx
import structlog
@@ -32,6 +33,8 @@ from .responses import (
logger = structlog.get_logger(__name__)
RecordingType = Literal["cloud", "raw-tracks"]
class DailyApiError(Exception):
"""Daily.co API error with full request/response context."""
@@ -395,6 +398,38 @@ class DailyApiClient:
return [RecordingResponse(**r) for r in data["data"]]
async def start_recording(
self,
room_name: NonEmptyString,
recording_type: RecordingType,
instance_id: UUID,
) -> dict[str, Any]:
"""Start recording via REST API.
Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start
Args:
room_name: Daily.co room name
recording_type: Recording type
instance_id: UUID for this recording session
Returns:
Recording start confirmation from Daily.co API
Raises:
DailyApiError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/rooms/{room_name}/recordings/start",
headers=self.headers,
json={
"type": recording_type,
"instanceId": str(instance_id),
},
)
return await self._handle_response(response, "start_recording")
# ============================================================================
# MEETING TOKENS
# ============================================================================

View File

@@ -0,0 +1,37 @@
"""
Daily.co recording instanceId generation utilities.
Deterministic instance ID generation for cloud and raw-tracks recordings.
MUST match frontend logic
"""
from uuid import UUID, uuid5
from reflector.utils.string import NonEmptyString
# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching
RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for cloud recording.
Cloud recordings use meeting ID directly as instanceId.
This ensures each meeting has one unique cloud recording.
"""
return UUID(meeting_id)
def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for raw-tracks recording.
Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure
different instanceId from cloud while remaining deterministic.
Daily.co requires cloud and raw-tracks to have different instanceIds
for concurrent recording.
"""
return uuid5(RAW_TRACKS_NAMESPACE, meeting_id)

View File

@@ -88,13 +88,6 @@ class MeetingTokenProperties(BaseModel):
is_owner: bool = Field(
default=False, description="Grant owner privileges to token holder"
)
start_cloud_recording: bool = Field(
default=False, description="Automatically start cloud recording on join"
)
start_cloud_recording_opts: dict | None = Field(
default=None,
description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)",
)
enable_recording_ui: bool = Field(
default=True, description="Show recording controls in UI"
)

View File

@@ -116,6 +116,7 @@ class RecordingS3Info(BaseModel):
bucket_name: NonEmptyString
bucket_region: NonEmptyString
key: NonEmptyString | None = None
endpoint: NonEmptyString | None = None
@@ -132,6 +133,9 @@ class RecordingResponse(BaseModel):
id: NonEmptyString = Field(description="Recording identifier")
room_name: NonEmptyString = Field(description="Room where recording occurred")
start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)")
type: Literal["cloud", "raw-tracks"] | None = Field(
None, description="Recording type (may be missing from API)"
)
status: RecordingStatus = Field(
description="Recording status ('in-progress' or 'finished')"
)
@@ -145,6 +149,9 @@ class RecordingResponse(BaseModel):
None, description="Token for sharing recording"
)
s3: RecordingS3Info | None = Field(None, description="S3 bucket information")
s3key: NonEmptyString | None = Field(
None, description="S3 key for cloud recordings (top-level field)"
)
tracks: list[DailyTrack] = Field(
default_factory=list,
description="Track list for raw-tracks recordings (always array, never null)",

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Literal
import sqlalchemy as sa
@@ -9,7 +9,7 @@ from reflector.db import get_database, metadata
from reflector.db.rooms import Room
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils import generate_uuid4
from reflector.utils.string import assert_equal
from reflector.utils.string import NonEmptyString, assert_equal
meetings = sa.Table(
"meeting",
@@ -63,6 +63,9 @@ meetings = sa.Table(
nullable=False,
server_default=assert_equal(WHEREBY_PLATFORM, "whereby"),
),
# Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby
sa.Column("daily_composed_video_s3_key", sa.String, nullable=True),
sa.Column("daily_composed_video_duration", sa.Integer, nullable=True),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
)
@@ -110,6 +113,9 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform = WHEREBY_PLATFORM
# Daily.co composed video (Brady Bunch grid) - Daily.co only
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class MeetingController:
@@ -171,6 +177,90 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_name_all(self, room_name: str) -> list[Meeting]:
"""Get all meetings for a room name (not just most recent)."""
query = meetings.select().where(meetings.c.room_name == room_name)
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -260,6 +350,44 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (

View File

@@ -7,6 +7,7 @@ from sqlalchemy import or_
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -71,6 +72,19 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:

View File

@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
nullable=False,
),
sqlalchemy.Column(
"use_hatchet",
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
@@ -97,7 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_hatchet: bool = False
use_celery: bool = False
skip_consent: bool = False

View File

@@ -1,77 +0,0 @@
"""
Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers.
Usage:
uv run -m reflector.hatchet.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.hatchet.run_workers
"""
import signal
import sys
from hatchet_sdk.rate_limit import RateLimitDuration
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Hatchet worker polling."""
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting workers")
sys.exit(1)
if not settings.HATCHET_CLIENT_TOKEN:
logger.error("HATCHET_CLIENT_TOKEN is not set")
sys.exit(1)
logger.info(
"Starting Hatchet workers",
debug=settings.HATCHET_DEBUG,
)
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
)
hatchet = HatchetClientManager.get_client()
hatchet.rate_limits.put(
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
)
worker = hatchet.worker(
"reflector-pipeline-worker",
workflows=[
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
)
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
# Worker cleanup happens automatically on exit
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting Hatchet worker polling...")
worker.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,43 @@
"""
CPU-heavy worker pool for audio processing tasks.
Handles ONLY: mixdown_tracks
Configuration:
- slots=1: Only mixdown (already serialized globally with max_runs=1)
- Worker affinity: pool=cpu-heavy
"""
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.logger import logger
def main():
hatchet = HatchetClientManager.get_client()
logger.info(
"Starting Hatchet CPU worker pool (mixdown only)",
worker_name="cpu-worker-pool",
slots=1,
labels={"pool": "cpu-heavy"},
)
cpu_worker = hatchet.worker(
"cpu-worker-pool",
slots=1, # Only 1 mixdown at a time (already serialized globally)
labels={
"pool": "cpu-heavy",
},
workflows=[daily_multitrack_pipeline],
)
try:
cpu_worker.start()
except KeyboardInterrupt:
logger.info("Received shutdown signal, stopping CPU workers...")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,51 @@
"""
LLM/I/O worker pool for all non-CPU tasks.
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
"""
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
)
from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger
SLOTS = 10
WORKER_NAME = "llm-worker-pool"
POOL = "llm-io"
def main():
hatchet = HatchetClientManager.get_client()
logger.info(
"Starting Hatchet LLM worker pool (all tasks except mixdown)",
worker_name=WORKER_NAME,
slots=SLOTS,
labels={"pool": POOL},
)
llm_worker = hatchet.worker(
WORKER_NAME,
slots=SLOTS, # not all slots are probably used
labels={
"pool": POOL,
},
workflows=[
daily_multitrack_pipeline,
topic_chunk_workflow,
subject_workflow,
track_workflow,
],
)
try:
llm_worker.start()
except KeyboardInterrupt:
logger.info("Received shutdown signal, stopping LLM workers...")
if __name__ == "__main__":
main()

View File

@@ -23,7 +23,12 @@ from pathlib import Path
from typing import Any, Callable, Coroutine, Protocol, TypeVar
import httpx
from hatchet_sdk import Context
from hatchet_sdk import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
Context,
)
from hatchet_sdk.labels import DesiredWorkerLabel
from pydantic import BaseModel
from reflector.dailyco_api.client import DailyApiClient
@@ -467,6 +472,20 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3,
desired_worker_labels={
"pool": DesiredWorkerLabel(
value="cpu-heavy",
required=True,
weight=100,
),
},
concurrency=[
ConcurrencyExpression(
expression="'mixdown-global'",
max_runs=1, # serialize mixdown to prevent resource contention
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, # Queue
)
],
)
@with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
@@ -1130,8 +1149,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
transcript,
"TRANSCRIPT",
TranscriptText(
text=merged_transcript.text,
translation=merged_transcript.translation,
text="",
translation=None,
),
logger=logger,
)
@@ -1328,6 +1347,7 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
f"participants={len(payload.transcript.participants)})"
)
try:
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
@@ -1337,5 +1357,24 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
except httpx.HTTPStatusError as e:
ctx.log(
f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway"
)
return WebhookResult(
webhook_sent=False, response_code=e.response.status_code
)
except httpx.ConnectError as e:
ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except httpx.TimeoutException as e:
ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except Exception as e:
ctx.log(f"send_webhook unexpected error, continuing anyway: {e}")
return WebhookResult(webhook_sent=False)

View File

@@ -7,7 +7,11 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
from datetime import timedelta
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context
from hatchet_sdk import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
Context,
)
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
@@ -34,11 +38,13 @@ hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing",
input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression(
concurrency=[
ConcurrencyExpression(
expression="'global'", # constant string = global limit across all runs
max_runs=20,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
)
],
)

View File

@@ -319,21 +319,6 @@ class ICSSyncService:
calendar = self.fetch_service.parse_ics(ics_content)
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
if room.ics_last_etag == content_hash:
logger.info("No changes in ICS for room", room_id=room.id)
room_url = f"{settings.UI_BASE_URL}/{room.name}"
events, total_events = self.fetch_service.extract_room_events(
calendar, room.name, room_url
)
return {
"status": SyncStatus.UNCHANGED,
"hash": content_hash,
"events_found": len(events),
"total_events": total_events,
"events_created": 0,
"events_updated": 0,
"events_deleted": 0,
}
# Extract matching events
room_url = f"{settings.UI_BASE_URL}/{room.name}"
@@ -371,6 +356,44 @@ class ICSSyncService:
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
return time_since_sync.total_seconds() >= room.ics_fetch_interval
def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool:
"""Check if event data has changed by comparing relevant fields.
IMPORTANT: When adding fields to CalendarEvent/EventData, update this method
and the _COMPARED_FIELDS set below for runtime validation.
"""
# Fields that come from ICS and should trigger updates when changed
_COMPARED_FIELDS = {
"title",
"description",
"start_time",
"end_time",
"location",
"attendees",
"ics_raw_data",
}
# Runtime exhaustiveness check: ensure we're comparing all EventData fields
event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"}
if event_data_fields != _COMPARED_FIELDS:
missing = event_data_fields - _COMPARED_FIELDS
extra = _COMPARED_FIELDS - event_data_fields
raise RuntimeError(
f"_event_data_changed() field mismatch: "
f"missing={missing}, extra={extra}. "
f"Update the comparison logic when adding/removing fields."
)
return (
existing.title != new_data["title"]
or existing.description != new_data["description"]
or existing.start_time != new_data["start_time"]
or existing.end_time != new_data["end_time"]
or existing.location != new_data["location"]
or existing.attendees != new_data["attendees"]
or existing.ics_raw_data != new_data["ics_raw_data"]
)
async def _sync_events_to_database(
self, room_id: str, events: list[EventData]
) -> SyncStats:
@@ -386,11 +409,14 @@ class ICSSyncService:
)
if existing:
# Only count as updated if data actually changed
if self._event_data_changed(existing, event_data):
updated += 1
await calendar_events_controller.upsert(calendar_event)
else:
created += 1
await calendar_events_controller.upsert(calendar_event)
current_ics_uids.append(event_data["ics_uid"])
# Soft delete events that are no longer in calendar

View File

@@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
@@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -102,8 +101,8 @@ async def validate_transcript_for_processing(
if transcript.locked:
return ValidationLocked(detail="Recording is locked")
# hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
# Check if recording is ready for processing
if transcript.status == "idle" and not transcript.workflow_run_id:
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks
@@ -116,7 +115,8 @@ async def validate_transcript_for_processing(
):
return ValidationAlreadyScheduled(detail="already running")
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
# Check Hatchet workflow status if workflow_run_id exists
if transcript.workflow_run_id:
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
@@ -181,19 +181,16 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
room_forces_hatchet = room.use_hatchet if room else False
use_celery = room.use_celery if room else False
# Start durable workflow if enabled (Hatchet)
# and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
use_hatchet = not use_celery
if room_forces_hatchet:
if use_celery:
logger.info(
"Room forces Hatchet workflow",
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
@@ -215,8 +212,9 @@ async def dispatch_transcript_processing(
)
return None
else:
# Workflow exists but can't replay (CANCELLED, COMPLETED, etc.)
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
@@ -225,14 +223,28 @@ async def dispatch_transcript_processing(
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)

View File

@@ -158,19 +158,10 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
# Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings()

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from uuid import UUID
from reflector.dailyco_api import (
CreateMeetingTokenRequest,
@@ -12,9 +13,11 @@ from reflector.dailyco_api import (
RoomProperties,
verify_webhook_signature,
)
from reflector.dailyco_api import RecordingType as DailyRecordingType
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room
from reflector.logger import logger
from reflector.storage import get_dailyco_storage
@@ -58,10 +61,9 @@ class DailyClient(VideoPlatformClient):
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif (
room.recording_type == self.RECORDING_CLOUD
): # daily "cloud" is not our "cloud"
enable_recording = "raw-tracks"
elif room.recording_type == self.RECORDING_CLOUD:
# Don't set enable_recording - recordings started via REST API (not auto-start)
enable_recording = None
properties = RoomProperties(
enable_recording=enable_recording,
@@ -106,8 +108,6 @@ class DailyClient(VideoPlatformClient):
Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored.
"""
from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
return []
@@ -129,6 +129,10 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants(
self, meeting_id: str
) -> MeetingParticipantsResponse:
@@ -179,21 +183,14 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
max_recording_duration_seconds: int | None = None,
) -> NonEmptyString:
start_cloud_recording_opts = None
if start_cloud_recording and max_recording_duration_seconds:
start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds}
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=start_cloud_recording,
start_cloud_recording_opts=start_cloud_recording_opts,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
@@ -201,6 +198,23 @@ class DailyClient(VideoPlatformClient):
result = await self._api_client.create_meeting_token(request)
return result.token
async def start_recording(
self,
room_name: DailyRoomName,
recording_type: DailyRecordingType,
instance_id: UUID,
) -> dict:
"""Start recording via Daily.co REST API.
Args:
instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector)
"""
return await self._api_client.start_recording(
room_name=room_name,
recording_type=recording_type,
instance_id=instance_id,
)
async def close(self):
"""Clean up API client resources."""
await self._api_client.close()

View File

@@ -19,6 +19,7 @@ from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import (
poll_daily_room_presence_task,
process_multitrack_recording,
store_cloud_recording,
)
router = APIRouter()
@@ -174,38 +175,48 @@ async def _handle_recording_started(event: RecordingStartedEvent):
async def _handle_recording_ready(event: RecordingReadyEvent):
room_name = event.payload.room_name
recording_id = event.payload.recording_id
tracks = event.payload.tracks
if not tracks:
logger.warning(
"recording.ready-to-download: missing tracks",
room_name=room_name,
recording_id=recording_id,
payload=event.payload,
)
return
recording_type = event.payload.type
logger.info(
"Recording ready for download",
room_name=room_name,
recording_id=recording_id,
num_tracks=len(tracks),
recording_type=recording_type,
platform="daily",
)
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
logger.error(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording"
logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured")
return
if recording_type == "cloud":
await store_cloud_recording(
recording_id=recording_id,
room_name=room_name,
s3_key=event.payload.s3_key,
duration=event.payload.duration,
start_ts=event.payload.start_ts,
source="webhook",
)
elif recording_type == "raw-tracks":
tracks = event.payload.tracks
if not tracks:
logger.warning(
"raw-tracks recording: missing tracks array",
room_name=room_name,
recording_id=recording_id,
)
return
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
logger.info(
"Recording webhook queuing processing",
"Raw-tracks recording queuing processing",
recording_id=recording_id,
room_name=room_name,
num_tracks=len(track_keys),
)
process_multitrack_recording.delay(
@@ -213,6 +224,14 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
recording_start_ts=event.payload.start_ts,
)
else:
logger.warning(
"Unknown recording type",
recording_type=recording_type,
recording_id=recording_id,
)

View File

@@ -1,16 +1,23 @@
import json
from datetime import datetime, timezone
from typing import Annotated, Optional
from typing import Annotated, Any, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
router = APIRouter()
@@ -73,3 +80,72 @@ async def meeting_deactivate(
await meetings_controller.update_meeting(meeting_id, is_active=False)
return {"status": "success", "meeting_id": meeting_id}
class StartRecordingRequest(BaseModel):
type: RecordingType
instanceId: UUID
@router.post("/meetings/{meeting_id}/recordings/start")
async def start_recording(
meeting_id: NonEmptyString, body: StartRecordingRequest
) -> dict[str, Any]:
"""Start cloud or raw-tracks recording via Daily.co REST API.
Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
Uses different instanceIds for cloud vs raw-tracks (same won't work)
Note: No authentication required - anonymous users supported. TODO this is a DOS vector
"""
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
log.info(f"Started {body.type} recording via REST API")
return {"status": "ok", "result": result}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
try:
error_body = json.loads(e.response_body)
error_info = error_body.get("info", "")
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
log.info(
f"{body.type} recording already active (started by another participant)"
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -20,6 +20,7 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
@@ -73,6 +74,8 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class CreateRoom(BaseModel):
@@ -363,6 +366,53 @@ async def rooms_create_meeting(
return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,
@@ -586,7 +636,6 @@ async def rooms_join_meeting(
)
token = await client.create_meeting_token(
meeting.room_name,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=enable_recording_ui,
user_id=user_id,
is_owner=user_id == room.user_id,

View File

@@ -6,6 +6,11 @@ from celery.schedules import crontab
from reflector.settings import settings
logger = structlog.get_logger(__name__)
# Polling intervals (seconds)
# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery)
POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
@@ -44,7 +49,7 @@ else:
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
},
"trigger_daily_reconciliation": {
"task": "reflector.worker.process.trigger_daily_reconciliation",

View File

@@ -2,7 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List
from typing import List, Literal
from urllib.parse import unquote
import av
@@ -42,6 +42,7 @@ from reflector.utils.daily import (
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
parse_whereby_recording_filename,
@@ -175,13 +176,18 @@ async def process_multitrack_recording(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process raw-tracks (multitrack) recording from Daily.co.
"""
logger.info(
"Processing multitrack recording",
bucket=bucket_name,
room_name=daily_room_name,
recording_id=recording_id,
provided_keys=len(track_keys),
recording_start_ts=recording_start_ts,
)
if not track_keys:
@@ -212,7 +218,7 @@ async def process_multitrack_recording(
)
await _process_multitrack_recording_inner(
bucket_name, daily_room_name, recording_id, track_keys
bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts
)
@@ -221,8 +227,18 @@ async def _process_multitrack_recording_inner(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""Inner function containing the actual processing logic."""
"""
Process multitrack recording (first time or reprocessing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
tz = timezone.utc
recorded_at = datetime.now(tz)
@@ -240,7 +256,53 @@ async def _process_multitrack_recording_inner(
exc_info=True,
)
meeting = await meetings_controller.get_by_room_name(daily_room_name)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -248,18 +310,8 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not meeting:
raise Exception(f"Meeting not found: {room_name_base}")
logger.info(
"Found existing Meeting for recording",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
@@ -271,7 +323,19 @@ async def _process_multitrack_recording_inner(
track_keys=track_keys,
)
)
# else: Recording already exists; metadata set at creation time
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
@@ -287,11 +351,12 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
use_celery = room and room.use_celery
use_hatchet = not use_celery
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
if use_celery:
logger.info(
"Room forces Hatchet workflow",
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
@@ -337,9 +402,11 @@ async def poll_daily_recordings():
"""Poll Daily.co API for recordings and process missing ones.
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
and queues processing for recordings not already in DB.
and stores/queues missing recordings:
- Cloud recordings: Store S3 key in meeting table
- Raw-tracks recordings: Queue multitrack processing
For each missing recording, uses audio tracks from API response.
Acts as fallback when webhooks active, primary discovery when webhooks unavailable.
Worker-level locking provides idempotency (see process_multitrack_recording).
"""
@@ -380,51 +447,222 @@ async def poll_daily_recordings():
)
return
recording_ids = [rec.id for rec in finished_recordings]
# Separate cloud and raw-tracks recordings
cloud_recordings = []
raw_tracks_recordings = []
for rec in finished_recordings:
if rec.type:
# Daily.co API returns null type - make sure this assumption stays
# If this logs, Daily.co API changed - we can remove inference logic.
recording_type = rec.type
logger.warning(
"Recording has explicit type field from Daily.co API (unexpected, API may have changed)",
recording_id=rec.id,
room_name=rec.room_name,
recording_type=recording_type,
has_s3key=bool(rec.s3key),
tracks_count=len(rec.tracks),
)
else:
# DAILY.CO API LIMITATION:
# GET /recordings response does NOT include type field.
# Daily.co docs mention type field exists, but API never returns it.
# Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None.
#
# This is not a recent API change - Daily.co has never returned type.
# Must infer from structural properties.
#
# Inference heuristic (reliable for finished recordings):
# - Has tracks array → raw-tracks
# - Has s3key but no tracks → cloud
# - Neither → failed/incomplete recording
if len(rec.tracks) > 0:
recording_type = "raw-tracks"
elif rec.s3key and len(rec.tracks) == 0:
recording_type = "cloud"
else:
logger.warning(
"Recording has no type, no s3key, and no tracks - likely failed recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
duration=rec.duration,
mtg_session_id=rec.mtgSessionId,
)
continue
if recording_type == "cloud":
cloud_recordings.append(rec)
else:
raw_tracks_recordings.append(rec)
logger.debug(
"Poll results",
total=len(finished_recordings),
cloud=len(cloud_recordings),
raw_tracks=len(raw_tracks_recordings),
)
# Process cloud recordings
await _poll_cloud_recordings(cloud_recordings)
# Process raw-tracks recordings
await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name)
async def store_cloud_recording(
recording_id: NonEmptyString,
room_name: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
start_ts: int,
source: Literal["webhook", "polling"],
) -> bool:
"""
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
room_name: Daily.co room name
s3_key: S3 key where recording is stored
duration: Recording duration in seconds
start_ts: Unix timestamp when recording started
source: "webhook" or "polling" (for logging)
Returns:
True if stored, False if skipped/failed
"""
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
room_name=room_name,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""
Store cloud recordings missing from meeting table via polling.
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
missing_recordings = [
rec for rec in finished_recordings if rec.id not in existing_ids
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All recordings already in DB",
api_count=len(finished_recordings),
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found recordings missing from DB",
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(finished_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
if not track_keys:
logger.warning(
"No audio tracks found in recording (only video tracks)",
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
@@ -432,7 +670,7 @@ async def poll_daily_recordings():
continue
logger.info(
"Queueing missing recording for processing",
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
@@ -443,6 +681,7 @@ async def poll_daily_recordings():
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
@@ -606,8 +845,40 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = False
has_had_sessions = False
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
@@ -633,7 +904,20 @@ async def process_meetings():
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting is deactivated")
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1
@@ -810,7 +1094,6 @@ async def reprocess_failed_daily_recordings():
)
continue
# Fetch room to check use_hatchet flag
room = None
if meeting.room_id:
room = await rooms_controller.get_by_id(meeting.room_id)
@@ -834,10 +1117,10 @@ async def reprocess_failed_daily_recordings():
)
continue
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
@@ -883,11 +1166,16 @@ async def reprocess_failed_daily_recordings():
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -7,8 +7,10 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
uv run celery -A reflector.worker.app worker --loglevel=info
elif [ "${ENTRYPOINT}" = "beat" ]; then
uv run celery -A reflector.worker.app beat --loglevel=info
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then
uv run python -m reflector.hatchet.run_workers
elif [ "${ENTRYPOINT}" = "hatchet-worker-cpu" ]; then
uv run python -m reflector.hatchet.run_workers_cpu
elif [ "${ENTRYPOINT}" = "hatchet-worker-llm" ]; then
uv run python -m reflector.hatchet.run_workers_llm
else
echo "Unknown command"
fi

View File

@@ -0,0 +1,286 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -0,0 +1,147 @@
"""
Tests for Daily.co instanceId generation.
Verifies deterministic behavior and frontend/backend consistency.
"""
import pytest
from reflector.dailyco_api.instance_id import (
RAW_TRACKS_NAMESPACE,
generate_cloud_instance_id,
generate_raw_tracks_instance_id,
)
class TestInstanceIdDeterminism:
"""Test deterministic generation of instanceIds."""
def test_cloud_instance_id_is_meeting_id(self):
"""Cloud instanceId is meeting ID directly (implicitly tests determinism)."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_cloud_instance_id(meeting_id)
result2 = generate_cloud_instance_id(meeting_id)
assert str(result1) == meeting_id
assert result1 == result2
def test_raw_tracks_instance_id_deterministic(self):
"""Raw-tracks instanceId generation is deterministic."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
result1 = generate_raw_tracks_instance_id(meeting_id)
result2 = generate_raw_tracks_instance_id(meeting_id)
assert result1 == result2
def test_raw_tracks_different_from_cloud(self):
"""Raw-tracks instanceId differs from cloud instanceId."""
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
cloud_id = generate_cloud_instance_id(meeting_id)
raw_tracks_id = generate_raw_tracks_instance_id(meeting_id)
assert cloud_id != raw_tracks_id
def test_different_meetings_different_instance_ids(self):
"""Different meetings generate different instanceIds."""
meeting_id1 = "550e8400-e29b-41d4-a716-446655440000"
meeting_id2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8"
cloud1 = generate_cloud_instance_id(meeting_id1)
cloud2 = generate_cloud_instance_id(meeting_id2)
assert cloud1 != cloud2
raw1 = generate_raw_tracks_instance_id(meeting_id1)
raw2 = generate_raw_tracks_instance_id(meeting_id2)
assert raw1 != raw2
class TestFrontendBackendConsistency:
"""Test that backend matches frontend logic."""
def test_namespace_matches_frontend(self):
"""Namespace UUID matches frontend RAW_TRACKS_NAMESPACE constant."""
# From www/app/[roomName]/components/DailyRoom.tsx
frontend_namespace = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
assert str(RAW_TRACKS_NAMESPACE) == frontend_namespace
def test_raw_tracks_generation_matches_frontend_logic(self):
"""Backend UUIDv5 generation matches frontend uuidv5() call."""
# Example meeting ID
meeting_id = "550e8400-e29b-41d4-a716-446655440000"
# Backend result
backend_result = generate_raw_tracks_instance_id(meeting_id)
# Expected result from frontend: uuidv5(meeting.id, RAW_TRACKS_NAMESPACE)
# Python uuid5 uses (namespace, name) argument order
# JavaScript uuid.v5(name, namespace) - same args, different order
# Frontend: uuidv5(meeting.id, "a1b2c3d4-e5f6-7890-abcd-ef1234567890")
# Backend: uuid5(UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), meeting.id)
# Verify it's a valid UUID (will raise if not)
assert len(str(backend_result)) == 36
assert backend_result.version == 5
class TestEdgeCases:
"""Test edge cases and error conditions."""
def test_invalid_uuid_format_raises(self):
"""Invalid UUID format raises ValueError."""
with pytest.raises(ValueError):
generate_cloud_instance_id("not-a-uuid")
def test_lowercase_uuid_normalized_for_cloud(self):
"""Cloud instanceId: lowercase/uppercase UUIDs produce same result."""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
cloud_lower = generate_cloud_instance_id(meeting_id_lower)
cloud_upper = generate_cloud_instance_id(meeting_id_upper)
assert cloud_lower == cloud_upper
def test_uuid5_is_case_sensitive_warning(self):
"""
Documents uuid5 case sensitivity - different case UUIDs produce different hashes.
Not a problem: meeting.id always lowercase from DB and API.
Frontend generates raw-tracks instanceId from lowercase meeting.id.
Backend receives lowercase meeting_id when matching.
This test documents the behavior, not a requirement.
"""
meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000"
meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000"
raw_lower = generate_raw_tracks_instance_id(meeting_id_lower)
raw_upper = generate_raw_tracks_instance_id(meeting_id_upper)
assert raw_lower != raw_upper
class TestMtgSessionIdVsInstanceId:
"""
Documents that Daily.co's mtgSessionId differs from our instanceId.
Why this matters: We investigated using mtgSessionId for matching but discovered
it's Daily.co-generated and unrelated to instanceId we send. This test documents
that finding so we don't investigate it again.
Production data from 2026-01-13:
- Meeting ID: 4ad503b6-8189-4910-a8f7-68cdd1b7f990
- Cloud instanceId: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 (same as meeting ID)
- Raw-tracks instanceId: 784b3af3-c7dd-57f0-ac54-2ee91c6927cb (UUIDv5 derived)
- Recording mtgSessionId: f25a2e09-740f-4932-9c0d-b1bebaa669c6 (different!)
Conclusion: Cannot use mtgSessionId for recording-to-meeting matching.
"""
def test_mtg_session_id_differs_from_our_instance_ids(self):
"""mtgSessionId (Daily.co) != instanceId (ours) for both cloud and raw-tracks."""
meeting_id = "4ad503b6-8189-4910-a8f7-68cdd1b7f990"
expected_raw_tracks_id = "784b3af3-c7dd-57f0-ac54-2ee91c6927cb"
mtg_session_id = "f25a2e09-740f-4932-9c0d-b1bebaa669c6"
cloud_instance_id = generate_cloud_instance_id(meeting_id)
raw_tracks_instance_id = generate_raw_tracks_instance_id(meeting_id)
assert str(cloud_instance_id) == meeting_id
assert str(raw_tracks_instance_id) == expected_raw_tracks_id
assert str(cloud_instance_id) != mtg_session_id
assert str(raw_tracks_instance_id) != mtg_session_id

View File

@@ -2,10 +2,9 @@
Tests for Hatchet workflow dispatch and routing logic.
These tests verify:
1. Routing to Hatchet when HATCHET_ENABLED=True
2. Replay logic for failed workflows
3. Force flag to cancel and restart
4. Validation prevents concurrent workflows
1. Hatchet workflow validation and replay logic
2. Force flag to cancel and restart workflows
3. Validation prevents concurrent workflows
"""
from unittest.mock import AsyncMock, patch
@@ -34,9 +33,6 @@ async def test_hatchet_validation_blocks_running_workflow():
workflow_run_id="running-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -72,9 +68,6 @@ async def test_hatchet_validation_blocks_queued_workflow():
workflow_run_id="queued-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -110,9 +103,6 @@ async def test_hatchet_validation_allows_failed_workflow():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -149,9 +139,6 @@ async def test_hatchet_validation_allows_completed_workflow():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -187,9 +174,6 @@ async def test_hatchet_validation_allows_when_status_check_fails():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -227,9 +211,6 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
@@ -248,38 +229,6 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet at all
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_validation_locked_transcript():

View File

@@ -189,14 +189,17 @@ async def test_ics_sync_service_sync_room_calendar():
assert events[0].ics_uid == "sync-event-1"
assert events[0].title == "Sync Test Meeting"
# Second sync with same content (should be unchanged)
# Second sync with same content (calendar unchanged, but sync always runs)
# Refresh room to get updated etag and force sync by setting old sync time
room = await rooms_controller.get_by_id(room.id)
await rooms_controller.update(
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
)
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "unchanged"
assert result["status"] == "success"
assert result["events_created"] == 0
assert result["events_updated"] == 0
assert result["events_deleted"] == 0
# Third sync with updated event
event["summary"] = "Updated Meeting Title"
@@ -288,3 +291,43 @@ async def test_ics_sync_service_error_handling():
result = await sync_service.sync_room_calendar(room)
assert result["status"] == "error"
assert "Network error" in result["error"]
@pytest.mark.asyncio
async def test_event_data_changed_exhaustiveness():
"""Test that _event_data_changed compares all EventData fields (except ics_uid).
This test ensures programmers don't forget to update the comparison logic
when adding new fields to EventData/CalendarEvent.
"""
from reflector.services.ics_sync import EventData
sync_service = ICSSyncService()
from reflector.db.calendar_events import CalendarEvent
now = datetime.now(timezone.utc)
event_data: EventData = {
"ics_uid": "test-123",
"title": "Test",
"description": "Desc",
"location": "Loc",
"start_time": now,
"end_time": now + timedelta(hours=1),
"attendees": [],
"ics_raw_data": "raw",
}
existing = CalendarEvent(
room_id="room1",
**event_data,
)
# Will raise RuntimeError if fields are missing from comparison
result = sync_service._event_data_changed(existing, event_data)
assert result is False
modified_data = event_data.copy()
modified_data["title"] = "Changed Title"
result = sync_service._event_data_changed(existing, modified_data)
assert result is True

View File

@@ -0,0 +1,374 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
# Create transcript with Daily.co multitrack recording
room = await rooms_controller.add(
name="test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
source_kind="room",
@@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
target_language="en",
user_id="test-user",
share_mode="public",
room_id=room.id,
)
track_keys = [

View File

@@ -3,7 +3,8 @@ import React from "react";
import Markdown from "react-markdown";
import "../../../styles/markdown.css";
import type { components } from "../../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import { useTranscriptUpdate } from "../../../lib/apiHooks";
import {
@@ -18,7 +19,7 @@ import { LuPen } from "react-icons/lu";
import { useError } from "../../../(errors)/errorContext";
type FinalSummaryProps = {
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
onUpdate: (newSummary: string) => void;
finalSummaryRef: React.Dispatch<React.SetStateAction<HTMLDivElement | null>>;

View File

@@ -2,10 +2,11 @@ import type { components } from "../../reflector-api";
import { useTranscriptCreate } from "../../lib/apiHooks";
type CreateTranscript = components["schemas"]["CreateTranscript"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type UseCreateTranscript = {
transcript: GetTranscript | null;
transcript: GetTranscriptWithParticipants | null;
loading: boolean;
error: Error | null;
create: (transcriptCreationDetails: CreateTranscript) => Promise<void>;

View File

@@ -2,7 +2,8 @@ import { useEffect, useState } from "react";
import { ShareMode, toShareMode } from "../../lib/shareMode";
import type { components } from "../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
import {
@@ -27,7 +28,7 @@ import { featureEnabled } from "../../lib/features";
type ShareAndPrivacyProps = {
finalSummaryElement: HTMLDivElement | null;
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
};

View File

@@ -1,7 +1,8 @@
import { useState, useEffect, useMemo } from "react";
import type { components } from "../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
BoxProps,
@@ -26,7 +27,7 @@ import {
import { featureEnabled } from "../../lib/features";
type ShareZulipProps = {
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
disabled: boolean;
};

View File

@@ -2,7 +2,8 @@ import { useState } from "react";
import type { components } from "../../reflector-api";
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
type GetTranscript = components["schemas"]["GetTranscript"];
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import {
useTranscriptUpdate,
@@ -20,7 +21,7 @@ type TranscriptTitle = {
onUpdate: (newTitle: string) => void;
// share props
transcript: GetTranscript | null;
transcript: GetTranscriptWithParticipants | null;
topics: GetTranscriptTopic[] | null;
finalSummaryElement: HTMLDivElement | null;
};

View File

@@ -22,14 +22,38 @@ import DailyIframe, {
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import {
useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import { assertExists } from "../../lib/utils";
import { assertMeetingId } from "../../lib/types";
import {
assertExists,
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import {
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
const RECORDING_INDICATOR_ID = "recording-indicator";
// Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
// DO NOT CHANGE: Breaks instanceId determinism across deployments
const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5;
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
@@ -73,9 +97,7 @@ const useFrame = (
cbs: {
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: (
startRecording: (args: { type: "raw-tracks" }) => void,
) => void;
onJoinMeeting: () => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -126,7 +148,7 @@ const useFrame = (
console.error("frame is null in joined-meeting callback");
return;
}
cbs.onJoinMeeting(frame.startRecording.bind(frame));
cbs.onJoinMeeting();
};
frame.on("joined-meeting", joinCb);
return () => {
@@ -166,6 +188,58 @@ const useFrame = (
] as const;
};
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
@@ -173,9 +247,20 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomName = params?.roomName as string;
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
const rawTracksInstanceId = parseNonEmptyString(
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
if (typeof params.roomName === "object")
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const {
showConsentModal,
@@ -217,6 +302,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
router.push("/browse");
}, [router]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) {
@@ -228,19 +315,83 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
],
);
const handleFrameJoinMeeting = useCallback(
(startRecording: (args: { type: "raw-tracks" }) => void) => {
try {
const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
console.log("Starting dual recording via REST API", {
cloudInstanceId,
rawTracksInstanceId,
});
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
// Daily.co needs time to register call as "hosting" for REST API
const startRecordingWithRetry = (
type: DailyRecordingType,
instanceId: NonEmptyString,
attempt: number = 1,
) => {
setTimeout(() => {
startRecordingMutation.mutate(
{
params: {
path: {
meeting_id: meeting.id,
},
},
body: {
type,
instanceId,
},
},
{
onError: (error: any) => {
const errorText = error?.detail || error?.message || "";
const is404NotHosting = errorText.includes(
"does not seem to be hosting a call",
);
const isActiveStream = errorText.includes(
"has an active stream",
);
if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) {
console.log(
`${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`,
);
startRecordingWithRetry(type, instanceId, attempt + 1);
} else if (isActiveStream) {
console.log(
`${type}: Recording already active (started by another participant)`,
);
} else {
console.error(`Failed to start ${type} recording:`, error);
}
},
[meeting.recording_type],
},
);
}, RECORDING_START_DELAY_MS);
};
// Start both recordings
startRecordingWithRetry("cloud", cloudInstanceId);
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
joinedMutation,
roomName,
meeting.id,
meeting.recording_type,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
]);
const recordingIconUrl = useMemo(
() => new URL("/recording-icon.svg", window.location.origin),

View File

@@ -1,11 +1,13 @@
"use client";
import { $api } from "./apiClient";
import { $api, API_URL } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import type { components, operations } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -567,6 +569,20 @@ export function useTranscriptSpeakerMerge() {
);
}
export function useMeetingStartRecording() {
const { setError } = useError();
return $api.useMutation(
"post",
"/v1/meetings/{meeting_id}/recordings/start",
{
onError: (error) => {
setError(error as Error, "Failed to start recording");
},
},
);
}
export function useMeetingAudioConsent() {
const { setError } = useError();
@@ -752,6 +768,44 @@ export function useRoomJoinMeeting() {
);
}
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() {
const { setError } = useError();

View File

@@ -1,5 +1,6 @@
import { components } from "../reflector-api";
type ApiTranscriptStatus = components["schemas"]["GetTranscript"]["status"];
type ApiTranscriptStatus =
components["schemas"]["GetTranscriptWithParticipants"]["status"];
export type TranscriptStatus = ApiTranscriptStatus;

View File

@@ -89,3 +89,5 @@ export const assertMeetingId = (s: string): MeetingId => {
// just cast for now
return nes as MeetingId;
};
export type DailyRecordingType = "cloud" | "raw-tracks";

View File

@@ -75,6 +75,31 @@ export interface paths {
patch: operations["v1_meeting_deactivate"];
trace?: never;
};
"/v1/meetings/{meeting_id}/recordings/start": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Start Recording
* @description Start cloud or raw-tracks recording via Daily.co REST API.
*
* Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
* Uses different instanceIds for cloud vs raw-tracks (same won't work)
*
* Note: No authentication required - anonymous users supported. TODO this is a DOS vector
*/
post: operations["v1_start_recording"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms": {
parameters: {
query?: never;
@@ -146,6 +171,48 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": {
parameters: {
query?: never;
@@ -1544,6 +1611,10 @@ export interface components {
* @enum {string}
*/
platform: "whereby" | "daily";
/** Daily Composed Video S3 Key */
daily_composed_video_s3_key?: string | null;
/** Daily Composed Video Duration */
daily_composed_video_duration?: number | null;
};
/** MeetingConsentRequest */
MeetingConsentRequest: {
@@ -1818,6 +1889,19 @@ export interface components {
/** Words */
words: components["schemas"]["Word"][];
};
/** StartRecordingRequest */
StartRecordingRequest: {
/**
* Type
* @enum {string}
*/
type: "cloud" | "raw-tracks";
/**
* Instanceid
* Format: uuid
*/
instanceId: string;
};
/** Stream */
Stream: {
/** Stream Id */
@@ -2126,6 +2210,43 @@ export interface operations {
};
};
};
v1_start_recording: {
parameters: {
query?: never;
header?: never;
path: {
meeting_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["StartRecordingRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: unknown;
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_list: {
parameters: {
query?: {
@@ -2356,6 +2477,72 @@ export interface operations {
};
};
};
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: {
parameters: {
query?: never;

View File

@@ -46,6 +46,7 @@
"react-markdown": "^9.0.0",
"react-qr-code": "^2.0.12",
"react-select-search": "^4.1.7",
"react-uuid-hook": "^0.0.6",
"redlock": "5.0.0-beta.2",
"remeda": "^2.31.1",
"sass": "^1.63.6",

25
www/pnpm-lock.yaml generated
View File

@@ -106,6 +106,9 @@ importers:
react-select-search:
specifier: ^4.1.7
version: 4.1.8(prop-types@15.8.1)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
react-uuid-hook:
specifier: ^0.0.6
version: 0.0.6(react@18.3.1)
redlock:
specifier: 5.0.0-beta.2
version: 5.0.0-beta.2
@@ -7628,6 +7631,14 @@ packages:
"@types/react":
optional: true
react-uuid-hook@0.0.6:
resolution:
{
integrity: sha512-u9+EvFbqpWfLE/ReYFry0vYu1BAg1fY9ekr0XLSDNnfWyrnVFytpurwz5qYsIB0psevuvrpZHIcvu7AjUwqinA==,
}
peerDependencies:
react: ">=16.8.0"
react@18.3.1:
resolution:
{
@@ -8771,6 +8782,13 @@ packages:
integrity: sha512-Fykw5U4eZESbq739BeLvEBFRuJODfrlmjx5eJux7W817LjRaq4b7/i4t2zxQmhcX+fAj4nMfRdTzO4tmwLKn0w==,
}
uuid@13.0.0:
resolution:
{
integrity: sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==,
}
hasBin: true
uuid@8.3.2:
resolution:
{
@@ -14570,6 +14588,11 @@ snapshots:
optionalDependencies:
"@types/react": 18.2.20
react-uuid-hook@0.0.6(react@18.3.1):
dependencies:
react: 18.3.1
uuid: 13.0.0
react@18.3.1:
dependencies:
loose-envify: 1.4.0
@@ -15401,6 +15424,8 @@ snapshots:
uuid-validate@0.0.3: {}
uuid@13.0.0: {}
uuid@8.3.2: {}
uuid@9.0.1: {}