Compare commits

..

1 Commits

Author SHA1 Message Date
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
28 changed files with 928 additions and 2479 deletions

View File

@@ -4,5 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,414 +0,0 @@
# Presence System Race Condition: Design Document
## Executive Summary
Users in the same Reflector room can end up in **different Daily.co rooms** due to race conditions in meeting lifecycle management. This document details the root cause, why current mitigations are insufficient, and proposes a solution that eliminates the race by design.
---
## Problem Statement
When a user quickly leaves and rejoins a meeting (e.g., closes tab and reopens within seconds), they may find themselves in a different Daily.co room than other participants in the same Reflector room. This breaks the core assumption that all users in a Reflector room share the same video call.
### Symptoms
- User A and User B are in the same Reflector room but different Daily.co rooms
- User reports "I can't see/hear the other participant"
- Meeting appears active but users are isolated
---
## Evidence: Hypothesis Simulation
A simulation was built to model the presence system and find race conditions through randomized action sequences.
**Location**: `server/tests/simulation/`
```bash
cd server
# Current system config - finds race conditions
uv run pytest tests/simulation/test_presence_race.py::test_presence_race_conditions_current_system
# Result: XFAIL (expected failure - race found)
# Fixed system config - no race conditions
uv run pytest tests/simulation/test_presence_race.py::test_presence_no_race_conditions_fixed_system
# Result: PASS
```
The simulation models:
- Discrete time clock for deterministic replay
- Daily.co rooms, participants, presence API with configurable lag
- Reflector meetings, sessions, webhooks
- User state machine: `idle → joining → handshaking → connected → leaving → idle`
- Background tasks: `poll_daily_room_presence`, `process_meetings`
### Key Finding
The simulation proves that **even with the Daily API call**, a race window exists during WebRTC handshake when users are invisible to the presence API.
---
## Current System Analysis
### Architecture Overview
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Frontend │────▶│ Backend │────▶│ Daily.co │
│ (Next.js) │ │ (FastAPI) │ │ API │
└─────────────┘ └─────────────┘ └─────────────┘
┌─────────────┐
│ Database │
│ (Sessions) │
└─────────────┘
```
### Relevant Code Paths
#### 1. Meeting Join Flow
- **File**: `server/reflector/views/rooms.py`
- **Endpoint**: `POST /rooms/{room_name}/meeting`
- Returns existing active meeting or creates new one
- User then connects to Daily.co via WebRTC (frontend)
#### 2. Presence Polling
- **File**: `server/reflector/worker/process.py:642`
- **Function**: `poll_daily_room_presence()`
- Called by webhooks (`participant.joined`, `participant.left`) and `/joined`, `/leave` endpoints
- Queries Daily API for current participants
- Updates `daily_participant_sessions` table in database
#### 3. Meeting Deactivation
- **File**: `server/reflector/worker/process.py:754`
- **Function**: `process_meetings()`
- Runs periodically (every 60s via Celery beat)
- Checks if meetings should be deactivated
**Current implementation** (lines 806-833):
```python
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
# ...
except Exception:
logger_.warning("Daily.co presence API failed, falling back to DB sessions")
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
```
**Key observation**: The code already uses the Daily API (`get_room_presence`), not just the database. The race condition persists despite this.
### Endpoints from feature-leave-endpoint Branch
The `feature-leave-endpoint` branch added explicit leave/join notifications:
| Endpoint | Purpose | Trigger |
|----------|---------|---------|
| `POST /rooms/{room_name}/meetings/{meeting_id}/join` | Get meeting info | User navigates to room |
| `POST /rooms/{room_name}/meetings/{meeting_id}/joined` | Signal connection complete | After WebRTC connects |
| `POST /rooms/{room_name}/meetings/{meeting_id}/leave` | Signal user leaving | Tab close via sendBeacon |
These endpoints trigger `poll_daily_room_presence_task` to update session state faster than waiting for webhooks.
---
## Race Condition: Detailed Analysis
### The Fundamental Problem
**The backend has no knowledge of users who are in the process of joining (WebRTC handshake phase).**
Data sources available to backend:
| Source | What it knows | Limitation |
|--------|---------------|------------|
| Daily Presence API | Currently connected users | 0-500ms lag; doesn't see handshaking users |
| Database sessions | Historical join/leave events | Stale; updated by polls |
| Webhooks | Join/leave events | Delayed; can fail |
**Gap**: No source knows about users between "decided to join" and "WebRTC handshake complete".
### Race Scenario Timeline
```
T+0ms: User A connected to Meeting M1, visible in Daily presence
T+1000ms: User A closes browser tab
T+1050ms: participant.left webhook fires → poll_daily_room_presence queued
T+1500ms: User A reopens tab (quick rejoin)
T+1600ms: POST /meeting returns M1 (still active)
T+1700ms: Frontend starts WebRTC handshake
T+2000ms: User A in handshake - NOT visible to Daily presence API
T+2100ms: poll runs → sees 0 participants → marks session as left_at
T+3000ms: process_meetings runs
T+3100ms: Daily API returns 0 participants (user still handshaking)
T+3200ms: has_active_sessions=False, has_had_sessions=True
T+3300ms: Meeting deactivated, Daily room deleted
T+4000ms: User A WebRTC completes → Daily room is gone!
T+5000ms: User B joins same Reflector room → new Meeting M2 created
RESULT: User A orphaned, User B in different Daily room
```
### Why Current Mitigations Are Insufficient
#### 1. Using Daily API (already implemented)
The code already calls `get_room_presence()` instead of relying solely on database sessions. **This doesn't help** because the Daily presence API itself doesn't see users during WebRTC handshake (0-500ms consistency lag + handshake duration of 500-3000ms).
#### 2. Fallback to Database
When Daily API fails, the code falls back to database sessions. This is **worse** because database is even more stale than the API.
#### 3. Leave/Join Endpoints
The `/joined` and `/leave` endpoints trigger immediate polls, reducing the window but **not eliminating it**. The poll still only sees what Daily presence API reports.
---
## Proposed Solutions
### Option A: Grace Period (Not Recommended)
Add a time-based buffer before deactivation.
```python
GRACE_PERIOD_SECONDS = 10
if not has_active_sessions and has_had_sessions:
recent_activity = await get_recent_activity(meeting_id, within_seconds=GRACE_PERIOD_SECONDS)
if recent_activity:
continue # Skip deactivation
```
**Pros:**
- Simple to implement
- Low risk
**Cons:**
- Arbitrary timeout value (why 10s? why not 5s or 30s?)
- Feels like a hack ("setTimeout solution")
- Delays legitimate deactivation
- Doesn't eliminate race, just makes it less likely
### Option B: Track "Intent to Join" (Recommended)
Add explicit state tracking for users who are in the process of joining.
**New endpoint**: `POST /rooms/{room_name}/meetings/{meeting_id}/joining`
**Flow change**:
```
Current:
1. POST /join → get meeting info
2. Render Daily iframe (start WebRTC)
3. POST /joined (after connected)
Proposed:
1. POST /join → get meeting info
2. POST /joining → "I'm about to connect" ← NEW (wait for 200 OK)
3. Render Daily iframe (start WebRTC)
4. POST /joined (after connected)
```
**Backend tracking**:
```python
# On /joining endpoint
await pending_joins.create(meeting_id=meeting_id, user_id=user_id, created_at=now())
# In process_meetings
pending = await pending_joins.get_recent(meeting_id, max_age_seconds=30)
if pending:
logger.info("Meeting has pending joins, skipping deactivation")
continue
# On /joined endpoint or timeout
await pending_joins.delete(meeting_id=meeting_id, user_id=user_id)
```
**Pros:**
- Eliminates race by design (backend knows before Daily does)
- Explicit state machine, not time-based guessing
- Clear semantics
**Cons:**
- Adds ~50-200ms latency (one round-trip before iframe renders)
- Requires frontend changes
- Needs cleanup mechanism for abandoned joins (user closes tab during handshake)
### Option C: Optimistic Locking with Version
Track meeting "version" that must match for deactivation.
**Concept**: Each join attempt increments a version. Deactivation only proceeds if version hasn't changed since presence check.
**Cons:**
- Complex to implement correctly
- Still has edge cases with concurrent joins
---
## Recommended Approach: Option B
**Track "Intent to Join"** is the cleanest solution because it:
1. **Eliminates the race by design** - no timing windows
2. **Makes state explicit** - joining/connected/leaving are tracked, not inferred
3. **Aligns with existing patterns** - similar to `/joined` and `/leave` endpoints
4. **No arbitrary timeouts** - unlike grace period
### Data Model Change
Add tracking for pending joins. Options:
| Storage | Pros | Cons |
|---------|------|------|
| Redis key | Fast, auto-expire | Lost on Redis restart |
| Database table | Persistent, queryable | Slightly slower |
| In-memory | Fastest | Lost on server restart |
**Recommendation**: Redis with TTL (30s expiry) for simplicity. Pending joins are ephemeral - if Redis restarts, worst case is a brief deactivation delay.
```python
# Redis key format
pending_join:{meeting_id}:{user_id} = {timestamp}
# TTL: 30 seconds
```
### Implementation Checklist
1. **Backend: Add `/joining` endpoint**
- File: `server/reflector/views/rooms.py`
- Creates Redis key with 30s TTL
- Returns 200 OK
2. **Backend: Modify `process_meetings()`**
- File: `server/reflector/worker/process.py`
- Before deactivation, check for pending joins
- If any exist, skip deactivation
3. **Backend: Modify `/joined` endpoint**
- Clear pending join on successful connection
4. **Frontend: Call `/joining` before WebRTC**
- File: `www/app/[roomName]/components/DailyRoom.tsx`
- Await response before rendering Daily iframe
5. **Update simulation**
- Add `joining` state tracking to match new design
- Verify race condition is eliminated
6. **Integration tests**
- Test quick rejoin scenario
- Test abandoned join (user closes during handshake)
- Test concurrent joins from multiple users
---
## Files Reference
### Core Files to Modify
| File | Purpose |
|------|---------|
| `server/reflector/views/rooms.py` | Add `/joining` endpoint |
| `server/reflector/worker/process.py` | Check pending joins before deactivation |
| `www/app/[roomName]/components/DailyRoom.tsx` | Call `/joining` before WebRTC |
### Reference Files
| File | Contains |
|------|----------|
| `server/reflector/video_platforms/daily.py:128` | `get_room_presence()` - Daily API call |
| `server/reflector/worker/process.py:642` | `poll_daily_room_presence()` - presence polling |
| `server/reflector/views/daily.py:125` | Webhook handlers |
| `server/tests/simulation/` | Hypothesis simulation proving the race |
| `server/tests/test_daily_presence_deactivation.py` | Unit tests for presence logic |
### Simulation Files
| File | Purpose |
|------|---------|
| `tests/simulation/system.py` | Main simulation engine |
| `tests/simulation/config.py` | Current vs fixed system configs |
| `tests/simulation/state.py` | State dataclasses |
| `tests/simulation/test_presence_race.py` | Hypothesis stateful tests |
| `tests/simulation/test_targeted_scenarios.py` | Specific race scenarios |
| `server/reflector/presence/model.py` | Shared state machine model |
---
## Alternative Considered: Remove DB Fallback
One simpler change discussed: remove the database fallback when Daily API fails, and "fail loudly" instead.
```python
# Current (with fallback)
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
except Exception:
# Fallback to stale DB
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(room_sessions and any(s.ended_at is None for s in room_sessions))
# Proposed (fail loudly)
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
except Exception:
logger.error("Daily API failed, skipping deactivation check for this meeting")
continue # Don't deactivate if we can't verify
```
**This helps but doesn't eliminate the race** - it only removes one failure mode (stale DB). The core race (handshake invisibility) remains.
---
## Conclusion
The presence system race condition is a **data model gap**, not a timing issue that can be solved with grace periods. The backend needs explicit knowledge of users who intend to join, before they become visible to the Daily presence API.
The recommended fix is to add a `/joining` endpoint that the frontend calls before starting WebRTC. This creates a "reservation" that prevents premature meeting deactivation during the handshake window.
This approach:
- Eliminates the race by design
- Adds minimal latency (~50-200ms)
- Follows explicit state machine principles
- Avoids arbitrary timeout hacks
---
## Appendix: Simulation Test Results
```
$ uv run pytest tests/simulation/ -v
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_uses_model_states PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_respects_transitions PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_invalid_transitions_checked PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_implements_protocols PASSED
tests/simulation/test_model_conformance.py::TestModelConformance::test_simulation_uses_shared_invariants PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_has_all_states PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_valid_transitions PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_state_machine_invalid_transitions_raise PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_transitions PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_rejects_invalid PASSED
tests/simulation/test_model_conformance.py::TestProductionStateMachine::test_guarded_user_state_tracks_history PASSED
tests/simulation/test_model_conformance.py::TestInvariantConsistency::test_invariants_same_between_model_and_simulation PASSED
tests/simulation/test_model_conformance.py::test_quick_conformance_check PASSED
tests/simulation/test_presence_race.py::TestPresenceRaceFixed::runTest PASSED
tests/simulation/test_presence_race.py::test_presence_race_conditions_current_system XFAIL
tests/simulation/test_presence_race.py::test_presence_no_race_conditions_fixed_system PASSED
tests/simulation/test_presence_race.py::test_smoke_presence_simulation PASSED
tests/simulation/test_targeted_scenarios.py::TestQuickRejoinRace::test_quick_rejoin_causes_split PASSED
tests/simulation/test_targeted_scenarios.py::TestQuickRejoinRace::test_quick_rejoin_fixed_system PASSED
tests/simulation/test_targeted_scenarios.py::TestSimultaneousJoins::test_two_users_join_simultaneously PASSED
tests/simulation/test_targeted_scenarios.py::TestProcessMeetingsRace::test_process_meetings_during_handshake PASSED
tests/simulation/test_targeted_scenarios.py::TestPresenceLagRace::test_presence_lag_causes_incorrect_count PASSED
tests/simulation/test_targeted_scenarios.py::TestMeetingDeactivationEdgeCases::test_deactivation_with_no_sessions PASSED
tests/simulation/test_targeted_scenarios.py::TestMeetingDeactivationEdgeCases::test_deactivation_requires_had_sessions PASSED
tests/simulation/test_targeted_scenarios.py::TestEventLogTracing::test_event_log_captures_flow PASSED
tests/simulation/test_targeted_scenarios.py::test_config_presets PASSED
================== 25 passed, 1 xfailed ==================
```
The `xfail` test (`test_presence_race_conditions_current_system`) demonstrates that the current system configuration has race conditions that can be found through randomized testing.

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
@@ -101,30 +101,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
---
## Entity Relationships
@@ -220,19 +196,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
@@ -257,24 +220,16 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response (mtgSessionId can be null OR present):**
**Example API response:**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null // ← Often null (unreliable)
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
"mtgSessionId": null Missing!
}
```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -536,10 +491,6 @@ UI: User sees 3 separate transcripts
---
**Document Version:** 1.1
**Last Updated:** 2026-01-20
**Data Source:** Production database + Daily.co API inspection + empirical testing
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior
**Document Version:** 1.0
**Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -1,67 +0,0 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -1,56 +0,0 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,7 +26,6 @@ def get_database() -> databases.Database:
# import models
import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa

View File

@@ -1,111 +0,0 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Literal
import sqlalchemy as sa
@@ -183,6 +183,84 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -272,6 +350,44 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (
@@ -351,27 +467,6 @@ class MeetingConsentController:
result = await get_database().fetch_one(query)
return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -31,13 +31,14 @@ recordings = sa.Table(
class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4)
bucket_name: str
# for single-track
object_key: str
recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed", "orphan"] = (
"pending"
)
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
@property
@@ -71,6 +72,20 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:
return []
@@ -89,12 +104,9 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python.
"""
# INLINE IMPORT REQUIRED: Circular dependency
# - recordings.py needs transcripts table for JOIN query
# - transcripts.py imports recordings_controller
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
from reflector.db.transcripts import (
transcripts, # noqa: PLC0415 cyclic import
)
query = (
recordings.select()
@@ -112,27 +124,5 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController()

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -129,10 +129,6 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants(
self, meeting_id: str
) -> MeetingParticipantsResponse:

View File

@@ -1,6 +1,4 @@
import json
import os
from datetime import datetime, timezone
from typing import assert_never
from fastapi import APIRouter, HTTPException, Request
@@ -14,10 +12,7 @@ from reflector.dailyco_api import (
RecordingReadyEvent,
RecordingStartedEvent,
)
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client
@@ -217,73 +212,10 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info(
"Raw-tracks recording queuing processing (webhook)",
"Raw-tracks recording queuing processing",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys),
)

View File

@@ -1,5 +1,4 @@
import json
import logging
from datetime import datetime, timezone
from typing import Annotated, Any, Optional
from uuid import UUID
@@ -10,21 +9,16 @@ from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -108,6 +102,13 @@ async def start_recording(
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
@@ -116,30 +117,9 @@ async def start_recording(
instance_id=body.instanceId,
)
recording_id = result["id"]
log.info(f"Started {body.type} recording via REST API")
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
return {"status": "ok", "result": result}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
@@ -150,42 +130,22 @@ async def start_recording(
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
logger.info(
f"{body.type} recording already active (started by another participant)",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
log.info(
f"{body.type} recording already active (started by another participant)"
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -20,7 +20,6 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
@@ -366,53 +365,6 @@ async def rooms_create_meeting(
return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,

View File

@@ -1,5 +1,6 @@
import json
import os
import re
from datetime import datetime, timezone
from typing import List, Literal
from urllib.parse import unquote
@@ -12,12 +13,10 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
@@ -28,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -231,44 +227,79 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int,
):
"""
Process multitrack recording.
Process multitrack recording (first time or reprocessing).
Recording must already exist with meeting_id set (created by webhook/polling before queueing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
# Get recording (must exist - created by webhook/polling)
tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
logger.error(
"Recording not found - should have been created by webhook/polling",
recording_id=recording_id,
)
return
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
if not recording.meeting_id:
logger.error(
"Recording has no meeting_id - orphan should not be queued",
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
return
# Get meeting
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Meeting not found for recording",
meeting_id=recording.meeting_id,
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -276,6 +307,33 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
transcript = await transcripts_controller.add(
@@ -290,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@@ -461,7 +499,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses direct recording_id lookup via daily_recording_requests table.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
@@ -474,170 +512,155 @@ async def store_cloud_recording(
Returns:
True if stored, False if skipped/failed
"""
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
if not match:
# ORPHAN: No request found (pre-migration recording or failed request creation)
await create_and_log_orphan(
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
bucket_name="",
room_name=room_name,
start_ts=start_ts,
track_keys=None,
source=source,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (stop/restart?)",
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source}",
meeting_id=meeting_id,
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""Process cloud recordings (database deduplication, worker-agnostic).
"""
Store cloud recordings missing from meeting table via polling.
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table.
Only first cloud recording per meeting is kept (existing behavior).
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
for rec in cloud_recordings:
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
meeting_id, _ = match
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording already exists for meeting (stop/restart?)",
recording_id=rec.id,
meeting_id=meeting_id,
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: NonEmptyString,
) -> None:
"""Process raw-tracks (database deduplication, worker-agnostic)."""
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
for rec in raw_tracks_recordings:
# Lookup request FIRST (before any DB writes)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
missing_recordings = [
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
meeting_id, _ = match
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id
)
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
)
continue
# DEDUPLICATION: Atomically create recording (single operation, no race window)
# ON CONFLICT → concurrent poller already got it, skip entire logic
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"]
created = await recordings_controller.try_create_with_meeting(
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
logger.info(
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
)
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -799,47 +822,15 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
has_active_sessions = False
has_had_sessions = False
room_sessions = await client.get_room_sessions(meeting.room_name)
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions:
logger_.debug("Meeting still has active sessions, keep it")
@@ -858,20 +849,7 @@ async def process_meetings():
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
logger_.info("Meeting is deactivated")
processed_count += 1
@@ -1071,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
reprocessed_count += 1

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield

View File

@@ -1,286 +0,0 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -1,258 +0,0 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -1,300 +0,0 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -0,0 +1,374 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline
# Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called()
mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called()

View File

@@ -24,24 +24,15 @@ import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import {
assertExists,
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import {
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
@@ -188,58 +179,6 @@ const useFrame = (
] as const;
};
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
@@ -247,8 +186,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
@@ -258,9 +195,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
if (typeof params.roomName === "object")
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const roomName = params?.roomName as string;
const {
showConsentModal,
@@ -302,8 +237,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
router.push("/browse");
}, [router]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) {
@@ -316,15 +249,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
);
const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
@@ -384,10 +308,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
joinedMutation,
roomName,
meeting.id,
meeting.recording_type,
meeting.id,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,

View File

@@ -1,13 +1,12 @@
"use client";
import { $api, API_URL } from "./apiClient";
import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components, operations } from "../reflector-api";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -808,44 +807,6 @@ export function useRoomJoinMeeting() {
);
}
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() {
const { setError } = useError();

View File

@@ -171,48 +171,6 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": {
parameters: {
query?: never;
@@ -2477,72 +2435,6 @@ export interface operations {
};
};
};
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: {
parameters: {
query?: never;