mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-06 10:46:46 +00:00
Merge feature-leave-endpoint into hypothesis-model
Brings in race condition reproduction code from feature-leave-endpoint to work with hypothesis model simulation.
This commit is contained in:
@@ -35,7 +35,9 @@ LLM_RATE_LIMIT_PER_SECOND = 10
|
||||
|
||||
# Task execution timeouts (seconds)
|
||||
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
|
||||
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
|
||||
TIMEOUT_MEDIUM = (
|
||||
300 # Single LLM calls, waveform generation (5m for slow LLM responses)
|
||||
)
|
||||
TIMEOUT_LONG = 180 # Action items (larger context LLM)
|
||||
TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
|
||||
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks
|
||||
|
||||
@@ -322,6 +322,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
mtg_session_id = recording.mtg_session_id
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||
TranscriptDuration,
|
||||
TranscriptParticipant,
|
||||
transcripts_controller,
|
||||
)
|
||||
@@ -330,15 +331,26 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
if not transcript:
|
||||
raise ValueError(f"Transcript {input.transcript_id} not found")
|
||||
# Note: title NOT cleared - preserves existing titles
|
||||
# Duration from Daily API (seconds -> milliseconds) - master source
|
||||
duration_ms = recording.duration * 1000 if recording.duration else 0
|
||||
await transcripts_controller.update(
|
||||
transcript,
|
||||
{
|
||||
"events": [],
|
||||
"topics": [],
|
||||
"participants": [],
|
||||
"duration": duration_ms,
|
||||
},
|
||||
)
|
||||
|
||||
await append_event_and_broadcast(
|
||||
input.transcript_id,
|
||||
transcript,
|
||||
"DURATION",
|
||||
TranscriptDuration(duration=duration_ms),
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
mtg_session_id = assert_non_none_and_non_empty(
|
||||
mtg_session_id, "mtg_session_id is required"
|
||||
)
|
||||
@@ -561,27 +573,13 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||
|
||||
Path(output_path).unlink(missing_ok=True)
|
||||
|
||||
duration = duration_ms_callback_capture_container[0]
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||
TranscriptDuration,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if transcript:
|
||||
await transcripts_controller.update(
|
||||
transcript, {"audio_location": "storage", "duration": duration}
|
||||
)
|
||||
|
||||
duration_data = TranscriptDuration(duration=duration)
|
||||
await append_event_and_broadcast(
|
||||
input.transcript_id,
|
||||
transcript,
|
||||
"DURATION",
|
||||
duration_data,
|
||||
logger=logger,
|
||||
transcript, {"audio_location": "storage"}
|
||||
)
|
||||
|
||||
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
|
||||
|
||||
@@ -129,6 +129,10 @@ class DailyClient(VideoPlatformClient):
|
||||
"""Get room presence/session data for a Daily.co room."""
|
||||
return await self._api_client.get_room_presence(room_name)
|
||||
|
||||
async def delete_room(self, room_name: str) -> None:
|
||||
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
|
||||
return await self._api_client.delete_room(room_name)
|
||||
|
||||
async def get_meeting_participants(
|
||||
self, meeting_id: str
|
||||
) -> MeetingParticipantsResponse:
|
||||
|
||||
@@ -20,6 +20,7 @@ from reflector.services.ics_sync import ics_sync_service
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.url import add_query_param
|
||||
from reflector.video_platforms.factory import create_platform_client
|
||||
from reflector.worker.process import poll_daily_room_presence_task
|
||||
from reflector.worker.webhook import test_webhook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -365,6 +366,53 @@ async def rooms_create_meeting(
|
||||
return meeting
|
||||
|
||||
|
||||
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
|
||||
async def rooms_joined_meeting(
|
||||
room_name: str,
|
||||
meeting_id: str,
|
||||
):
|
||||
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if meeting.platform == "daily":
|
||||
poll_daily_room_presence_task.delay(meeting_id)
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
|
||||
async def rooms_leave_meeting(
|
||||
room_name: str,
|
||||
meeting_id: str,
|
||||
delay_seconds: int = 2,
|
||||
):
|
||||
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
|
||||
|
||||
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
|
||||
"""
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
if not room:
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
if meeting.platform == "daily":
|
||||
poll_daily_room_presence_task.apply_async(
|
||||
args=[meeting_id],
|
||||
countdown=delay_seconds,
|
||||
)
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
|
||||
async def rooms_test_webhook(
|
||||
room_id: str,
|
||||
|
||||
@@ -799,15 +799,47 @@ async def process_meetings():
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
has_active_sessions = False
|
||||
has_had_sessions = False
|
||||
|
||||
has_active_sessions = bool(
|
||||
room_sessions and any(s.ended_at is None for s in room_sessions)
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
||||
)
|
||||
if meeting.platform == "daily":
|
||||
try:
|
||||
presence = await client.get_room_presence(meeting.room_name)
|
||||
has_active_sessions = presence.total_count > 0
|
||||
|
||||
room_sessions = await client.get_room_sessions(
|
||||
meeting.room_name
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
|
||||
logger_.info(
|
||||
"Daily.co presence check",
|
||||
has_active_sessions=has_active_sessions,
|
||||
has_had_sessions=has_had_sessions,
|
||||
presence_count=presence.total_count,
|
||||
)
|
||||
except Exception:
|
||||
logger_.warning(
|
||||
"Daily.co presence API failed, falling back to DB sessions",
|
||||
exc_info=True,
|
||||
)
|
||||
room_sessions = await client.get_room_sessions(
|
||||
meeting.room_name
|
||||
)
|
||||
has_active_sessions = bool(
|
||||
room_sessions
|
||||
and any(s.ended_at is None for s in room_sessions)
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
else:
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
has_active_sessions = bool(
|
||||
room_sessions and any(s.ended_at is None for s in room_sessions)
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
||||
)
|
||||
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
@@ -826,7 +858,20 @@ async def process_meetings():
|
||||
await meetings_controller.update_meeting(
|
||||
meeting.id, is_active=False
|
||||
)
|
||||
logger_.info("Meeting is deactivated")
|
||||
logger_.info("Meeting deactivated in database")
|
||||
|
||||
if meeting.platform == "daily":
|
||||
try:
|
||||
await client.delete_room(meeting.room_name)
|
||||
logger_.info(
|
||||
"Daily.co room deleted", room_name=meeting.room_name
|
||||
)
|
||||
except Exception:
|
||||
logger_.warning(
|
||||
"Failed to delete Daily.co room",
|
||||
room_name=meeting.room_name,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
processed_count += 1
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
|
||||
import redis.asyncio as redis
|
||||
from fastapi import WebSocket
|
||||
@@ -98,6 +97,7 @@ class WebsocketManager:
|
||||
|
||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||
while True:
|
||||
# timeout=1.0 prevents tight CPU loop when no messages available
|
||||
message = await pubsub_subscriber.get_message(
|
||||
ignore_subscribe_messages=True
|
||||
)
|
||||
@@ -109,29 +109,38 @@ class WebsocketManager:
|
||||
await socket.send_json(data)
|
||||
|
||||
|
||||
# Process-global singleton to ensure only one WebsocketManager instance exists.
|
||||
# Multiple instances would cause resource leaks and CPU issues.
|
||||
_ws_manager: WebsocketManager | None = None
|
||||
|
||||
|
||||
def get_ws_manager() -> WebsocketManager:
|
||||
"""
|
||||
Returns the WebsocketManager instance for managing websockets.
|
||||
Returns the global WebsocketManager singleton.
|
||||
|
||||
This function initializes and returns the WebsocketManager instance,
|
||||
which is responsible for managing websockets and handling websocket
|
||||
connections.
|
||||
Creates instance on first call, subsequent calls return cached instance.
|
||||
Thread-safe via GIL. Concurrent initialization may create duplicate
|
||||
instances but last write wins (acceptable for this use case).
|
||||
|
||||
Returns:
|
||||
WebsocketManager: The initialized WebsocketManager instance.
|
||||
|
||||
Raises:
|
||||
ImportError: If the 'reflector.settings' module cannot be imported.
|
||||
RedisConnectionError: If there is an error connecting to the Redis server.
|
||||
WebsocketManager: The global WebsocketManager instance.
|
||||
"""
|
||||
local = threading.local()
|
||||
if hasattr(local, "ws_manager"):
|
||||
return local.ws_manager
|
||||
global _ws_manager
|
||||
|
||||
if _ws_manager is not None:
|
||||
return _ws_manager
|
||||
|
||||
# No lock needed - GIL makes this safe enough
|
||||
# Worst case: race creates two instances, last assignment wins
|
||||
pubsub_client = RedisPubSubManager(
|
||||
host=settings.REDIS_HOST,
|
||||
port=settings.REDIS_PORT,
|
||||
)
|
||||
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
local.ws_manager = ws_manager
|
||||
return ws_manager
|
||||
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||
return _ws_manager
|
||||
|
||||
|
||||
def reset_ws_manager() -> None:
|
||||
"""Reset singleton for testing. DO NOT use in production."""
|
||||
global _ws_manager
|
||||
_ws_manager = None
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from tempfile import NamedTemporaryFile
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@@ -333,11 +332,14 @@ def celery_enable_logging():
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_config():
|
||||
with NamedTemporaryFile() as f:
|
||||
yield {
|
||||
"broker_url": "memory://",
|
||||
"result_backend": f"db+sqlite:///{f.name}",
|
||||
}
|
||||
redis_host = os.environ.get("REDIS_HOST", "localhost")
|
||||
redis_port = os.environ.get("REDIS_PORT", "6379")
|
||||
# Use db 2 to avoid conflicts with main app
|
||||
redis_url = f"redis://{redis_host}:{redis_port}/2"
|
||||
yield {
|
||||
"broker_url": redis_url,
|
||||
"result_backend": redis_url,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@@ -370,9 +372,12 @@ async def ws_manager_in_memory(monkeypatch):
|
||||
def __init__(self, queue: asyncio.Queue):
|
||||
self.queue = queue
|
||||
|
||||
async def get_message(self, ignore_subscribe_messages: bool = True):
|
||||
async def get_message(
|
||||
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
|
||||
):
|
||||
wait_timeout = timeout if timeout is not None else 0.05
|
||||
try:
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
|
||||
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
286
server/tests/test_daily_presence_deactivation.py
Normal file
286
server/tests/test_daily_presence_deactivation.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""Unit tests for Daily.co presence-based meeting deactivation logic.
|
||||
|
||||
Tests the fix for split room race condition by verifying:
|
||||
1. Real-time presence checking via Daily.co API
|
||||
2. Room deletion when meetings deactivate
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from reflector.dailyco_api.responses import (
|
||||
RoomPresenceParticipant,
|
||||
RoomPresenceResponse,
|
||||
)
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
DailyParticipantSession,
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.video_platforms.daily import DailyClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def daily_room_and_meeting():
|
||||
"""Create test room and meeting for Daily platform."""
|
||||
room = await rooms_controller.add(
|
||||
name="test-daily",
|
||||
user_id="test-user",
|
||||
platform="daily",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
)
|
||||
|
||||
current_time = datetime.now(timezone.utc)
|
||||
end_time = current_time + timedelta(hours=2)
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
id="test-meeting-id",
|
||||
room_name="test-daily-20260129120000",
|
||||
room_url="https://daily.co/test",
|
||||
host_room_url="https://daily.co/test",
|
||||
start_date=current_time,
|
||||
end_date=end_time,
|
||||
room=room,
|
||||
)
|
||||
|
||||
return room, meeting
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_daily_client_has_delete_room_method():
|
||||
"""Verify DailyClient has delete_room method for cleanup."""
|
||||
# Create a mock DailyClient
|
||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||
from reflector.video_platforms.models import VideoPlatformConfig
|
||||
|
||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||
client = DailyClient(config)
|
||||
|
||||
# Verify delete_room method exists
|
||||
assert hasattr(client, "delete_room")
|
||||
assert callable(getattr(client, "delete_room"))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
|
||||
"""Test that get_room_presence returns real-time participant data."""
|
||||
room, meeting = daily_room_and_meeting
|
||||
|
||||
# Mock Daily.co API response
|
||||
mock_presence = RoomPresenceResponse(
|
||||
total_count=2,
|
||||
data=[
|
||||
RoomPresenceParticipant(
|
||||
room=meeting.room_name,
|
||||
id="session-1",
|
||||
userId="user-1",
|
||||
userName="User One",
|
||||
joinTime="2026-01-29T12:00:00.000Z",
|
||||
duration=120,
|
||||
),
|
||||
RoomPresenceParticipant(
|
||||
room=meeting.room_name,
|
||||
id="session-2",
|
||||
userId="user-2",
|
||||
userName="User Two",
|
||||
joinTime="2026-01-29T12:05:00.000Z",
|
||||
duration=60,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
|
||||
from reflector.video_platforms.models import VideoPlatformConfig
|
||||
|
||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||
client = DailyClient(config)
|
||||
|
||||
# Mock the API client method
|
||||
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
||||
|
||||
# Call get_room_presence
|
||||
result = await client.get_room_presence(meeting.room_name)
|
||||
|
||||
# Verify it calls Daily.co API
|
||||
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
|
||||
|
||||
# Verify result contains real-time data
|
||||
assert result.total_count == 2
|
||||
assert len(result.data) == 2
|
||||
assert result.data[0].id == "session-1"
|
||||
assert result.data[1].id == "session-2"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
|
||||
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
|
||||
room, meeting = daily_room_and_meeting
|
||||
current_time = datetime.now(timezone.utc)
|
||||
|
||||
# Create stale DB session (left_at=NULL but user actually left)
|
||||
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
|
||||
await daily_participant_sessions_controller.upsert_joined(
|
||||
DailyParticipantSession(
|
||||
id=session_id,
|
||||
meeting_id=meeting.id,
|
||||
room_id=room.id,
|
||||
session_id="stale-daily-session",
|
||||
user_name="Stale User",
|
||||
user_id="stale-user",
|
||||
joined_at=current_time - timedelta(minutes=5),
|
||||
left_at=None, # Stale - shows active but user left
|
||||
)
|
||||
)
|
||||
|
||||
# Verify DB shows active session
|
||||
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
|
||||
meeting.id
|
||||
)
|
||||
assert len(db_sessions) == 1
|
||||
|
||||
# But Daily.co API shows room is empty
|
||||
mock_presence = RoomPresenceResponse(total_count=0, data=[])
|
||||
|
||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||
from reflector.video_platforms.models import VideoPlatformConfig
|
||||
|
||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||
client = DailyClient(config)
|
||||
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
||||
|
||||
# Get real-time presence
|
||||
presence = await client.get_room_presence(meeting.room_name)
|
||||
|
||||
# Real-time API shows no participants (truth)
|
||||
assert presence.total_count == 0
|
||||
assert len(presence.data) == 0
|
||||
|
||||
# DB shows 1 participant (stale)
|
||||
assert len(db_sessions) == 1
|
||||
|
||||
# Implementation should trust presence API, not DB
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_meeting_deactivation_logic_with_presence_empty():
|
||||
"""Test the core deactivation decision logic when presence shows room empty."""
|
||||
# This tests the logic that will be in process_meetings
|
||||
|
||||
# Simulate: DB shows stale active session
|
||||
has_active_db_sessions = True # DB is stale
|
||||
|
||||
# Simulate: Daily.co presence API shows room empty
|
||||
presence_count = 0 # Real-time truth
|
||||
|
||||
# Simulate: Meeting has been used before
|
||||
has_had_sessions = True
|
||||
|
||||
# Decision logic (what process_meetings should do):
|
||||
# - If presence API available: trust it
|
||||
# - If presence shows empty AND has_had_sessions: deactivate
|
||||
|
||||
if presence_count == 0 and has_had_sessions:
|
||||
should_deactivate = True
|
||||
else:
|
||||
should_deactivate = False
|
||||
|
||||
assert should_deactivate is True # Should deactivate despite stale DB
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_meeting_deactivation_logic_with_presence_active():
|
||||
"""Test that meetings stay active when presence shows participants."""
|
||||
# Simulate: DB shows no sessions (not yet updated)
|
||||
has_active_db_sessions = False # DB hasn't caught up
|
||||
|
||||
# Simulate: Daily.co presence API shows active participant
|
||||
presence_count = 1 # Real-time truth
|
||||
|
||||
# Decision logic: presence shows activity, keep meeting active
|
||||
if presence_count > 0:
|
||||
should_deactivate = False
|
||||
else:
|
||||
should_deactivate = True
|
||||
|
||||
assert should_deactivate is False # Should stay active
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
|
||||
"""Test that Daily.co room is deleted when meeting deactivates."""
|
||||
room, meeting = daily_room_and_meeting
|
||||
|
||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||
from reflector.video_platforms.models import VideoPlatformConfig
|
||||
|
||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||
client = DailyClient(config)
|
||||
|
||||
# Mock delete_room API call
|
||||
client._api_client.delete_room = AsyncMock()
|
||||
|
||||
# Simulate deactivation - should delete room
|
||||
await client._api_client.delete_room(meeting.room_name)
|
||||
|
||||
# Verify delete was called
|
||||
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_room_idempotent_on_404():
|
||||
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
|
||||
from reflector.dailyco_api.client import DailyApiClient
|
||||
|
||||
# Create real client to test delete_room logic
|
||||
client = DailyApiClient(api_key="test-key")
|
||||
|
||||
# Mock the HTTP client
|
||||
mock_http_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response.status_code = 404 # Room not found
|
||||
mock_http_client.delete = AsyncMock(return_value=mock_response)
|
||||
|
||||
# Mock _get_client to return our mock
|
||||
async def mock_get_client():
|
||||
return mock_http_client
|
||||
|
||||
client._get_client = mock_get_client
|
||||
|
||||
# delete_room should succeed even on 404 (idempotent)
|
||||
await client.delete_room("nonexistent-room")
|
||||
|
||||
# Verify delete was attempted
|
||||
mock_http_client.delete.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_failure_fallback_to_db_sessions():
|
||||
"""Test that system falls back to DB sessions if Daily.co API fails."""
|
||||
# Simulate: Daily.co API throws exception
|
||||
api_exception = Exception("API unavailable")
|
||||
|
||||
# Simulate: DB shows active session
|
||||
has_active_db_sessions = True
|
||||
|
||||
# Decision logic with fallback:
|
||||
try:
|
||||
presence_count = None
|
||||
raise api_exception # Simulating API failure
|
||||
except Exception:
|
||||
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
|
||||
if has_active_db_sessions:
|
||||
should_deactivate = False
|
||||
else:
|
||||
should_deactivate = True
|
||||
|
||||
assert should_deactivate is False # Conservative: keep active on API failure
|
||||
@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
|
||||
settings.DATA_DIR = DATA_DIR
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def celery_includes():
|
||||
return ["reflector.pipelines.main_live_pipeline"]
|
||||
# Using celery_includes from conftest.py which includes both pipelines
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
|
||||
@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
|
||||
|
||||
if server_instance:
|
||||
server_instance.should_exit = True
|
||||
server_thread.join(timeout=30)
|
||||
server_thread.join(timeout=2.0)
|
||||
|
||||
# Reset global singleton for test isolation
|
||||
from reflector.ws_manager import reset_ws_manager
|
||||
|
||||
reset_ws_manager()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
|
||||
# Connect and then trigger an event via HTTP create
|
||||
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Emit an event to the user's room via a standard HTTP action
|
||||
from httpx import AsyncClient
|
||||
|
||||
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
|
||||
"email": "user-abc@example.com",
|
||||
}
|
||||
|
||||
# Use in-memory client (global singleton makes it share ws_manager)
|
||||
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
|
||||
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
|
||||
resp = await ac.post("/transcripts", json={"name": "WS Test"})
|
||||
|
||||
Reference in New Issue
Block a user