feat: brady bunch (#816)

* brady bunch PRD/tasks

* clean dead daily.co code

* brady bunch prototype (no-mistakes)

* brady bunch prototype (no-mistakes) review

* self-review

* daily poll time match (no-mistakes)

* daily poll self-review (no-mistakes)

* daily poll self-review (no-mistakes)

* daily co doc

* cleanup

* cleanup

* self-review (no-mistakes)

* self-review (no-mistakes)

* self-review

* self-review

* ui typefix

* dupe calls error handling proper

* daily reflector data model doc

* logging style fix

* migration merge

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
2026-01-23 12:33:06 -05:00
committed by GitHub
parent 6e786b7631
commit 6c175a11d8
31 changed files with 1973 additions and 123 deletions

View File

@@ -3,7 +3,7 @@ Daily.co API Module
"""
# Client
from .client import DailyApiClient, DailyApiError
from .client import DailyApiClient, DailyApiError, RecordingType
# Request models
from .requests import (
@@ -64,6 +64,7 @@ __all__ = [
# Client
"DailyApiClient",
"DailyApiError",
"RecordingType",
# Requests
"CreateRoomRequest",
"RoomProperties",

View File

@@ -7,7 +7,8 @@ Reference: https://docs.daily.co/reference/rest-api
"""
from http import HTTPStatus
from typing import Any
from typing import Any, Literal
from uuid import UUID
import httpx
import structlog
@@ -32,6 +33,8 @@ from .responses import (
logger = structlog.get_logger(__name__)
RecordingType = Literal["cloud", "raw-tracks"]
class DailyApiError(Exception):
"""Daily.co API error with full request/response context."""
@@ -395,6 +398,38 @@ class DailyApiClient:
return [RecordingResponse(**r) for r in data["data"]]
async def start_recording(
self,
room_name: NonEmptyString,
recording_type: RecordingType,
instance_id: UUID,
) -> dict[str, Any]:
"""Start recording via REST API.
Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start
Args:
room_name: Daily.co room name
recording_type: Recording type
instance_id: UUID for this recording session
Returns:
Recording start confirmation from Daily.co API
Raises:
DailyApiError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/rooms/{room_name}/recordings/start",
headers=self.headers,
json={
"type": recording_type,
"instanceId": str(instance_id),
},
)
return await self._handle_response(response, "start_recording")
# ============================================================================
# MEETING TOKENS
# ============================================================================

View File

@@ -0,0 +1,37 @@
"""
Daily.co recording instanceId generation utilities.
Deterministic instance ID generation for cloud and raw-tracks recordings.
MUST match frontend logic
"""
from uuid import UUID, uuid5
from reflector.utils.string import NonEmptyString
# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds
# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching
RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for cloud recording.
Cloud recordings use meeting ID directly as instanceId.
This ensures each meeting has one unique cloud recording.
"""
return UUID(meeting_id)
def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID:
"""
Generate instanceId for raw-tracks recording.
Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure
different instanceId from cloud while remaining deterministic.
Daily.co requires cloud and raw-tracks to have different instanceIds
for concurrent recording.
"""
return uuid5(RAW_TRACKS_NAMESPACE, meeting_id)

View File

@@ -88,13 +88,6 @@ class MeetingTokenProperties(BaseModel):
is_owner: bool = Field(
default=False, description="Grant owner privileges to token holder"
)
start_cloud_recording: bool = Field(
default=False, description="Automatically start cloud recording on join"
)
start_cloud_recording_opts: dict | None = Field(
default=None,
description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)",
)
enable_recording_ui: bool = Field(
default=True, description="Show recording controls in UI"
)

View File

@@ -116,6 +116,7 @@ class RecordingS3Info(BaseModel):
bucket_name: NonEmptyString
bucket_region: NonEmptyString
key: NonEmptyString | None = None
endpoint: NonEmptyString | None = None
@@ -132,6 +133,9 @@ class RecordingResponse(BaseModel):
id: NonEmptyString = Field(description="Recording identifier")
room_name: NonEmptyString = Field(description="Room where recording occurred")
start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)")
type: Literal["cloud", "raw-tracks"] | None = Field(
None, description="Recording type (may be missing from API)"
)
status: RecordingStatus = Field(
description="Recording status ('in-progress' or 'finished')"
)
@@ -145,6 +149,9 @@ class RecordingResponse(BaseModel):
None, description="Token for sharing recording"
)
s3: RecordingS3Info | None = Field(None, description="S3 bucket information")
s3key: NonEmptyString | None = Field(
None, description="S3 key for cloud recordings (top-level field)"
)
tracks: list[DailyTrack] = Field(
default_factory=list,
description="Track list for raw-tracks recordings (always array, never null)",

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Literal
import sqlalchemy as sa
@@ -9,7 +9,7 @@ from reflector.db import get_database, metadata
from reflector.db.rooms import Room
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils import generate_uuid4
from reflector.utils.string import assert_equal
from reflector.utils.string import NonEmptyString, assert_equal
meetings = sa.Table(
"meeting",
@@ -63,6 +63,9 @@ meetings = sa.Table(
nullable=False,
server_default=assert_equal(WHEREBY_PLATFORM, "whereby"),
),
# Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby
sa.Column("daily_composed_video_s3_key", sa.String, nullable=True),
sa.Column("daily_composed_video_duration", sa.Integer, nullable=True),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
)
@@ -110,6 +113,9 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform = WHEREBY_PLATFORM
# Daily.co composed video (Brady Bunch grid) - Daily.co only
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class MeetingController:
@@ -171,6 +177,90 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_name_all(self, room_name: str) -> list[Meeting]:
"""Get all meetings for a room name (not just most recent)."""
query = meetings.select().where(meetings.c.room_name == room_name)
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -260,6 +350,44 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (

View File

@@ -7,6 +7,7 @@ from sqlalchemy import or_
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -71,6 +72,19 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from uuid import UUID
from reflector.dailyco_api import (
CreateMeetingTokenRequest,
@@ -12,9 +13,11 @@ from reflector.dailyco_api import (
RoomProperties,
verify_webhook_signature,
)
from reflector.dailyco_api import RecordingType as DailyRecordingType
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room
from reflector.logger import logger
from reflector.storage import get_dailyco_storage
@@ -58,10 +61,9 @@ class DailyClient(VideoPlatformClient):
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif (
room.recording_type == self.RECORDING_CLOUD
): # daily "cloud" is not our "cloud"
enable_recording = "raw-tracks"
elif room.recording_type == self.RECORDING_CLOUD:
# Don't set enable_recording - recordings started via REST API (not auto-start)
enable_recording = None
properties = RoomProperties(
enable_recording=enable_recording,
@@ -106,8 +108,6 @@ class DailyClient(VideoPlatformClient):
Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored.
"""
from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
return []
@@ -179,21 +179,14 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
max_recording_duration_seconds: int | None = None,
) -> NonEmptyString:
start_cloud_recording_opts = None
if start_cloud_recording and max_recording_duration_seconds:
start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds}
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=start_cloud_recording,
start_cloud_recording_opts=start_cloud_recording_opts,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
@@ -201,6 +194,23 @@ class DailyClient(VideoPlatformClient):
result = await self._api_client.create_meeting_token(request)
return result.token
async def start_recording(
self,
room_name: DailyRoomName,
recording_type: DailyRecordingType,
instance_id: UUID,
) -> dict:
"""Start recording via Daily.co REST API.
Args:
instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector)
"""
return await self._api_client.start_recording(
room_name=room_name,
recording_type=recording_type,
instance_id=instance_id,
)
async def close(self):
"""Clean up API client resources."""
await self._api_client.close()

View File

@@ -19,6 +19,7 @@ from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import (
poll_daily_room_presence_task,
process_multitrack_recording,
store_cloud_recording,
)
router = APIRouter()
@@ -174,46 +175,64 @@ async def _handle_recording_started(event: RecordingStartedEvent):
async def _handle_recording_ready(event: RecordingReadyEvent):
room_name = event.payload.room_name
recording_id = event.payload.recording_id
tracks = event.payload.tracks
if not tracks:
logger.warning(
"recording.ready-to-download: missing tracks",
room_name=room_name,
recording_id=recording_id,
payload=event.payload,
)
return
recording_type = event.payload.type
logger.info(
"Recording ready for download",
room_name=room_name,
recording_id=recording_id,
num_tracks=len(tracks),
recording_type=recording_type,
platform="daily",
)
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
logger.error(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording"
)
logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured")
return
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
if recording_type == "cloud":
await store_cloud_recording(
recording_id=recording_id,
room_name=room_name,
s3_key=event.payload.s3_key,
duration=event.payload.duration,
start_ts=event.payload.start_ts,
source="webhook",
)
logger.info(
"Recording webhook queuing processing",
recording_id=recording_id,
room_name=room_name,
)
elif recording_type == "raw-tracks":
tracks = event.payload.tracks
if not tracks:
logger.warning(
"raw-tracks recording: missing tracks array",
room_name=room_name,
recording_id=recording_id,
)
return
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
)
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
logger.info(
"Raw-tracks recording queuing processing",
recording_id=recording_id,
room_name=room_name,
num_tracks=len(track_keys),
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=room_name,
recording_id=recording_id,
track_keys=track_keys,
recording_start_ts=event.payload.start_ts,
)
else:
logger.warning(
"Unknown recording type",
recording_type=recording_type,
recording_id=recording_id,
)
async def _handle_recording_error(event: RecordingErrorEvent):

View File

@@ -1,16 +1,23 @@
import json
from datetime import datetime, timezone
from typing import Annotated, Optional
from typing import Annotated, Any, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
router = APIRouter()
@@ -73,3 +80,72 @@ async def meeting_deactivate(
await meetings_controller.update_meeting(meeting_id, is_active=False)
return {"status": "success", "meeting_id": meeting_id}
class StartRecordingRequest(BaseModel):
type: RecordingType
instanceId: UUID
@router.post("/meetings/{meeting_id}/recordings/start")
async def start_recording(
meeting_id: NonEmptyString, body: StartRecordingRequest
) -> dict[str, Any]:
"""Start cloud or raw-tracks recording via Daily.co REST API.
Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time.
Uses different instanceIds for cloud vs raw-tracks (same won't work)
Note: No authentication required - anonymous users supported. TODO this is a DOS vector
"""
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
log.info(f"Started {body.type} recording via REST API")
return {"status": "ok", "result": result}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
try:
error_body = json.loads(e.response_body)
error_info = error_body.get("info", "")
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
log.info(
f"{body.type} recording already active (started by another participant)"
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -73,6 +73,8 @@ class Meeting(BaseModel):
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
class CreateRoom(BaseModel):
@@ -586,7 +588,6 @@ async def rooms_join_meeting(
)
token = await client.create_meeting_token(
meeting.room_name,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=enable_recording_ui,
user_id=user_id,
is_owner=user_id == room.user_id,

View File

@@ -6,6 +6,11 @@ from celery.schedules import crontab
from reflector.settings import settings
logger = structlog.get_logger(__name__)
# Polling intervals (seconds)
# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery)
POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
@@ -44,7 +49,7 @@ else:
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
},
"trigger_daily_reconciliation": {
"task": "reflector.worker.process.trigger_daily_reconciliation",

View File

@@ -2,7 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List
from typing import List, Literal
from urllib.parse import unquote
import av
@@ -42,6 +42,7 @@ from reflector.utils.daily import (
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
parse_whereby_recording_filename,
@@ -175,13 +176,18 @@ async def process_multitrack_recording(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process raw-tracks (multitrack) recording from Daily.co.
"""
logger.info(
"Processing multitrack recording",
bucket=bucket_name,
room_name=daily_room_name,
recording_id=recording_id,
provided_keys=len(track_keys),
recording_start_ts=recording_start_ts,
)
if not track_keys:
@@ -212,7 +218,7 @@ async def process_multitrack_recording(
)
await _process_multitrack_recording_inner(
bucket_name, daily_room_name, recording_id, track_keys
bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts
)
@@ -221,8 +227,18 @@ async def _process_multitrack_recording_inner(
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""Inner function containing the actual processing logic."""
"""
Process multitrack recording (first time or reprocessing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
tz = timezone.utc
recorded_at = datetime.now(tz)
@@ -240,7 +256,53 @@ async def _process_multitrack_recording_inner(
exc_info=True,
)
meeting = await meetings_controller.get_by_room_name(daily_room_name)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -248,18 +310,8 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not meeting:
raise Exception(f"Meeting not found: {room_name_base}")
logger.info(
"Found existing Meeting for recording",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
@@ -271,7 +323,19 @@ async def _process_multitrack_recording_inner(
track_keys=track_keys,
)
)
# else: Recording already exists; metadata set at creation time
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
@@ -338,9 +402,11 @@ async def poll_daily_recordings():
"""Poll Daily.co API for recordings and process missing ones.
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
and queues processing for recordings not already in DB.
and stores/queues missing recordings:
- Cloud recordings: Store S3 key in meeting table
- Raw-tracks recordings: Queue multitrack processing
For each missing recording, uses audio tracks from API response.
Acts as fallback when webhooks active, primary discovery when webhooks unavailable.
Worker-level locking provides idempotency (see process_multitrack_recording).
"""
@@ -381,51 +447,222 @@ async def poll_daily_recordings():
)
return
recording_ids = [rec.id for rec in finished_recordings]
# Separate cloud and raw-tracks recordings
cloud_recordings = []
raw_tracks_recordings = []
for rec in finished_recordings:
if rec.type:
# Daily.co API returns null type - make sure this assumption stays
# If this logs, Daily.co API changed - we can remove inference logic.
recording_type = rec.type
logger.warning(
"Recording has explicit type field from Daily.co API (unexpected, API may have changed)",
recording_id=rec.id,
room_name=rec.room_name,
recording_type=recording_type,
has_s3key=bool(rec.s3key),
tracks_count=len(rec.tracks),
)
else:
# DAILY.CO API LIMITATION:
# GET /recordings response does NOT include type field.
# Daily.co docs mention type field exists, but API never returns it.
# Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None.
#
# This is not a recent API change - Daily.co has never returned type.
# Must infer from structural properties.
#
# Inference heuristic (reliable for finished recordings):
# - Has tracks array → raw-tracks
# - Has s3key but no tracks → cloud
# - Neither → failed/incomplete recording
if len(rec.tracks) > 0:
recording_type = "raw-tracks"
elif rec.s3key and len(rec.tracks) == 0:
recording_type = "cloud"
else:
logger.warning(
"Recording has no type, no s3key, and no tracks - likely failed recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
duration=rec.duration,
mtg_session_id=rec.mtgSessionId,
)
continue
if recording_type == "cloud":
cloud_recordings.append(rec)
else:
raw_tracks_recordings.append(rec)
logger.debug(
"Poll results",
total=len(finished_recordings),
cloud=len(cloud_recordings),
raw_tracks=len(raw_tracks_recordings),
)
# Process cloud recordings
await _poll_cloud_recordings(cloud_recordings)
# Process raw-tracks recordings
await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name)
async def store_cloud_recording(
recording_id: NonEmptyString,
room_name: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
start_ts: int,
source: Literal["webhook", "polling"],
) -> bool:
"""
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
room_name: Daily.co room name
s3_key: S3 key where recording is stored
duration: Recording duration in seconds
start_ts: Unix timestamp when recording started
source: "webhook" or "polling" (for logging)
Returns:
True if stored, False if skipped/failed
"""
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
room_name=room_name,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""
Store cloud recordings missing from meeting table via polling.
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
missing_recordings = [
rec for rec in finished_recordings if rec.id not in existing_ids
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All recordings already in DB",
api_count=len(finished_recordings),
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found recordings missing from DB",
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(finished_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
if not track_keys:
logger.warning(
"No audio tracks found in recording (only video tracks)",
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
@@ -433,7 +670,7 @@ async def poll_daily_recordings():
continue
logger.info(
"Queueing missing recording for processing",
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
@@ -444,6 +681,7 @@ async def poll_daily_recordings():
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
@@ -883,11 +1121,16 @@ async def reprocess_failed_daily_recordings():
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1