Compare commits

..

4 Commits

Author SHA1 Message Date
Igor Loskutov
0c4c1387c1 fix: prevent presence race condition during WebRTC handshake
Add /joining and /joined endpoints to track user join intent before
WebRTC handshake completes. This prevents meetings from being
deactivated while users are still connecting.

- Add pending_joins Redis module with 30s TTL
- Add /joining endpoint (called before WebRTC handshake)
- Add /joined endpoint (called after connection established)
- Check for pending joins before deactivating meetings in worker
- Frontend integration with connectionId per browser tab
2026-02-05 20:58:34 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
fa3cf5da0f chore(main): release 0.32.2 (#842) 2026-02-03 22:05:22 -05:00
30 changed files with 1839 additions and 1250 deletions

View File

@@ -4,4 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/tests/test_recording_request_flow.py:generic-api-key:121 server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,14 @@
# Changelog # Changelog
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30) ## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \ UV_LINK_MODE=copy \
UV_NO_CACHE=1 UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp WORKDIR /tmp
RUN apt-get update \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \ && apt-get install -y \
ffmpeg \ ffmpeg \
curl \ curl \
ca-certificates \ ca-certificates \
gnupg \ gnupg \
wget \ wget
&& apt-get clean
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12 # Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \ && rm /cuda-keyring.deb \
&& apt-get update \ && apt-get update \
&& apt-get install -y --no-install-recommends \ && apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \ cuda-cudart-12-6 \
libcublas-12-6 \ libcublas-12-6 \
libcudnn9-cuda-12 \ libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \ libcudnn9-dev-cuda-12
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH" ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/ COPY ./main.py /app/
COPY ./runserver.sh /app/ COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000 EXPOSE 8000
CMD ["sh", "/app/runserver.sh"] CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -86,7 +86,7 @@ Daily.co Room: "daily-private-igor-20260110042117"
| **Purpose** | Tracks active session state | Links recordings, transcripts, participants | | **Purpose** | Tracks active session state | Links recordings, transcripts, participants |
| **Scope** | Per room instance | Per Reflector room + timestamp | | **Scope** | Per room instance | Per Reflector room + timestamp |
**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId` (can be null), requiring time-based matching (see [Time-Based Matching](#time-based-matching)). **Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)).
### Recording ### Recording
@@ -101,30 +101,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs. **Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs.
### instanceId (Reflector-Generated)
**Definition:** UUID we generate and send when starting recording via REST API.
**Generation:** Deterministic from meeting_id
- Cloud: `instanceId = meeting_id` directly
- Raw-tracks: `instanceId = UUIDv5(meeting_id, namespace)`
**Key behaviors:**
-**Reuse allowed:** Same instanceId can be used after stop (validated 2026-01-20)
-**Not returned:** Daily.co does NOT echo instanceId back in GET /recordings response
-**Present in error webhooks:** `recording.error` webhook includes instanceId
- **Purpose:** Allows multiple concurrent recordings (cloud + raw-tracks) in same room
**Stop/restart example:**
```
Recording 1: POST /start with instanceId="779e6376..." → recording_id="ee00c4e8..."
Stop recording
Recording 2: POST /start with instanceId="779e6376..." (SAME) → recording_id="b702f509..." (DIFFERENT)
✅ Both succeed, different recording_ids returned
```
**Implication:** Cannot match recordings by instanceId (not in response) - must use recording_id.
--- ---
## Entity Relationships ## Entity Relationships
@@ -220,19 +196,6 @@ Daily.co Room: "daily-private-igor-20260110042117"
`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room). `mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room).
**Reliability:** Can be null or present in GET /recordings response (unreliable).
**When present:** Multiple recordings from same session (stop/restart with participants connected) share same mtgSessionId.
**Example (validated 2026-01-20):**
```json
Recording 1: {"id": "ee00c4e8...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
Recording 2: {"id": "b702f509...", "mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6"}
// Same mtgSessionId (stop/restart in same session)
```
**When null:** Common - Daily.co API does not reliably populate this field.
### session_id (Per-Participant) ### session_id (Per-Participant)
**Different concept:** Per-participant connection identifier from webhooks. **Different concept:** Per-participant connection identifier from webhooks.
@@ -257,24 +220,16 @@ TABLE daily_participant_session (
Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers. Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers.
**Example API response (mtgSessionId can be null OR present):** **Example API response:**
```json ```json
{ {
"id": "recording-uuid", "id": "recording-uuid",
"room_name": "daily-private-igor-20260110042117", "room_name": "daily-private-igor-20260110042117",
"start_ts": 1768018896, "start_ts": 1768018896,
"mtgSessionId": null // ← Often null (unreliable) "mtgSessionId": null Missing!
}
// OR (when present):
{
"id": "recording-uuid",
"mtgSessionId": "92c4136a-a8da-41c5-9c45-e9a2baae6bd6" // ← Sometimes present
} }
``` ```
**Key insight:** Cannot rely on mtgSessionId for matching (unreliable). instanceId also not returned. Only reliable identifier is recording.id.
### Solution: Time-Based Matching ### Solution: Time-Based Matching
**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()` **Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()`
@@ -536,10 +491,6 @@ UI: User sees 3 separate transcripts
--- ---
**Document Version:** 1.1 **Document Version:** 1.0
**Last Updated:** 2026-01-20 **Last Verified:** 2026-01-15
**Data Source:** Production database + Daily.co API inspection + empirical testing **Data Source:** Production database + Daily.co API inspection
**Changes in 1.1:**
- Added instanceId behavior documentation (reuse allowed, not returned in API)
- Clarified mtgSessionId reliability (can be null or present)
- Added empirical validation of stop/restart behavior

View File

@@ -1,67 +0,0 @@
"""add_daily_recording_requests
Revision ID: f5b008fa8a14
Revises: 1b1e6a6fc465
Create Date: 2026-01-20 22:32:06.697144
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f5b008fa8a14"
down_revision: Union[str, None] = "1b1e6a6fc465"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"daily_recording_request",
sa.Column("recording_id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("instance_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("recording_id"),
)
op.create_index("idx_meeting_id", "daily_recording_request", ["meeting_id"])
op.create_index("idx_instance_id", "daily_recording_request", ["instance_id"])
# Clean up orphaned recordings before adding FK constraint
op.execute("""
UPDATE recording SET status = 'orphan', meeting_id = NULL
WHERE meeting_id IS NOT NULL
AND meeting_id NOT IN (SELECT id FROM meeting)
""")
# Add FK constraint to recording table (cascade delete recordings when meeting deleted)
op.execute("""
ALTER TABLE recording ADD CONSTRAINT fk_recording_meeting
FOREIGN KEY (meeting_id) REFERENCES meeting(id) ON DELETE CASCADE
""")
# Add CHECK constraints to enforce orphan invariants
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_orphan_no_meeting
CHECK (status != 'orphan' OR meeting_id IS NULL)
""")
op.execute("""
ALTER TABLE recording ADD CONSTRAINT chk_non_orphan_has_meeting
CHECK (status = 'orphan' OR meeting_id IS NOT NULL)
""")
def downgrade() -> None:
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_orphan_no_meeting")
op.execute(
"ALTER TABLE recording DROP CONSTRAINT IF EXISTS chk_non_orphan_has_meeting"
)
op.execute("ALTER TABLE recording DROP CONSTRAINT IF EXISTS fk_recording_meeting")
op.drop_index("idx_instance_id", table_name="daily_recording_request")
op.drop_index("idx_meeting_id", table_name="daily_recording_request")
op.drop_table("daily_recording_request")

View File

@@ -1,56 +0,0 @@
"""Utility for creating orphan recordings."""
import os
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString
async def create_and_log_orphan(
recording_id: NonEmptyString,
bucket_name: str,
room_name: str,
start_ts: int,
track_keys: list[str] | None,
source: str,
) -> bool:
"""Create orphan recording and log if first occurrence.
Args:
recording_id: Daily.co recording ID
bucket_name: S3 bucket (empty string for cloud recordings)
room_name: Daily.co room name
start_ts: Unix timestamp
track_keys: Track keys for raw-tracks, None for cloud
source: "webhook" or "polling" for logging
Returns:
True if created (first poller), False if already exists
"""
if track_keys:
object_key = os.path.dirname(track_keys[0]) if track_keys else room_name
else:
object_key = room_name
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key,
recorded_at=datetime.fromtimestamp(start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=None,
status="orphan",
)
)
if created:
logger.error(
f"Orphan recording ({source})",
recording_id=recording_id,
room_name=room_name,
)
return created

View File

@@ -26,7 +26,6 @@ def get_database() -> databases.Database:
# import models # import models
import reflector.db.calendar_events # noqa import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa import reflector.db.daily_participant_sessions # noqa
import reflector.db.daily_recording_requests # noqa
import reflector.db.meetings # noqa import reflector.db.meetings # noqa
import reflector.db.recordings # noqa import reflector.db.recordings # noqa
import reflector.db.rooms # noqa import reflector.db.rooms # noqa

View File

@@ -1,111 +0,0 @@
from datetime import datetime
from typing import Literal
from uuid import UUID
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_recording_requests = sa.Table(
"daily_recording_request",
metadata,
sa.Column("recording_id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("instance_id", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
sa.Column("requested_at", sa.DateTime(timezone=True), nullable=False),
sa.Index("idx_meeting_id", "meeting_id"),
sa.Index("idx_instance_id", "instance_id"),
)
class DailyRecordingRequest(BaseModel):
recording_id: NonEmptyString
meeting_id: NonEmptyString
instance_id: UUID
type: Literal["cloud", "raw-tracks"]
requested_at: datetime
class DailyRecordingRequestsController:
async def create(self, request: DailyRecordingRequest) -> None:
stmt = insert(daily_recording_requests).values(
recording_id=request.recording_id,
meeting_id=request.meeting_id,
instance_id=str(request.instance_id),
type=request.type,
requested_at=request.requested_at,
)
stmt = stmt.on_conflict_do_nothing(index_elements=["recording_id"])
await get_database().execute(stmt)
async def find_by_recording_id(
self,
recording_id: NonEmptyString,
) -> tuple[NonEmptyString, Literal["cloud", "raw-tracks"]] | None:
query = daily_recording_requests.select().where(
daily_recording_requests.c.recording_id == recording_id
)
result = await get_database().fetch_one(query)
if not result:
return None
req = DailyRecordingRequest(
recording_id=result["recording_id"],
meeting_id=result["meeting_id"],
instance_id=UUID(result["instance_id"]),
type=result["type"],
requested_at=result["requested_at"],
)
return (req.meeting_id, req.type)
async def find_by_instance_id(
self,
instance_id: UUID,
) -> list[DailyRecordingRequest]:
"""Multiple recordings can have same instance_id (stop/restart)."""
query = daily_recording_requests.select().where(
daily_recording_requests.c.instance_id == str(instance_id)
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
async def get_by_meeting_id(
self,
meeting_id: NonEmptyString,
) -> list[DailyRecordingRequest]:
query = daily_recording_requests.select().where(
daily_recording_requests.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [
DailyRecordingRequest(
recording_id=r["recording_id"],
meeting_id=r["meeting_id"],
instance_id=UUID(r["instance_id"]),
type=r["type"],
requested_at=r["requested_at"],
)
for r in results
]
daily_recording_requests_controller = DailyRecordingRequestsController()

View File

@@ -1,4 +1,4 @@
from datetime import datetime from datetime import datetime, timedelta
from typing import Any, Literal from typing import Any, Literal
import sqlalchemy as sa import sqlalchemy as sa
@@ -183,6 +183,84 @@ class MeetingController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [Meeting(**r) for r in results] return [Meeting(**r) for r in results]
async def get_by_room_name_and_time(
self,
room_name: NonEmptyString,
recording_start: datetime,
time_window_hours: int = 168,
) -> Meeting | None:
"""
Get meeting by room name closest to recording timestamp.
HACK ALERT: Daily.co doesn't return instanceId in recordings API response,
and mtgSessionId is separate from our instanceId. Time-based matching is
the least-bad workaround.
This handles edge case of duplicate room_name values in DB (race conditions,
double-clicks, etc.) by matching based on temporal proximity.
Algorithm:
1. Find meetings within time_window_hours of recording_start
2. Return meeting with start_date closest to recording_start
3. If tie, return first by meeting.id (deterministic)
Args:
room_name: Daily.co room name from recording
recording_start: Timezone-aware datetime from recording.start_ts
time_window_hours: Search window (default 168 = 1 week)
Returns:
Meeting closest to recording timestamp, or None if no matches
Failure modes:
- Multiple meetings in same room within ~5 minutes: picks closest
- All meetings outside time window: returns None
- Clock skew between Daily.co and DB: 1-week window tolerates this
Why 1 week window:
- Handles webhook failures (recording discovered days later)
- Tolerates clock skew
- Rejects unrelated meetings from weeks ago
"""
# Validate timezone-aware datetime
if recording_start.tzinfo is None:
raise ValueError(
f"recording_start must be timezone-aware, got naive datetime: {recording_start}"
)
window_start = recording_start - timedelta(hours=time_window_hours)
window_end = recording_start + timedelta(hours=time_window_hours)
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_name == room_name,
meetings.c.start_date >= window_start,
meetings.c.start_date <= window_end,
)
)
.order_by(meetings.c.start_date)
)
results = await get_database().fetch_all(query)
if not results:
return None
candidates = [Meeting(**r) for r in results]
# Find meeting with start_date closest to recording_start
closest = min(
candidates,
key=lambda m: (
abs((m.start_date - recording_start).total_seconds()),
m.id, # Tie-breaker: deterministic by UUID
),
)
return closest
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
""" """
Get latest active meeting for a room. Get latest active meeting for a room.
@@ -272,6 +350,44 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query) await get_database().execute(query)
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""
Set cloud recording only if not already set.
Returns True if updated, False if already set.
Prevents webhook/polling race condition via atomic WHERE clause.
"""
# Check current value before update to detect actual change
meeting_before = await self.get_by_id(meeting_id)
if not meeting_before:
return False
was_null = meeting_before.daily_composed_video_s3_key is None
query = (
meetings.update()
.where(
sa.and_(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
await get_database().execute(query)
# Return True only if value was NULL before (actual update occurred)
# If was_null=False, the WHERE clause prevented the update
return was_null
async def increment_num_clients(self, meeting_id: str) -> None: async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count.""" """Atomically increment participant count."""
query = ( query = (
@@ -351,27 +467,6 @@ class MeetingConsentController:
result = await get_database().fetch_one(query) result = await get_database().fetch_one(query)
return result is not None return result is not None
async def set_cloud_recording_if_missing(
self,
meeting_id: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
) -> bool:
"""Returns True if updated, False if already set."""
query = (
meetings.update()
.where(
meetings.c.id == meeting_id,
meetings.c.daily_composed_video_s3_key.is_(None),
)
.values(
daily_composed_video_s3_key=s3_key,
daily_composed_video_duration=duration,
)
)
result = await get_database().execute(query)
return result.rowcount > 0
meetings_controller = MeetingController() meetings_controller = MeetingController()
meeting_consent_controller = MeetingConsentController() meeting_consent_controller = MeetingConsentController()

View File

@@ -4,10 +4,10 @@ from typing import Literal
import sqlalchemy as sa import sqlalchemy as sa
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4 from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
recordings = sa.Table( recordings = sa.Table(
"recording", "recording",
@@ -31,13 +31,14 @@ recordings = sa.Table(
class Recording(BaseModel): class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4) id: str = Field(default_factory=generate_uuid4)
bucket_name: str bucket_name: str
# for single-track
object_key: str object_key: str
recorded_at: datetime recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed", "orphan"] = ( status: Literal["pending", "processing", "completed", "failed"] = "pending"
"pending"
)
meeting_id: str | None = None meeting_id: str | None = None
# None = single-track, [] = multitrack with no audio, [keys...] = multitrack with audio # for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None track_keys: list[str] | None = None
@property @property
@@ -71,6 +72,20 @@ class RecordingController:
query = recordings.delete().where(recordings.c.id == id) query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query) await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,
meeting_id: NonEmptyString,
) -> None:
"""Link recording to meeting."""
query = (
recordings.update()
.where(recordings.c.id == recording_id)
.values(meeting_id=meeting_id)
)
await get_database().execute(query)
# no check for existence
async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]:
if not recording_ids: if not recording_ids:
return [] return []
@@ -89,12 +104,9 @@ class RecordingController:
This is more efficient than fetching all recordings and filtering in Python. This is more efficient than fetching all recordings and filtering in Python.
""" """
# INLINE IMPORT REQUIRED: Circular dependency from reflector.db.transcripts import (
# - recordings.py needs transcripts table for JOIN query transcripts, # noqa: PLC0415 cyclic import
# - transcripts.py imports recordings_controller )
# - db/__init__.py loads recordings before transcripts (line 31 vs 33)
# - Top-level import would fail during module initialization
from reflector.db.transcripts import transcripts
query = ( query = (
recordings.select() recordings.select()
@@ -112,27 +124,5 @@ class RecordingController:
recordings_list = [Recording(**row) for row in results] recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack] return [r for r in recordings_list if r.is_multitrack]
async def try_create_with_meeting(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.meeting_id is not None, "meeting_id required for non-orphan"
assert recording.status != "orphan", "use create_orphan for orphans"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
async def create_orphan(self, recording: Recording) -> bool:
"""Returns True if created, False if already exists."""
assert recording.status == "orphan", "status must be 'orphan'"
assert recording.meeting_id is None, "meeting_id must be NULL for orphan"
stmt = insert(recordings).values(**recording.model_dump())
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
result = await get_database().execute(stmt)
return result.rowcount > 0
recordings_controller = RecordingController() recordings_controller = RecordingController()

View File

@@ -0,0 +1,17 @@
"""Presence tracking for meetings."""
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
__all__ = [
"PENDING_JOIN_PREFIX",
"PENDING_JOIN_TTL",
"create_pending_join",
"delete_pending_join",
"has_pending_joins",
]

View File

@@ -0,0 +1,59 @@
"""Track pending join intents in Redis.
When a user signals intent to join a meeting (before WebRTC handshake completes),
we store a pending join record. This prevents the meeting from being deactivated
while users are still connecting.
"""
import time
from redis.asyncio import Redis
from reflector.logger import logger
PENDING_JOIN_TTL = 30 # seconds
PENDING_JOIN_PREFIX = "pending_join"
# Max keys to scan per Redis SCAN iteration
SCAN_BATCH_SIZE = 100
async def create_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Create a pending join record. Called before WebRTC handshake."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.setex(key, PENDING_JOIN_TTL, str(time.time()))
log.debug("Created pending join")
async def delete_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Delete pending join. Called after WebRTC connection established."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.delete(key)
log.debug("Deleted pending join")
async def has_pending_joins(redis: Redis, meeting_id: str) -> bool:
"""Check if meeting has any pending joins.
Uses Redis SCAN to iterate through all keys matching the pattern.
Properly iterates until cursor returns 0 to ensure all keys are checked.
"""
pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
log = logger.bind(meeting_id=meeting_id, pattern=pattern)
cursor = 0
iterations = 0
while True:
cursor, keys = await redis.scan(
cursor=cursor, match=pattern, count=SCAN_BATCH_SIZE
)
iterations += 1
if keys:
log.debug("Found pending joins", count=len(keys), iterations=iterations)
return True
if cursor == 0:
break
log.debug("No pending joins found", iterations=iterations)
return False

View File

@@ -1,6 +1,4 @@
import json import json
import os
from datetime import datetime, timezone
from typing import assert_never from typing import assert_never
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
@@ -14,10 +12,7 @@ from reflector.dailyco_api import (
RecordingReadyEvent, RecordingReadyEvent,
RecordingStartedEvent, RecordingStartedEvent,
) )
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.logger import logger as _logger from reflector.logger import logger as _logger
from reflector.settings import settings from reflector.settings import settings
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
@@ -217,73 +212,10 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
track_keys = [t.s3Key for t in tracks if t.type == "audio"] track_keys = [t.s3Key for t in tracks if t.type == "audio"]
# Lookup request
match = await daily_recording_requests_controller.find_by_recording_id(
recording_id
)
if not match:
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
meeting_id, _ = match
# Verify meeting exists
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.error(
"Meeting not found (webhook)",
recording_id=recording_id,
meeting_id=meeting_id,
)
await create_and_log_orphan(
recording_id=recording_id,
bucket_name=bucket_name,
room_name=room_name,
start_ts=event.payload.start_ts,
track_keys=track_keys,
source="webhook",
)
return
# Create recording atomically
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=(
os.path.dirname(track_keys[0]) if track_keys else room_name
),
recorded_at=datetime.fromtimestamp(
event.payload.start_ts, tz=timezone.utc
),
track_keys=track_keys,
meeting_id=meeting_id,
status="pending",
)
)
if not created:
# Already created (polling got it first)
logger.debug(
"Recording already exists (webhook late)",
recording_id=recording_id,
meeting_id=meeting_id,
)
return
logger.info( logger.info(
"Raw-tracks recording queuing processing (webhook)", "Raw-tracks recording queuing processing",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting_id,
num_tracks=len(track_keys), num_tracks=len(track_keys),
) )

View File

@@ -1,5 +1,4 @@
import json import json
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Annotated, Any, Optional from typing import Annotated, Any, Optional
from uuid import UUID from uuid import UUID
@@ -10,21 +9,16 @@ from pydantic import BaseModel
import reflector.auth as auth import reflector.auth as auth
from reflector.dailyco_api import RecordingType from reflector.dailyco_api import RecordingType
from reflector.dailyco_api.client import DailyApiError from reflector.dailyco_api.client import DailyApiError
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import ( from reflector.db.meetings import (
MeetingConsent, MeetingConsent,
meeting_consent_controller, meeting_consent_controller,
meetings_controller, meetings_controller,
) )
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@@ -108,6 +102,13 @@ async def start_recording(
if not meeting: if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found") raise HTTPException(status_code=404, detail="Meeting not found")
log = logger.bind(
meeting_id=meeting_id,
room_name=meeting.room_name,
recording_type=body.type,
instance_id=body.instanceId,
)
try: try:
client = create_platform_client("daily") client = create_platform_client("daily")
result = await client.start_recording( result = await client.start_recording(
@@ -116,30 +117,9 @@ async def start_recording(
instance_id=body.instanceId, instance_id=body.instanceId,
) )
recording_id = result["id"] log.info(f"Started {body.type} recording via REST API")
await daily_recording_requests_controller.create( return {"status": "ok", "result": result}
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=body.instanceId,
type=body.type,
requested_at=datetime.now(timezone.utc),
)
)
logger.info(
f"Started {body.type} recording via REST API",
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
"recording_id": recording_id,
},
)
return {"status": "ok", "recording_id": recording_id}
except DailyApiError as e: except DailyApiError as e:
# Parse Daily.co error response to detect "has an active stream" # Parse Daily.co error response to detect "has an active stream"
@@ -150,42 +130,22 @@ async def start_recording(
# "has an active stream" means recording already started by another participant # "has an active stream" means recording already started by another participant
# This is SUCCESS from business logic perspective - return 200 # This is SUCCESS from business logic perspective - return 200
if "has an active stream" in error_info: if "has an active stream" in error_info:
logger.info( log.info(
f"{body.type} recording already active (started by another participant)", f"{body.type} recording already active (started by another participant)"
extra={
"meeting_id": meeting_id,
"room_name": meeting.room_name,
"recording_type": body.type,
"instance_id": body.instanceId,
},
) )
return {"status": "already_active", "instanceId": str(body.instanceId)} return {"status": "already_active", "instanceId": str(body.instanceId)}
except (json.JSONDecodeError, KeyError): except (json.JSONDecodeError, KeyError):
pass # Fall through to error handling pass # Fall through to error handling
# All other Daily.co API errors # All other Daily.co API errors
logger.error( log.error(f"Failed to start {body.type} recording", error=str(e))
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )
except Exception as e: except Exception as e:
# Non-Daily.co errors # Non-Daily.co errors
logger.error( log.error(f"Failed to start {body.type} recording", error=str(e))
f"Failed to start {body.type} recording",
extra={
"meeting_id": meeting_id,
"recording_type": body.type,
"error": str(e),
},
)
raise HTTPException( raise HTTPException(
status_code=500, detail=f"Failed to start recording: {str(e)}" status_code=500, detail=f"Failed to start recording: {str(e)}"
) )

View File

@@ -1,4 +1,3 @@
import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from typing import Annotated, Any, Literal, Optional from typing import Annotated, Any, Literal, Optional
@@ -14,16 +13,17 @@ from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock from reflector.logger import logger
from reflector.presence.pending_joins import create_pending_join, delete_pending_join
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.schemas.platform import Platform from reflector.schemas.platform import Platform
from reflector.services.ics_sync import ics_sync_service from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.url import add_query_param from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
from reflector.worker.webhook import test_webhook from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
class Room(BaseModel): class Room(BaseModel):
id: str id: str
@@ -597,3 +597,112 @@ async def rooms_join_meeting(
meeting.room_url = add_query_param(meeting.room_url, "t", token) meeting.room_url = add_query_param(meeting.room_url, "t", token)
return meeting return meeting
class JoiningRequest(BaseModel):
"""Request body for /joining and /joined endpoints."""
connection_id: NonEmptyString
"""Unique identifier for this connection. Should be a UUID generated by the client.
Must be the same value for both /joining and /joined calls from the same tab."""
class JoiningResponse(BaseModel):
status: Literal["ok"]
def _get_pending_join_key(
user: Optional[auth.UserInfo], connection_id: NonEmptyString
) -> str:
"""Get a unique key for pending join tracking.
Uses user ID for authenticated users, connection_id for anonymous users.
This ensures each browser tab has its own unique pending join record.
"""
if user:
return f"{user['sub']}:{connection_id}"
return f"anon:{connection_id}"
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joining", response_model=JoiningResponse
)
async def meeting_joining(
room_name: str,
meeting_id: str,
body: JoiningRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoiningResponse:
"""Signal intent to join meeting. Called before WebRTC handshake starts.
This creates a pending join record that prevents the meeting from being
deactivated while the WebRTC handshake is in progress. The record expires
automatically after 30 seconds if the connection is not established.
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if not meeting.is_active:
raise HTTPException(status_code=400, detail="Meeting is not active")
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await create_pending_join(redis, meeting_id, join_key)
log.debug("Created pending join intent", join_key=join_key)
finally:
await redis.aclose()
return JoiningResponse(status="ok")
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joined", response_model=JoiningResponse
)
async def meeting_joined(
room_name: str,
meeting_id: str,
body: JoiningRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoiningResponse:
"""Signal that WebRTC connection is established.
This clears the pending join record, confirming the user has successfully
connected to the meeting. Safe to call even if meeting was deactivated
during the handshake (idempotent cleanup).
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Note: We don't check is_active here - the /joined call is a cleanup operation
# and should succeed even if the meeting was deactivated during the handshake
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await delete_pending_join(redis, meeting_id, join_key)
log.debug("Cleared pending join intent", join_key=join_key)
finally:
await redis.aclose()
return JoiningResponse(status="ok")

View File

@@ -1,5 +1,6 @@
import json import json
import os import os
import re
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Literal from typing import List, Literal
from urllib.parse import unquote from urllib.parse import unquote
@@ -12,12 +13,10 @@ from celery.utils.log import get_task_logger
from pydantic import ValidationError from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.dailyco_api.recording_orphans import create_and_log_orphan
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
DailyParticipantSession, DailyParticipantSession,
daily_participant_sessions_controller, daily_participant_sessions_controller,
) )
from reflector.db.daily_recording_requests import daily_recording_requests_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
@@ -32,9 +31,10 @@ from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process, task_pipeline_multitrack_process,
) )
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.presence.pending_joins import has_pending_joins
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.settings import settings from reflector.settings import settings
from reflector.storage import get_transcripts_storage from reflector.storage import get_transcripts_storage
from reflector.utils.daily import ( from reflector.utils.daily import (
@@ -231,44 +231,79 @@ async def _process_multitrack_recording_inner(
recording_start_ts: int, recording_start_ts: int,
): ):
""" """
Process multitrack recording. Process multitrack recording (first time or reprocessing).
Recording must already exist with meeting_id set (created by webhook/polling before queueing). For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
""" """
# Get recording (must exist - created by webhook/polling) tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id) recording = await recordings_controller.get_by_id(recording_id)
if not recording: if recording and recording.meeting_id:
logger.error( # Reprocessing: recording exists with meeting already linked
"Recording not found - should have been created by webhook/polling", meeting = await meetings_controller.get_by_id(recording.meeting_id)
recording_id=recording_id, if not meeting:
) logger.error(
return "Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
if not recording.meeting_id: logger.info(
logger.error( "Reprocessing: using existing recording.meeting_id",
"Recording has no meeting_id - orphan should not be queued",
recording_id=recording_id, recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
) )
return else:
# First processing: recording doesn't exist, need time-based matching
# Get meeting # (Daily.co doesn't return instanceId in API, must match by timestamp)
meeting = await meetings_controller.get_by_id(recording.meeting_id) recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
if not meeting: meeting = await meetings_controller.get_by_room_name_and_time(
logger.error( room_name=daily_room_name,
"Meeting not found for recording", recording_start=recording_start,
meeting_id=recording.meeting_id, time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id, recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
) )
return
logger.info(
"Processing multitrack recording",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
room_name_base = extract_base_room_name(daily_room_name) room_name_base = extract_base_room_name(daily_room_name)
@@ -276,6 +311,33 @@ async def _process_multitrack_recording_inner(
if not room: if not room:
raise Exception(f"Room not found: {room_name_base}") raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id) transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript: if not transcript:
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
@@ -461,7 +523,7 @@ async def store_cloud_recording(
Store cloud recording reference in meeting table. Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths. Common function for both webhook and polling code paths.
Uses direct recording_id lookup via daily_recording_requests table. Uses time-based matching to handle duplicate room_name values.
Args: Args:
recording_id: Daily.co recording ID recording_id: Daily.co recording ID
@@ -474,170 +536,155 @@ async def store_cloud_recording(
Returns: Returns:
True if stored, False if skipped/failed True if stored, False if skipped/failed
""" """
# Lookup request recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
if not match: meeting = await meetings_controller.get_by_room_name_and_time(
# ORPHAN: No request found (pre-migration recording or failed request creation) room_name=room_name,
await create_and_log_orphan( recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id, recording_id=recording_id,
bucket_name="",
room_name=room_name, room_name=room_name,
start_ts=start_ts, recording_start_ts=start_ts,
track_keys=None, recording_start=recording_start.isoformat(),
source=source,
) )
return False return False
meeting_id, _ = match
success = await meetings_controller.set_cloud_recording_if_missing( success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id, meeting_id=meeting.id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
) )
if not success: if not success:
logger.debug( logger.debug(
f"Cloud recording ({source}): already set (stop/restart?)", f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id, recording_id=recording_id,
room_name=room_name, room_name=room_name,
meeting_id=meeting_id, meeting_id=meeting.id,
) )
return False return False
logger.info( logger.info(
f"Cloud recording stored via {source}", f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting_id, meeting_id=meeting.id,
recording_id=recording_id, recording_id=recording_id,
s3_key=s3_key, s3_key=s3_key,
duration=duration, duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
) )
return True return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""Process cloud recordings (database deduplication, worker-agnostic). """
Store cloud recordings missing from meeting table via polling.
Cloud recordings stored in meeting.daily_composed_video_s3_key, not recording table. Uses time-based matching via store_cloud_recording().
Only first cloud recording per meeting is kept (existing behavior).
""" """
if not cloud_recordings: if not cloud_recordings:
return return
for rec in cloud_recordings: stored_count = 0
# Lookup request for recording in cloud_recordings:
match = await daily_recording_requests_controller.find_by_recording_id(rec.id) # Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not match: if not s3_key:
await create_and_log_orphan(
recording_id=rec.id,
bucket_name="",
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=None,
source="polling",
)
continue
meeting_id, _ = match
if not rec.s3key:
logger.error("Cloud recording missing s3_key", recording_id=rec.id)
continue
# Store in meeting table (atomic, only if not already set)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key=rec.s3key,
duration=rec.duration,
)
if success:
logger.info(
"Stored cloud recording", recording_id=rec.id, meeting_id=meeting_id
)
else:
logger.warning( logger.warning(
"Cloud recording already exists for meeting (stop/restart?)", "Cloud recording: missing S3 key",
recording_id=rec.id, recording_id=recording.id,
meeting_id=meeting_id, room_name=recording.room_name,
) )
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings( async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse], raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: NonEmptyString, bucket_name: str,
) -> None: ):
"""Process raw-tracks (database deduplication, worker-agnostic).""" """Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings: if not raw_tracks_recordings:
return return
for rec in raw_tracks_recordings: recording_ids = [rec.id for rec in raw_tracks_recordings]
# Lookup request FIRST (before any DB writes) existing_recordings = await recordings_controller.get_by_ids(recording_ids)
match = await daily_recording_requests_controller.find_by_recording_id(rec.id) existing_ids = {rec.id for rec in existing_recordings}
if not match: missing_recordings = [
await create_and_log_orphan( rec for rec in raw_tracks_recordings if rec.id not in existing_ids
recording_id=rec.id, ]
bucket_name=bucket_name,
room_name=rec.room_name, if not missing_recordings:
start_ts=rec.start_ts, logger.debug(
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"], "All raw-tracks recordings already in DB",
source="polling", api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
) )
continue continue
meeting_id, _ = match track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
# Verify meeting exists if not track_keys:
meeting = await meetings_controller.get_by_id(meeting_id) logger.warning(
if not meeting: "No audio tracks found in raw-tracks recording",
logger.error( recording_id=recording.id,
"Meeting not found", recording_id=rec.id, meeting_id=meeting_id room_name=recording.room_name,
) total_tracks=len(recording.tracks),
await create_and_log_orphan(
recording_id=rec.id,
bucket_name=bucket_name,
room_name=rec.room_name,
start_ts=rec.start_ts,
track_keys=[t.s3Key for t in rec.tracks if t.type == "audio"],
source="polling",
) )
continue continue
# DEDUPLICATION: Atomically create recording (single operation, no race window) logger.info(
# ON CONFLICT → concurrent poller already got it, skip entire logic "Queueing missing raw-tracks recording for processing",
track_keys = [t.s3Key for t in rec.tracks if t.type == "audio"] recording_id=recording.id,
room_name=recording.room_name,
created = await recordings_controller.try_create_with_meeting( track_count=len(track_keys),
Recording(
id=rec.id,
bucket_name=bucket_name,
object_key=os.path.dirname(track_keys[0]) if track_keys else "",
recorded_at=datetime.fromtimestamp(rec.start_ts, tz=timezone.utc),
track_keys=track_keys,
meeting_id=meeting_id, # Set at creation (constraint-safe)
status="pending",
)
) )
if not created:
# Conflict: another poller already created/queued this
# Skip all remaining logic (match already done by winner)
continue
# Only winner reaches here - queue processing (works with Celery or Hatchet)
process_multitrack_recording.delay( process_multitrack_recording.delay(
recording_id=rec.id,
daily_room_name=rec.room_name,
recording_start_ts=rec.start_ts,
bucket_name=bucket_name, bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys, track_keys=track_keys,
recording_start_ts=recording.start_ts,
) )
logger.info("Queued recording", recording_id=rec.id, meeting_id=meeting_id)
async def poll_daily_room_presence(meeting_id: str) -> None: async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed. """Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
@@ -823,6 +870,18 @@ async def process_meetings():
logger_.debug("Meeting not yet started, keep it") logger_.debug("Meeting not yet started, keep it")
if should_deactivate: if should_deactivate:
# Check for pending joins before deactivating
# Users might be in the process of connecting via WebRTC
redis = await get_async_redis_client()
try:
if await has_pending_joins(redis, meeting.id):
logger_.info(
"Meeting has pending joins, skipping deactivation"
)
continue
finally:
await redis.aclose()
await meetings_controller.update_meeting( await meetings_controller.update_meeting(
meeting.id, is_active=False meeting.id, is_active=False
) )

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio import asyncio
import json import json
import threading
import redis.asyncio as redis import redis.asyncio as redis
from fastapi import WebSocket from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber): async def _pubsub_data_reader(self, pubsub_subscriber):
while True: while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message( message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True ignore_subscribe_messages=True
) )
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data) await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager: def get_ws_manager() -> WebsocketManager:
""" """
Returns the WebsocketManager instance for managing websockets. Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance, Creates instance on first call, subsequent calls return cached instance.
which is responsible for managing websockets and handling websocket Thread-safe via GIL. Concurrent initialization may create duplicate
connections. instances but last write wins (acceptable for this use case).
Returns: Returns:
WebsocketManager: The initialized WebsocketManager instance. WebsocketManager: The global WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
""" """
local = threading.local() global _ws_manager
if hasattr(local, "ws_manager"):
return local.ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager( pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST, host=settings.REDIS_HOST,
port=settings.REDIS_PORT, port=settings.REDIS_PORT,
) )
ws_manager = WebsocketManager(pubsub_client=pubsub_client) _ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager return _ws_manager
return ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python3
"""Test script to fetch Daily.co recordings for a specific room and show raw API response."""
import asyncio
import json
from reflector.video_platforms.factory import create_platform_client
async def main():
room_name = "daily-private-igor-20260110042117"
print(f"\n=== Fetching recordings for room: {room_name} ===\n")
async with create_platform_client("daily") as client:
recordings = await client.list_recordings(room_name=room_name)
print(f"Found {len(recordings)} recording objects from Daily.co API\n")
for i, rec in enumerate(recordings, 1):
print(f"--- Recording #{i} ---")
print(f"ID: {rec.id}")
print(f"Room: {rec.room_name}")
print(f"Start TS: {rec.start_ts}")
print(f"Status: {rec.status}")
print(f"Duration: {rec.duration}")
print(f"Type: {rec.type}")
print(f"Tracks count: {len(rec.tracks)}")
if rec.tracks:
print(f"Tracks:")
for j, track in enumerate(rec.tracks, 1):
print(f" Track {j}: {track.s3Key}")
print(f"\nRaw JSON:\n{json.dumps(rec.model_dump(), indent=2, default=str)}\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,6 +1,5 @@
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
@@ -333,11 +332,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def celery_config(): def celery_config():
with NamedTemporaryFile() as f: redis_host = os.environ.get("REDIS_HOST", "localhost")
yield { redis_port = os.environ.get("REDIS_PORT", "6379")
"broker_url": "memory://", # Use db 2 to avoid conflicts with main app
"result_backend": f"db+sqlite:///{f.name}", redis_url = f"redis://{redis_host}:{redis_port}/2"
} yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue): def __init__(self, queue: asyncio.Queue):
self.queue = queue self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True): async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try: try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05) return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception: except Exception:
return None return None

View File

@@ -1,258 +0,0 @@
from datetime import datetime, timezone
from uuid import UUID
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_create_request():
"""Test creating a recording request."""
# Create meeting first
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-123",
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
request = DailyRecordingRequest(
recording_id="rec-1",
meeting_id="meeting-123",
instance_id=UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"),
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
result = await daily_recording_requests_controller.find_by_recording_id("rec-1")
assert result is not None
assert result[0] == "meeting-123"
assert result[1] == "cloud"
@pytest.mark.asyncio
async def test_multiple_recordings_same_meeting():
"""Test stop/restart creates multiple request rows."""
# Create room and meeting
room = Room(
id="test-room-2", name="Test Room 2", slug="test-room-2", user_id="test-user"
)
await rooms_controller.create(room)
meeting_id = "meeting-456"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-1",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-2", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-1", "rec-2"}
@pytest.mark.asyncio
async def test_deduplication_via_database():
"""Test concurrent pollers use database for deduplication."""
# Create room and meeting
room = Room(
id="test-room-3", name="Test Room 3", slug="test-room-3", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-789",
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
recording_id = "rec-123"
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-789",
status="pending",
track_keys=["track1.webm", "track2.webm"],
)
)
assert created2 is False # Conflict, skip
@pytest.mark.asyncio
async def test_orphan_logged_once():
"""Test orphan marked once, skipped on re-poll."""
# First poll
created1 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created1 is True
# Second poll (same orphan discovered again)
created2 = await recordings_controller.create_orphan(
Recording(
id="orphan-123",
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created2 is False # Already exists
# Verify it exists
existing = await recordings_controller.get_by_id("orphan-123")
assert existing is not None
assert existing.status == "orphan"
@pytest.mark.asyncio
async def test_orphan_constraints():
"""Test orphan invariants are enforced."""
# Can't create orphan with meeting_id
with pytest.raises(AssertionError, match="meeting_id must be NULL"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-123", # Should be None
status="orphan",
track_keys=None,
)
)
# Can't create orphan with wrong status
with pytest.raises(AssertionError, match="status must be 'orphan'"):
await recordings_controller.create_orphan(
Recording(
id="bad-orphan-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="pending", # Should be "orphan"
track_keys=None,
)
)
@pytest.mark.asyncio
async def test_try_create_with_meeting_constraints():
"""Test try_create_with_meeting enforces constraints."""
# Create room and meeting
room = Room(
id="test-room-4", name="Test Room 4", slug="test-room-4", user_id="test-user"
)
await rooms_controller.create(room)
meeting = Meeting(
id="meeting-999",
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Can't create with orphan status
with pytest.raises(AssertionError, match="use create_orphan"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-1",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id="meeting-999",
status="orphan", # Should not be orphan
track_keys=None,
)
)
# Can't create without meeting_id
with pytest.raises(AssertionError, match="meeting_id required"):
await recordings_controller.try_create_with_meeting(
Recording(
id="bad-rec-2",
bucket_name="test",
object_key="test",
recorded_at=datetime.now(timezone.utc),
meeting_id=None, # Should have meeting_id
status="pending",
track_keys=None,
)
)

View File

@@ -0,0 +1,362 @@
"""Integration tests for /joining and /joined endpoints.
Tests for the join intent tracking to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
from reflector.presence.pending_joins import PENDING_JOIN_PREFIX
TEST_CONNECTION_ID = "test-connection-uuid-12345"
@pytest.fixture
def mock_room():
"""Mock room object."""
from reflector.db.rooms import Room
return Room(
id="room-123",
name="test-room",
user_id="owner-user",
created_at=datetime.now(timezone.utc),
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=True,
platform="daily",
skip_consent=False,
)
@pytest.fixture
def mock_meeting():
"""Mock meeting object."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-456",
room_id="room-123",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now,
end_date=now + timedelta(hours=1),
)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_creates_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining endpoint creates pending join in Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with correct key pattern
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joined_endpoint_deletes_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joined endpoint deletes pending join from Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.delete = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joined",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis delete was called with correct key pattern
mock_redis.delete.assert_called_once()
call_args = mock_redis.delete.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
async def test_joining_endpoint_room_not_found(
mock_get_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when room not found."""
mock_get_room.return_value = None
response = await client.post(
"/rooms/nonexistent-room/meetings/meeting-123/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Room not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_found(
mock_get_meeting,
mock_get_room,
mock_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when meeting not found."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = None
response = await client.post(
f"/rooms/{mock_room.name}/meetings/nonexistent-meeting/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Meeting not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_active(
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining returns 400 when meeting is not active."""
mock_get_room.return_value = mock_room
inactive_meeting = mock_meeting.model_copy(update={"is_active": False})
mock_get_meeting.return_value = inactive_meeting
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 400
assert response.json()["detail"] == "Meeting is not active"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_anonymous_user(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that /joining works for anonymous users with unique connection_id."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with "anon:" prefix and connection_id
call_args = mock_redis.setex.call_args[0]
assert ":anon:" in call_args[0]
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_success(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed after successful operation."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_error(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed even when operation fails."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock(side_effect=Exception("Redis error"))
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
with pytest.raises(Exception):
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
async def test_joining_endpoint_requires_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is missing."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={},
)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_joining_endpoint_rejects_empty_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is empty string."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={"connection_id": ""},
)
assert response.status_code == 422 # Validation error (NonEmptyString)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_different_connection_ids_create_different_keys(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that different connection_ids create different Redis keys."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# First connection
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-1"},
)
key1 = mock_redis.setex.call_args[0][0]
mock_redis.setex.reset_mock()
# Second connection (different tab)
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-2"},
)
key2 = mock_redis.setex.call_args[0][0]
# Keys should be different
assert key1 != key2
assert "connection-1" in key1
assert "connection-2" in key2

View File

@@ -0,0 +1,153 @@
"""Tests for pending joins Redis helper functions.
TDD tests for tracking join intent to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from unittest.mock import AsyncMock
import pytest
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
@pytest.fixture
def mock_redis():
"""Mock async Redis client."""
redis = AsyncMock()
redis.setex = AsyncMock()
redis.delete = AsyncMock()
redis.scan = AsyncMock(return_value=(0, []))
return redis
@pytest.mark.asyncio
async def test_create_pending_join_sets_key_with_ttl(mock_redis):
"""Test that create_pending_join stores key with correct TTL."""
meeting_id = "meeting-123"
user_id = "user-456"
await create_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args
assert call_args[0][0] == expected_key
assert call_args[0][1] == PENDING_JOIN_TTL
# Value should be a timestamp string
assert call_args[0][2] is not None
@pytest.mark.asyncio
async def test_delete_pending_join_removes_key(mock_redis):
"""Test that delete_pending_join removes the key."""
meeting_id = "meeting-123"
user_id = "user-456"
await delete_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.delete.assert_called_once_with(expected_key)
@pytest.mark.asyncio
async def test_has_pending_joins_returns_false_when_no_keys(mock_redis):
"""Test has_pending_joins returns False when no matching keys."""
mock_redis.scan.return_value = (0, [])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == f"{PENDING_JOIN_PREFIX}:meeting-123:*"
@pytest.mark.asyncio
async def test_has_pending_joins_returns_true_when_keys_exist(mock_redis):
"""Test has_pending_joins returns True when matching keys found."""
mock_redis.scan.return_value = (0, [b"pending_join:meeting-123:user-1"])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
@pytest.mark.asyncio
async def test_has_pending_joins_scans_with_correct_pattern(mock_redis):
"""Test has_pending_joins uses correct scan pattern."""
meeting_id = "meeting-abc-def"
mock_redis.scan.return_value = (0, [])
await has_pending_joins(mock_redis, meeting_id)
expected_pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == expected_pattern
assert call_kwargs["count"] == 100
@pytest.mark.asyncio
async def test_multiple_users_pending_joins(mock_redis):
"""Test that multiple users can have pending joins for same meeting."""
meeting_id = "meeting-123"
# Simulate two pending joins
mock_redis.scan.return_value = (
0,
[b"pending_join:meeting-123:user-1", b"pending_join:meeting-123:user-2"],
)
result = await has_pending_joins(mock_redis, meeting_id)
assert result is True
@pytest.mark.asyncio
async def test_pending_join_ttl_value():
"""Test that PENDING_JOIN_TTL has expected value."""
# 30 seconds should be enough for WebRTC handshake but not too long
assert PENDING_JOIN_TTL == 30
@pytest.mark.asyncio
async def test_pending_join_prefix_value():
"""Test that PENDING_JOIN_PREFIX has expected value."""
assert PENDING_JOIN_PREFIX == "pending_join"
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_scan_no_keys(mock_redis):
"""Test has_pending_joins iterates until cursor returns 0."""
# Simulate multi-iteration scan: cursor 100 -> cursor 50 -> cursor 0
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys, continue
(50, []), # Second iteration, no keys, continue
(0, []), # Third iteration, cursor 0, done
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
assert mock_redis.scan.call_count == 3
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_finds_key_later(mock_redis):
"""Test has_pending_joins finds key on second iteration."""
# Simulate finding key on second scan iteration
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys
(0, [b"pending_join:meeting-123:user-1"]), # Second iteration, found key
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
assert mock_redis.scan.call_count == 2

View File

@@ -0,0 +1,241 @@
"""Tests for process_meetings pending joins check.
Tests that process_meetings correctly skips deactivation when
pending joins exist for a meeting.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
def _get_process_meetings_fn():
"""Get the underlying async function without Celery/asynctask decorators."""
from reflector.worker import process
fn = process.process_meetings
# Get through both decorator layers (@shared_task and @asynctask)
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn
@pytest.fixture
def mock_active_meeting():
"""Mock an active meeting that should be considered for deactivation."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-123",
room_id="room-456",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now - timedelta(hours=1),
end_date=now - timedelta(minutes=30), # Already ended
)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_skips_deactivation_with_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings skips deactivation when pending joins exist."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions (triggers deactivation)
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc) # Session ended
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify has_pending_joins was called
mock_has_pending_joins.assert_called_once_with(mock_redis, mock_active_meeting.id)
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_deactivates_without_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings deactivates when no pending joins."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock no pending joins
mock_has_pending_joins.return_value = False
await process_meetings()
# Verify meeting was deactivated
mock_update_meeting.assert_called_once_with(mock_active_meeting.id, is_active=False)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
async def test_process_meetings_no_check_when_active_sessions(
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that pending joins check is skipped when there are active sessions."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - has active session
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = None # Still active
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
with (
patch("reflector.worker.process.get_async_redis_client") as mock_get_redis,
patch("reflector.worker.process.has_pending_joins") as mock_has_pending_joins,
patch(
"reflector.worker.process.meetings_controller.update_meeting"
) as mock_update_meeting,
):
await process_meetings()
# Verify pending joins check was NOT called (no need - active sessions exist)
mock_has_pending_joins.assert_not_called()
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_closes_redis_even_on_continue(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that Redis connection is always closed, even when skipping deactivation."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist (will trigger continue)
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()

View File

@@ -1,300 +0,0 @@
"""
Integration tests for recording request flow.
These tests verify the end-to-end flow of:
1. Starting a recording (creates request)
2. Webhook/polling discovering recording (matches via request)
3. Recording processing (uses existing meeting_id)
"""
from datetime import datetime, timezone
from uuid import UUID, uuid4
import pytest
from reflector.db.daily_recording_requests import (
DailyRecordingRequest,
daily_recording_requests_controller,
)
from reflector.db.meetings import Meeting, meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import Room, rooms_controller
@pytest.mark.asyncio
async def test_recording_request_flow_cloud(client):
"""Test full cloud recording flow: start -> webhook -> match"""
# Create room and meeting
room = Room(id="test-room", name="Test Room", slug="test-room", user_id="test-user")
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
# Simulate recording start (what endpoint does)
recording_id = "rec-cloud-123"
instance_id = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Verify request exists
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
assert match[0] == meeting_id
assert match[1] == "cloud"
# Simulate webhook/polling storing cloud recording
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting_id,
s3_key="s3://bucket/recording.mp4",
duration=120,
)
assert success is True
# Verify meeting updated
updated_meeting = await meetings_controller.get_by_id(meeting_id)
assert updated_meeting.daily_composed_video_s3_key == "s3://bucket/recording.mp4"
assert updated_meeting.daily_composed_video_duration == 120
@pytest.mark.asyncio
async def test_recording_request_flow_raw_tracks(client):
"""Test full raw-tracks recording flow: start -> webhook/polling -> process"""
# Create room and meeting
room = Room(
id="test-room-2",
name="Test Room 2",
slug="test-room-2",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-2",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Simulate recording start
recording_id = "rec-raw-456"
instance_id = UUID("b1c2d3e4-f5a6-7890-abcd-ef1234567890")
request = DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=instance_id,
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
await daily_recording_requests_controller.create(request)
# Simulate webhook/polling discovering recording
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is not None
found_meeting_id, recording_type = match
assert found_meeting_id == meeting_id
assert recording_type == "raw-tracks"
# Create recording (what webhook/polling does)
created = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="recordings/20260120/",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
meeting_id=meeting_id,
status="pending",
)
)
assert created is True
# Verify recording exists with meeting_id
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id
assert recording.status == "pending"
assert len(recording.track_keys) == 2
@pytest.mark.asyncio
async def test_stop_restart_creates_multiple_requests(client):
"""Test stop/restart creates multiple request rows with same instance_id"""
# Create room and meeting
room = Room(
id="test-room-3",
name="Test Room 3",
slug="test-room-3",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-3",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="cloud",
)
await meetings_controller.create(meeting)
instance_id = UUID("c1d2e3f4-a5b6-7890-abcd-ef1234567890")
# First recording
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-first",
meeting_id=meeting_id,
instance_id=instance_id,
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Stop, then restart (new recording_id, same instance_id)
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id="rec-second", # DIFFERENT
meeting_id=meeting_id,
instance_id=instance_id, # SAME
type="cloud",
requested_at=datetime.now(timezone.utc),
)
)
# Both exist
requests = await daily_recording_requests_controller.get_by_meeting_id(meeting_id)
assert len(requests) == 2
assert {r.recording_id for r in requests} == {"rec-first", "rec-second"}
assert all(r.instance_id == instance_id for r in requests)
@pytest.mark.asyncio
async def test_orphan_recording_no_request(client):
"""Test orphan recording (no request found)"""
# Simulate polling discovering recording with no request
recording_id = "rec-orphan"
match = await daily_recording_requests_controller.find_by_recording_id(recording_id)
assert match is None # No request
# Mark as orphan
created = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created is True
# Verify orphan exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.status == "orphan"
assert recording.meeting_id is None
# Second poll - already exists
created_again = await recordings_controller.create_orphan(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="orphan-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=None,
status="orphan",
track_keys=None,
)
)
assert created_again is False # Already exists
@pytest.mark.asyncio
async def test_concurrent_polling_deduplication(client):
"""Test concurrent pollers only queue once"""
# Create room and meeting
room = Room(
id="test-room-4",
name="Test Room 4",
slug="test-room-4",
user_id="test-user",
)
await rooms_controller.create(room)
meeting_id = f"meeting-{uuid4()}"
meeting = Meeting(
id=meeting_id,
room_name="test-room-4",
start_date=datetime.now(timezone.utc),
end_date=None,
recording_type="raw-tracks",
)
await meetings_controller.create(meeting)
# Create request
recording_id = "rec-concurrent"
await daily_recording_requests_controller.create(
DailyRecordingRequest(
recording_id=recording_id,
meeting_id=meeting_id,
instance_id=UUID("d1e2f3a4-b5c6-7890-abcd-ef1234567890"),
type="raw-tracks",
requested_at=datetime.now(timezone.utc),
)
)
# Poller 1
created1 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created1 is True # First wins
# Poller 2 (concurrent)
created2 = await recordings_controller.try_create_with_meeting(
Recording(
id=recording_id,
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
meeting_id=meeting_id,
status="pending",
track_keys=["track1.webm"],
)
)
assert created2 is False # Conflict, skip
# Only one recording exists
recording = await recordings_controller.get_by_id(recording_id)
assert recording is not None
assert recording.meeting_id == meeting_id

View File

@@ -0,0 +1,374 @@
"""
Integration tests for time-based meeting-to-recording matching.
Tests the critical path for matching Daily.co recordings to meetings when
API doesn't return instanceId.
"""
from datetime import datetime, timedelta, timezone
import pytest
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
@pytest.fixture
async def test_room():
"""Create a test room for meetings."""
room = await rooms_controller.add(
name="test-room-time",
user_id="test-user-id",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=False,
platform="daily",
)
return room
@pytest.fixture
def base_time():
"""Fixed timestamp for deterministic tests."""
return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc)
class TestTimeBasedMatching:
"""Test get_by_room_name_and_time() matching logic."""
async def test_exact_time_match(self, test_room, base_time):
"""Recording timestamp exactly matches meeting start_date."""
meeting = await meetings_controller.create(
id="meeting-exact",
room_name="daily-test-20260114090000",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090000",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_recording_slightly_after_meeting_start(self, test_room, base_time):
"""Recording started 1 minute after meeting (participants joined late)."""
meeting = await meetings_controller.create(
id="meeting-late",
room_name="daily-test-20260114090100",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time + timedelta(minutes=1)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-20260114090100",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting.id
async def test_duplicate_room_names_picks_closest(self, test_room, base_time):
"""
Two meetings with same room_name (duplicate/race condition).
Should pick closest by timestamp.
"""
meeting1 = await meetings_controller.create(
id="meeting-1-first",
room_name="daily-duplicate-room",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting2 = await meetings_controller.create(
id="meeting-2-second",
room_name="daily-duplicate-room", # Same room_name!
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(seconds=0.99), # 0.99s later
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording started 0.5s after meeting1
# Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer
recording_start = base_time + timedelta(seconds=0.5)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-duplicate-room",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s)
async def test_outside_time_window_returns_none(self, test_room, base_time):
"""Recording outside 1-week window returns None."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-old",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Recording 8 days later (outside 7-day window)
recording_start = base_time + timedelta(days=8)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-old",
recording_start=recording_start,
time_window_hours=168,
)
assert result is None
async def test_tie_breaker_deterministic(self, test_room, base_time):
"""When time delta identical, tie-breaker by meeting.id is deterministic."""
meeting_z = await meetings_controller.create(
id="zzz-last-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
meeting_a = await meetings_controller.create(
id="aaa-first-uuid",
room_name="daily-test-tie",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time, # Exact same start_date
end_date=base_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tie",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
# Tie-breaker: lexicographically first UUID
assert result.id == "aaa-first-uuid"
async def test_timezone_naive_datetime_raises(self, test_room, base_time):
"""Timezone-naive datetime raises ValueError."""
await meetings_controller.create(
id="meeting-tz",
room_name="daily-test-tz",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
# Naive datetime (no timezone)
naive_dt = datetime(2026, 1, 14, 9, 0, 0)
with pytest.raises(ValueError, match="timezone-aware"):
await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-tz",
recording_start=naive_dt,
time_window_hours=168,
)
async def test_one_week_boundary_after_included(self, test_room, base_time):
"""Meeting 1-week AFTER recording is included (window_end boundary)."""
meeting_time = base_time + timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-after",
room_name="daily-test-boundary-after",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-after",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-after"
async def test_one_week_boundary_before_included(self, test_room, base_time):
"""Meeting 1-week BEFORE recording is included (window_start boundary)."""
meeting_time = base_time - timedelta(hours=168)
await meetings_controller.create(
id="meeting-boundary-before",
room_name="daily-test-boundary-before",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=meeting_time,
end_date=meeting_time + timedelta(hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-boundary-before",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-boundary-before"
async def test_recording_before_meeting_start(self, test_room, base_time):
"""Recording started before meeting (clock skew or early join)."""
await meetings_controller.create(
id="meeting-early",
room_name="daily-test-early",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
recording_start = base_time - timedelta(minutes=2)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-early",
recording_start=recording_start,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-early"
async def test_mixed_inside_outside_window(self, test_room, base_time):
"""Multiple meetings, only one inside window - returns the inside one."""
await meetings_controller.create(
id="meeting-old",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=10),
end_date=base_time - timedelta(days=10, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-inside",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time - timedelta(days=2),
end_date=base_time - timedelta(days=2, hours=-1),
room=test_room,
)
await meetings_controller.create(
id="meeting-future",
room_name="daily-test-mixed",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time + timedelta(days=10),
end_date=base_time + timedelta(days=10, hours=1),
room=test_room,
)
result = await meetings_controller.get_by_room_name_and_time(
room_name="daily-test-mixed",
recording_start=base_time,
time_window_hours=168,
)
assert result is not None
assert result.id == "meeting-inside"
class TestAtomicCloudRecordingUpdate:
"""Test atomic update prevents race conditions."""
async def test_first_update_succeeds(self, test_room, base_time):
"""First call to set_cloud_recording_if_missing succeeds."""
meeting = await meetings_controller.create(
id="meeting-atomic-1",
room_name="daily-test-atomic",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success is True
updated = await meetings_controller.get_by_id(meeting.id)
assert updated.daily_composed_video_s3_key == "first-s3-key"
assert updated.daily_composed_video_duration == 100
async def test_second_update_fails_atomically(self, test_room, base_time):
"""Second call to update same meeting doesn't overwrite (atomic check)."""
meeting = await meetings_controller.create(
id="meeting-atomic-2",
room_name="daily-test-atomic2",
room_url="https://example.daily.co/test",
host_room_url="https://example.daily.co/test?t=host",
start_date=base_time,
end_date=base_time + timedelta(hours=1),
room=test_room,
)
success1 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="first-s3-key",
duration=100,
)
assert success1 is True
after_first = await meetings_controller.get_by_id(meeting.id)
assert after_first.daily_composed_video_s3_key == "first-s3-key"
success2 = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key="bucket/path/should-not-overwrite",
duration=200,
)
assert success2 is False
final = await meetings_controller.get_by_id(meeting.id)
assert final.daily_composed_video_s3_key == "first-s3-key"
assert final.daily_composed_video_duration == 100

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session") # Using celery_includes from conftest.py which includes both pipelines
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance: if server_instance:
server_instance.should_exit = True server_instance.should_exit = True
server_thread.join(timeout=30) server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create # Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action # Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com", "email": "user-abc@example.com",
} }
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"}) resp = await ac.post("/transcripts", json={"name": "WS Test"})

View File

@@ -25,6 +25,8 @@ import { useConsentDialog } from "../../lib/consent";
import { import {
useRoomJoinMeeting, useRoomJoinMeeting,
useMeetingStartRecording, useMeetingStartRecording,
useMeetingJoining,
useMeetingJoined,
} from "../../lib/apiHooks"; } from "../../lib/apiHooks";
import { omit } from "remeda"; import { omit } from "remeda";
import { import {
@@ -187,8 +189,14 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const [container, setContainer] = useState<HTMLDivElement | null>(null); const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting(); const joinMutation = useRoomJoinMeeting();
const startRecordingMutation = useMeetingStartRecording(); const startRecordingMutation = useMeetingStartRecording();
const joiningMutation = useMeetingJoining();
const joinedMutation = useMeetingJoined();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null); const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
// Generate a stable connection ID for this component instance
// Used to track pending joins per browser tab (prevents key collision for anonymous users)
const connectionId = useMemo(() => crypto.randomUUID(), []);
// Generate deterministic instanceIds so all participants use SAME IDs // Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id); const cloudInstanceId = parseNonEmptyString(meeting.id);
const rawTracksInstanceId = parseNonEmptyString( const rawTracksInstanceId = parseNonEmptyString(
@@ -249,6 +257,28 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
); );
const handleFrameJoinMeeting = useCallback(() => { const handleFrameJoinMeeting = useCallback(() => {
// Signal that WebRTC connection is established
// This clears the pending join intent, confirming successful connection
joinedMutation.mutate(
{
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
},
},
{
onError: (error: unknown) => {
// Non-blocking: log but don't fail - this is cleanup, not critical
console.warn("Failed to signal joined:", error);
},
},
);
if (meeting.recording_type === "cloud") { if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", { console.log("Starting dual recording via REST API", {
cloudInstanceId, cloudInstanceId,
@@ -310,6 +340,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
}, [ }, [
meeting.recording_type, meeting.recording_type,
meeting.id, meeting.id,
roomName,
connectionId,
joinedMutation,
startRecordingMutation, startRecordingMutation,
cloudInstanceId, cloudInstanceId,
rawTracksInstanceId, rawTracksInstanceId,
@@ -328,8 +361,28 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useEffect(() => { useEffect(() => {
if (!frame || !roomUrl) return; if (!frame || !roomUrl) return;
frame
.join({ const joinRoom = async () => {
// Signal intent to join before WebRTC handshake starts
// This prevents race condition where meeting is deactivated during handshake
try {
await joiningMutation.mutateAsync({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
},
});
} catch (error) {
// Non-blocking: log but continue with join
console.warn("Failed to signal joining intent:", error);
}
await frame.join({
url: roomUrl, url: roomUrl,
sendSettings: { sendSettings: {
video: { video: {
@@ -341,9 +394,13 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
}, },
// Note: screenVideo intentionally not configured to preserve full quality for screen shares // Note: screenVideo intentionally not configured to preserve full quality for screen shares
}, },
}) });
.catch(console.error.bind(console, "Failed to join daily room:")); };
}, [frame, roomUrl]);
joinRoom().catch(console.error.bind(console, "Failed to join daily room:"));
// joiningMutation excluded from deps - it's a stable hook reference
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [frame, roomUrl, roomName, meeting.id, connectionId]);
useEffect(() => { useEffect(() => {
setCustomTrayButton( setCustomTrayButton(

View File

@@ -807,6 +807,26 @@ export function useRoomJoinMeeting() {
); );
} }
// Presence race fix endpoints (not yet in OpenAPI spec)
// These signal join intent to prevent race conditions during WebRTC handshake
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function useMeetingJoining(): any {
return ($api as any).useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joining",
{},
);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function useMeetingJoined(): any {
return ($api as any).useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined",
{},
);
}
export function useRoomIcsSync() { export function useRoomIcsSync() {
const { setError } = useError(); const { setError } = useError();