Compare commits

..

13 Commits

Author SHA1 Message Date
a455be047a Merge branch 'main' into fix-daily-ux 2026-02-06 11:33:35 -05:00
Igor Loskutov
2a9993ceec remove integration test with hardcoded external URL 2026-02-06 11:31:22 -05:00
Igor Loskutov
8773d9fbbd test: add integration test for ICS dedup using Max's live calendar feed 2026-02-06 11:02:20 -05:00
Igor Loskutov
6c88e05423 refactor: remove redundant fatalError ref, use state directly in handleLeave 2026-02-05 22:06:15 -05:00
Igor Loskutov
f4803c0d76 fix: use function mocks with correct signature in dedup tests 2026-02-05 19:51:27 -05:00
Igor Loskutov
2b484aec00 refactor: data-driven FatalErrorScreen, cover all DailyFatalErrorType cases 2026-02-05 19:49:14 -05:00
Igor Loskutov
d3161730ef fix: use SDK DailyFatalErrorType, add meeting-full/not-allowed/exp-token cases, remove dead end_date fallback 2026-02-05 19:43:25 -05:00
Igor Loskutov
4fd88b2fc1 fix: review feedback — literal error types, extract FatalErrorScreen, type params, fix mock signature 2026-02-05 19:35:47 -05:00
Igor Loskutov
a2694650fd chore: add REPORT.md and ref markers for ApiError cast issue 2026-02-05 19:16:58 -05:00
Igor Loskutov
238d768499 fix: address review feedback - use ApiError type, move inline imports 2026-02-05 18:50:12 -05:00
Igor Loskutov
1da687fe13 fix: prevent duplicate meetings from aggregated ICS calendar feeds
When Cal.com events appear in an aggregated ICS feed, the same event
gets two different UIDs (one from Cal.com, one from Google Calendar).
This caused duplicate meetings to be created for the same time slot.

Add time-window dedup check in create_upcoming_meetings_for_event:
after verifying no meeting exists for the calendar_event_id, also check
if a meeting already exists for the same room + start_date + end_date.
2026-02-05 18:42:42 -05:00
Igor Loskutov
e9e1676409 feat: add specific error messages for Daily.co meeting disconnections
Handle fatal errors from Daily.co SDK (connection-error, exp-room,
ejected, etc.) with user-friendly messages and appropriate actions.
Improve join failure display to show actual API error detail.
2026-02-05 18:42:42 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
16 changed files with 532 additions and 869 deletions

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -346,6 +346,27 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_and_time_window(
self, room: Room, start_date: datetime, end_date: datetime
) -> Meeting | None:
"""Check if a meeting already exists for this room with the same time window."""
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id == room.id,
meetings.c.start_date == start_date,
meetings.c.end_date == end_date,
meetings.c.is_active,
)
)
.limit(1)
)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def update_meeting(self, meeting_id: str, **kwargs):
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -181,21 +177,7 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if use_hatchet:
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
@@ -203,9 +185,7 @@ async def dispatch_transcript_processing(
transcript.workflow_run_id
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
@@ -233,9 +213,7 @@ async def dispatch_transcript_processing(
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
@@ -245,9 +223,7 @@ async def dispatch_transcript_processing(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
@@ -293,12 +269,6 @@ async def dispatch_transcript_processing(
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -129,10 +129,6 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants(
self, meeting_id: str
) -> MeetingParticipantsResponse:

View File

@@ -20,7 +20,6 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings
from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
@@ -366,53 +365,6 @@ async def rooms_create_meeting(
return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,

View File

@@ -5,7 +5,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.asynctask import asynctask
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.redis_cache import RedisAsyncLock
@@ -83,10 +83,9 @@ def _should_sync(room) -> bool:
return time_since_sync.total_seconds() >= room.ics_fetch_interval
MEETING_DEFAULT_DURATION = timedelta(hours=1)
async def create_upcoming_meetings_for_event(event, create_window, room: Room):
async def create_upcoming_meetings_for_event(
event: CalendarEvent, create_window: datetime, room: Room
):
if event.start_time <= create_window:
return
existing_meeting = await meetings_controller.get_by_calendar_event(event.id, room)
@@ -94,6 +93,21 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
if existing_meeting:
return
# Prevent duplicate meetings from aggregated calendar feeds
# (e.g. same event appears with different UIDs from Cal.com and Google Calendar)
end_date = event.end_time
existing_by_time = await meetings_controller.get_by_room_and_time_window(
room, event.start_time, end_date
)
if existing_by_time:
logger.info(
"Skipping duplicate calendar event - meeting already exists for this time window",
room_id=room.id,
event_id=event.id,
existing_meeting_id=existing_by_time.id,
)
return
logger.info(
"Pre-creating meeting for calendar event",
room_id=room.id,
@@ -102,8 +116,6 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
)
try:
end_date = event.end_time or (event.start_time + MEETING_DEFAULT_DURATION)
client = create_platform_client(room.platform)
meeting_data = await client.create_meeting(

View File

@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,17 +348,7 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
@@ -383,17 +370,7 @@ async def _process_multitrack_recording_inner(
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@@ -845,40 +822,8 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
has_active_sessions = False
has_had_sessions = False
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
@@ -904,20 +849,7 @@ async def process_meetings():
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
logger_.info("Meeting is deactivated")
processed_count += 1
@@ -1117,10 +1049,7 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
@@ -1157,26 +1086,6 @@ async def reprocess_failed_daily_recordings():
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield

View File

@@ -1,286 +0,0 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -0,0 +1,190 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db import get_database
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings
from reflector.db.rooms import rooms_controller
from reflector.worker.ics_sync import create_upcoming_meetings_for_event
@pytest.mark.asyncio
async def test_duplicate_calendar_event_does_not_create_duplicate_meeting():
"""When an aggregated ICS feed contains the same event with different UIDs
(e.g. Cal.com UID + Google Calendar UUID), only one meeting should be created."""
room = await rooms_controller.add(
name="dedup-test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
start_time = now + timedelta(hours=1)
end_time = now + timedelta(hours=2)
# Create first calendar event (Cal.com UID)
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="abc123@Cal.com",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# create_window must be before start_time for the function to proceed
create_window = now - timedelta(minutes=6)
# Create meeting for event1
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
async def mock_create_meeting_1(room_name_prefix, *, end_date, room):
return AsyncMock(
meeting_id="meeting-1",
room_name="dedup-test-room-abc",
room_url="https://mock.video/dedup-test-room-abc",
host_room_url="https://mock.video/dedup-test-room-abc?host=true",
)
mock_client.create_meeting = mock_create_meeting_1
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
# Verify meeting was created
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting, got {len(results)}"
# Create second calendar event with different UID but same time window (Google Calendar UUID)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="550e8400-e29b-41d4-a716-446655440000",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# Try to create meeting for event2 - should be skipped due to dedup
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
create_meeting_called = False
async def mock_create_meeting_2(room_name_prefix, *, end_date, room):
nonlocal create_meeting_called
create_meeting_called = True
mock_client.create_meeting = mock_create_meeting_2
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event2, create_window, room)
# Platform client should NOT have been called for the duplicate
assert (
not create_meeting_called
), "create_meeting should not be called for duplicate"
# Verify still only 1 meeting
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting after dedup, got {len(results)}"
@pytest.mark.asyncio
async def test_different_time_windows_create_separate_meetings():
"""Events at different times should create separate meetings, even if titles match."""
room = await rooms_controller.add(
name="dedup-diff-time-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup2.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
create_window = now - timedelta(minutes=6)
# Event 1: 1-2pm
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-morning@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=1),
end_time=now + timedelta(hours=2),
)
)
# Event 2: 3-4pm (different time)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-afternoon@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=3),
end_time=now + timedelta(hours=4),
)
)
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
call_count = 0
async def mock_create_meeting(room_name_prefix, *, end_date, room):
nonlocal call_count
call_count += 1
return AsyncMock(
meeting_id=f"meeting-{call_count}",
room_name=f"dedup-diff-time-room-{call_count}",
room_url=f"https://mock.video/dedup-diff-time-room-{call_count}",
host_room_url=f"https://mock.video/dedup-diff-time-room-{call_count}?host=true",
)
mock_client.create_meeting = mock_create_meeting
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
await create_upcoming_meetings_for_event(event2, create_window, room)
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert (
len(results) == 2
), f"Expected 2 meetings for different times, got {len(results)}"

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline
# Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called()
mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called()

View File

@@ -8,7 +8,7 @@ import {
useRef,
useState,
} from "react";
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
import { Box, Spinner, Center, Text, Button, VStack } from "@chakra-ui/react";
import { useRouter, useParams } from "next/navigation";
import DailyIframe, {
DailyCall,
@@ -16,32 +16,26 @@ import DailyIframe, {
DailyCustomTrayButton,
DailyCustomTrayButtons,
DailyEventObjectCustomButtonClick,
DailyEventObjectFatalError,
DailyFatalErrorType,
DailyFactoryOptions,
DailyParticipantsObject,
} from "@daily-co/daily-js";
import type { components } from "../../reflector-api";
import { printApiError, ApiError } from "../../api/_error";
import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks";
import { omit } from "remeda";
import {
assertExists,
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../../lib/utils";
import {
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent";
@@ -54,6 +48,63 @@ const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5;
const FATAL_ERROR_MESSAGES: Partial<
Record<DailyFatalErrorType, { message: string; rejoinable?: boolean }>
> = {
"connection-error": {
message: "Connection lost. Please check your network.",
rejoinable: true,
},
"exp-room": { message: "The meeting time has ended." },
"exp-token": { message: "Your session has expired.", rejoinable: true },
ejected: { message: "You were removed from this meeting." },
"meeting-full": { message: "This meeting is full." },
"not-allowed": { message: "You are not allowed to join this meeting." },
"nbf-room": { message: "This meeting hasn't started yet." },
"nbf-token": { message: "This meeting hasn't started yet." },
"no-room": { message: "This room does not exist." },
"end-of-life": { message: "This meeting room is no longer available." },
};
function FatalErrorScreen({
error,
roomName,
}: {
error: FatalError;
roomName: string;
}) {
const router = useRouter();
const info =
error.type !== "unknown" ? FATAL_ERROR_MESSAGES[error.type] : undefined;
const message = info?.message ?? `Something went wrong: ${error.message}`;
const rejoinable = info?.rejoinable ?? false;
return (
<Center width="100vw" height="100vh">
<VStack gap={4}>
<Text color="red.500">{message}</Text>
{rejoinable ? (
<>
<Button onClick={() => window.location.reload()}>
Try Rejoining
</Button>
<Button
variant="outline"
onClick={() => router.push(`/${roomName}`)}
>
Leave
</Button>
</>
) : (
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
)}
</VStack>
</Center>
);
}
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
@@ -91,6 +142,8 @@ const USE_FRAME_INIT_STATE = {
joined: false as boolean,
} as const;
type FatalError = { type: DailyFatalErrorType | "unknown"; message: string };
// Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults
const useFrame = (
container: HTMLDivElement | null,
@@ -98,6 +151,7 @@ const useFrame = (
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: () => void;
onError: (ev: DailyEventObjectFatalError) => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -143,6 +197,7 @@ const useFrame = (
if (!frame) return;
frame.on("left-meeting", cbs.onLeftMeeting);
frame.on("custom-button-click", cbs.onCustomButtonClick);
frame.on("error", cbs.onError);
const joinCb = () => {
if (!frame) {
console.error("frame is null in joined-meeting callback");
@@ -154,6 +209,7 @@ const useFrame = (
return () => {
frame.off("left-meeting", cbs.onLeftMeeting);
frame.off("custom-button-click", cbs.onCustomButtonClick);
frame.off("error", cbs.onError);
frame.off("joined-meeting", joinCb);
};
}, [frame, cbs]);
@@ -188,58 +244,6 @@ const useFrame = (
] as const;
};
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
@@ -247,10 +251,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const [fatalError, setFatalError] = useState<FatalError | null>(null);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
@@ -258,9 +261,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
);
if (typeof params.roomName === "object")
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const roomName = params?.roomName as string;
const {
showConsentModal,
@@ -299,10 +300,18 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => {
// If a fatal error occurred, don't redirect — let the error UI show
if (fatalError) return;
router.push("/browse");
}, [router]);
}, [router, fatalError]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleError = useCallback((ev: DailyEventObjectFatalError) => {
const error: FatalError = {
type: ev.error?.type ?? "unknown",
message: ev.errorMsg,
};
setFatalError(error);
}, []);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
@@ -316,15 +325,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
);
const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", {
cloudInstanceId,
@@ -384,10 +384,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
}
}, [
joinedMutation,
roomName,
meeting.id,
meeting.recording_type,
meeting.id,
startRecordingMutation,
cloudInstanceId,
rawTracksInstanceId,
@@ -402,6 +400,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
onLeftMeeting: handleLeave,
onCustomButtonClick: handleCustomButtonClick,
onJoinMeeting: handleFrameJoinMeeting,
onError: handleError,
});
useEffect(() => {
@@ -458,13 +457,27 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
}
if (joinMutation.isError) {
const apiDetail = printApiError(
joinMutation.error as /*ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3*/ ApiError,
);
return (
<Center width="100vw" height="100vh">
<Text color="red.500">Failed to join meeting. Please try again.</Text>
<VStack gap={4}>
<Text color="red.500">
{apiDetail ?? "Failed to join meeting. Please try again."}
</Text>
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
</VStack>
</Center>
);
}
if (fatalError) {
return <FatalErrorScreen error={fatalError} roomName={roomName} />;
}
if (!roomUrl) {
return null;
}

View File

@@ -1,15 +1,15 @@
"use client";
import { $api, API_URL } from "./apiClient";
import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components, operations } from "../reflector-api";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/*
* ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
* this is either a limitation or incorrect usage of Python json schema generator
* or, limitation or incorrect usage of .d type generator from json schema
@@ -808,44 +808,6 @@ export function useRoomJoinMeeting() {
);
}
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() {
const { setError } = useError();

View File

@@ -171,48 +171,6 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": {
parameters: {
query?: never;
@@ -2477,72 +2435,6 @@ export interface operations {
};
};
};
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: {
parameters: {
query?: never;