Compare commits

..

10 Commits

Author SHA1 Message Date
ae44f5227b Merge branch 'main' into fix-room-query-batching 2026-02-05 19:42:29 -05:00
Igor Loskutov
4339ffffcf fix: use $api.queryOptions for batcher query keys
Replace custom meetingStatusKeys with $api.queryOptions()-derived keys
so cache identity matches the original per-room GET endpoints.
2026-02-05 19:40:13 -05:00
Igor Loskutov
9dc6c20ef8 fix: address code review findings
- Add max_length=100 on BulkStatusRequest.room_names to prevent abuse
- Filter bulk endpoint results to rooms user can see (owned or shared)
- Throw on bulk-status fetch error instead of silently returning empty data
- Fix room_by_id type annotation: dict[str, DbRoom] instead of Any
- Remove stale "200ms" comment in test
- Enable strict: true in jest tsconfig
- Remove working docs from tracked files
- Simplify redundant ternary in test helper
2026-02-05 19:36:17 -05:00
Igor Loskutov
931c344ddf feat: add frontend test infrastructure and fix CI workflow
- Fix pnpm version mismatch in test_next_server.yml (8 → auto-detect 10)
- Add concurrency group to cancel stale CI runs
- Remove redundant setup-node step
- Update jest.config.js for jsdom + tsx support
- Add meetingStatusBatcher integration test (3 tests)
- Extract createMeetingStatusBatcher factory for testability
2026-02-05 19:06:32 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
Igor Loskutov
129290517e docs: add handoff report and frontend testing research
BATSHIT_REPORT.md: full context on bulk query batching — business goal,
approach, all changes, verification status, and how to run.
FRONTEND_TEST_RESEARCH.md: research on unit testing react-query hooks
with jest.mock, renderHook, and batcher testing patterns.
2026-02-05 17:49:23 -05:00
Igor Loskutov
7e072219bf feat: batch room meeting status queries into single bulk endpoint
Reduces rooms list page from 2N+2 HTTP requests to 1 POST request.
Backend: POST /v1/rooms/meetings/bulk-status with 3 DB queries total.
Frontend: @yornaath/batshit DataLoader-style batcher with 10ms window.
2026-02-05 17:47:58 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
fa3cf5da0f chore(main): release 0.32.2 (#842) 2026-02-03 22:05:22 -05:00
37 changed files with 2272 additions and 1545 deletions

View File

@@ -13,6 +13,9 @@ on:
jobs:
test-next-server:
runs-on: ubuntu-latest
concurrency:
group: test-next-server-${{ github.ref }}
cancel-in-progress: true
defaults:
run:
@@ -21,17 +24,12 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 8
package_json_file: './www/package.json'
- name: Setup Node.js cache
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'

View File

@@ -4,4 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,14 @@
# Changelog
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \
UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp
RUN apt-get update \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \
ffmpeg \
curl \
ca-certificates \
gnupg \
wget \
&& apt-get clean
wget
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \
libcublas-12-6 \
libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
libcudnn9-dev-cuda-12
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/
COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000
CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording
@@ -101,30 +101,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
---
## Entity Relationships
@@ -220,19 +196,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks.
@@ -257,24 +220,16 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response (mtgSessionId can be null OR present):**
**Example API response:**
```json
{
"id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896,
"mtgSessionId": null // ← Often null (unreliable)
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
"mtgSessionId": null Missing!
}
```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -536,10 +491,6 @@ UI: User sees 3 separate transcripts
---
**Document Version:** 1.1
**Last Updated:** 2026-01-20
**Data Source:** Production database + Daily.co API inspection + empirical testing
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior
**Document Version:** 1.0
**Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -1,67 +0,0 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -1,56 +0,0 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,7 +26,6 @@ def get_database() -> databases.Database:
# import models
import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa

View File

@@ -104,6 +104,26 @@ class CalendarEventController:
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_upcoming_for_rooms(
self, room_ids: list[str], minutes_ahead: int = 120
) -> list[CalendarEvent]:
now = datetime.now(timezone.utc)
future_time = now + timedelta(minutes=minutes_ahead)
query = (
calendar_events.select()
.where(
sa.and_(
calendar_events.c.room_id.in_(room_ids),
calendar_events.c.is_deleted == False,
calendar_events.c.start_time <= future_time,
calendar_events.c.end_time >= now,
)
)
.order_by(calendar_events.c.start_time.asc())
)
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_by_id(self, event_id: str) -> CalendarEvent | None:
query = calendar_events.select().where(calendar_events.c.id == event_id)
result = await get_database().fetch_one(query)

View File

@@ -1,111 +0,0 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Literal
import sqlalchemy as sa
@@ -183,6 +183,84 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
@@ -223,6 +301,23 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_all_active_for_rooms(
self, room_ids: list[str], current_time: datetime
) -> list[Meeting]:
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id.in_(room_ids),
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
.order_by(meetings.c.end_date.desc())
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_active_by_calendar_event(
self, room: Room, calendar_event_id: str, current_time: datetime
) -> Meeting | None:
@@ -272,6 +367,44 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (
@@ -351,27 +484,6 @@ class MeetingConsentController:
result = await get_database().fetch_one(query)
return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table(
"recording",
@@ -31,13 +31,14 @@ recordings = sa.Table(
class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4)
bucket_name: str
# for single-track
object_key: str
recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed", "orphan"] = (
"pending"
)
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
@property
@@ -71,6 +72,20 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids:
return []
@@ -89,12 +104,9 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python.
"""
# INLINE IMPORT REQUIRED: Circular dependency
# - recordings.py needs transcripts table for JOIN query
# - transcripts.py imports recordings_controller
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
from reflector.db.transcripts import (
transcripts, # noqa: PLC0415 cyclic import
)
query = (
recordings.select()
@@ -112,27 +124,5 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController()

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False
@@ -245,6 +238,11 @@ class RoomController:
return room
async def get_by_names(self, names: list[str]) -> list[Room]:
query = rooms.select().where(rooms.c.name.in_(names))
results = await get_database().fetch_all(query)
return [Room(**r) for r in results]
async def get_ics_enabled(self) -> list[Room]:
query = rooms.select().where(
rooms.c.ics_enabled == True, rooms.c.ics_url != None

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -1,6 +1,4 @@
import json
import os
from datetime import datetime, timezone
from typing import assert_never
from fastapi import APIRouter, HTTPException, Request
@@ -14,10 +12,7 @@ from reflector.dailyco_api import (
RecordingReadyEvent,
RecordingStartedEvent,
)
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client
@@ -217,73 +212,10 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info(
"Raw-tracks recording queuing processing (webhook)",
"Raw-tracks recording queuing processing",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys),
)

View File

@@ -1,5 +1,4 @@
import json
import logging
from datetime import datetime, timezone
from typing import Annotated, Any, Optional
from uuid import UUID
@@ -10,21 +9,16 @@ from pydantic import BaseModel
import reflector.auth as auth
from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import (
MeetingConsent,
meeting_consent_controller,
meetings_controller,
)
from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -108,6 +102,13 @@ async def start_recording(
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try:
client = create_platform_client("daily")
result = await client.start_recording(
@@ -116,30 +117,9 @@ async def start_recording(
instance_id=body.instanceId,
)
recording_id = result["id"]
log.info(f"Started {body.type} recording via REST API")
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
return {"status": "ok", "result": result}
except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream"
@@ -150,42 +130,22 @@ async def start_recording(
# "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info:
logger.info(
f"{body.type} recording already active (started by another participant)",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
log.info(
f"{body.type} recording already active (started by another participant)"
)
return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling
# All other Daily.co API errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)
except Exception as e:
# Non-Daily.co errors
logger.error(
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
log.error(f"Failed to start {body.type} recording", error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}"
)

View File

@@ -1,4 +1,6 @@
import asyncio
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Literal, Optional
@@ -6,13 +8,14 @@ from typing import Annotated, Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate
from pydantic import BaseModel
from pydantic import BaseModel, Field
from redis.exceptions import LockError
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room as DbRoom
from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock
from reflector.schemas.platform import Platform
@@ -195,6 +198,69 @@ async def rooms_list(
return paginated
class BulkStatusRequest(BaseModel):
room_names: list[str] = Field(max_length=100)
class RoomMeetingStatus(BaseModel):
active_meetings: list[Meeting]
upcoming_events: list[CalendarEventResponse]
@router.post("/rooms/meetings/bulk-status", response_model=dict[str, RoomMeetingStatus])
async def rooms_bulk_meeting_status(
request: BulkStatusRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
user_id = user["sub"] if user else None
all_rooms = await rooms_controller.get_by_names(request.room_names)
# Filter to rooms the user can see (owned or shared), matching rooms_list behavior
rooms = [
r
for r in all_rooms
if r.is_shared or (user_id is not None and r.user_id == user_id)
]
room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms}
room_ids = list(room_by_id.keys())
current_time = datetime.now(timezone.utc)
active_meetings, upcoming_events = await asyncio.gather(
meetings_controller.get_all_active_for_rooms(room_ids, current_time),
calendar_events_controller.get_upcoming_for_rooms(room_ids),
)
# Group by room name
active_by_room: dict[str, list[Meeting]] = defaultdict(list)
for m in active_meetings:
room = room_by_id.get(m.room_id)
if not room:
continue
m.platform = room.platform
if user_id != room.user_id and m.platform == "whereby":
m.host_room_url = ""
active_by_room[room.name].append(m)
upcoming_by_room: dict[str, list[CalendarEventResponse]] = defaultdict(list)
for e in upcoming_events:
room = room_by_id.get(e.room_id)
if not room:
continue
if user_id != room.user_id:
e.description = None
e.attendees = None
upcoming_by_room[room.name].append(e)
result: dict[str, RoomMeetingStatus] = {}
for name in request.room_names:
result[name] = RoomMeetingStatus(
active_meetings=active_by_room.get(name, []),
upcoming_events=upcoming_by_room.get(name, []),
)
return result
@router.get("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_get(
room_id: str,

View File

@@ -1,5 +1,6 @@
import json
import os
import re
from datetime import datetime, timezone
from typing import List, Literal
from urllib.parse import unquote
@@ -12,12 +13,10 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
@@ -28,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -231,44 +227,79 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int,
):
"""
Process multitrack recording.
Process multitrack recording (first time or reprocessing).
Recording must already exist with meeting_id set (created by webhook/polling before queueing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
# Get recording (must exist - created by webhook/polling)
tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if not recording:
logger.error(
"Recording not found - should have been created by webhook/polling",
recording_id=recording_id,
)
return
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
if not recording.meeting_id:
logger.error(
"Recording has no meeting_id - orphan should not be queued",
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
return
# Get meeting
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Meeting not found for recording",
meeting_id=recording.meeting_id,
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name)
@@ -276,6 +307,33 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
transcript = await transcripts_controller.add(
@@ -290,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@@ -461,7 +499,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses direct recording_id lookup via daily_recording_requests table.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
@@ -474,170 +512,155 @@ async def store_cloud_recording(
Returns:
True if stored, False if skipped/failed
"""
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
if not match:
# ORPHAN: No request found (pre-migration recording or failed request creation)
await create_and_log_orphan(
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
bucket_name="",
room_name=room_name,
start_ts=start_ts,
track_keys=None,
source=source,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (stop/restart?)",
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting_id,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source}",
meeting_id=meeting_id,
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""Process cloud recordings (database deduplication, worker-agnostic).
"""
Store cloud recordings missing from meeting table via polling.
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table.
Only first cloud recording per meeting is kept (existing behavior).
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
for rec in cloud_recordings:
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
meeting_id, _ = match
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording already exists for meeting (stop/restart?)",
recording_id=rec.id,
meeting_id=meeting_id,
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: NonEmptyString,
) -> None:
"""Process raw-tracks (database deduplication, worker-agnostic)."""
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
for rec in raw_tracks_recordings:
# Lookup request FIRST (before any DB writes)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id)
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
if not match:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
missing_recordings = [
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
meeting_id, _ = match
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id
)
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
)
continue
# DEDUPLICATION: Atomically create recording (single operation, no race window)
# ON CONFLICT → concurrent poller already got it, skip entire logic
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"]
created = await recordings_controller.try_create_with_meeting(
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
logger.info(
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
)
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -1026,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
reprocessed_count += 1

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio
import json
import threading
import redis.asyncio as redis
from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber):
while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True
)
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager:
"""
Returns the WebsocketManager instance for managing websockets.
Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
Creates instance on first call, subsequent calls return cached instance.
Thread-safe via GIL. Concurrent initialization may create duplicate
instances but last write wins (acceptable for this use case).
Returns:
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
WebsocketManager: The global WebsocketManager instance.
"""
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
global _ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,11 +1,10 @@
import os
from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch
import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -15,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield
@@ -333,11 +333,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session")
def celery_config():
with NamedTemporaryFile() as f:
yield {
"broker_url": "memory://",
"result_backend": f"db+sqlite:///{f.name}",
}
redis_host = os.environ.get("REDIS_HOST", "localhost")
redis_port = os.environ.get("REDIS_PORT", "6379")
# Use db 2 to avoid conflicts with main app
redis_url = f"redis://{redis_host}:{redis_port}/2"
yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session")
@@ -370,9 +373,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue):
self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True):
async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception:
return None

View File

@@ -1,258 +0,0 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -1,300 +0,0 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -0,0 +1,374 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline
# Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called()
mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called()

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session")
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
# Using celery_includes from conftest.py which includes both pipelines
@pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance:
server_instance.should_exit = True
server_thread.join(timeout=30)
server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com",
}
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"})

View File

@@ -0,0 +1,217 @@
import "@testing-library/jest-dom";
// --- Module mocks (hoisted before imports) ---
jest.mock("../apiClient", () => ({
client: {
GET: jest.fn(),
POST: jest.fn(),
PUT: jest.fn(),
PATCH: jest.fn(),
DELETE: jest.fn(),
use: jest.fn(),
},
$api: {
useQuery: jest.fn(),
useMutation: jest.fn(),
queryOptions: (method: string, path: string, init?: unknown) =>
init === undefined
? { queryKey: [method, path] }
: { queryKey: [method, path, init] },
},
API_URL: "http://test",
WEBSOCKET_URL: "ws://test",
configureApiAuth: jest.fn(),
}));
jest.mock("../AuthProvider", () => ({
useAuth: () => ({
status: "authenticated" as const,
accessToken: "test-token",
accessTokenExpires: Date.now() + 3600000,
user: { id: "user1", name: "Test User" },
update: jest.fn(),
signIn: jest.fn(),
signOut: jest.fn(),
lastUserId: "user1",
}),
}));
// Recreate the batcher with a 0ms window. setTimeout(fn, 0) defers to the next
// macrotask boundary — after all synchronous React rendering completes. All
// useQuery queryFns fire within the same macrotask, so they all queue into one
// batch before the timer fires. This is deterministic and avoids fake timers.
jest.mock("../meetingStatusBatcher", () => {
const actual = jest.requireActual("../meetingStatusBatcher");
return {
...actual,
meetingStatusBatcher: actual.createMeetingStatusBatcher(0),
};
});
// --- Imports (after mocks) ---
import React from "react";
import { render, waitFor, screen } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useRoomActiveMeetings, useRoomUpcomingMeetings } from "../apiHooks";
import { client } from "../apiClient";
import { ErrorProvider } from "../../(errors)/errorContext";
const mockClient = client as { POST: jest.Mock };
// --- Helpers ---
function mockBulkStatusEndpoint(
roomData?: Record<
string,
{ active_meetings: unknown[]; upcoming_events: unknown[] }
>,
) {
mockClient.POST.mockImplementation(
async (_path: string, options: { body: { room_names: string[] } }) => {
const roomNames: string[] = options.body.room_names;
const src = roomData ?? {};
const data = Object.fromEntries(
roomNames.map((name) => [
name,
src[name] ?? { active_meetings: [], upcoming_events: [] },
]),
);
return { data, error: undefined, response: {} };
},
);
}
// --- Test component: renders N room cards, each using both hooks ---
function RoomCard({ roomName }: { roomName: string }) {
const active = useRoomActiveMeetings(roomName);
const upcoming = useRoomUpcomingMeetings(roomName);
if (active.isLoading || upcoming.isLoading) {
return <div data-testid={`room-${roomName}`}>loading</div>;
}
return (
<div data-testid={`room-${roomName}`}>
{active.data?.length ?? 0} active, {upcoming.data?.length ?? 0} upcoming
</div>
);
}
function RoomList({ roomNames }: { roomNames: string[] }) {
return (
<>
{roomNames.map((name) => (
<RoomCard key={name} roomName={name} />
))}
</>
);
}
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: {
queries: { retry: false },
},
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>
<ErrorProvider>{children}</ErrorProvider>
</QueryClientProvider>
);
};
}
// --- Tests ---
describe("meeting status batcher integration", () => {
afterEach(() => jest.clearAllMocks());
it("batches multiple room queries into a single POST request", async () => {
const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`);
mockBulkStatusEndpoint();
render(<RoomList roomNames={rooms} />, { wrapper: createWrapper() });
await waitFor(() => {
for (const name of rooms) {
expect(screen.getByTestId(`room-${name}`)).toHaveTextContent(
"0 active, 0 upcoming",
);
}
});
const postCalls = mockClient.POST.mock.calls.filter(
([path]: [string]) => path === "/v1/rooms/meetings/bulk-status",
);
// Without batching this would be 20 calls (2 hooks x 10 rooms).
expect(postCalls).toHaveLength(1);
// The single call should contain all 10 rooms (deduplicated)
const requestedRooms: string[] = postCalls[0][1].body.room_names;
for (const name of rooms) {
expect(requestedRooms).toContain(name);
}
});
it("batcher fetcher returns room-specific data", async () => {
const {
meetingStatusBatcher: batcher,
} = require("../meetingStatusBatcher");
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
const [resultA, resultB] = await Promise.all([
batcher.fetch("room-a"),
batcher.fetch("room-b"),
]);
expect(mockClient.POST).toHaveBeenCalledTimes(1);
expect(resultA.active_meetings).toEqual([
{ id: "m1", room_name: "room-a" },
]);
expect(resultA.upcoming_events).toEqual([]);
expect(resultB.active_meetings).toEqual([]);
expect(resultB.upcoming_events).toEqual([{ id: "e1", title: "Standup" }]);
});
it("renders room-specific meeting data through hooks", async () => {
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
render(<RoomList roomNames={["room-a", "room-b"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("room-room-a")).toHaveTextContent(
"1 active, 0 upcoming",
);
expect(screen.getByTestId("room-room-b")).toHaveTextContent(
"0 active, 1 upcoming",
);
});
});
});

View File

@@ -2,9 +2,10 @@
import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { meetingStatusBatcher } from "./meetingStatusBatcher";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
@@ -697,15 +698,7 @@ export function useRoomsCreateMeeting() {
queryKey: $api.queryOptions("get", "/v1/rooms").queryKey,
}),
queryClient.invalidateQueries({
queryKey: $api.queryOptions(
"get",
"/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`,
{
params: {
path: { room_name: roomName },
},
},
).queryKey,
queryKey: meetingStatusKeys.active(roomName),
}),
]);
},
@@ -734,42 +727,39 @@ export function useRoomGetByName(roomName: string | null) {
export function useRoomUpcomingMeetings(roomName: string | null) {
const { isAuthenticated } = useAuthReady();
return $api.useQuery(
"get",
"/v1/rooms/{room_name}/meetings/upcoming" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_UPCOMING_PATH_PARTIAL}`,
{
params: {
path: { room_name: roomName! },
},
return useQuery({
queryKey: meetingStatusKeys.upcoming(roomName!),
queryFn: async () => {
const result = await meetingStatusBatcher.fetch(roomName!);
return result.upcoming_events;
},
{
enabled: !!roomName && isAuthenticated,
},
);
enabled: !!roomName && isAuthenticated,
});
}
const MEETINGS_PATH_PARTIAL = "meetings" as const;
const MEETINGS_ACTIVE_PATH_PARTIAL = `${MEETINGS_PATH_PARTIAL}/active` as const;
const MEETINGS_UPCOMING_PATH_PARTIAL =
`${MEETINGS_PATH_PARTIAL}/upcoming` as const;
const MEETING_LIST_PATH_PARTIALS = [
MEETINGS_ACTIVE_PATH_PARTIAL,
MEETINGS_UPCOMING_PATH_PARTIAL,
];
// Query keys reuse $api.queryOptions so cache identity matches the original
// per-room GET endpoints. The actual fetch goes through the batcher, but the
// keys stay consistent with the rest of the codebase.
const meetingStatusKeys = {
active: (roomName: string) =>
$api.queryOptions("get", "/v1/rooms/{room_name}/meetings/active", {
params: { path: { room_name: roomName } },
}).queryKey,
upcoming: (roomName: string) =>
$api.queryOptions("get", "/v1/rooms/{room_name}/meetings/upcoming", {
params: { path: { room_name: roomName } },
}).queryKey,
};
export function useRoomActiveMeetings(roomName: string | null) {
return $api.useQuery(
"get",
"/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`,
{
params: {
path: { room_name: roomName! },
},
return useQuery({
queryKey: meetingStatusKeys.active(roomName!),
queryFn: async () => {
const result = await meetingStatusBatcher.fetch(roomName!);
return result.active_meetings;
},
{
enabled: !!roomName,
},
);
enabled: !!roomName,
});
}
export function useRoomGetMeeting(

View File

@@ -0,0 +1,37 @@
import { create, keyResolver, windowScheduler } from "@yornaath/batshit";
import { client } from "./apiClient";
import type { components } from "../reflector-api";
type MeetingStatusResult = {
roomName: string;
active_meetings: components["schemas"]["Meeting"][];
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
const BATCH_WINDOW_MS = 10;
export function createMeetingStatusBatcher(windowMs: number = BATCH_WINDOW_MS) {
return create({
fetcher: async (roomNames: string[]): Promise<MeetingStatusResult[]> => {
const unique = [...new Set(roomNames)];
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: unique } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return roomNames.map((name) => ({
roomName: name,
active_meetings: data[name]?.active_meetings ?? [],
upcoming_events: data[name]?.upcoming_events ?? [],
}));
},
resolver: keyResolver("roomName"),
scheduler: windowScheduler(windowMs),
});
}
export const meetingStatusBatcher = createMeetingStatusBatcher();

View File

@@ -118,6 +118,23 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/meetings/bulk-status": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Rooms Bulk Meeting Status */
post: operations["v1_rooms_bulk_meeting_status"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}": {
parameters: {
query?: never;
@@ -799,6 +816,11 @@ export interface components {
*/
chunk: string;
};
/** BulkStatusRequest */
BulkStatusRequest: {
/** Room Names */
room_names: string[];
};
/** CalendarEventResponse */
CalendarEventResponse: {
/** Id */
@@ -1735,6 +1757,13 @@ export interface components {
/** Webhook Secret */
webhook_secret: string | null;
};
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RtcOffer */
RtcOffer: {
/** Sdp */
@@ -2272,6 +2301,41 @@ export interface operations {
};
};
};
v1_rooms_bulk_meeting_status: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["BulkStatusRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: components["schemas"]["RoomMeetingStatus"];
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_get: {
parameters: {
query?: never;

View File

@@ -1,8 +1,22 @@
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
testEnvironment: "jest-environment-jsdom",
roots: ["<rootDir>/app"],
testMatch: ["**/__tests__/**/*.test.ts"],
collectCoverage: true,
collectCoverageFrom: ["app/**/*.ts", "!app/**/*.d.ts"],
testMatch: ["**/__tests__/**/*.test.ts", "**/__tests__/**/*.test.tsx"],
collectCoverage: false,
transform: {
"^.+\\.[jt]sx?$": [
"ts-jest",
{
tsconfig: {
jsx: "react-jsx",
module: "esnext",
moduleResolution: "bundler",
esModuleInterop: true,
strict: true,
downlevelIteration: true,
lib: ["dom", "dom.iterable", "esnext"],
},
},
],
},
};

View File

@@ -23,6 +23,7 @@
"@tanstack/react-query": "^5.85.9",
"@types/ioredis": "^5.0.0",
"@whereby.com/browser-sdk": "^3.3.4",
"@yornaath/batshit": "^0.14.0",
"autoprefixer": "10.4.20",
"axios": "^1.8.2",
"eslint": "^9.33.0",
@@ -61,9 +62,13 @@
"author": "Andreas <andreas@monadical.com>",
"license": "All Rights Reserved",
"devDependencies": {
"@testing-library/dom": "^10.4.1",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/jest": "^30.0.0",
"@types/react": "18.2.20",
"jest": "^30.1.3",
"jest-environment-jsdom": "^30.2.0",
"openapi-typescript": "^7.9.1",
"prettier": "^3.0.0",
"ts-jest": "^29.4.1"

808
www/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff