Compare commits

..

3 Commits

Author SHA1 Message Date
Igor Loskutov
d5ca2345d4 wip 2026-02-06 18:10:33 -05:00
Igor Loskutov
f6594448e6 regenerate api 2026-02-06 14:02:08 -05:00
Igor Loskutov
08462338de fix: prevent presence race condition during WebRTC handshake
Add /joining, /joined, and /leave endpoints to track user join intent
and trigger presence updates.

Backend:
- Add pending_joins Redis module with 30s TTL
- Add /joining endpoint (before WebRTC handshake)
- Add /joined endpoint (after connection, triggers presence poll)
- Add /leave endpoint (on tab close, triggers presence poll)
- Check for pending joins before deactivating meetings in worker

Frontend:
- Generate unique connectionId per browser tab
- Call /joining before Daily.co join, /joined after connection
- Add beforeunload handler calling /leave via sendBeacon
2026-02-06 12:36:03 -05:00
22 changed files with 2364 additions and 350 deletions

View File

@@ -1,17 +1,5 @@
# Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)

View File

@@ -1,35 +0,0 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -57,6 +57,12 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -91,6 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False

View File

@@ -206,12 +206,6 @@ class LLM:
"""Configure llamaindex Settings with OpenAILike LLM"""
session_id = llm_session_id.get() or f"fallback-{uuid4().hex}"
extra_body: dict = {"litellm_session_id": session_id}
# Only send enable_thinking when explicitly set (not None/unset).
# Models that don't support it will ignore the param.
if self.settings_obj.LLM_ENABLE_THINKING is not None:
extra_body["enable_thinking"] = self.settings_obj.LLM_ENABLE_THINKING
Settings.llm = OpenAILike(
model=self.model_name,
api_base=self.url,
@@ -221,7 +215,7 @@ class LLM:
is_function_calling_model=False,
temperature=self.temperature,
max_tokens=self.max_tokens,
additional_kwargs={"extra_body": extra_body},
additional_kwargs={"extra_body": {"litellm_session_id": session_id}},
)
async def get_response(

View File

@@ -0,0 +1,17 @@
"""Presence tracking for meetings."""
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
__all__ = [
"PENDING_JOIN_PREFIX",
"PENDING_JOIN_TTL",
"create_pending_join",
"delete_pending_join",
"has_pending_joins",
]

View File

@@ -0,0 +1,59 @@
"""Track pending join intents in Redis.
When a user signals intent to join a meeting (before WebRTC handshake completes),
we store a pending join record. This prevents the meeting from being deactivated
while users are still connecting.
"""
import time
from redis.asyncio import Redis
from reflector.logger import logger
PENDING_JOIN_TTL = 30 # seconds
PENDING_JOIN_PREFIX = "pending_join"
# Max keys to scan per Redis SCAN iteration
SCAN_BATCH_SIZE = 100
async def create_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Create a pending join record. Called before WebRTC handshake."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.setex(key, PENDING_JOIN_TTL, str(time.time()))
log.debug("Created pending join")
async def delete_pending_join(redis: Redis, meeting_id: str, user_id: str) -> None:
"""Delete pending join. Called after WebRTC connection established."""
key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
log = logger.bind(meeting_id=meeting_id, user_id=user_id, key=key)
await redis.delete(key)
log.debug("Deleted pending join")
async def has_pending_joins(redis: Redis, meeting_id: str) -> bool:
"""Check if meeting has any pending joins.
Uses Redis SCAN to iterate through all keys matching the pattern.
Properly iterates until cursor returns 0 to ensure all keys are checked.
"""
pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
log = logger.bind(meeting_id=meeting_id, pattern=pattern)
cursor = 0
iterations = 0
while True:
cursor, keys = await redis.scan(
cursor=cursor, match=pattern, count=SCAN_BATCH_SIZE
)
iterations += 1
if keys:
log.debug("Found pending joins", count=len(keys), iterations=iterations)
return True
if cursor == 0:
break
log.debug("No pending joins found", iterations=iterations)
return False

View File

@@ -15,10 +15,14 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -177,98 +181,124 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -75,7 +75,6 @@ class Settings(BaseSettings):
LLM_URL: str | None = None
LLM_API_KEY: str | None = None
LLM_CONTEXT_WINDOW: int = 16000
LLM_ENABLE_THINKING: bool | None = None
LLM_PARSE_MAX_RETRIES: int = (
3 # Max retries for JSON/validation errors (total attempts = retries + 1)
@@ -156,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -1,4 +1,5 @@
import json
from datetime import datetime, timezone
from typing import assert_never
from fastapi import APIRouter, HTTPException, Request
@@ -12,6 +13,9 @@ from reflector.dailyco_api import (
RecordingReadyEvent,
RecordingStartedEvent,
)
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
@@ -141,15 +145,57 @@ async def _handle_participant_joined(event: ParticipantJoinedEvent):
async def _handle_participant_left(event: ParticipantLeftEvent):
"""Queue poll task for presence reconciliation."""
await _queue_poll_for_room(
event.payload.room_name,
"participant.left",
event.payload.user_id,
event.payload.session_id,
duration=event.payload.duration,
"""Close session directly on webhook and update num_clients.
The webhook IS the authoritative signal that a participant left.
We close the session immediately rather than polling Daily.co API,
which avoids the race where the API still shows the participant.
A delayed reconciliation poll is queued as a safety net.
"""
room_name = event.payload.room_name
if not room_name:
logger.warning("participant.left: no room in payload")
return
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning("participant.left: meeting not found", room_name=room_name)
return
log = logger.bind(
meeting_id=meeting.id,
room_name=room_name,
session_id=event.payload.session_id,
user_id=event.payload.user_id,
)
existing = await daily_participant_sessions_controller.get_open_session(
meeting.id, event.payload.session_id
)
if existing:
now = datetime.now(timezone.utc)
await daily_participant_sessions_controller.batch_close_sessions(
[existing.id], left_at=now
)
active = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
await meetings_controller.update_meeting(meeting.id, num_clients=len(active))
log.info(
"Participant left - session closed",
remaining_clients=len(active),
duration=event.payload.duration,
)
else:
log.info(
"Participant left - no open session found, skipping direct close",
duration=event.payload.duration,
)
# Delayed reconciliation poll as safety net
poll_daily_room_presence_task.apply_async(args=[meeting.id], countdown=5)
async def _handle_recording_started(event: RecordingStartedEvent):
room_name = event.payload.room_name

View File

@@ -1,9 +1,9 @@
import logging
import json
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate
from pydantic import BaseModel
@@ -12,18 +12,24 @@ from redis.exceptions import LockError
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock
from reflector.logger import logger
from reflector.presence.pending_joins import create_pending_join, delete_pending_join
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.schemas.platform import Platform
from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
class Room(BaseModel):
id: str
@@ -597,3 +603,221 @@ async def rooms_join_meeting(
meeting.room_url = add_query_param(meeting.room_url, "t", token)
return meeting
class JoiningRequest(BaseModel):
"""Request body for /joining endpoint (before WebRTC handshake)."""
connection_id: NonEmptyString
"""Unique identifier for this connection. Generated by client via crypto.randomUUID()."""
class JoiningResponse(BaseModel):
status: Literal["ok"]
class JoinedRequest(BaseModel):
"""Request body for /joined endpoint (after WebRTC connection established)."""
connection_id: NonEmptyString
"""Must match the connection_id sent to /joining."""
session_id: NonEmptyString | None = None
"""Daily.co session_id for direct session creation. Optional for backward compat."""
user_name: str | None = None
"""Display name from Daily.co participant data."""
class JoinedResponse(BaseModel):
status: Literal["ok"]
def _get_pending_join_key(
user: Optional[auth.UserInfo], connection_id: NonEmptyString
) -> str:
"""Get a unique key for pending join tracking.
Uses user ID for authenticated users, connection_id for anonymous users.
This ensures each browser tab has its own unique pending join record.
"""
if user:
return f"{user['sub']}:{connection_id}"
return f"anon:{connection_id}"
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joining", response_model=JoiningResponse
)
async def meeting_joining(
room_name: str,
meeting_id: str,
body: JoiningRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoiningResponse:
"""Signal intent to join meeting. Called before WebRTC handshake starts.
This creates a pending join record that prevents the meeting from being
deactivated while the WebRTC handshake is in progress. The record expires
automatically after 30 seconds if the connection is not established.
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if not meeting.is_active:
raise HTTPException(status_code=400, detail="Meeting is not active")
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await create_pending_join(redis, meeting_id, join_key)
log.debug("Created pending join intent", join_key=join_key)
finally:
await redis.aclose()
return JoiningResponse(status="ok")
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/joined", response_model=JoinedResponse
)
async def meeting_joined(
room_name: str,
meeting_id: str,
body: JoinedRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> JoinedResponse:
"""Signal that WebRTC connection is established.
This clears the pending join record, confirming the user has successfully
connected to the meeting. Safe to call even if meeting was deactivated
during the handshake (idempotent cleanup).
"""
log = logger.bind(
room_name=room_name, meeting_id=meeting_id, connection_id=body.connection_id
)
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Note: We don't check is_active here - the /joined call is a cleanup operation
# and should succeed even if the meeting was deactivated during the handshake
join_key = _get_pending_join_key(user, body.connection_id)
redis = await get_async_redis_client()
try:
await delete_pending_join(redis, meeting_id, join_key)
log.debug("Cleared pending join intent", join_key=join_key)
finally:
await redis.aclose()
# Create session directly when session_id provided (instant presence update)
if body.session_id and meeting.platform == "daily":
session = DailyParticipantSession(
id=f"{meeting.id}:{body.session_id}",
meeting_id=meeting.id,
room_id=room.id,
session_id=body.session_id,
user_id=user["sub"] if user else None,
user_name=body.user_name or "Anonymous",
joined_at=datetime.now(timezone.utc),
)
await daily_participant_sessions_controller.batch_upsert_sessions([session])
active = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
await meetings_controller.update_meeting(meeting.id, num_clients=len(active))
log.info(
"Session created directly",
session_id=body.session_id,
num_clients=len(active),
)
# Trigger presence poll as reconciliation safety net
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(args=[meeting_id], countdown=3)
return JoinedResponse(status="ok")
class LeaveResponse(BaseModel):
status: Literal["ok"]
@router.post(
"/rooms/{room_name}/meetings/{meeting_id}/leave", response_model=LeaveResponse
)
async def meeting_leave(
room_name: str,
meeting_id: str,
request: Request,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> LeaveResponse:
"""Trigger presence update when user leaves meeting.
When session_id is provided in the body, closes the session directly
for instant presence update. Falls back to polling when session_id
is not available (e.g., sendBeacon without frame access).
Called on tab close/navigation via sendBeacon().
"""
# Parse session_id from body (sendBeacon may send text/plain or no body)
session_id: str | None = None
try:
body_bytes = await request.body()
if body_bytes:
data = json.loads(body_bytes)
raw = data.get("session_id")
if isinstance(raw, str) and raw.strip():
session_id = raw.strip()
except Exception:
pass
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Close session directly when session_id provided
session_closed = False
if session_id and meeting.platform == "daily":
existing = await daily_participant_sessions_controller.get_open_session(
meeting.id, session_id
)
if existing:
await daily_participant_sessions_controller.batch_close_sessions(
[existing.id], left_at=datetime.now(timezone.utc)
)
active = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
await meetings_controller.update_meeting(
meeting.id, num_clients=len(active)
)
session_closed = True
# Only queue poll if we couldn't close directly — the poll runs before
# Daily.co API removes the participant, which would undo our correct count
if meeting.platform == "daily" and not session_closed:
poll_daily_room_presence_task.apply_async(args=[meeting_id], countdown=3)
return LeaveResponse(status="ok")

View File

@@ -27,10 +27,14 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.presence.pending_joins import has_pending_joins
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
@@ -348,29 +352,49 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
@shared_task
@@ -846,6 +870,18 @@ async def process_meetings():
logger_.debug("Meeting not yet started, keep it")
if should_deactivate:
# Check for pending joins before deactivating
# Users might be in the process of connecting via WebRTC
redis = await get_async_redis_client()
try:
if await has_pending_joins(redis, meeting.id):
logger_.info(
"Meeting has pending joins, skipping deactivation"
)
continue
finally:
await redis.aclose()
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
@@ -1049,43 +1085,66 @@ async def reprocess_failed_daily_recordings():
)
continue
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
from reflector.schemas.platform import WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -14,7 +14,6 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield

View File

@@ -0,0 +1,213 @@
"""Tests for direct session close on participant.left webhook.
Verifies that _handle_participant_left:
1. Closes the session directly (authoritative signal)
2. Updates num_clients from remaining active sessions
3. Queues a delayed reconciliation poll as safety net
4. Handles missing session gracefully
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.webhooks import ParticipantLeftEvent, ParticipantLeftPayload
from reflector.db.daily_participant_sessions import DailyParticipantSession
from reflector.db.meetings import Meeting
from reflector.views.daily import _handle_participant_left
@pytest.fixture
def mock_meeting():
return Meeting(
id="meeting-123",
room_id="room-456",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host-token",
platform="daily",
num_clients=2,
is_active=True,
start_date=datetime.now(timezone.utc),
end_date=datetime.now(timezone.utc),
)
@pytest.fixture
def participant_left_event():
now = datetime.now(timezone.utc)
return ParticipantLeftEvent(
version="1.0.0",
type="participant.left",
id="evt-left-abc123",
payload=ParticipantLeftPayload(
room_name="test-room-20251118120000",
session_id="session-alice",
user_id="user-alice",
user_name="Alice",
joined_at=int((now - timedelta(minutes=10)).timestamp()),
duration=600,
),
event_ts=int(now.timestamp()),
)
@pytest.fixture
def existing_session():
now = datetime.now(timezone.utc)
return DailyParticipantSession(
id="meeting-123:session-alice",
meeting_id="meeting-123",
room_id="room-456",
session_id="session-alice",
user_id="user-alice",
user_name="Alice",
joined_at=now - timedelta(minutes=10),
left_at=None,
)
@pytest.mark.asyncio
@patch("reflector.views.daily.poll_daily_room_presence_task")
@patch("reflector.views.daily.meetings_controller")
@patch("reflector.views.daily.daily_participant_sessions_controller")
async def test_closes_session_and_updates_num_clients(
mock_sessions_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_meeting,
participant_left_event,
existing_session,
):
"""Webhook directly closes session and updates num_clients from remaining active count."""
mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting)
mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session)
mock_sessions_ctrl.batch_close_sessions = AsyncMock()
# One remaining active session after close
remaining = DailyParticipantSession(
id="meeting-123:session-bob",
meeting_id="meeting-123",
room_id="room-456",
session_id="session-bob",
user_id="user-bob",
user_name="Bob",
joined_at=datetime.now(timezone.utc),
left_at=None,
)
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[remaining])
mock_meetings_ctrl.update_meeting = AsyncMock()
await _handle_participant_left(participant_left_event)
# Session closed
mock_sessions_ctrl.batch_close_sessions.assert_called_once()
closed_ids = mock_sessions_ctrl.batch_close_sessions.call_args.args[0]
assert closed_ids == [existing_session.id]
# num_clients updated to remaining count
mock_meetings_ctrl.update_meeting.assert_called_once_with(
mock_meeting.id, num_clients=1
)
# Delayed reconciliation poll queued
mock_poll_task.apply_async.assert_called_once()
call_kwargs = mock_poll_task.apply_async.call_args.kwargs
assert call_kwargs["countdown"] == 5
assert call_kwargs["args"] == [mock_meeting.id]
@pytest.mark.asyncio
@patch("reflector.views.daily.poll_daily_room_presence_task")
@patch("reflector.views.daily.meetings_controller")
@patch("reflector.views.daily.daily_participant_sessions_controller")
async def test_handles_missing_session(
mock_sessions_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_meeting,
participant_left_event,
):
"""No crash when session not found in DB — still queues reconciliation poll."""
mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting)
mock_sessions_ctrl.get_open_session = AsyncMock(return_value=None)
await _handle_participant_left(participant_left_event)
# No session close attempted
mock_sessions_ctrl.batch_close_sessions.assert_not_called()
# No num_clients update (no authoritative data without session)
mock_meetings_ctrl.update_meeting.assert_not_called()
# Still queues reconciliation poll
mock_poll_task.apply_async.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.daily.poll_daily_room_presence_task")
@patch("reflector.views.daily.meetings_controller")
@patch("reflector.views.daily.daily_participant_sessions_controller")
async def test_updates_num_clients_to_zero_when_last_participant_leaves(
mock_sessions_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_meeting,
participant_left_event,
existing_session,
):
"""num_clients set to 0 when no active sessions remain."""
mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=mock_meeting)
mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session)
mock_sessions_ctrl.batch_close_sessions = AsyncMock()
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[])
mock_meetings_ctrl.update_meeting = AsyncMock()
await _handle_participant_left(participant_left_event)
mock_meetings_ctrl.update_meeting.assert_called_once_with(
mock_meeting.id, num_clients=0
)
@pytest.mark.asyncio
@patch("reflector.views.daily.poll_daily_room_presence_task")
@patch("reflector.views.daily.meetings_controller")
async def test_no_room_name_in_event(
mock_meetings_ctrl,
mock_poll_task,
):
"""No crash when room_name is missing from webhook payload."""
event = ParticipantLeftEvent(
version="1.0.0",
type="participant.left",
id="evt-left-no-room",
payload=ParticipantLeftPayload(
room_name=None,
session_id="session-x",
user_id="user-x",
user_name="X",
joined_at=int(datetime.now(timezone.utc).timestamp()),
duration=0,
),
event_ts=int(datetime.now(timezone.utc).timestamp()),
)
await _handle_participant_left(event)
mock_meetings_ctrl.get_by_room_name.assert_not_called()
mock_poll_task.apply_async.assert_not_called()
@pytest.mark.asyncio
@patch("reflector.views.daily.poll_daily_room_presence_task")
@patch("reflector.views.daily.meetings_controller")
async def test_meeting_not_found(
mock_meetings_ctrl,
mock_poll_task,
participant_left_event,
):
"""No crash when meeting not found for room_name."""
mock_meetings_ctrl.get_by_room_name = AsyncMock(return_value=None)
await _handle_participant_left(participant_left_event)
mock_poll_task.apply_async.assert_not_called()

View File

@@ -0,0 +1,339 @@
"""Tests for direct session management via /joined and /leave endpoints.
Verifies that:
1. /joined with session_id creates session directly, updates num_clients
2. /joined without session_id (backward compat) still works, queues poll
3. /leave with session_id closes session, updates num_clients
4. /leave without session_id falls back to poll
5. Duplicate /joined calls are idempotent (upsert)
6. /leave for already-closed session is a no-op
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.daily_participant_sessions import DailyParticipantSession
from reflector.db.meetings import Meeting
from reflector.views.rooms import (
JoinedRequest,
meeting_joined,
meeting_leave,
)
@pytest.fixture
def mock_room():
room = MagicMock()
room.id = "room-456"
room.name = "test-room"
room.platform = "daily"
return room
@pytest.fixture
def mock_meeting():
return Meeting(
id="meeting-123",
room_id="room-456",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room",
host_room_url="https://daily.co/test-room?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=datetime.now(timezone.utc),
end_date=datetime.now(timezone.utc) + timedelta(hours=8),
)
@pytest.fixture
def mock_redis():
redis = AsyncMock()
redis.aclose = AsyncMock()
return redis
@pytest.fixture
def mock_request_with_session_id():
"""Mock Request with session_id in JSON body."""
request = AsyncMock()
request.body = AsyncMock(return_value=b'{"session_id": "session-abc"}')
return request
@pytest.fixture
def mock_request_empty_body():
"""Mock Request with empty JSON body (old frontend / no frame access)."""
request = AsyncMock()
request.body = AsyncMock(return_value=b"{}")
return request
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.delete_pending_join")
@patch("reflector.views.rooms.get_async_redis_client")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_joined_with_session_id_creates_session(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_redis_client,
mock_delete_pending,
mock_poll_task,
mock_room,
mock_meeting,
mock_redis,
):
"""session_id in /joined -> create session + update num_clients."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
mock_redis_client.return_value = mock_redis
mock_sessions_ctrl.batch_upsert_sessions = AsyncMock()
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(
return_value=[MagicMock()] # 1 active session
)
mock_meetings_ctrl.update_meeting = AsyncMock()
body = JoinedRequest(
connection_id="conn-1",
session_id="session-abc",
user_name="Alice",
)
result = await meeting_joined(
"test-room", "meeting-123", body, user={"sub": "user-1"}
)
assert result.status == "ok"
# Session created via upsert
mock_sessions_ctrl.batch_upsert_sessions.assert_called_once()
sessions = mock_sessions_ctrl.batch_upsert_sessions.call_args.args[0]
assert len(sessions) == 1
assert sessions[0].session_id == "session-abc"
assert sessions[0].meeting_id == "meeting-123"
assert sessions[0].room_id == "room-456"
assert sessions[0].user_name == "Alice"
assert sessions[0].user_id == "user-1"
assert sessions[0].id == "meeting-123:session-abc"
# num_clients updated
mock_meetings_ctrl.update_meeting.assert_called_once_with(
"meeting-123", num_clients=1
)
# Reconciliation poll still queued
mock_poll_task.apply_async.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.delete_pending_join")
@patch("reflector.views.rooms.get_async_redis_client")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_joined_without_session_id_backward_compat(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_redis_client,
mock_delete_pending,
mock_poll_task,
mock_room,
mock_meeting,
mock_redis,
):
"""No session_id in /joined -> no session create, still queues poll."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
mock_redis_client.return_value = mock_redis
body = JoinedRequest(connection_id="conn-1")
result = await meeting_joined(
"test-room", "meeting-123", body, user={"sub": "user-1"}
)
assert result.status == "ok"
mock_sessions_ctrl.batch_upsert_sessions.assert_not_called()
mock_poll_task.apply_async.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.delete_pending_join")
@patch("reflector.views.rooms.get_async_redis_client")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_joined_anonymous_user_sets_null_user_id(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_redis_client,
mock_delete_pending,
mock_poll_task,
mock_room,
mock_meeting,
mock_redis,
):
"""Anonymous user -> session.user_id is None, user_name defaults to 'Anonymous'."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
mock_redis_client.return_value = mock_redis
mock_sessions_ctrl.batch_upsert_sessions = AsyncMock()
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[MagicMock()])
mock_meetings_ctrl.update_meeting = AsyncMock()
body = JoinedRequest(connection_id="conn-1", session_id="session-abc")
result = await meeting_joined("test-room", "meeting-123", body, user=None)
assert result.status == "ok"
sessions = mock_sessions_ctrl.batch_upsert_sessions.call_args.args[0]
assert sessions[0].user_id is None
assert sessions[0].user_name == "Anonymous"
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_leave_with_session_id_closes_session(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_room,
mock_meeting,
mock_request_with_session_id,
):
"""session_id in /leave -> close session + update num_clients."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
existing_session = DailyParticipantSession(
id="meeting-123:session-abc",
meeting_id="meeting-123",
room_id="room-456",
session_id="session-abc",
user_id="user-1",
user_name="Alice",
joined_at=datetime.now(timezone.utc) - timedelta(minutes=5),
left_at=None,
)
mock_sessions_ctrl.get_open_session = AsyncMock(return_value=existing_session)
mock_sessions_ctrl.batch_close_sessions = AsyncMock()
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[])
mock_meetings_ctrl.update_meeting = AsyncMock()
result = await meeting_leave(
"test-room", "meeting-123", mock_request_with_session_id, user={"sub": "user-1"}
)
assert result.status == "ok"
# Session closed
mock_sessions_ctrl.batch_close_sessions.assert_called_once()
closed_ids = mock_sessions_ctrl.batch_close_sessions.call_args.args[0]
assert closed_ids == ["meeting-123:session-abc"]
# num_clients updated
mock_meetings_ctrl.update_meeting.assert_called_once_with(
"meeting-123", num_clients=0
)
# No poll — direct close is authoritative, poll would race with API latency
mock_poll_task.apply_async.assert_not_called()
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
async def test_leave_without_session_id_falls_back_to_poll(
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_room,
mock_meeting,
mock_request_empty_body,
):
"""No session_id in /leave -> just queues poll as before."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
result = await meeting_leave(
"test-room", "meeting-123", mock_request_empty_body, user=None
)
assert result.status == "ok"
mock_poll_task.apply_async.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.delete_pending_join")
@patch("reflector.views.rooms.get_async_redis_client")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_duplicate_joined_is_idempotent(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_redis_client,
mock_delete_pending,
mock_poll_task,
mock_room,
mock_meeting,
mock_redis,
):
"""Calling /joined twice with same session_id -> upsert both times, no error."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
mock_redis_client.return_value = mock_redis
mock_sessions_ctrl.batch_upsert_sessions = AsyncMock()
mock_sessions_ctrl.get_active_by_meeting = AsyncMock(return_value=[MagicMock()])
mock_meetings_ctrl.update_meeting = AsyncMock()
body = JoinedRequest(
connection_id="conn-1", session_id="session-abc", user_name="Alice"
)
await meeting_joined("test-room", "meeting-123", body, user={"sub": "user-1"})
await meeting_joined("test-room", "meeting-123", body, user={"sub": "user-1"})
assert mock_sessions_ctrl.batch_upsert_sessions.call_count == 2
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.meetings_controller")
@patch("reflector.views.rooms.rooms_controller")
@patch("reflector.views.rooms.daily_participant_sessions_controller")
async def test_leave_already_closed_session_is_noop(
mock_sessions_ctrl,
mock_rooms_ctrl,
mock_meetings_ctrl,
mock_poll_task,
mock_room,
mock_meeting,
mock_request_with_session_id,
):
"""/leave for already-closed session -> no close attempted, just poll."""
mock_rooms_ctrl.get_by_name = AsyncMock(return_value=mock_room)
mock_meetings_ctrl.get_by_id = AsyncMock(return_value=mock_meeting)
mock_sessions_ctrl.get_open_session = AsyncMock(return_value=None)
result = await meeting_leave(
"test-room", "meeting-123", mock_request_with_session_id, user=None
)
assert result.status == "ok"
mock_sessions_ctrl.batch_close_sessions.assert_not_called()
mock_meetings_ctrl.update_meeting.assert_not_called()
mock_poll_task.apply_async.assert_called_once()

View File

@@ -0,0 +1,367 @@
"""Integration tests for /joining and /joined endpoints.
Tests for the join intent tracking to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
from reflector.presence.pending_joins import PENDING_JOIN_PREFIX
TEST_CONNECTION_ID = "test-connection-uuid-12345"
@pytest.fixture
def mock_room():
"""Mock room object."""
from reflector.db.rooms import Room
return Room(
id="room-123",
name="test-room",
user_id="owner-user",
created_at=datetime.now(timezone.utc),
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic",
is_shared=True,
platform="daily",
skip_consent=False,
)
@pytest.fixture
def mock_meeting():
"""Mock meeting object."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-456",
room_id="room-123",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now,
end_date=now + timedelta(hours=1),
)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_creates_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining endpoint creates pending join in Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with correct key pattern
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.poll_daily_room_presence_task")
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joined_endpoint_deletes_pending_join(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_poll_task,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joined endpoint deletes pending join from Redis."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.delete = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joined",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis delete was called with correct key pattern
mock_redis.delete.assert_called_once()
call_args = mock_redis.delete.call_args[0]
assert call_args[0].startswith(f"{PENDING_JOIN_PREFIX}:{mock_meeting.id}:")
assert TEST_CONNECTION_ID in call_args[0]
# Verify presence poll was triggered for Daily meetings
mock_poll_task.delay.assert_called_once_with(mock_meeting.id)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
async def test_joining_endpoint_room_not_found(
mock_get_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when room not found."""
mock_get_room.return_value = None
response = await client.post(
"/rooms/nonexistent-room/meetings/meeting-123/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Room not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_found(
mock_get_meeting,
mock_get_room,
mock_room,
client,
authenticated_client,
):
"""Test that /joining returns 404 when meeting not found."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = None
response = await client.post(
f"/rooms/{mock_room.name}/meetings/nonexistent-meeting/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 404
assert response.json()["detail"] == "Meeting not found"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
async def test_joining_endpoint_meeting_not_active(
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that /joining returns 400 when meeting is not active."""
mock_get_room.return_value = mock_room
inactive_meeting = mock_meeting.model_copy(update={"is_active": False})
mock_get_meeting.return_value = inactive_meeting
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 400
assert response.json()["detail"] == "Meeting is not active"
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_anonymous_user(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that /joining works for anonymous users with unique connection_id."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
response = await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify Redis setex was called with "anon:" prefix and connection_id
call_args = mock_redis.setex.call_args[0]
assert ":anon:" in call_args[0]
assert TEST_CONNECTION_ID in call_args[0]
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_success(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed after successful operation."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_joining_endpoint_redis_closed_on_error(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
authenticated_client,
):
"""Test that Redis connection is closed even when operation fails."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock(side_effect=Exception("Redis error"))
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
with pytest.raises(Exception):
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": TEST_CONNECTION_ID},
)
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
async def test_joining_endpoint_requires_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is missing."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={},
)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_joining_endpoint_rejects_empty_connection_id(
client,
):
"""Test that /joining returns 422 when connection_id is empty string."""
response = await client.post(
"/rooms/test-room/meetings/meeting-123/joining",
json={"connection_id": ""},
)
assert response.status_code == 422 # Validation error (NonEmptyString)
@pytest.mark.asyncio
@patch("reflector.views.rooms.rooms_controller.get_by_name")
@patch("reflector.views.rooms.meetings_controller.get_by_id")
@patch("reflector.views.rooms.get_async_redis_client")
async def test_different_connection_ids_create_different_keys(
mock_get_redis,
mock_get_meeting,
mock_get_room,
mock_room,
mock_meeting,
client,
):
"""Test that different connection_ids create different Redis keys."""
mock_get_room.return_value = mock_room
mock_get_meeting.return_value = mock_meeting
mock_redis = AsyncMock()
mock_redis.setex = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# First connection
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-1"},
)
key1 = mock_redis.setex.call_args[0][0]
mock_redis.setex.reset_mock()
# Second connection (different tab)
await client.post(
f"/rooms/{mock_room.name}/meetings/{mock_meeting.id}/joining",
json={"connection_id": "connection-2"},
)
key2 = mock_redis.setex.call_args[0][0]
# Keys should be different
assert key1 != key2
assert "connection-1" in key1
assert "connection-2" in key2

View File

@@ -8,7 +8,6 @@ from pydantic import BaseModel, Field
from workflows.errors import WorkflowRuntimeError, WorkflowTimeoutError
from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow
from reflector.settings import Settings
from reflector.utils.retry import RetryException
@@ -27,57 +26,6 @@ def make_completion_response(text: str):
return response
class TestLLMEnableThinking:
"""Test that LLM_ENABLE_THINKING setting is passed through to OpenAILike"""
def test_enable_thinking_false_passed_in_extra_body(self):
"""enable_thinking=False should be in extra_body when LLM_ENABLE_THINKING=False"""
settings = Settings(
LLM_ENABLE_THINKING=False,
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert extra_body["enable_thinking"] is False
def test_enable_thinking_true_passed_in_extra_body(self):
"""enable_thinking=True should be in extra_body when LLM_ENABLE_THINKING=True"""
settings = Settings(
LLM_ENABLE_THINKING=True,
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert extra_body["enable_thinking"] is True
def test_enable_thinking_none_not_in_extra_body(self):
"""enable_thinking should not be in extra_body when LLM_ENABLE_THINKING is None (default)"""
settings = Settings(
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert "enable_thinking" not in extra_body
class TestLLMParseErrorRecovery:
"""Test parse error recovery with Workflow feedback loop"""

View File

@@ -0,0 +1,153 @@
"""Tests for pending joins Redis helper functions.
TDD tests for tracking join intent to prevent race conditions during
WebRTC handshake when users join meetings.
"""
from unittest.mock import AsyncMock
import pytest
from reflector.presence.pending_joins import (
PENDING_JOIN_PREFIX,
PENDING_JOIN_TTL,
create_pending_join,
delete_pending_join,
has_pending_joins,
)
@pytest.fixture
def mock_redis():
"""Mock async Redis client."""
redis = AsyncMock()
redis.setex = AsyncMock()
redis.delete = AsyncMock()
redis.scan = AsyncMock(return_value=(0, []))
return redis
@pytest.mark.asyncio
async def test_create_pending_join_sets_key_with_ttl(mock_redis):
"""Test that create_pending_join stores key with correct TTL."""
meeting_id = "meeting-123"
user_id = "user-456"
await create_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args
assert call_args[0][0] == expected_key
assert call_args[0][1] == PENDING_JOIN_TTL
# Value should be a timestamp string
assert call_args[0][2] is not None
@pytest.mark.asyncio
async def test_delete_pending_join_removes_key(mock_redis):
"""Test that delete_pending_join removes the key."""
meeting_id = "meeting-123"
user_id = "user-456"
await delete_pending_join(mock_redis, meeting_id, user_id)
expected_key = f"{PENDING_JOIN_PREFIX}:{meeting_id}:{user_id}"
mock_redis.delete.assert_called_once_with(expected_key)
@pytest.mark.asyncio
async def test_has_pending_joins_returns_false_when_no_keys(mock_redis):
"""Test has_pending_joins returns False when no matching keys."""
mock_redis.scan.return_value = (0, [])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == f"{PENDING_JOIN_PREFIX}:meeting-123:*"
@pytest.mark.asyncio
async def test_has_pending_joins_returns_true_when_keys_exist(mock_redis):
"""Test has_pending_joins returns True when matching keys found."""
mock_redis.scan.return_value = (0, [b"pending_join:meeting-123:user-1"])
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
@pytest.mark.asyncio
async def test_has_pending_joins_scans_with_correct_pattern(mock_redis):
"""Test has_pending_joins uses correct scan pattern."""
meeting_id = "meeting-abc-def"
mock_redis.scan.return_value = (0, [])
await has_pending_joins(mock_redis, meeting_id)
expected_pattern = f"{PENDING_JOIN_PREFIX}:{meeting_id}:*"
mock_redis.scan.assert_called_once()
call_kwargs = mock_redis.scan.call_args.kwargs
assert call_kwargs["match"] == expected_pattern
assert call_kwargs["count"] == 100
@pytest.mark.asyncio
async def test_multiple_users_pending_joins(mock_redis):
"""Test that multiple users can have pending joins for same meeting."""
meeting_id = "meeting-123"
# Simulate two pending joins
mock_redis.scan.return_value = (
0,
[b"pending_join:meeting-123:user-1", b"pending_join:meeting-123:user-2"],
)
result = await has_pending_joins(mock_redis, meeting_id)
assert result is True
@pytest.mark.asyncio
async def test_pending_join_ttl_value():
"""Test that PENDING_JOIN_TTL has expected value."""
# 30 seconds should be enough for WebRTC handshake but not too long
assert PENDING_JOIN_TTL == 30
@pytest.mark.asyncio
async def test_pending_join_prefix_value():
"""Test that PENDING_JOIN_PREFIX has expected value."""
assert PENDING_JOIN_PREFIX == "pending_join"
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_scan_no_keys(mock_redis):
"""Test has_pending_joins iterates until cursor returns 0."""
# Simulate multi-iteration scan: cursor 100 -> cursor 50 -> cursor 0
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys, continue
(50, []), # Second iteration, no keys, continue
(0, []), # Third iteration, cursor 0, done
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is False
assert mock_redis.scan.call_count == 3
@pytest.mark.asyncio
async def test_has_pending_joins_multi_iteration_finds_key_later(mock_redis):
"""Test has_pending_joins finds key on second iteration."""
# Simulate finding key on second scan iteration
mock_redis.scan.side_effect = [
(100, []), # First iteration, no keys
(0, [b"pending_join:meeting-123:user-1"]), # Second iteration, found key
]
result = await has_pending_joins(mock_redis, "meeting-123")
assert result is True
assert mock_redis.scan.call_count == 2

View File

@@ -0,0 +1,241 @@
"""Tests for process_meetings pending joins check.
Tests that process_meetings correctly skips deactivation when
pending joins exist for a meeting.
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.meetings import Meeting
def _get_process_meetings_fn():
"""Get the underlying async function without Celery/asynctask decorators."""
from reflector.worker import process
fn = process.process_meetings
# Get through both decorator layers (@shared_task and @asynctask)
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
return fn
@pytest.fixture
def mock_active_meeting():
"""Mock an active meeting that should be considered for deactivation."""
now = datetime.now(timezone.utc)
return Meeting(
id="meeting-123",
room_id="room-456",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room-20251118120000",
host_room_url="https://daily.co/test-room-20251118120000?t=host",
platform="daily",
num_clients=0,
is_active=True,
start_date=now - timedelta(hours=1),
end_date=now - timedelta(minutes=30), # Already ended
)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_skips_deactivation_with_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings skips deactivation when pending joins exist."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions (triggers deactivation)
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc) # Session ended
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify has_pending_joins was called
mock_has_pending_joins.assert_called_once_with(mock_redis, mock_active_meeting.id)
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_deactivates_without_pending_joins(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that process_meetings deactivates when no pending joins."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions, but had sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock no pending joins
mock_has_pending_joins.return_value = False
await process_meetings()
# Verify meeting was deactivated
mock_update_meeting.assert_called_once_with(mock_active_meeting.id, is_active=False)
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
async def test_process_meetings_no_check_when_active_sessions(
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that pending joins check is skipped when there are active sessions."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - has active session
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = None # Still active
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
with (
patch("reflector.worker.process.get_async_redis_client") as mock_get_redis,
patch("reflector.worker.process.has_pending_joins") as mock_has_pending_joins,
patch(
"reflector.worker.process.meetings_controller.update_meeting"
) as mock_update_meeting,
):
await process_meetings()
# Verify pending joins check was NOT called (no need - active sessions exist)
mock_has_pending_joins.assert_not_called()
# Verify meeting was NOT deactivated
mock_update_meeting.assert_not_called()
@pytest.mark.asyncio
@patch("reflector.worker.process.meetings_controller.get_all_active")
@patch("reflector.worker.process.RedisAsyncLock")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.get_async_redis_client")
@patch("reflector.worker.process.has_pending_joins")
@patch("reflector.worker.process.meetings_controller.update_meeting")
async def test_process_meetings_closes_redis_even_on_continue(
mock_update_meeting,
mock_has_pending_joins,
mock_get_redis,
mock_create_client,
mock_redis_lock_class,
mock_get_all_active,
mock_active_meeting,
):
"""Test that Redis connection is always closed, even when skipping deactivation."""
process_meetings = _get_process_meetings_fn()
mock_get_all_active.return_value = [mock_active_meeting]
# Mock lock acquired
mock_lock_instance = AsyncMock()
mock_lock_instance.acquired = True
mock_lock_instance.__aenter__ = AsyncMock(return_value=mock_lock_instance)
mock_lock_instance.__aexit__ = AsyncMock()
mock_redis_lock_class.return_value = mock_lock_instance
# Mock platform client - no active sessions
mock_daily_client = AsyncMock()
mock_session = AsyncMock()
mock_session.ended_at = datetime.now(timezone.utc)
mock_daily_client.get_room_sessions = AsyncMock(return_value=[mock_session])
mock_create_client.return_value = mock_daily_client
# Mock Redis client
mock_redis = AsyncMock()
mock_redis.aclose = AsyncMock()
mock_get_redis.return_value = mock_redis
# Mock pending joins exist (will trigger continue)
mock_has_pending_joins.return_value = True
await process_meetings()
# Verify Redis was closed
mock_redis.aclose.assert_called_once()

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import AsyncMock, patch
from unittest.mock import patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline, not Hatchet
# Whereby recordings should use file pipeline
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_hatchet.start_workflow.assert_not_called()
mock_multitrack_pipeline.delay.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,6 +177,8 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -211,23 +213,18 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
mock_file_pipeline.delay.assert_not_called()

View File

@@ -25,6 +25,9 @@ import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useMeetingStartRecording,
useMeetingJoining,
useMeetingJoined,
buildMeetingLeaveUrl,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import {
@@ -88,7 +91,7 @@ const useFrame = (
cbs: {
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: () => void;
onJoinMeeting: (sessionId: string | null) => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -139,7 +142,8 @@ const useFrame = (
console.error("frame is null in joined-meeting callback");
return;
}
cbs.onJoinMeeting();
const local = frame.participants()?.local;
cbs.onJoinMeeting(local?.session_id ?? null);
};
frame.on("joined-meeting", joinCb);
return () => {
@@ -187,7 +191,14 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const startRecordingMutation = useMeetingStartRecording();
const joiningMutation = useMeetingJoining();
const joinedMutation = useMeetingJoined();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const sessionIdRef = useRef<string | null>(null);
// Generate a stable connection ID for this component instance
// Used to track pending joins per browser tab (prevents key collision for anonymous users)
const connectionId = useMemo(() => crypto.randomUUID(), []);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
@@ -234,8 +245,34 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => {
if (meeting?.id && roomName) {
const payload = sessionIdRef.current
? { session_id: sessionIdRef.current }
: {};
navigator.sendBeacon(
buildMeetingLeaveUrl(roomName, meeting.id),
JSON.stringify(payload),
);
}
router.push("/browse");
}, [router]);
}, [router, roomName, meeting?.id]);
// Trigger presence recheck on dirty disconnects (tab close, navigation away)
useEffect(() => {
if (!meeting?.id || !roomName) return;
const handleBeforeUnload = () => {
// sendBeacon guarantees delivery even if tab closes mid-request
const url = buildMeetingLeaveUrl(roomName, meeting.id);
const payload = sessionIdRef.current
? { session_id: sessionIdRef.current }
: {};
navigator.sendBeacon(url, JSON.stringify(payload));
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meeting?.id, roomName]);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
@@ -248,72 +285,106 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
],
);
const handleFrameJoinMeeting = useCallback(() => {
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
rawTracksInstanceId,
});
const handleFrameJoinMeeting = useCallback(
(sessionId: string | null) => {
sessionIdRef.current = sessionId;
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
// Daily.co needs time to register call as "hosting" for REST API
const startRecordingWithRetry = (
type: DailyRecordingType,
instanceId: NonEmptyString,
attempt: number = 1,
) => {
setTimeout(() => {
startRecordingMutation.mutate(
{
params: {
path: {
meeting_id: meeting.id,
// Signal that WebRTC connection is established
// This clears the pending join intent and creates session record directly
joinedMutation.mutate(
{
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
session_id: sessionId,
},
},
{
onError: (error: unknown) => {
// Non-blocking: log but don't fail - this is cleanup, not critical
console.warn("Failed to signal joined:", error);
},
},
);
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
rawTracksInstanceId,
});
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
// Daily.co needs time to register call as "hosting" for REST API
const startRecordingWithRetry = (
type: DailyRecordingType,
instanceId: NonEmptyString,
attempt: number = 1,
) => {
setTimeout(() => {
startRecordingMutation.mutate(
{
params: {
path: {
meeting_id: meeting.id,
},
},
body: {
type,
instanceId,
},
},
body: {
type,
instanceId,
},
},
{
onError: (error: any) => {
const errorText = error?.detail || error?.message || "";
const is404NotHosting = errorText.includes(
"does not seem to be hosting a call",
);
const isActiveStream = errorText.includes(
"has an active stream",
);
if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) {
console.log(
`${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`,
{
onError: (error: any) => {
const errorText = error?.detail || error?.message || "";
const is404NotHosting = errorText.includes(
"does not seem to be hosting a call",
);
startRecordingWithRetry(type, instanceId, attempt + 1);
} else if (isActiveStream) {
console.log(
`${type}: Recording already active (started by another participant)`,
const isActiveStream = errorText.includes(
"has an active stream",
);
} else {
console.error(`Failed to start ${type} recording:`, error);
}
},
},
);
}, RECORDING_START_DELAY_MS);
};
// Start both recordings
startRecordingWithRetry("cloud", cloudInstanceId);
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
meeting.recording_type,
meeting.id,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
]);
if (
is404NotHosting &&
attempt < RECORDING_START_MAX_RETRIES
) {
console.log(
`${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`,
);
startRecordingWithRetry(type, instanceId, attempt + 1);
} else if (isActiveStream) {
console.log(
`${type}: Recording already active (started by another participant)`,
);
} else {
console.error(`Failed to start ${type} recording:`, error);
}
},
},
);
}, RECORDING_START_DELAY_MS);
};
// Start both recordings
startRecordingWithRetry("cloud", cloudInstanceId);
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
},
[
meeting.recording_type,
meeting.id,
roomName,
connectionId,
joinedMutation,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
],
);
const recordingIconUrl = useMemo(
() => new URL("/recording-icon.svg", window.location.origin),
@@ -328,8 +399,28 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useEffect(() => {
if (!frame || !roomUrl) return;
frame
.join({
const joinRoom = async () => {
// Signal intent to join before WebRTC handshake starts
// This prevents race condition where meeting is deactivated during handshake
try {
await joiningMutation.mutateAsync({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
body: {
connection_id: connectionId,
},
});
} catch (error) {
// Non-blocking: log but continue with join
console.warn("Failed to signal joining intent:", error);
}
await frame.join({
url: roomUrl,
sendSettings: {
video: {
@@ -341,9 +432,13 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
},
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
},
})
.catch(console.error.bind(console, "Failed to join daily room:"));
}, [frame, roomUrl]);
});
};
joinRoom().catch(console.error.bind(console, "Failed to join daily room:"));
// joiningMutation excluded from deps - it's a stable hook reference
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [frame, roomUrl, roomName, meeting.id, connectionId]);
useEffect(() => {
setCustomTrayButton(

View File

@@ -1,9 +1,10 @@
"use client";
import { $api } from "./apiClient";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
import { $api, API_URL } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import type { components, paths } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
@@ -14,6 +15,10 @@ import { NonEmptyString } from "./utils";
* or, limitation or incorrect usage of .d type generator from json schema
* */
/*
if you experience lack of expected endpoints on $api, try to regenerate
*/
export const useAuthReady = () => {
const auth = useAuth();
@@ -768,6 +773,7 @@ export function useRoomActiveMeetings(roomName: string | null) {
},
{
enabled: !!roomName,
refetchInterval: 5000,
},
);
}
@@ -807,6 +813,47 @@ export function useRoomJoinMeeting() {
);
}
// Presence race fix endpoints - signal join intent to prevent race conditions during WebRTC handshake
export function useMeetingJoining() {
return $api.useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joining",
{},
);
}
export function useMeetingJoined() {
return $api.useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined",
{},
);
}
export function useMeetingLeave() {
return $api.useMutation(
"post",
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave",
{},
);
}
/**
* Build absolute URL for /leave endpoint (for sendBeacon which can't use hooks).
*/
export function buildMeetingLeaveUrl(
roomName: string,
meetingId: string,
): string {
return createFinalURL("/v1/rooms/{room_name}/meetings/{meeting_id}/leave", {
baseUrl: API_URL,
params: {
path: { room_name: roomName, meeting_id: meetingId },
},
querySerializer: createQuerySerializer(),
});
}
export function useRoomIcsSync() {
const { setError } = useError();

View File

@@ -313,6 +313,79 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joining": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Meeting Joining
* @description Signal intent to join meeting. Called before WebRTC handshake starts.
*
* This creates a pending join record that prevents the meeting from being
* deactivated while the WebRTC handshake is in progress. The record expires
* automatically after 30 seconds if the connection is not established.
*/
post: operations["v1_meeting_joining"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Meeting Joined
* @description Signal that WebRTC connection is established.
*
* This clears the pending join record, confirming the user has successfully
* connected to the meeting. Safe to call even if meeting was deactivated
* during the handshake (idempotent cleanup).
*/
post: operations["v1_meeting_joined"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Meeting Leave
* @description Trigger presence update when user leaves meeting.
*
* When session_id is provided in the body, closes the session directly
* for instant presence update. Falls back to polling when session_id
* is not available (e.g., sendBeacon without frame access).
* Called on tab close/navigation via sendBeacon().
*/
post: operations["v1_meeting_leave"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/transcripts": {
parameters: {
query?: never;
@@ -1497,6 +1570,56 @@ export interface components {
/** Reason */
reason?: string | null;
};
/**
* JoinedRequest
* @description Request body for /joined endpoint (after WebRTC connection established).
*/
JoinedRequest: {
/**
* Connection Id
* @description A non-empty string
*/
connection_id: string;
/** Session Id */
session_id?: string | null;
/** User Name */
user_name?: string | null;
};
/** JoinedResponse */
JoinedResponse: {
/**
* Status
* @constant
*/
status: "ok";
};
/**
* JoiningRequest
* @description Request body for /joining endpoint (before WebRTC handshake).
*/
JoiningRequest: {
/**
* Connection Id
* @description A non-empty string
*/
connection_id: string;
};
/** JoiningResponse */
JoiningResponse: {
/**
* Status
* @constant
*/
status: "ok";
};
/** LeaveResponse */
LeaveResponse: {
/**
* Status
* @constant
*/
status: "ok";
};
/** Meeting */
Meeting: {
/** Id */
@@ -2687,6 +2810,110 @@ export interface operations {
};
};
};
v1_meeting_joining: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["JoiningRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["JoiningResponse"];
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_meeting_joined: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["JoinedRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["JoinedResponse"];
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_meeting_leave: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["LeaveResponse"];
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_transcripts_list: {
parameters: {
query?: {