Compare commits

..

13 Commits

Author SHA1 Message Date
a455be047a Merge branch 'main' into fix-daily-ux 2026-02-06 11:33:35 -05:00
Igor Loskutov
2a9993ceec remove integration test with hardcoded external URL 2026-02-06 11:31:22 -05:00
Igor Loskutov
8773d9fbbd test: add integration test for ICS dedup using Max's live calendar feed 2026-02-06 11:02:20 -05:00
Igor Loskutov
6c88e05423 refactor: remove redundant fatalError ref, use state directly in handleLeave 2026-02-05 22:06:15 -05:00
Igor Loskutov
f4803c0d76 fix: use function mocks with correct signature in dedup tests 2026-02-05 19:51:27 -05:00
Igor Loskutov
2b484aec00 refactor: data-driven FatalErrorScreen, cover all DailyFatalErrorType cases 2026-02-05 19:49:14 -05:00
Igor Loskutov
d3161730ef fix: use SDK DailyFatalErrorType, add meeting-full/not-allowed/exp-token cases, remove dead end_date fallback 2026-02-05 19:43:25 -05:00
Igor Loskutov
4fd88b2fc1 fix: review feedback — literal error types, extract FatalErrorScreen, type params, fix mock signature 2026-02-05 19:35:47 -05:00
Igor Loskutov
a2694650fd chore: add REPORT.md and ref markers for ApiError cast issue 2026-02-05 19:16:58 -05:00
Igor Loskutov
238d768499 fix: address review feedback - use ApiError type, move inline imports 2026-02-05 18:50:12 -05:00
Igor Loskutov
1da687fe13 fix: prevent duplicate meetings from aggregated ICS calendar feeds
When Cal.com events appear in an aggregated ICS feed, the same event
gets two different UIDs (one from Cal.com, one from Google Calendar).
This caused duplicate meetings to be created for the same time slot.

Add time-window dedup check in create_upcoming_meetings_for_event:
after verifying no meeting exists for the calendar_event_id, also check
if a meeting already exists for the same room + start_date + end_date.
2026-02-05 18:42:42 -05:00
Igor Loskutov
e9e1676409 feat: add specific error messages for Daily.co meeting disconnections
Handle fatal errors from Daily.co SDK (connection-error, exp-room,
ejected, etc.) with user-friendly messages and appropriate actions.
Improve join failure display to show actual API error detail.
2026-02-05 18:42:42 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
30 changed files with 1252 additions and 2488 deletions

View File

@@ -4,5 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121
server/reflector/worker/process.py:generic-api-key:594 server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,414 +0,0 @@
# Presence System Race Condition: Design Document
## Executive Summary
Users in the same Reflector room can end up in **different Daily.co rooms** due to race conditions in meeting lifecycle management. This document details the root cause, why current mitigations are insufficient, and proposes a solution that eliminates the race by design.
---
## Problem Statement
When a user quickly leaves and rejoins a meeting (e.g., closes tab and reopens within seconds), they may find themselves in a different Daily.co room than other participants in the same Reflector room. This breaks the core assumption that all users in a Reflector room share the same video call.
### Symptoms
- User A and User B are in the same Reflector room but different Daily.co rooms
- User reports "I can't see/hear the other participant"
- Meeting appears active but users are isolated
---
## Evidence: Hypothesis Simulation
A simulation was built to model the presence system and find race conditions through randomized action sequences.
**Location**: `server/tests/simulation/`
```bash
cd server
# Current system config - finds race conditions
uv run pytest tests/simulation/test_presence_race.py::test_presence_race_conditions_current_system
# Result: XFAIL (expected failure - race found)
# Fixed system config - no race conditions
uv run pytest tests/simulation/test_presence_race.py::test_presence_no_race_conditions_fixed_system
# Result: PASS
```
The simulation models:
- Discrete time clock for deterministic replay
- Daily.co rooms, participants, presence API with configurable lag
- Reflector meetings, sessions, webhooks
- User state machine: `idle → joining → handshaking → connected → leaving → idle`
- Background tasks: `poll_daily_room_presence`, `process_meetings`
### Key Finding
The simulation proves that **even with the Daily API call**, a race window exists during WebRTC handshake when users are invisible to the presence API.
---
## Current System Analysis
### Architecture Overview
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Frontend │────▶│ Backend │────▶│ Daily.co │
│ (Next.js) │ │ (FastAPI) │ │ API │
└─────────────┘ └─────────────┘ └─────────────┘
┌─────────────┐
│ Database │
│ (Sessions) │
└─────────────┘
```
### Relevant Code Paths
#### 1. Meeting Join Flow
- **File**: `server/reflector/views/rooms.py`
- **Endpoint**: `POST /rooms/{room_name}/meeting`
- Returns existing active meeting or creates new one
- User then connects to Daily.co via WebRTC (frontend)
#### 2. Presence Polling
- **File**: `server/reflector/worker/process.py:642`
- **Function**: `poll_daily_room_presence()`
- Called by webhooks (`participant.joined`, `participant.left`) and `/joined`, `/leave` endpoints
- Queries Daily API for current participants
- Updates `daily_participant_sessions` table in database
#### 3. Meeting Deactivation
- **File**: `server/reflector/worker/process.py:754`
- **Function**: `process_meetings()`
- Runs periodically (every 60s via Celery beat)
- Checks if meetings should be deactivated
**Current implementation** (lines 806-833):
```python
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
# ...
except Exception:
logger_.warning("Daily.co presence API failed, falling back to DB sessions")
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
```
**Key observation**: The code already uses the Daily API (`get_room_presence`), not just the database. The race condition persists despite this.
### Endpoints from feature-leave-endpoint Branch
The `feature-leave-endpoint` branch added explicit leave/join notifications:
| Endpoint | Purpose | Trigger |
|----------|---------|---------|
| `POST /rooms/{room_name}/meetings/{meeting_id}/join` | Get meeting info | User navigates to room |
| `POST /rooms/{room_name}/meetings/{meeting_id}/joined` | Signal connection complete | After WebRTC connects |
| `POST /rooms/{room_name}/meetings/{meeting_id}/leave` | Signal user leaving | Tab close via sendBeacon |
These endpoints trigger `poll_daily_room_presence_task` to update session state faster than waiting for webhooks.
---
## Race Condition: Detailed Analysis
### The Fundamental Problem
**The backend has no knowledge of users who are in the process of joining (WebRTC handshake phase).**
Data sources available to backend:
| Source | What it knows | Limitation |
|--------|---------------|------------|
| Daily Presence API | Currently connected users | 0-500ms lag; doesn't see handshaking users |
| Database sessions | Historical join/leave events | Stale; updated by polls |
| Webhooks | Join/leave events | Delayed; can fail |
**Gap**: No source knows about users between "decided to join" and "WebRTC handshake complete".
### Race Scenario Timeline
```
T+0ms: User A connected to Meeting M1, visible in Daily presence
T+1000ms: User A closes browser tab
T+1050ms: participant.left webhook fires → poll_daily_room_presence queued
T+1500ms: User A reopens tab (quick rejoin)
T+1600ms: POST /meeting returns M1 (still active)
T+1700ms: Frontend starts WebRTC handshake
T+2000ms: User A in handshake - NOT visible to Daily presence API
T+2100ms: poll runs → sees 0 participants → marks session as left_at
T+3000ms: process_meetings runs
T+3100ms: Daily API returns 0 participants (user still handshaking)
T+3200ms: has_active_sessions=False, has_had_sessions=True
T+3300ms: Meeting deactivated, Daily room deleted
T+4000ms: User A WebRTC completes → Daily room is gone!
T+5000ms: User B joins same Reflector room → new Meeting M2 created
RESULT: User A orphaned, User B in different Daily room
```
### Why Current Mitigations Are Insufficient
#### 1. Using Daily API (already implemented)
The code already calls `get_room_presence()` instead of relying solely on database sessions. **This doesn't help** because the Daily presence API itself doesn't see users during WebRTC handshake (0-500ms consistency lag + handshake duration of 500-3000ms).
#### 2. Fallback to Database
When Daily API fails, the code falls back to database sessions. This is **worse** because database is even more stale than the API.
#### 3. Leave/Join Endpoints
The `/joined` and `/leave` endpoints trigger immediate polls, reducing the window but **not eliminating it**. The poll still only sees what Daily presence API reports.
---
## Proposed Solutions
### Option A: Grace Period (Not Recommended)
Add a time-based buffer before deactivation.
```python
GRACE_PERIOD_SECONDS = 10
if not has_active_sessions and has_had_sessions:
recent_activity = await get_recent_activity(meeting_id, within_seconds=GRACE_PERIOD_SECONDS)
if recent_activity:
continue # Skip deactivation
```
**Pros:**
- Simple to implement
- Low risk
**Cons:**
- Arbitrary timeout value (why 10s? why not 5s or 30s?)
- Feels like a hack ("setTimeout solution")
- Delays legitimate deactivation
- Doesn't eliminate race, just makes it less likely
### Option B: Track "Intent to Join" (Recommended)
Add explicit state tracking for users who are in the process of joining.
**New endpoint**: `POST /rooms/{room_name}/meetings/{meeting_id}/joining`
**Flow change**:
```
Current:
1. POST /join → get meeting info
2. Render Daily iframe (start WebRTC)
3. POST /joined (after connected)
Proposed:
1. POST /join → get meeting info
2. POST /joining → "I'm about to connect" ← NEW (wait for 200 OK)
3. Render Daily iframe (start WebRTC)
4. POST /joined (after connected)
```
**Backend tracking**:
```python
# On /joining endpoint
await pending_joins.create(meeting_id=meeting_id, user_id=user_id, created_at=now())
# In process_meetings
pending = await pending_joins.get_recent(meeting_id, max_age_seconds=30)
if pending:
logger.info("Meeting has pending joins, skipping deactivation")
continue
# On /joined endpoint or timeout
await pending_joins.delete(meeting_id=meeting_id, user_id=user_id)
```
**Pros:**
- Eliminates race by design (backend knows before Daily does)
- Explicit state machine, not time-based guessing
- Clear semantics
**Cons:**
- Adds ~50-200ms latency (one round-trip before iframe renders)
- Requires frontend changes
- Needs cleanup mechanism for abandoned joins (user closes tab during handshake)
### Option C: Optimistic Locking with Version
Track meeting "version" that must match for deactivation.
**Concept**: Each join attempt increments a version. Deactivation only proceeds if version hasn't changed since presence check.
**Cons:**
- Complex to implement correctly
- Still has edge cases with concurrent joins
---
## Recommended Approach: Option B
**Track "Intent to Join"** is the cleanest solution because it:
1. **Eliminates the race by design** - no timing windows
2. **Makes state explicit** - joining/connected/leaving are tracked, not inferred
3. **Aligns with existing patterns** - similar to `/joined` and `/leave` endpoints
4. **No arbitrary timeouts** - unlike grace period
### Data Model Change
Add tracking for pending joins. Options:
| Storage | Pros | Cons |
|---------|------|------|
| Redis key | Fast, auto-expire | Lost on Redis restart |
| Database table | Persistent, queryable | Slightly slower |
| In-memory | Fastest | Lost on server restart |
**Recommendation**: Redis with TTL (30s expiry) for simplicity. Pending joins are ephemeral - if Redis restarts, worst case is a brief deactivation delay.
```python
# Redis key format
pending_join:{meeting_id}:{user_id} = {timestamp}
# TTL: 30 seconds
```
### Implementation Checklist
1. **Backend: Add `/joining` endpoint**
- File: `server/reflector/views/rooms.py`
- Creates Redis key with 30s TTL
- Returns 200 OK
2. **Backend: Modify `process_meetings()`**
- File: `server/reflector/worker/process.py`
- Before deactivation, check for pending joins
- If any exist, skip deactivation
3. **Backend: Modify `/joined` endpoint**
- Clear pending join on successful connection
4. **Frontend: Call `/joining` before WebRTC**
- File: `www/app/[roomName]/components/DailyRoom.tsx`
- Await response before rendering Daily iframe
5. **Update simulation**
- Add `joining` state tracking to match new design
- Verify race condition is eliminated
6. **Integration tests**
- Test quick rejoin scenario
- Test abandoned join (user closes during handshake)
- Test concurrent joins from multiple users
---
## Files Reference
### Core Files to Modify
| File | Purpose |
|------|---------|
| `server/reflector/views/rooms.py` | Add `/joining` endpoint |
| `server/reflector/worker/process.py` | Check pending joins before deactivation |
| `www/app/[roomName]/components/DailyRoom.tsx` | Call `/joining` before WebRTC |
### Reference Files
| File | Contains |
|------|----------|
| `server/reflector/video_platforms/daily.py:128` | `get_room_presence()` - Daily API call |
| `server/reflector/worker/process.py:642` | `poll_daily_room_presence()` - presence polling |
| `server/reflector/views/daily.py:125` | Webhook handlers |
| `server/tests/simulation/` | Hypothesis simulation proving the race |
| `server/tests/test_daily_presence_deactivation.py` | Unit tests for presence logic |
### Simulation Files
| File | Purpose |
|------|---------|
| `tests/simulation/system.py` | Main simulation engine |
| `tests/simulation/config.py` | Current vs fixed system configs |
| `tests/simulation/state.py` | State dataclasses |
| `tests/simulation/test_presence_race.py` | Hypothesis stateful tests |
| `tests/simulation/test_targeted_scenarios.py` | Specific race scenarios |
| `server/reflector/presence/model.py` | Shared state machine model |
---
## Alternative Considered: Remove DB Fallback
One simpler change discussed: remove the database fallback when Daily API fails, and "fail loudly" instead.
```python
# Current (with fallback)
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
except Exception:
# Fallback to stale DB
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(room_sessions and any(s.ended_at is None for s in room_sessions))
# Proposed (fail loudly)
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
except Exception:
logger.error("Daily API failed, skipping deactivation check for this meeting")
continue # Don't deactivate if we can't verify
```
**This helps but doesn't eliminate the race** - it only removes one failure mode (stale DB). The core race (handshake invisibility) remains.
---
## Conclusion
The presence system race condition is a **data model gap**, not a timing issue that can be solved with grace periods. The backend needs explicit knowledge of users who intend to join, before they become visible to the Daily presence API.
The recommended fix is to add a `/joining` endpoint that the frontend calls before starting WebRTC. This creates a "reservation" that prevents premature meeting deactivation during the handshake window.
This approach:
- Eliminates the race by design
- Adds minimal latency (~50-200ms)
- Follows explicit state machine principles
- Avoids arbitrary timeout hacks
---
## Appendix: Simulation Test Results
```
$ uv run pytest tests/simulation/ -v
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_uses_model_states PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_respects_transitions PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_invalid_transitions_checked PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_implements_protocols PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_uses_shared_invariants PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_has_all_states PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_valid_transitions PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_invalid_transitions_raise PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_transitions PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_rejects_invalid PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_tracks_history PASSED
tests/simulation/test_model_conformance.py::TestInvariantConsistency::test_invariants_same_between_model_and_simulation PASSED
tests/simulation/test_model_conformance.py::test_quick_conformance_check PASSED
tests/simulation/test_presence_race.py::TestPresenceRaceFixed::runTest PASSED
tests/simulation/test_presence_race.py::test_presence_race_conditions_current_system XFAIL
tests/simulation/test_presence_race.py::test_presence_no_race_conditions_fixed_system PASSED
tests/simulation/test_presence_race.py::test_smoke_presence_simulation PASSED
tests/simulation/test_targeted_scenarios.py::TestQuickRejoinRace::test_quick_rejoin_causes_split PASSED
tests/simulation/test_targeted_scenarios.py::TestQuickRejoinRace::test_quick_rejoin_fixed_system PASSED
tests/simulation/test_targeted_scenarios.py::TestSimultaneousJoins::test_two_users_join_simultaneously PASSED
tests/simulation/test_targeted_scenarios.py::TestProcessMeetingsRace::test_process_meetings_during_handshake PASSED
tests/simulation/test_targeted_scenarios.py::TestPresenceLagRace::test_presence_lag_causes_incorrect_count PASSED
tests/simulation/test_targeted_scenarios.py::TestMeetingDeactivationEdgeCases::test_deactivation_with_no_sessions PASSED
tests/simulation/test_targeted_scenarios.py::TestMeetingDeactivationEdgeCases::test_deactivation_requires_had_sessions PASSED
tests/simulation/test_targeted_scenarios.py::TestEventLogTracing::test_event_log_captures_flow PASSED
tests/simulation/test_targeted_scenarios.py::test_config_presets PASSED
================== 25 passed, 1 xfailed ==================
```
The `xfail` test (`test_presence_race_conditions_current_system`) demonstrates that the current system configuration has race conditions that can be found through randomized testing.

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants | | **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp | | **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)). **Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording ### Recording
@@ -101,30 +101,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs. **Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
--- ---
## Entity Relationships ## Entity Relationships
@@ -220,19 +196,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room). `mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant) ### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks. **Different concept:** Per-participant connection identifier from webhooks.
@@ -257,24 +220,16 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers. Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response (mtgSessionId can be null OR present):** **Example API response:**
```json ```json
{ {
"id": "recording-uuid", "id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117", "room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896, "start_ts": 1768018896,
"mtgSessionId": null // ← Often null (unreliable) "mtgSessionId": null Missing!
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
} }
``` ```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching ### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()` **Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -536,10 +491,6 @@ UI: User sees 3 separate transcripts
--- ---
**Document Version:** 1.1 **Document Version:** 1.0
**Last Updated:** 2026-01-20 **Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection + empirical testing **Data Source:** Production database + Daily.co API inspection
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -1,67 +0,0 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -1,56 +0,0 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,7 +26,6 @@ def get_database() -> databases.Database:
# import models # import models
import reflector.db.calendar_events # noqa import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa import reflector.db.meetings # noqa
import reflector.db.recordings # noqa import reflector.db.recordings # noqa
import reflector.db.rooms # noqa import reflector.db.rooms # noqa

View File

@@ -1,111 +0,0 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime from datetime import datetime, timedelta
from typing import Any, Literal from typing import Any, Literal
import sqlalchemy as sa import sqlalchemy as sa
@@ -183,6 +183,84 @@ class MeetingController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results] return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
""" """
Get latest active meeting for a room. Get latest active meeting for a room.
@@ -268,10 +346,69 @@ class MeetingController:
return None return None
return Meeting(**result) return Meeting(**result)
async def get_by_room_and_time_window(
self, room: Room, start_date: datetime, end_date: datetime
) -> Meeting | None:
"""Check if a meeting already exists for this room with the same time window."""
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id == room.id,
meetings.c.start_date == start_date,
meetings.c.end_date == end_date,
meetings.c.is_active,
)
)
.limit(1)
)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def update_meeting(self, meeting_id: str, **kwargs): async def update_meeting(self, meeting_id: str, **kwargs):
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query) await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None: async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count.""" """Atomically increment participant count."""
query = ( query = (
@@ -351,27 +488,6 @@ class MeetingConsentController:
result = await get_database().fetch_one(query) result = await get_database().fetch_one(query)
return result is not None return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController() meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController() meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa import sqlalchemy as sa
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4 from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table( recordings = sa.Table(
"recording", "recording",
@@ -31,13 +31,14 @@ recordings = sa.Table(
class Recording(BaseModel): class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4) id: str = Field(default_factory=generate_uuid4)
bucket_name: str bucket_name: str
# for single-track
object_key: str object_key: str
recorded_at: datetime recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed", "orphan"] = ( status: Literal["pending", "processing", "completed", "failed"] = "pending"
"pending"
)
meeting_id: str | None = None meeting_id: str | None = None
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio # for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None track_keys: list[str] | None = None
@property @property
@@ -71,6 +72,20 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id) query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query) await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids: if not recording_ids:
return [] return []
@@ -89,12 +104,9 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python. This is more efficient than fetching all recordings and filtering in Python.
""" """
# INLINE IMPORT REQUIRED: Circular dependency from reflector.db.transcripts import (
# - recordings.py needs transcripts table for JOIN query transcripts, # noqa: PLC0415 cyclic import
# - transcripts.py imports recordings_controller )
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
query = ( query = (
recordings.select() recordings.select()
@@ -112,27 +124,5 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results] recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack] return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController() recordings_controller = RecordingController()

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
use_celery = False # Multitrack processing always uses Hatchet (no Celery fallback)
if config.room_id: # First check if we can replay (outside transaction since it's read-only)
room = await rooms_controller.get_by_id(config.room_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
use_celery = room.use_celery if room else False if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
use_hatchet = not use_celery transcript.workflow_run_id
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
if use_hatchet: await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
# First check if we can replay (outside transaction since it's read-only) logger.info(
transcript = await transcripts_controller.get_by_id(config.transcript_id) "Replaying Hatchet workflow",
if transcript and transcript.workflow_run_id and not force: workflow_id=transcript.workflow_run_id,
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
if can_replay: return None
await HatchetClientManager.replay_workflow( else:
transcript.workflow_run_id # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
) # Log and proceed to start new workflow
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): logger.info(
logger.info( "Old workflow not replayable, starting new",
"Concurrent workflow detected, skipping dispatch", old_workflow_id=transcript.workflow_run_id,
workflow_id=transcript.workflow_run_id, old_status=status.value,
) )
return None except NotFoundException:
except ApiException: # Workflow deleted from Hatchet but ID still in DB
# Workflow might be gone (404) or API issue - proceed with new workflow logger.info(
pass "Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow( # Force: cancel old workflow if exists
workflow_name="DiarizationPipeline", if force and transcript and transcript.workflow_run_id:
input_data={ try:
"recording_id": config.recording_id, await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
"tracks": [{"s3_key": k} for k in config.track_keys], logger.info(
"bucket_name": config.bucket_name, "Cancelled old workflow (--force)",
"transcript_id": config.transcript_id, workflow_id=transcript.workflow_run_id,
"room_id": config.room_id, )
}, except NotFoundException:
additional_metadata={ logger.info(
"transcript_id": config.transcript_id, "Old workflow already deleted (--force)",
"recording_id": config.recording_id, workflow_id=transcript.workflow_run_id,
"daily_recording_id": config.recording_id, )
}, await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
) )
if transcript: logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
await transcripts_controller.update( return None
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None

View File

@@ -129,10 +129,6 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room.""" """Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name) return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants( async def get_meeting_participants(
self, meeting_id: str self, meeting_id: str
) -> MeetingParticipantsResponse: ) -> MeetingParticipantsResponse:

View File

@@ -1,6 +1,4 @@
import json import json
import os
from datetime import datetime, timezone
from typing import assert_never from typing import assert_never
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
@@ -14,10 +12,7 @@ from reflector.dailyco_api import (
RecordingReadyEvent, RecordingReadyEvent,
RecordingStartedEvent, RecordingStartedEvent,
) )
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger from reflector.logger import logger as _logger
from reflector.settings import settings from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
@@ -217,73 +212,10 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"] track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info( logger.info(
"Raw-tracks recording queuing processing (webhook)", "Raw-tracks recording queuing processing",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys), num_tracks=len(track_keys),
) )

View File

@@ -1,5 +1,4 @@
import json import json
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Annotated, Any, Optional from typing import Annotated, Any, Optional
from uuid import UUID from uuid import UUID
@@ -10,21 +9,16 @@ from pydantic import BaseModel
import reflector.auth as auth import reflector.auth as auth
from reflector.dailyco_api import RecordingType from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import ( from reflector.db.meetings import (
MeetingConsent, MeetingConsent,
meeting_consent_controller, meeting_consent_controller,
meetings_controller, meetings_controller,
) )
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@@ -108,6 +102,13 @@ async def start_recording(
if not meeting: if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found") raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try: try:
client = create_platform_client("daily") client = create_platform_client("daily")
result = await client.start_recording( result = await client.start_recording(
@@ -116,30 +117,9 @@ async def start_recording(
instance_id=body.instanceId, instance_id=body.instanceId,
) )
recording_id = result["id"] log.info(f"Started {body.type} recording via REST API")
await daily_recording_requests_controller.create( return {"status": "ok", "result": result}
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
except DailyApiError as e: except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream" # Parse Daily.co error response to detect "has an active stream"
@@ -150,42 +130,22 @@ async def start_recording(
# "has an active stream" means recording already started by another participant # "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200 # This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info: if "has an active stream" in error_info:
logger.info( log.info(
f"{body.type} recording already active (started by another participant)", f"{body.type} recording already active (started by another participant)"
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
) )
return {"status": "already_active", "instanceId": str(body.instanceId)} return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError): except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling pass # Fall through to error handling
# All other Daily.co API errors # All other Daily.co API errors
logger.error( log.error(f"Failed to start {body.type} recording", error=str(e))
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )
except Exception as e: except Exception as e:
# Non-Daily.co errors # Non-Daily.co errors
logger.error( log.error(f"Failed to start {body.type} recording", error=str(e))
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )

View File

@@ -20,7 +20,6 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.url import add_query_param from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -366,53 +365,6 @@ async def rooms_create_meeting(
return meeting return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult) @router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook( async def rooms_test_webhook(
room_id: str, room_id: str,

View File

@@ -5,7 +5,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from reflector.asynctask import asynctask from reflector.asynctask import asynctask
from reflector.db.calendar_events import calendar_events_controller from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller from reflector.db.rooms import Room, rooms_controller
from reflector.redis_cache import RedisAsyncLock from reflector.redis_cache import RedisAsyncLock
@@ -83,10 +83,9 @@ def _should_sync(room) -> bool:
return time_since_sync.total_seconds() >= room.ics_fetch_interval return time_since_sync.total_seconds() >= room.ics_fetch_interval
MEETING_DEFAULT_DURATION = timedelta(hours=1) async def create_upcoming_meetings_for_event(
event: CalendarEvent, create_window: datetime, room: Room
):
async def create_upcoming_meetings_for_event(event, create_window, room: Room):
if event.start_time <= create_window: if event.start_time <= create_window:
return return
existing_meeting = await meetings_controller.get_by_calendar_event(event.id, room) existing_meeting = await meetings_controller.get_by_calendar_event(event.id, room)
@@ -94,6 +93,21 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
if existing_meeting: if existing_meeting:
return return
# Prevent duplicate meetings from aggregated calendar feeds
# (e.g. same event appears with different UIDs from Cal.com and Google Calendar)
end_date = event.end_time
existing_by_time = await meetings_controller.get_by_room_and_time_window(
room, event.start_time, end_date
)
if existing_by_time:
logger.info(
"Skipping duplicate calendar event - meeting already exists for this time window",
room_id=room.id,
event_id=event.id,
existing_meeting_id=existing_by_time.id,
)
return
logger.info( logger.info(
"Pre-creating meeting for calendar event", "Pre-creating meeting for calendar event",
room_id=room.id, room_id=room.id,
@@ -102,8 +116,6 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
) )
try: try:
end_date = event.end_time or (event.start_time + MEETING_DEFAULT_DURATION)
client = create_platform_client(room.platform) client = create_platform_client(room.platform)
meeting_data = await client.create_meeting( meeting_data = await client.create_meeting(

View File

@@ -1,5 +1,6 @@
import json import json
import os import os
import re
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Literal from typing import List, Literal
from urllib.parse import unquote from urllib.parse import unquote
@@ -12,12 +13,10 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
DailyParticipantSession, DailyParticipantSession,
daily_participant_sessions_controller, daily_participant_sessions_controller,
) )
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
@@ -28,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -231,44 +227,79 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int, recording_start_ts: int,
): ):
""" """
Process multitrack recording. Process multitrack recording (first time or reprocessing).
Recording must already exist with meeting_id set (created by webhook/polling before queueing). For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
""" """
# Get recording (must exist - created by webhook/polling) tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id) recording = await recordings_controller.get_by_id(recording_id)
if not recording: if recording and recording.meeting_id:
logger.error( # Reprocessing: recording exists with meeting already linked
"Recording not found - should have been created by webhook/polling", meeting = await meetings_controller.get_by_id(recording.meeting_id)
recording_id=recording_id, if not meeting:
) logger.error(
return "Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
if not recording.meeting_id: logger.info(
logger.error( "Reprocessing: using existing recording.meeting_id",
"Recording has no meeting_id - orphan should not be queued",
recording_id=recording_id, recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
) )
return else:
# First processing: recording doesn't exist, need time-based matching
# Get meeting # (Daily.co doesn't return instanceId in API, must match by timestamp)
meeting = await meetings_controller.get_by_id(recording.meeting_id) recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
if not meeting: meeting = await meetings_controller.get_by_room_name_and_time(
logger.error( room_name=daily_room_name,
"Meeting not found for recording", recording_start=recording_start,
meeting_id=recording.meeting_id, time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id, recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
) )
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name) room_name_base = extract_base_room_name(daily_room_name)
@@ -276,6 +307,33 @@ async def _process_multitrack_recording_inner(
if not room: if not room:
raise Exception(f"Room not found: {room_name_base}") raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id) transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript: if not transcript:
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
@@ -290,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_celery = room and room.use_celery # Multitrack processing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
if use_celery: input_data={
logger.info( "recording_id": recording_id,
"Room uses legacy Celery processing", "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
room_id=room.id, "bucket_name": bucket_name,
transcript_id=transcript.id, "transcript_id": transcript.id,
) "room_id": room.id,
},
if use_hatchet: additional_metadata={
workflow_id = await HatchetClientManager.start_workflow( "transcript_id": transcript.id,
workflow_name="DiarizationPipeline", "recording_id": recording_id,
input_data={ "daily_recording_id": recording_id,
"recording_id": recording_id, },
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
) )
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task @shared_task
@@ -461,7 +499,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table. Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths. Common function for both webhook and polling code paths.
Uses direct recording_id lookup via daily_recording_requests table. Uses time-based matching to handle duplicate room_name values.
Args: Args:
recording_id: Daily.co recording ID recording_id: Daily.co recording ID
@@ -474,170 +512,155 @@ async def store_cloud_recording(
Returns: Returns:
True if stored, False if skipped/failed True if stored, False if skipped/failed
""" """
# Lookup request recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
if not match: meeting = await meetings_controller.get_by_room_name_and_time(
# ORPHAN: No request found (pre-migration recording or failed request creation) room_name=room_name,
await create_and_log_orphan( recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id, recording_id=recording_id,
bucket_name="",
room_name=room_name, room_name=room_name,
start_ts=start_ts, recording_start_ts=start_ts,
track_keys=None, recording_start=recording_start.isoformat(),
source=source,
) )
return False return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing( success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id, meeting_id=meeting.id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
) )
if not success: if not success:
logger.debug( logger.debug(
f"Cloud recording ({source}): already set (stop/restart?)", f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting_id, meeting_id=meeting.id,
) )
return False return False
logger.info( logger.info(
f"Cloud recording stored via {source}", f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting_id, meeting_id=meeting.id,
recording_id=recording_id, recording_id=recording_id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
) )
return True return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""Process cloud recordings (database deduplication, worker-agnostic). """
Store cloud recordings missing from meeting table via polling.
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table. Uses time-based matching via store_cloud_recording().
Only first cloud recording per meeting is kept (existing behavior).
""" """
if not cloud_recordings: if not cloud_recordings:
return return
for rec in cloud_recordings: stored_count = 0
# Lookup request for recording in cloud_recordings:
match = await daily_recording_requests_controller.find_by_recording_id(rec.id) # Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not match: if not s3_key:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
meeting_id, _ = match
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
logger.warning( logger.warning(
"Cloud recording already exists for meeting (stop/restart?)", "Cloud recording: missing S3 key",
recording_id=rec.id, recording_id=recording.id,
meeting_id=meeting_id, room_name=recording.room_name,
) )
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings( async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse], raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: NonEmptyString, bucket_name: str,
) -> None: ):
"""Process raw-tracks (database deduplication, worker-agnostic).""" """Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings: if not raw_tracks_recordings:
return return
for rec in raw_tracks_recordings: recording_ids = [rec.id for rec in raw_tracks_recordings]
# Lookup request FIRST (before any DB writes) existing_recordings = await recordings_controller.get_by_ids(recording_ids)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id) existing_ids = {rec.id for rec in existing_recordings}
if not match: missing_recordings = [
await create_and_log_orphan( rec for rec in raw_tracks_recordings if rec.id not in existing_ids
recording_id=rec.id, ]
bucket_name=bucket_name,
room_name=rec.room_name, if not missing_recordings:
start_ts=rec.start_ts, logger.debug(
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"], "All raw-tracks recordings already in DB",
source="polling", api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
) )
continue continue
meeting_id, _ = match track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
# Verify meeting exists if not track_keys:
meeting = await meetings_controller.get_by_id(meeting_id) logger.warning(
if not meeting: "No audio tracks found in raw-tracks recording",
logger.error( recording_id=recording.id,
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id room_name=recording.room_name,
) total_tracks=len(recording.tracks),
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
) )
continue continue
# DEDUPLICATION: Atomically create recording (single operation, no race window) logger.info(
# ON CONFLICT → concurrent poller already got it, skip entire logic "Queueing missing raw-tracks recording for processing",
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"] recording_id=recording.id,
room_name=recording.room_name,
created = await recordings_controller.try_create_with_meeting( track_count=len(track_keys),
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
) )
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay( process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name, bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys, track_keys=track_keys,
recording_start_ts=recording.start_ts,
) )
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None: async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed. """Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -799,47 +822,15 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc) end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform) client = create_platform_client(meeting.platform)
has_active_sessions = False room_sessions = await client.get_room_sessions(meeting.room_name)
has_had_sessions = False
if meeting.platform == "daily": has_active_sessions = bool(
try: room_sessions and any(s.ended_at is None for s in room_sessions)
presence = await client.get_room_presence(meeting.room_name) )
has_active_sessions = presence.total_count > 0 has_had_sessions = bool(room_sessions)
logger_.info(
room_sessions = await client.get_room_sessions( f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
meeting.room_name )
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions: if has_active_sessions:
logger_.debug("Meeting still has active sessions, keep it") logger_.debug("Meeting still has active sessions, keep it")
@@ -858,20 +849,7 @@ async def process_meetings():
await meetings_controller.update_meeting( await meetings_controller.update_meeting(
meeting.id, is_active=False meeting.id, is_active=False
) )
logger_.info("Meeting deactivated in database") logger_.info("Meeting is deactivated")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1 processed_count += 1
@@ -1071,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_celery = room and room.use_celery # Multitrack reprocessing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery if not transcript:
logger.warning(
if use_hatchet: "No transcript for Hatchet reprocessing, skipping",
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) workflow_id = await HatchetClientManager.start_workflow(
# Reprocessing uses recording.meeting_id directly instead of time-based matching workflow_name="DiarizationPipeline",
recording_start_ts = int(recording.recorded_at.timestamp()) input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay( logger.info(
bucket_name=bucket_name, "Queued Daily recording for Hatchet reprocessing",
daily_room_name=meeting.room_name, recording_id=recording.id,
recording_id=recording.id, workflow_id=workflow_id,
track_keys=recording.track_keys, room_name=meeting.room_name,
recording_start_ts=recording_start_ts, track_count=len(recording.track_keys),
) )
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield

View File

@@ -1,286 +0,0 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -1,258 +0,0 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -0,0 +1,190 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db import get_database
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings
from reflector.db.rooms import rooms_controller
from reflector.worker.ics_sync import create_upcoming_meetings_for_event
@pytest.mark.asyncio
async def test_duplicate_calendar_event_does_not_create_duplicate_meeting():
"""When an aggregated ICS feed contains the same event with different UIDs
(e.g. Cal.com UID + Google Calendar UUID), only one meeting should be created."""
room = await rooms_controller.add(
name="dedup-test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
start_time = now + timedelta(hours=1)
end_time = now + timedelta(hours=2)
# Create first calendar event (Cal.com UID)
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="abc123@Cal.com",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# create_window must be before start_time for the function to proceed
create_window = now - timedelta(minutes=6)
# Create meeting for event1
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
async def mock_create_meeting_1(room_name_prefix, *, end_date, room):
return AsyncMock(
meeting_id="meeting-1",
room_name="dedup-test-room-abc",
room_url="https://mock.video/dedup-test-room-abc",
host_room_url="https://mock.video/dedup-test-room-abc?host=true",
)
mock_client.create_meeting = mock_create_meeting_1
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
# Verify meeting was created
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting, got {len(results)}"
# Create second calendar event with different UID but same time window (Google Calendar UUID)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="550e8400-e29b-41d4-a716-446655440000",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# Try to create meeting for event2 - should be skipped due to dedup
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
create_meeting_called = False
async def mock_create_meeting_2(room_name_prefix, *, end_date, room):
nonlocal create_meeting_called
create_meeting_called = True
mock_client.create_meeting = mock_create_meeting_2
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event2, create_window, room)
# Platform client should NOT have been called for the duplicate
assert (
not create_meeting_called
), "create_meeting should not be called for duplicate"
# Verify still only 1 meeting
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting after dedup, got {len(results)}"
@pytest.mark.asyncio
async def test_different_time_windows_create_separate_meetings():
"""Events at different times should create separate meetings, even if titles match."""
room = await rooms_controller.add(
name="dedup-diff-time-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup2.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
create_window = now - timedelta(minutes=6)
# Event 1: 1-2pm
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-morning@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=1),
end_time=now + timedelta(hours=2),
)
)
# Event 2: 3-4pm (different time)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-afternoon@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=3),
end_time=now + timedelta(hours=4),
)
)
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
call_count = 0
async def mock_create_meeting(room_name_prefix, *, end_date, room):
nonlocal call_count
call_count += 1
return AsyncMock(
meeting_id=f"meeting-{call_count}",
room_name=f"dedup-diff-time-room-{call_count}",
room_url=f"https://mock.video/dedup-diff-time-room-{call_count}",
host_room_url=f"https://mock.video/dedup-diff-time-room-{call_count}?host=true",
)
mock_client.create_meeting = mock_create_meeting
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
await create_upcoming_meetings_for_event(event2, create_window, room)
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert (
len(results) == 2
), f"Expected 2 meetings for different times, got {len(results)}"

View File

@@ -1,300 +0,0 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -0,0 +1,374 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline # Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called() mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline # Daily.co multitrack recordings should use Hatchet workflow
mock_multitrack_pipeline.delay.assert_called_once_with( mock_hatchet.start_workflow.assert_called_once()
transcript_id=transcript.id, call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
bucket_name="daily-bucket", assert call_kwargs["workflow_name"] == "DiarizationPipeline"
track_keys=track_keys, assert call_kwargs["input_data"]["transcript_id"] == transcript.id
) assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -8,7 +8,7 @@ import {
useRef, useRef,
useState, useState,
} from "react"; } from "react";
import { Box, Spinner, Center, Text } from "@chakra-ui/react"; import { Box, Spinner, Center, Text, Button, VStack } from "@chakra-ui/react";
import { useRouter, useParams } from "next/navigation"; import { useRouter, useParams } from "next/navigation";
import DailyIframe, { import DailyIframe, {
DailyCall, DailyCall,
@@ -16,32 +16,26 @@ import DailyIframe, {
DailyCustomTrayButton, DailyCustomTrayButton,
DailyCustomTrayButtons, DailyCustomTrayButtons,
DailyEventObjectCustomButtonClick, DailyEventObjectCustomButtonClick,
DailyEventObjectFatalError,
DailyFatalErrorType,
DailyFactoryOptions, DailyFactoryOptions,
DailyParticipantsObject, DailyParticipantsObject,
} from "@daily-co/daily-js"; } from "@daily-co/daily-js";
import type { components } from "../../reflector-api"; import type { components } from "../../reflector-api";
import { printApiError, ApiError } from "../../api/_error";
import { useAuth } from "../../lib/AuthProvider"; import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent"; import { useConsentDialog } from "../../lib/consent";
import { import {
useRoomJoinMeeting, useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording, useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks"; } from "../../lib/apiHooks";
import { omit } from "remeda"; import { omit } from "remeda";
import { import {
assertExists, assertExists,
assertExistsAndNonEmptyString,
NonEmptyString, NonEmptyString,
parseNonEmptyString, parseNonEmptyString,
} from "../../lib/utils"; } from "../../lib/utils";
import { import { assertMeetingId, DailyRecordingType } from "../../lib/types";
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook"; import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent"; const CONSENT_BUTTON_ID = "recording-consent";
@@ -54,6 +48,63 @@ const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000; const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5; const RECORDING_START_MAX_RETRIES = 5;
const FATAL_ERROR_MESSAGES: Partial<
Record<DailyFatalErrorType, { message: string; rejoinable?: boolean }>
> = {
"connection-error": {
message: "Connection lost. Please check your network.",
rejoinable: true,
},
"exp-room": { message: "The meeting time has ended." },
"exp-token": { message: "Your session has expired.", rejoinable: true },
ejected: { message: "You were removed from this meeting." },
"meeting-full": { message: "This meeting is full." },
"not-allowed": { message: "You are not allowed to join this meeting." },
"nbf-room": { message: "This meeting hasn't started yet." },
"nbf-token": { message: "This meeting hasn't started yet." },
"no-room": { message: "This room does not exist." },
"end-of-life": { message: "This meeting room is no longer available." },
};
function FatalErrorScreen({
error,
roomName,
}: {
error: FatalError;
roomName: string;
}) {
const router = useRouter();
const info =
error.type !== "unknown" ? FATAL_ERROR_MESSAGES[error.type] : undefined;
const message = info?.message ?? `Something went wrong: ${error.message}`;
const rejoinable = info?.rejoinable ?? false;
return (
<Center width="100vw" height="100vh">
<VStack gap={4}>
<Text color="red.500">{message}</Text>
{rejoinable ? (
<>
<Button onClick={() => window.location.reload()}>
Try Rejoining
</Button>
<Button
variant="outline"
onClick={() => router.push(`/${roomName}`)}
>
Leave
</Button>
</>
) : (
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
)}
</VStack>
</Center>
);
}
type Meeting = components["schemas"]["Meeting"]; type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"]; type Room = components["schemas"]["RoomDetails"];
@@ -91,6 +142,8 @@ const USE_FRAME_INIT_STATE = {
joined: false as boolean, joined: false as boolean,
} as const; } as const;
type FatalError = { type: DailyFatalErrorType | "unknown"; message: string };
// Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults // Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults
const useFrame = ( const useFrame = (
container: HTMLDivElement | null, container: HTMLDivElement | null,
@@ -98,6 +151,7 @@ const useFrame = (
onLeftMeeting: () => void; onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void; onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: () => void; onJoinMeeting: () => void;
onError: (ev: DailyEventObjectFatalError) => void;
}, },
) => { ) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE); const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -143,6 +197,7 @@ const useFrame = (
if (!frame) return; if (!frame) return;
frame.on("left-meeting", cbs.onLeftMeeting); frame.on("left-meeting", cbs.onLeftMeeting);
frame.on("custom-button-click", cbs.onCustomButtonClick); frame.on("custom-button-click", cbs.onCustomButtonClick);
frame.on("error", cbs.onError);
const joinCb = () => { const joinCb = () => {
if (!frame) { if (!frame) {
console.error("frame is null in joined-meeting callback"); console.error("frame is null in joined-meeting callback");
@@ -154,6 +209,7 @@ const useFrame = (
return () => { return () => {
frame.off("left-meeting", cbs.onLeftMeeting); frame.off("left-meeting", cbs.onLeftMeeting);
frame.off("custom-button-click", cbs.onCustomButtonClick); frame.off("custom-button-click", cbs.onCustomButtonClick);
frame.off("error", cbs.onError);
frame.off("joined-meeting", joinCb); frame.off("joined-meeting", joinCb);
}; };
}, [frame, cbs]); }, [frame, cbs]);
@@ -188,58 +244,6 @@ const useFrame = (
] as const; ] as const;
}; };
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) { export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter(); const router = useRouter();
const params = useParams(); const params = useParams();
@@ -247,10 +251,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId; const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null); const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting(); const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording(); const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null); const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const [fatalError, setFatalError] = useState<FatalError | null>(null);
// Generate deterministic instanceIds so all participants use SAME IDs // Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id); const cloudInstanceId = parseNonEmptyString(meeting.id);
@@ -258,9 +261,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0], useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
); );
if (typeof params.roomName === "object") const roomName = params?.roomName as string;
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const { const {
showConsentModal, showConsentModal,
@@ -299,10 +300,18 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const roomUrl = joinedMeeting?.room_url; const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => { const handleLeave = useCallback(() => {
// If a fatal error occurred, don't redirect — let the error UI show
if (fatalError) return;
router.push("/browse"); router.push("/browse");
}, [router]); }, [router, fatalError]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation); const handleError = useCallback((ev: DailyEventObjectFatalError) => {
const error: FatalError = {
type: ev.error?.type ?? "unknown",
message: ev.errorMsg,
};
setFatalError(error);
}, []);
const handleCustomButtonClick = useCallback( const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => { (ev: DailyEventObjectCustomButtonClick) => {
@@ -316,15 +325,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
); );
const handleFrameJoinMeeting = useCallback(() => { const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") { if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", { console.log("Starting dual recording via REST API", {
cloudInstanceId, cloudInstanceId,
@@ -384,10 +384,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId); startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
} }
}, [ }, [
joinedMutation,
roomName,
meeting.id,
meeting.recording_type, meeting.recording_type,
meeting.id,
startRecordingMutation, startRecordingMutation,
cloudInstanceId, cloudInstanceId,
rawTracksInstanceId, rawTracksInstanceId,
@@ -402,6 +400,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
onLeftMeeting: handleLeave, onLeftMeeting: handleLeave,
onCustomButtonClick: handleCustomButtonClick, onCustomButtonClick: handleCustomButtonClick,
onJoinMeeting: handleFrameJoinMeeting, onJoinMeeting: handleFrameJoinMeeting,
onError: handleError,
}); });
useEffect(() => { useEffect(() => {
@@ -458,13 +457,27 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
} }
if (joinMutation.isError) { if (joinMutation.isError) {
const apiDetail = printApiError(
joinMutation.error as /*ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3*/ ApiError,
);
return ( return (
<Center width="100vw" height="100vh"> <Center width="100vw" height="100vh">
<Text color="red.500">Failed to join meeting. Please try again.</Text> <VStack gap={4}>
<Text color="red.500">
{apiDetail ?? "Failed to join meeting. Please try again."}
</Text>
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
</VStack>
</Center> </Center>
); );
} }
if (fatalError) {
return <FatalErrorScreen error={fatalError} roomName={roomName} />;
}
if (!roomUrl) { if (!roomUrl) {
return null; return null;
} }

View File

@@ -1,15 +1,15 @@
"use client"; "use client";
import { $api, API_URL } from "./apiClient"; import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext"; import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query"; import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components, operations } from "../reflector-api"; import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider"; import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types"; import { MeetingId } from "./types";
import { NonEmptyString } from "./utils"; import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/* /*
* ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other * XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
* this is either a limitation or incorrect usage of Python json schema generator * this is either a limitation or incorrect usage of Python json schema generator
* or, limitation or incorrect usage of .d type generator from json schema * or, limitation or incorrect usage of .d type generator from json schema
@@ -808,44 +808,6 @@ export function useRoomJoinMeeting() {
); );
} }
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() { export function useRoomIcsSync() {
const { setError } = useError(); const { setError } = useError();

View File

@@ -171,48 +171,6 @@ export interface paths {
patch?: never; patch?: never;
trace?: never; trace?: never;
}; };
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": { "/v1/rooms/{room_id}/webhook/test": {
parameters: { parameters: {
query?: never; query?: never;
@@ -2477,72 +2435,6 @@ export interface operations {
}; };
}; };
}; };
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: { v1_rooms_test_webhook: {
parameters: { parameters: {
query?: never; query?: never;