Compare commits

..

3 Commits

Author SHA1 Message Date
Igor Loskutov
f345f1f122 Merge branch 'main' into brady-bunch-sync 2026-02-03 17:15:46 -05:00
Igor Loskutov
15aa121727 daily-matching 2026-01-30 17:47:47 -05:00
Igor Loskutov
08f4294b36 more daily co api docs 2026-01-30 17:46:38 -05:00
32 changed files with 1478 additions and 2180 deletions

View File

@@ -4,4 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594
server/tests/test_recording_request_flow.py:generic-api-key:121

View File

@@ -1,26 +1,5 @@
# Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)

View File

@@ -4,31 +4,27 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \
UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
RUN apt-get update \
&& apt-get install -y \
ffmpeg \
curl \
ca-certificates \
gnupg \
wget
wget \
&& apt-get clean
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
RUN dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \
libcublas-12-6 \
libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12
libcudnn9-dev-cuda-12 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH"
@@ -43,13 +39,6 @@ COPY ./app /app/app
COPY ./main.py /app/
COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000
CMD ["sh", "/app/runserver.sh"]

View File

@@ -1,2 +0,0 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
@@ -101,6 +101,30 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
---
## Entity Relationships
@@ -196,6 +220,19 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
@@ -220,16 +257,24 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response:**
**Example API response (mtgSessionId can be null OR present):**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null Missing!
"mtgSessionId": null // ← Often null (unreliable)
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
}
```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -491,6 +536,10 @@ UI: User sees 3 separate transcripts
---
**Document Version:** 1.0
**Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection
**Document Version:** 1.1
**Last Updated:** 2026-01-20
**Data Source:** Production database + Daily.co API inspection + empirical testing
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior

View File

@@ -1,35 +0,0 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -0,0 +1,67 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -0,0 +1,56 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,6 +26,7 @@ def get_database() -> databases.Database:
# import models
import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa

View File

@@ -0,0 +1,111 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime
from typing import Any, Literal
import sqlalchemy as sa
@@ -183,84 +183,6 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -350,44 +272,6 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (
@@ -467,6 +351,27 @@ class MeetingConsentController:
result = await get_database().fetch_one(query)
return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -31,14 +31,13 @@ recordings = sa.Table(
class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4)
bucket_name: str
# for single-track
object_key: str
recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed"] = "pending"
status: Literal["pending", "processing", "completed", "failed", "orphan"] = (
"pending"
)
meeting_id: str | None = None
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio
track_keys: list[str] | None = None
@property
@@ -72,20 +71,6 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:
return []
@@ -104,9 +89,12 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python.
"""
from reflector.db.transcripts import (
transcripts, # noqa: PLC0415 cyclic import
)
# INLINE IMPORT REQUIRED: Circular dependency
# - recordings.py needs transcripts table for JOIN query
# - transcripts.py imports recordings_controller
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
query = (
recordings.select()
@@ -124,5 +112,27 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController()

View File

@@ -57,6 +57,12 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -91,6 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False

View File

@@ -1,144 +0,0 @@
"""
Hatchet DAG Status -> Zulip Live Updates.
Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
All functions are fire-and-forget (catch + warning log on failure).
Note: Uses deferred imports throughout for fork-safety,
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
"""
from reflector.logger import logger
from reflector.settings import settings
def _dag_zulip_enabled() -> bool:
return bool(
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
)
async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import send_message_to_zulip # noqa: PLC0415
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
response = await send_message_to_zulip(
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
)
message_id = response.get("id")
if message_id:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def update_dag_zulip_message(
transcript_id: str,
workflow_run_id: str,
error_message: str | None = None,
) -> None:
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).
Args:
error_message: If set, appended as an error banner to the rendered DAG.
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
fresh_db_connection,
)
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import update_zulip_message # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
if error_message:
content += f"\n\n:cross_mark: **{error_message}**"
await update_zulip_message(
transcript.zulip_message_id,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to update DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def delete_dag_zulip_message(transcript_id: str) -> None:
"""Delete DAG Zulip message and clear zulip_message_id.
Called from post_zulip task (already inside fresh_db_connection).
Swallows InvalidMessageError (message already deleted).
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import ( # noqa: PLC0415
InvalidMessageError,
delete_zulip_message,
)
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
try:
await delete_zulip_message(transcript.zulip_message_id)
except InvalidMessageError:
logger.warning(
"[DAG Zulip] Message already deleted",
transcript_id=transcript_id,
zulip_message_id=transcript.zulip_message_id,
)
await transcripts_controller.update(transcript, {"zulip_message_id": None})
except Exception:
logger.warning(
"[DAG Zulip] Failed to delete DAG message",
transcript_id=transcript_id,
exc_info=True,
)

View File

@@ -45,7 +45,6 @@ from reflector.hatchet.constants import (
TIMEOUT_SHORT,
TaskName,
)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult,
@@ -239,14 +238,7 @@ def with_error_handling(
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context) -> R:
try:
result = await func(input, ctx)
try:
await update_dag_zulip_message(
input.transcript_id, ctx.workflow_run_id
)
except Exception:
pass
return result
return await func(input, ctx)
except Exception as e:
logger.error(
f"[Hatchet] {step_name} failed",
@@ -254,14 +246,6 @@ def with_error_handling(
error=str(e),
exc_info=True,
)
try:
await update_dag_zulip_message(
input.transcript_id,
ctx.workflow_run_id,
error_message=f"{step_name} failed: {e}",
)
except Exception:
pass
if set_error_status:
await set_workflow_error_status(input.transcript_id)
raise
@@ -1310,11 +1294,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
delete_dag_zulip_message,
)
await delete_dag_zulip_message(input.transcript_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:

View File

@@ -15,11 +15,14 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -178,108 +181,124 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(config.transcript_id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=config.transcript_id,
workflow_id=workflow_id,
exc_info=True,
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,15 +155,12 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
ZULIP_HOST_HEADER: str | None = None
# Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None

View File

@@ -1,412 +0,0 @@
"""
Render Hatchet workflow runs as text DAG.
Usage:
# Show latest 5 runs (summary table)
uv run -m reflector.tools.render_hatchet_run
# Show specific run with full DAG + task details
uv run -m reflector.tools.render_hatchet_run <workflow_run_id>
# Drill into Nth run from the list (1-indexed)
uv run -m reflector.tools.render_hatchet_run --show 1
# Show latest N runs
uv run -m reflector.tools.render_hatchet_run --last 10
# Filter by status
uv run -m reflector.tools.render_hatchet_run --status FAILED
uv run -m reflector.tools.render_hatchet_run --status RUNNING
"""
import argparse
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from hatchet_sdk.clients.rest.models import (
V1TaskEvent,
V1TaskStatus,
V1TaskSummary,
V1WorkflowRunDetails,
WorkflowRunShapeItemForWorkflowRunDetails,
)
from reflector.hatchet.client import HatchetClientManager
STATUS_ICON = {
V1TaskStatus.COMPLETED: "\u2705",
V1TaskStatus.RUNNING: "\u23f3",
V1TaskStatus.FAILED: "\u274c",
V1TaskStatus.QUEUED: "\u23f8\ufe0f",
V1TaskStatus.CANCELLED: "\u26a0\ufe0f",
}
STATUS_LABEL = {
V1TaskStatus.COMPLETED: "Complete",
V1TaskStatus.RUNNING: "Running",
V1TaskStatus.FAILED: "FAILED",
V1TaskStatus.QUEUED: "Queued",
V1TaskStatus.CANCELLED: "Cancelled",
}
def _fmt_time(dt: datetime | None) -> str:
if dt is None:
return "-"
return dt.strftime("%H:%M:%S")
def _fmt_duration(ms: int | None) -> str:
if ms is None:
return "-"
secs = ms / 1000
if secs < 60:
return f"{secs:.1f}s"
mins = secs / 60
return f"{mins:.1f}m"
def _fmt_status_line(task: V1TaskSummary) -> str:
"""Format a status line like: Complete (finished 20:31:44)"""
label = STATUS_LABEL.get(task.status, task.status.value)
icon = STATUS_ICON.get(task.status, "?")
if task.status == V1TaskStatus.COMPLETED and task.finished_at:
return f"{icon} {label} (finished {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.RUNNING and task.started_at:
parts = [f"started {_fmt_time(task.started_at)}"]
if task.duration:
parts.append(f"{_fmt_duration(task.duration)} elapsed")
return f"{icon} {label} ({', '.join(parts)})"
elif task.status == V1TaskStatus.FAILED and task.finished_at:
return f"{icon} {label} (failed {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.CANCELLED:
return f"{icon} {label}"
elif task.status == V1TaskStatus.QUEUED:
return f"{icon} {label}"
return f"{icon} {label}"
def _topo_sort(
shape: list[WorkflowRunShapeItemForWorkflowRunDetails],
) -> list[str]:
"""Topological sort of step_ids from shape DAG."""
step_ids = {s.step_id for s in shape}
children_map: dict[str, list[str]] = {}
in_degree: dict[str, int] = {sid: 0 for sid in step_ids}
for s in shape:
children = [c for c in (s.children_step_ids or []) if c in step_ids]
children_map[s.step_id] = children
for c in children:
in_degree[c] += 1
queue = sorted(sid for sid, deg in in_degree.items() if deg == 0)
result: list[str] = []
while queue:
node = queue.pop(0)
result.append(node)
for c in children_map.get(node, []):
in_degree[c] -= 1
if in_degree[c] == 0:
queue.append(c)
queue.sort()
return result
def render_run_detail(details: V1WorkflowRunDetails) -> str:
"""Render a single workflow run as markdown DAG with task details."""
shape = details.shape or []
tasks = details.tasks or []
events = details.task_events or []
run = details.run
if not shape:
return f"Run {run.metadata.id}: {run.status.value} (no shape data)"
# Build lookups
step_to_shape: dict[str, WorkflowRunShapeItemForWorkflowRunDetails] = {
s.step_id: s for s in shape
}
step_to_name: dict[str, str] = {s.step_id: s.task_name for s in shape}
# Reverse edges (parents)
parents: dict[str, list[str]] = {s.step_id: [] for s in shape}
for s in shape:
for child_id in s.children_step_ids or []:
if child_id in parents:
parents[child_id].append(s.step_id)
# Join tasks by step_id
task_by_step: dict[str, V1TaskSummary] = {}
for t in tasks:
if t.step_id and t.step_id in step_to_name:
task_by_step[t.step_id] = t
# Events indexed by task_external_id
events_by_task: dict[str, list[V1TaskEvent]] = defaultdict(list)
for ev in events:
events_by_task[ev.task_id].append(ev)
ordered = _topo_sort(shape)
lines: list[str] = []
# Run header
run_icon = STATUS_ICON.get(run.status, "?")
run_name = run.display_name or run.workflow_id
dur = _fmt_duration(run.duration)
lines.append(f"**{run_name}** {run_icon} {dur}")
lines.append(f"ID: `{run.metadata.id}`")
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
lines.append(f"Meta: {', '.join(meta_parts)}")
if run.error_message:
# Take first line of error only for header
first_line = run.error_message.split("\n")[0]
lines.append(f"Error: {first_line}")
lines.append("")
# DAG Status Overview table
lines.append("**DAG Status Overview**")
lines.append("")
lines.append("| Node | Status | Duration | Dependencies |")
lines.append("|------|--------|----------|--------------|")
for step_id in ordered:
s = step_to_shape[step_id]
t = task_by_step.get(step_id)
name = step_to_name[step_id]
icon = STATUS_ICON.get(t.status, "?") if t else "?"
dur = _fmt_duration(t.duration) if t else "-"
parent_names = [step_to_name[p] for p in parents[step_id]]
child_names = [
step_to_name[c] for c in (s.children_step_ids or []) if c in step_to_name
]
deps_left = ", ".join(parent_names) if parent_names else ""
deps_right = ", ".join(child_names) if child_names else ""
if deps_left and deps_right:
deps = f"{deps_left} \u2192 {deps_right}"
elif deps_right:
deps = f"\u2192 {deps_right}"
elif deps_left:
deps = f"{deps_left} \u2192"
else:
deps = "-"
lines.append(f"| {name} | {icon} | {dur} | {deps} |")
lines.append("")
lines.append("---")
lines.append("")
# Node details
for step_id in ordered:
t = task_by_step.get(step_id)
name = step_to_name[step_id]
if not t:
lines.append(f"**\U0001f4e6 {name}**")
lines.append("Status: no task data")
lines.append("")
continue
lines.append(f"**\U0001f4e6 {name}**")
lines.append(f"Status: {_fmt_status_line(t)}")
if t.duration:
lines.append(f"Duration: {_fmt_duration(t.duration)}")
if t.retry_count and t.retry_count > 0:
lines.append(f"Retries: {t.retry_count}")
# Fan-out children
if t.num_spawned_children and t.num_spawned_children > 0:
children = t.children or []
completed = sum(1 for c in children if c.status == V1TaskStatus.COMPLETED)
failed = sum(1 for c in children if c.status == V1TaskStatus.FAILED)
running = sum(1 for c in children if c.status == V1TaskStatus.RUNNING)
lines.append(
f"Spawned children: {completed}/{t.num_spawned_children} done"
f"{f', {running} running' if running else ''}"
f"{f', {failed} failed' if failed else ''}"
)
# Error message (first meaningful line only, full trace in events)
if t.error_message:
err_lines = t.error_message.strip().split("\n")
# Find first non-empty, non-traceback line
err_summary = err_lines[0]
for line in err_lines:
stripped = line.strip()
if stripped and not stripped.startswith(
("Traceback", "File ", "{", ")")
):
err_summary = stripped
break
lines.append(f"Error: `{err_summary}`")
# Events log
task_events = sorted(
events_by_task.get(t.task_external_id, []),
key=lambda e: e.timestamp,
)
if task_events:
lines.append("Events:")
for ev in task_events:
ts = ev.timestamp.strftime("%H:%M:%S")
ev_icon = ""
if ev.event_type.value == "FINISHED":
ev_icon = "\u2705 "
elif ev.event_type.value in ("FAILED", "TIMED_OUT"):
ev_icon = "\u274c "
elif ev.event_type.value == "STARTED":
ev_icon = "\u25b6\ufe0f "
elif ev.event_type.value == "RETRYING":
ev_icon = "\U0001f504 "
elif ev.event_type.value == "CANCELLED":
ev_icon = "\u26a0\ufe0f "
msg = ev.message.strip()
if ev.error_message:
# Just first line of error in event log
err_first = ev.error_message.strip().split("\n")[0]
if msg:
msg += f" | {err_first}"
else:
msg = err_first
if msg:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}: {msg}")
else:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}")
lines.append("")
return "\n".join(lines)
def render_run_summary(idx: int, run: V1TaskSummary) -> str:
"""One-line summary for a run in the list view."""
icon = STATUS_ICON.get(run.status, "?")
name = run.display_name or run.workflow_name or "?"
run_id = run.workflow_run_external_id or "?"
dur = _fmt_duration(run.duration)
started = _fmt_time(run.started_at)
meta = ""
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
meta = f" ({', '.join(meta_parts)})"
return (
f" {idx}. {icon} **{name}** started={started} dur={dur}{meta}\n"
f" `{run_id}`"
)
async def _fetch_run_list(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> list[V1TaskSummary]:
client = HatchetClientManager.get_client()
since = datetime.now(timezone.utc) - timedelta(days=7)
runs = await client.runs.aio_list(
since=since,
statuses=statuses,
limit=count,
)
return runs.rows or []
async def list_recent_runs(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""List recent workflow runs as text."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
lines = [f"Recent runs ({len(rows)}):", ""]
for i, run in enumerate(rows, 1):
lines.append(render_run_summary(i, run))
lines.append("")
lines.append("Use `--show N` to see full DAG for run N")
return "\n".join(lines)
async def show_run(workflow_run_id: str) -> str:
"""Fetch and render a single run."""
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
return render_run_detail(details)
async def show_nth_run(
n: int,
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""Fetch list, then drill into Nth run."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
if n < 1 or n > len(rows):
return f"Invalid index {n}. Have {len(rows)} runs (1-{len(rows)})."
run = rows[n - 1]
return await show_run(run.workflow_run_external_id)
async def main_async(args: argparse.Namespace) -> None:
statuses = [V1TaskStatus(args.status)] if args.status else None
if args.run_id:
output = await show_run(args.run_id)
elif args.show is not None:
output = await show_nth_run(args.show, count=args.last, statuses=statuses)
else:
output = await list_recent_runs(count=args.last, statuses=statuses)
print(output)
def main() -> None:
parser = argparse.ArgumentParser(
description="Render Hatchet workflow runs as text DAG"
)
parser.add_argument(
"run_id",
nargs="?",
default=None,
help="Workflow run ID to show in detail. If omitted, lists recent runs.",
)
parser.add_argument(
"--show",
type=int,
default=None,
metavar="N",
help="Show full DAG for the Nth run in the list (1-indexed)",
)
parser.add_argument(
"--last",
type=int,
default=5,
help="Number of recent runs to list (default: 5)",
)
parser.add_argument(
"--status",
choices=["QUEUED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED"],
help="Filter by status",
)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,6 @@
import json
import os
from datetime import datetime, timezone
from typing import assert_never
from fastapi import APIRouter, HTTPException, Request
@@ -12,7 +14,10 @@ from reflector.dailyco_api import (
RecordingReadyEvent,
RecordingStartedEvent,
)
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client
@@ -212,10 +217,73 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info(
"Raw-tracks recording queuing processing",
"Raw-tracks recording queuing processing (webhook)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys),
)

View File

@@ -1,4 +1,5 @@
import json
import logging
from datetime import datetime, timezone
from typing import Annotated, Any, Optional
from uuid import UUID
@@ -9,16 +10,21 @@ from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -102,13 +108,6 @@ async def start_recording(
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
@@ -117,9 +116,30 @@ async def start_recording(
instance_id=body.instanceId,
)
log.info(f"Started {body.type} recording via REST API")
recording_id = result["id"]
return {"status": "ok", "result": result}
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
@@ -130,22 +150,42 @@ async def start_recording(
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
log.info(
f"{body.type} recording already active (started by another participant)"
logger.info(
f"{body.type} recording already active (started by another participant)",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
log.error(f"Failed to start {body.type} recording", error=str(e))
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
log.error(f"Failed to start {body.type} recording", error=str(e))
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -1,6 +1,5 @@
import json
import os
import re
from datetime import datetime, timezone
from typing import List, Literal
from urllib.parse import unquote
@@ -13,10 +12,12 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
@@ -25,9 +26,11 @@ from reflector.db.transcripts import (
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -228,79 +231,44 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int,
):
"""
Process multitrack recording (first time or reprocessing).
Process multitrack recording.
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
Recording must already exist with meeting_id set (created by webhook/polling before queueing).
"""
tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
# Get recording (must exist - created by webhook/polling)
recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
if not recording:
logger.error(
"Recording not found - should have been created by webhook/polling",
recording_id=recording_id,
)
return
logger.info(
"Reprocessing: using existing recording.meeting_id",
if not recording.meeting_id:
logger.error(
"Recording has no meeting_id - orphan should not be queued",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
return
# Get meeting
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Meeting not found for recording",
meeting_id=recording.meeting_id,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -308,33 +276,6 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
transcript = await transcripts_controller.add(
@@ -349,40 +290,50 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
workflow_id=workflow_id,
exc_info=True,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
@shared_task
@asynctask
@@ -510,7 +461,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values.
Uses direct recording_id lookup via daily_recording_requests table.
Args:
recording_id: Daily.co recording ID
@@ -523,155 +474,170 @@ async def store_cloud_recording(
Returns:
True if stored, False if skipped/failed
"""
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
if not match:
# ORPHAN: No request found (pre-migration recording or failed request creation)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name="",
room_name=room_name,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
start_ts=start_ts,
track_keys=None,
source=source,
)
return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
meeting_id=meeting_id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (race lost)",
f"Cloud recording ({source}): already set (stop/restart?)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting.id,
meeting_id=meeting_id,
)
return False
logger.info(
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
f"Cloud recording stored via {source}",
meeting_id=meeting_id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""
Store cloud recordings missing from meeting table via polling.
"""Process cloud recordings (database deduplication, worker-agnostic).
Uses time-based matching via store_cloud_recording().
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table.
Only first cloud recording per meeting is kept (existing behavior).
"""
if not cloud_recordings:
return
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
for rec in cloud_recordings:
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
meeting_id, _ = match
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
logger.warning(
"Cloud recording already exists for meeting (stop/restart?)",
recording_id=rec.id,
meeting_id=meeting_id,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
bucket_name: NonEmptyString,
) -> None:
"""Process raw-tracks (database deduplication, worker-agnostic)."""
if not raw_tracks_recordings:
return
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
for rec in raw_tracks_recordings:
# Lookup request FIRST (before any DB writes)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
missing_recordings = [
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
meeting_id, _ = match
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id
)
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
)
continue
logger.info(
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
# DEDUPLICATION: Atomically create recording (single operation, no race window)
# ON CONFLICT → concurrent poller already got it, skip entire logic
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"]
created = await recordings_controller.try_create_with_meeting(
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
)
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -1060,53 +1026,66 @@ async def reprocess_failed_daily_recordings():
)
continue
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at reprocess dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id,
exc_info=True,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -11,6 +11,7 @@ broadcast messages to all connected websockets.
import asyncio
import json
import threading
import redis.asyncio as redis
from fastapi import WebSocket
@@ -97,7 +98,6 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber):
while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True
)
@@ -109,38 +109,29 @@ class WebsocketManager:
await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager:
"""
Returns the global WebsocketManager singleton.
Returns the WebsocketManager instance for managing websockets.
Creates instance on first call, subsequent calls return cached instance.
Thread-safe via GIL. Concurrent initialization may create duplicate
instances but last write wins (acceptable for this use case).
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
Returns:
WebsocketManager: The global WebsocketManager instance.
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
"""
global _ws_manager
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager

View File

@@ -12,16 +12,9 @@ class InvalidMessageError(Exception):
pass
def _zulip_client() -> httpx.AsyncClient:
headers = {}
if settings.ZULIP_HOST_HEADER:
headers["Host"] = settings.ZULIP_HOST_HEADER
return httpx.AsyncClient(verify=False, headers=headers)
async def get_zulip_topics(stream_id: int) -> list[dict]:
try:
async with _zulip_client() as client:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -36,7 +29,7 @@ async def get_zulip_topics(stream_id: int) -> list[dict]:
async def get_zulip_streams() -> list[dict]:
try:
async with _zulip_client() as client:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/streams",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -51,7 +44,7 @@ async def get_zulip_streams() -> list[dict]:
async def send_message_to_zulip(stream: str, topic: str, content: str):
try:
async with _zulip_client() as client:
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://{settings.ZULIP_REALM}/api/v1/messages",
data={
@@ -73,7 +66,7 @@ async def send_message_to_zulip(stream: str, topic: str, content: str):
async def update_zulip_message(message_id: int, stream: str, topic: str, content: str):
try:
async with _zulip_client() as client:
async with httpx.AsyncClient() as client:
response = await client.patch(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
data={
@@ -97,27 +90,6 @@ async def update_zulip_message(message_id: int, stream: str, topic: str, content
raise Exception(f"Failed to update Zulip message: {error}")
async def delete_zulip_message(message_id: int):
try:
async with _zulip_client() as client:
response = await client.delete(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
)
if (
response.status_code == 400
and response.json()["msg"] == "Invalid message(s)"
):
raise InvalidMessageError(f"There is no message with id: {message_id}")
response.raise_for_status()
return response.json()
except httpx.RequestError as error:
raise Exception(f"Failed to delete Zulip message: {error}")
def get_zulip_message(transcript: Transcript, include_topics: bool):
transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,10 +1,11 @@
import os
from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch
import pytest
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
from reflector.schemas.platform import WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -14,7 +15,6 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield
@@ -333,14 +333,11 @@ def celery_enable_logging():
@pytest.fixture(scope="session")
def celery_config():
redis_host = os.environ.get("REDIS_HOST", "localhost")
redis_port = os.environ.get("REDIS_PORT", "6379")
# Use db 2 to avoid conflicts with main app
redis_url = f"redis://{redis_host}:{redis_port}/2"
yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
with NamedTemporaryFile() as f:
yield {
"broker_url": "memory://",
"result_backend": f"db+sqlite:///{f.name}",
}
@pytest.fixture(scope="session")
@@ -373,12 +370,9 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue):
self.queue = queue
async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
async def get_message(self, ignore_subscribe_messages: bool = True):
try:
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
except Exception:
return None

View File

@@ -1,536 +0,0 @@
"""
Tests for Hatchet DAG Status -> Zulip Live Updates.
Tests cover:
- _dag_zulip_enabled() guard logic
- create_dag_zulip_message: sends + stores message ID
- update_dag_zulip_message: updates existing; noop when no message_id
- delete_dag_zulip_message: deletes + clears; handles InvalidMessageError
- delete_zulip_message: sends HTTP DELETE; raises on 400
- with_error_handling integration: calls update after success + failure
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.transcripts import Transcript
@pytest.fixture
def dag_settings():
"""Patch settings for DAG Zulip tests."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
yield mock_settings
@pytest.fixture
def dag_settings_disabled():
"""Patch settings with DAG Zulip disabled."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = None
mock_settings.ZULIP_DAG_TOPIC = None
yield mock_settings
@pytest.fixture
def mock_transcript():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=None,
)
@pytest.fixture
def mock_transcript_with_zulip_id():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=42,
)
class TestDagZulipEnabled:
def test_enabled_when_all_set(self, dag_settings):
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is True
def test_disabled_when_realm_missing(self, dag_settings):
dag_settings.ZULIP_REALM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_stream_missing(self, dag_settings):
dag_settings.ZULIP_DAG_STREAM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_topic_missing(self, dag_settings):
dag_settings.ZULIP_DAG_TOPIC = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestCreateDagZulipMessage:
async def test_sends_and_stores_message_id(self, dag_settings, mock_transcript):
mock_run_details = MagicMock()
rendered_md = "**DAG** rendered"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 99},
) as mock_send,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_update,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_called_once_with("dag-stream", "dag-topic", rendered_md)
mock_update.assert_called_once_with(
mock_transcript, {"zulip_message_id": 99}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
) as mock_send:
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_not_called()
async def test_logs_warning_on_failure(self, dag_settings, mock_transcript):
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value="rendered",
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
side_effect=Exception("Zulip down"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch("reflector.hatchet.dag_zulip.logger") as mock_logger,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=MagicMock())
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
# Should not raise
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_logger.warning.assert_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestUpdateDagZulipMessage:
async def test_updates_existing_message(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_called_once_with(
42, "dag-stream", "dag-topic", rendered_md
)
async def test_appends_error_banner(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message(
"test-transcript-id",
"workflow-run-123",
error_message="get_recording failed: connection timeout",
)
call_args = mock_update.call_args
content = call_args[0][3]
assert rendered_md in content
assert "get_recording failed: connection timeout" in content
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update:
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestDeleteDagZulipMessage:
async def test_deletes_and_clears(
self, dag_settings, mock_transcript_with_zulip_id
):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_called_once_with(42)
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
async def test_handles_invalid_message_error(
self, dag_settings, mock_transcript_with_zulip_id
):
from reflector.zulip import InvalidMessageError
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
side_effect=InvalidMessageError("gone"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
patch("reflector.hatchet.dag_zulip.logger"),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
# Should not raise; should still clear the message_id
await delete_dag_zulip_message("test-transcript-id")
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete:
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
@pytest.mark.asyncio
class TestDeleteZulipMessage:
async def test_sends_delete_request(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"result": "success"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
result = await delete_zulip_message(123)
assert result == {"result": "success"}
mock_client.delete.assert_called_once()
call_args = mock_client.delete.call_args
assert "123" in call_args.args[0]
async def test_raises_invalid_message_on_400(self):
from reflector.zulip import InvalidMessageError
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"msg": "Invalid message(s)"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
with pytest.raises(InvalidMessageError):
await delete_zulip_message(999)
@pytest.mark.asyncio
class TestWithErrorHandlingDagUpdate:
"""Test that with_error_handling calls update_dag_zulip_message."""
async def test_calls_update_on_success(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def fake_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update:
result = await fake_task(input_data, mock_ctx)
assert result == "ok"
mock_update.assert_called_once_with("tid-1", "wfr-123")
async def test_calls_update_on_failure_with_error_message(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def failing_task(input: PipelineInput, ctx) -> str:
raise ValueError("boom")
with (
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.set_workflow_error_status",
new_callable=AsyncMock,
),
):
with pytest.raises(ValueError, match="boom"):
await failing_task(input_data, mock_ctx)
mock_update.assert_called_once_with(
"tid-1", "wfr-123", error_message="get_recording failed: boom"
)
async def test_dag_failure_doesnt_affect_task(self):
"""DAG update failure should not prevent task from succeeding."""
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def ok_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
side_effect=Exception("zulip exploded"),
):
result = await ok_task(input_data, mock_ctx)
assert result == "ok"

View File

@@ -0,0 +1,258 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -0,0 +1,300 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -1,374 +0,0 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import AsyncMock, patch
from unittest.mock import patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline, not Hatchet
# Whereby recordings should use file pipeline
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_hatchet.start_workflow.assert_not_called()
mock_multitrack_pipeline.delay.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,6 +177,8 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -211,23 +213,18 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
mock_file_pipeline.delay.assert_not_called()

View File

@@ -115,7 +115,9 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR
# Using celery_includes from conftest.py which includes both pipelines
@pytest.fixture(scope="session")
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
@pytest.mark.usefixtures("setup_database")

View File

@@ -56,12 +56,7 @@ def appserver_ws_user(setup_database):
if server_instance:
server_instance.should_exit = True
server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
server_thread.join(timeout=30)
@pytest.fixture(autouse=True)
@@ -138,8 +133,6 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient
@@ -157,7 +150,6 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com",
}
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"})