Compare commits

..

13 Commits

Author SHA1 Message Date
a455be047a Merge branch 'main' into fix-daily-ux 2026-02-06 11:33:35 -05:00
Igor Loskutov
2a9993ceec remove integration test with hardcoded external URL 2026-02-06 11:31:22 -05:00
Igor Loskutov
8773d9fbbd test: add integration test for ICS dedup using Max's live calendar feed 2026-02-06 11:02:20 -05:00
Igor Loskutov
6c88e05423 refactor: remove redundant fatalError ref, use state directly in handleLeave 2026-02-05 22:06:15 -05:00
Igor Loskutov
f4803c0d76 fix: use function mocks with correct signature in dedup tests 2026-02-05 19:51:27 -05:00
Igor Loskutov
2b484aec00 refactor: data-driven FatalErrorScreen, cover all DailyFatalErrorType cases 2026-02-05 19:49:14 -05:00
Igor Loskutov
d3161730ef fix: use SDK DailyFatalErrorType, add meeting-full/not-allowed/exp-token cases, remove dead end_date fallback 2026-02-05 19:43:25 -05:00
Igor Loskutov
4fd88b2fc1 fix: review feedback — literal error types, extract FatalErrorScreen, type params, fix mock signature 2026-02-05 19:35:47 -05:00
Igor Loskutov
a2694650fd chore: add REPORT.md and ref markers for ApiError cast issue 2026-02-05 19:16:58 -05:00
Igor Loskutov
238d768499 fix: address review feedback - use ApiError type, move inline imports 2026-02-05 18:50:12 -05:00
Igor Loskutov
1da687fe13 fix: prevent duplicate meetings from aggregated ICS calendar feeds
When Cal.com events appear in an aggregated ICS feed, the same event
gets two different UIDs (one from Cal.com, one from Google Calendar).
This caused duplicate meetings to be created for the same time slot.

Add time-window dedup check in create_upcoming_meetings_for_event:
after verifying no meeting exists for the calendar_event_id, also check
if a meeting already exists for the same room + start_date + end_date.
2026-02-05 18:42:42 -05:00
Igor Loskutov
e9e1676409 feat: add specific error messages for Daily.co meeting disconnections
Handle fatal errors from Daily.co SDK (connection-error, exp-room,
ejected, etc.) with user-friendly messages and appropriate actions.
Improve join failure display to show actual API error detail.
2026-02-05 18:42:42 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
23 changed files with 573 additions and 1747 deletions

View File

@@ -13,9 +13,6 @@ on:
jobs:
test-next-server:
runs-on: ubuntu-latest
concurrency:
group: test-next-server-${{ github.ref }}
cancel-in-progress: true
defaults:
run:
@@ -24,12 +21,17 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
package_json_file: './www/package.json'
version: 8
- name: Setup Node.js
- name: Setup Node.js cache
uses: actions/setup-node@v4
with:
node-version: '20'
@@ -40,4 +42,4 @@ jobs:
run: pnpm install
- name: Run tests
run: pnpm test
run: pnpm test

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -104,26 +104,6 @@ class CalendarEventController:
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_upcoming_for_rooms(
self, room_ids: list[str], minutes_ahead: int = 120
) -> list[CalendarEvent]:
now = datetime.now(timezone.utc)
future_time = now + timedelta(minutes=minutes_ahead)
query = (
calendar_events.select()
.where(
sa.and_(
calendar_events.c.room_id.in_(room_ids),
calendar_events.c.is_deleted == False,
calendar_events.c.start_time <= future_time,
calendar_events.c.end_time >= now,
)
)
.order_by(calendar_events.c.start_time.asc())
)
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_by_id(self, event_id: str) -> CalendarEvent | None:
query = calendar_events.select().where(calendar_events.c.id == event_id)
result = await get_database().fetch_one(query)

View File

@@ -301,23 +301,6 @@ class MeetingController:
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_all_active_for_rooms(
self, room_ids: list[str], current_time: datetime
) -> list[Meeting]:
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id.in_(room_ids),
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
.order_by(meetings.c.end_date.desc())
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_active_by_calendar_event(
self, room: Room, calendar_event_id: str, current_time: datetime
) -> Meeting | None:
@@ -363,6 +346,27 @@ class MeetingController:
return None
return Meeting(**result)
async def get_by_room_and_time_window(
self, room: Room, start_date: datetime, end_date: datetime
) -> Meeting | None:
"""Check if a meeting already exists for this room with the same time window."""
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id == room.id,
meetings.c.start_date == start_date,
meetings.c.end_date == end_date,
meetings.c.is_active,
)
)
.limit(1)
)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def update_meeting(self, meeting_id: str, **kwargs):
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False
@@ -245,11 +238,6 @@ class RoomController:
return room
async def get_by_names(self, names: list[str]) -> list[Room]:
query = rooms.select().where(rooms.c.name.in_(names))
results = await get_database().fetch_all(query)
return [Room(**r) for r in results]
async def get_ics_enabled(self) -> list[Room]:
query = rooms.select().where(
rooms.c.ics_enabled == True, rooms.c.ics_url != None

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -1,6 +1,4 @@
import asyncio
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Literal, Optional
@@ -8,14 +6,13 @@ from typing import Annotated, Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate
from pydantic import BaseModel, Field
from pydantic import BaseModel
from redis.exceptions import LockError
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room as DbRoom
from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock
from reflector.schemas.platform import Platform
@@ -198,82 +195,6 @@ async def rooms_list(
return paginated
class BulkStatusRequest(BaseModel):
room_names: list[str] = Field(max_length=100)
class RoomMeetingStatus(BaseModel):
active_meetings: list[Meeting]
upcoming_events: list[CalendarEventResponse]
@router.post("/rooms/meetings/bulk-status", response_model=dict[str, RoomMeetingStatus])
async def rooms_bulk_meeting_status(
request: BulkStatusRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
if not user and not settings.PUBLIC_MODE:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user["sub"] if user else None
all_rooms = await rooms_controller.get_by_names(request.room_names)
# Filter to rooms the user can see (owned or shared), matching rooms_list behavior
rooms = [
r
for r in all_rooms
if r.is_shared or (user_id is not None and r.user_id == user_id)
]
room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms}
room_ids = list(room_by_id.keys())
if not room_ids:
return {
name: RoomMeetingStatus(active_meetings=[], upcoming_events=[])
for name in request.room_names
}
current_time = datetime.now(timezone.utc)
active_meetings, upcoming_events = await asyncio.gather(
meetings_controller.get_all_active_for_rooms(room_ids, current_time),
calendar_events_controller.get_upcoming_for_rooms(room_ids),
)
# Group by room name, converting DB models to view models
active_by_room: dict[str, list[Meeting]] = defaultdict(list)
for m in active_meetings:
room = room_by_id.get(m.room_id)
if not room:
continue
m.platform = room.platform
if user_id != room.user_id and m.platform == "whereby":
m.host_room_url = ""
active_by_room[room.name].append(
Meeting.model_validate(m, from_attributes=True)
)
upcoming_by_room: dict[str, list[CalendarEventResponse]] = defaultdict(list)
for e in upcoming_events:
room = room_by_id.get(e.room_id)
if not room:
continue
if user_id != room.user_id:
e.description = None
e.attendees = None
upcoming_by_room[room.name].append(
CalendarEventResponse.model_validate(e, from_attributes=True)
)
result: dict[str, RoomMeetingStatus] = {}
for name in request.room_names:
result[name] = RoomMeetingStatus(
active_meetings=active_by_room.get(name, []),
upcoming_events=upcoming_by_room.get(name, []),
)
return result
@router.get("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_get(
room_id: str,

View File

@@ -5,7 +5,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.asynctask import asynctask
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.redis_cache import RedisAsyncLock
@@ -83,10 +83,9 @@ def _should_sync(room) -> bool:
return time_since_sync.total_seconds() >= room.ics_fetch_interval
MEETING_DEFAULT_DURATION = timedelta(hours=1)
async def create_upcoming_meetings_for_event(event, create_window, room: Room):
async def create_upcoming_meetings_for_event(
event: CalendarEvent, create_window: datetime, room: Room
):
if event.start_time <= create_window:
return
existing_meeting = await meetings_controller.get_by_calendar_event(event.id, room)
@@ -94,6 +93,21 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
if existing_meeting:
return
# Prevent duplicate meetings from aggregated calendar feeds
# (e.g. same event appears with different UIDs from Cal.com and Google Calendar)
end_date = event.end_time
existing_by_time = await meetings_controller.get_by_room_and_time_window(
room, event.start_time, end_date
)
if existing_by_time:
logger.info(
"Skipping duplicate calendar event - meeting already exists for this time window",
room_id=room.id,
event_id=event.id,
existing_meeting_id=existing_by_time.id,
)
return
logger.info(
"Pre-creating meeting for calendar event",
room_id=room.id,
@@ -102,8 +116,6 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
)
try:
end_date = event.end_time or (event.start_time + MEETING_DEFAULT_DURATION)
client = create_platform_client(room.platform)
meeting_data = await client.create_meeting(

View File

@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task
@@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
reprocessed_count += 1

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield

View File

@@ -0,0 +1,190 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db import get_database
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings
from reflector.db.rooms import rooms_controller
from reflector.worker.ics_sync import create_upcoming_meetings_for_event
@pytest.mark.asyncio
async def test_duplicate_calendar_event_does_not_create_duplicate_meeting():
"""When an aggregated ICS feed contains the same event with different UIDs
(e.g. Cal.com UID + Google Calendar UUID), only one meeting should be created."""
room = await rooms_controller.add(
name="dedup-test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
start_time = now + timedelta(hours=1)
end_time = now + timedelta(hours=2)
# Create first calendar event (Cal.com UID)
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="abc123@Cal.com",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# create_window must be before start_time for the function to proceed
create_window = now - timedelta(minutes=6)
# Create meeting for event1
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
async def mock_create_meeting_1(room_name_prefix, *, end_date, room):
return AsyncMock(
meeting_id="meeting-1",
room_name="dedup-test-room-abc",
room_url="https://mock.video/dedup-test-room-abc",
host_room_url="https://mock.video/dedup-test-room-abc?host=true",
)
mock_client.create_meeting = mock_create_meeting_1
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
# Verify meeting was created
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting, got {len(results)}"
# Create second calendar event with different UID but same time window (Google Calendar UUID)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="550e8400-e29b-41d4-a716-446655440000",
title="Team Standup",
start_time=start_time,
end_time=end_time,
)
)
# Try to create meeting for event2 - should be skipped due to dedup
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
create_meeting_called = False
async def mock_create_meeting_2(room_name_prefix, *, end_date, room):
nonlocal create_meeting_called
create_meeting_called = True
mock_client.create_meeting = mock_create_meeting_2
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event2, create_window, room)
# Platform client should NOT have been called for the duplicate
assert (
not create_meeting_called
), "create_meeting should not be called for duplicate"
# Verify still only 1 meeting
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert len(results) == 1, f"Expected 1 meeting after dedup, got {len(results)}"
@pytest.mark.asyncio
async def test_different_time_windows_create_separate_meetings():
"""Events at different times should create separate meetings, even if titles match."""
room = await rooms_controller.add(
name="dedup-diff-time-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
ics_url="https://calendar.example.com/dedup2.ics",
ics_enabled=True,
)
now = datetime.now(timezone.utc)
create_window = now - timedelta(minutes=6)
# Event 1: 1-2pm
event1 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-morning@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=1),
end_time=now + timedelta(hours=2),
)
)
# Event 2: 3-4pm (different time)
event2 = await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid="event-afternoon@Cal.com",
title="Team Standup",
start_time=now + timedelta(hours=3),
end_time=now + timedelta(hours=4),
)
)
with patch(
"reflector.worker.ics_sync.create_platform_client"
) as mock_platform_factory:
mock_client = AsyncMock()
call_count = 0
async def mock_create_meeting(room_name_prefix, *, end_date, room):
nonlocal call_count
call_count += 1
return AsyncMock(
meeting_id=f"meeting-{call_count}",
room_name=f"dedup-diff-time-room-{call_count}",
room_url=f"https://mock.video/dedup-diff-time-room-{call_count}",
host_room_url=f"https://mock.video/dedup-diff-time-room-{call_count}?host=true",
)
mock_client.create_meeting = mock_create_meeting
mock_client.upload_logo = AsyncMock()
mock_platform_factory.return_value = mock_client
await create_upcoming_meetings_for_event(event1, create_window, room)
await create_upcoming_meetings_for_event(event2, create_window, room)
results = await get_database().fetch_all(
meetings.select().where(meetings.c.room_id == room.id)
)
assert (
len(results) == 2
), f"Expected 2 meetings for different times, got {len(results)}"

View File

@@ -1,184 +0,0 @@
from datetime import datetime, timedelta, timezone
import pytest
from conftest import authenticated_client_ctx
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.settings import settings
async def _create_room(name: str, user_id: str, is_shared: bool = False) -> Room:
return await rooms_controller.add(
name=name,
user_id=user_id,
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=is_shared,
webhook_url="",
webhook_secret="",
)
async def _create_meeting(room: Room, active: bool = True):
now = datetime.now(timezone.utc)
return await meetings_controller.create(
id=f"meeting-{room.name}-{now.timestamp()}",
room_name=room.name,
room_url="room-url",
host_room_url="host-url",
start_date=now - timedelta(minutes=10),
end_date=now + timedelta(minutes=50) if active else now - timedelta(minutes=1),
room=room,
)
async def _create_calendar_event(room: Room):
now = datetime.now(timezone.utc)
return await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid=f"event-{room.name}",
title=f"Upcoming in {room.name}",
description="secret description",
start_time=now + timedelta(minutes=30),
end_time=now + timedelta(minutes=90),
attendees=[{"name": "Alice", "email": "alice@example.com"}],
)
)
@pytest.mark.asyncio
async def test_bulk_status_returns_empty_for_no_rooms(client):
"""Empty room_names returns empty dict."""
async with authenticated_client_ctx():
resp = await client.post("/rooms/meetings/bulk-status", json={"room_names": []})
assert resp.status_code == 200
assert resp.json() == {}
@pytest.mark.asyncio
async def test_bulk_status_returns_active_meetings_and_upcoming_events(client):
"""Owner sees active meetings and upcoming events for their rooms."""
room = await _create_room("bulk-test-room", "randomuserid")
await _create_meeting(room, active=True)
await _create_calendar_event(room)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["bulk-test-room"]},
)
assert resp.status_code == 200
data = resp.json()
assert "bulk-test-room" in data
status = data["bulk-test-room"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Owner sees description
assert status["upcoming_events"][0]["description"] == "secret description"
@pytest.mark.asyncio
async def test_bulk_status_redacts_data_for_non_owner(client):
"""Non-owner of a shared room gets redacted calendar events and no whereby host_room_url."""
room = await _create_room("shared-bulk", "other-user-id", is_shared=True)
await _create_meeting(room, active=True)
await _create_calendar_event(room)
# authenticated as "randomuserid" but room owned by "other-user-id"
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-bulk"]},
)
assert resp.status_code == 200
status = resp.json()["shared-bulk"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Non-owner: description and attendees redacted
assert status["upcoming_events"][0]["description"] is None
assert status["upcoming_events"][0]["attendees"] is None
@pytest.mark.asyncio
async def test_bulk_status_filters_private_rooms_of_other_users(client):
"""User cannot see private rooms owned by others."""
await _create_room("private-other", "other-user-id", is_shared=False)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["private-other"]},
)
assert resp.status_code == 200
status = resp.json()["private-other"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []
@pytest.mark.asyncio
async def test_bulk_status_redacts_whereby_host_room_url_for_non_owner(client):
"""Non-owner of a shared whereby room gets empty host_room_url."""
room = await _create_room("shared-whereby", "other-user-id", is_shared=True)
# Force platform to whereby
from reflector.db import get_database
from reflector.db.rooms import rooms as rooms_table
await get_database().execute(
rooms_table.update()
.where(rooms_table.c.id == room.id)
.values(platform="whereby")
)
await _create_meeting(room, active=True)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-whereby"]},
)
assert resp.status_code == 200
status = resp.json()["shared-whereby"]
assert len(status["active_meetings"]) == 1
assert status["active_meetings"][0]["host_room_url"] == ""
@pytest.mark.asyncio
async def test_bulk_status_unauthenticated_rejected_non_public(client):
"""Unauthenticated request on non-PUBLIC_MODE instance returns 401."""
original = settings.PUBLIC_MODE
try:
settings.PUBLIC_MODE = False
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["any-room"]},
)
assert resp.status_code == 401
finally:
settings.PUBLIC_MODE = original
@pytest.mark.asyncio
async def test_bulk_status_nonexistent_room_returns_empty(client):
"""Requesting a room that doesn't exist returns empty lists."""
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["does-not-exist"]},
)
assert resp.status_code == 200
status = resp.json()["does-not-exist"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline
# Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called()
mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called()

View File

@@ -1,10 +1,5 @@
import { useMemo } from "react";
import { Box, Heading, Text, VStack } from "@chakra-ui/react";
import type { components } from "../../../reflector-api";
import {
useRoomsBulkMeetingStatus,
BulkMeetingStatusMap,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"];
import { RoomTable } from "./RoomTable";
@@ -36,10 +31,6 @@ export function RoomList({
pt,
loading,
}: RoomListProps) {
const roomNames = useMemo(() => rooms.map((r) => r.name), [rooms]);
const bulkStatusQuery = useRoomsBulkMeetingStatus(roomNames);
const meetingStatusMap: BulkMeetingStatusMap = bulkStatusQuery.data ?? {};
return (
<VStack alignItems="start" gap={4} mb={mb} pt={pt}>
<Heading size="md">{title}</Heading>
@@ -52,8 +43,6 @@ export function RoomList({
onEdit={onEdit}
onDelete={onDelete}
loading={loading}
meetingStatusMap={meetingStatusMap}
meetingStatusLoading={bulkStatusQuery.isLoading}
/>
<RoomCards
rooms={rooms}

View File

@@ -14,7 +14,11 @@ import {
import { LuLink, LuRefreshCw } from "react-icons/lu";
import { FaCalendarAlt } from "react-icons/fa";
import type { components } from "../../../reflector-api";
import { useRoomIcsSync, BulkMeetingStatusMap } from "../../../lib/apiHooks";
import {
useRoomActiveMeetings,
useRoomUpcomingMeetings,
useRoomIcsSync,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"];
type Meeting = components["schemas"]["Meeting"];
@@ -58,8 +62,6 @@ interface RoomTableProps {
onEdit: (roomId: string, roomData: any) => void;
onDelete: (roomId: string) => void;
loading?: boolean;
meetingStatusMap: BulkMeetingStatusMap;
meetingStatusLoading: boolean;
}
const getRoomModeDisplay = (mode: string): string => {
@@ -102,16 +104,14 @@ const getZulipDisplay = (
return "Enabled";
};
function MeetingStatus({
activeMeetings,
upcomingMeetings,
isLoading,
}: {
activeMeetings: Meeting[];
upcomingMeetings: CalendarEventResponse[];
isLoading: boolean;
}) {
if (isLoading) {
function MeetingStatus({ roomName }: { roomName: string }) {
const activeMeetingsQuery = useRoomActiveMeetings(roomName);
const upcomingMeetingsQuery = useRoomUpcomingMeetings(roomName);
const activeMeetings = activeMeetingsQuery.data || [];
const upcomingMeetings = upcomingMeetingsQuery.data || [];
if (activeMeetingsQuery.isLoading || upcomingMeetingsQuery.isLoading) {
return <Spinner size="sm" />;
}
@@ -176,8 +176,6 @@ export function RoomTable({
onEdit,
onDelete,
loading,
meetingStatusMap,
meetingStatusLoading,
}: RoomTableProps) {
const [syncingRooms, setSyncingRooms] = useState<Set<NonEmptyString>>(
new Set(),
@@ -254,15 +252,7 @@ export function RoomTable({
<Link href={`/${room.name}`}>{room.name}</Link>
</Table.Cell>
<Table.Cell>
<MeetingStatus
activeMeetings={
meetingStatusMap[room.name]?.active_meetings ?? []
}
upcomingMeetings={
meetingStatusMap[room.name]?.upcoming_events ?? []
}
isLoading={meetingStatusLoading}
/>
<MeetingStatus roomName={room.name} />
</Table.Cell>
<Table.Cell>
{getZulipDisplay(

View File

@@ -8,7 +8,7 @@ import {
useRef,
useState,
} from "react";
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
import { Box, Spinner, Center, Text, Button, VStack } from "@chakra-ui/react";
import { useRouter, useParams } from "next/navigation";
import DailyIframe, {
DailyCall,
@@ -16,10 +16,13 @@ import DailyIframe, {
DailyCustomTrayButton,
DailyCustomTrayButtons,
DailyEventObjectCustomButtonClick,
DailyEventObjectFatalError,
DailyFatalErrorType,
DailyFactoryOptions,
DailyParticipantsObject,
} from "@daily-co/daily-js";
import type { components } from "../../reflector-api";
import { printApiError, ApiError } from "../../api/_error";
import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent";
import {
@@ -45,6 +48,63 @@ const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
const RECORDING_START_DELAY_MS = 2000;
const RECORDING_START_MAX_RETRIES = 5;
const FATAL_ERROR_MESSAGES: Partial<
Record<DailyFatalErrorType, { message: string; rejoinable?: boolean }>
> = {
"connection-error": {
message: "Connection lost. Please check your network.",
rejoinable: true,
},
"exp-room": { message: "The meeting time has ended." },
"exp-token": { message: "Your session has expired.", rejoinable: true },
ejected: { message: "You were removed from this meeting." },
"meeting-full": { message: "This meeting is full." },
"not-allowed": { message: "You are not allowed to join this meeting." },
"nbf-room": { message: "This meeting hasn't started yet." },
"nbf-token": { message: "This meeting hasn't started yet." },
"no-room": { message: "This room does not exist." },
"end-of-life": { message: "This meeting room is no longer available." },
};
function FatalErrorScreen({
error,
roomName,
}: {
error: FatalError;
roomName: string;
}) {
const router = useRouter();
const info =
error.type !== "unknown" ? FATAL_ERROR_MESSAGES[error.type] : undefined;
const message = info?.message ?? `Something went wrong: ${error.message}`;
const rejoinable = info?.rejoinable ?? false;
return (
<Center width="100vw" height="100vh">
<VStack gap={4}>
<Text color="red.500">{message}</Text>
{rejoinable ? (
<>
<Button onClick={() => window.location.reload()}>
Try Rejoining
</Button>
<Button
variant="outline"
onClick={() => router.push(`/${roomName}`)}
>
Leave
</Button>
</>
) : (
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
)}
</VStack>
</Center>
);
}
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
@@ -82,6 +142,8 @@ const USE_FRAME_INIT_STATE = {
joined: false as boolean,
} as const;
type FatalError = { type: DailyFatalErrorType | "unknown"; message: string };
// Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults
const useFrame = (
container: HTMLDivElement | null,
@@ -89,6 +151,7 @@ const useFrame = (
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: () => void;
onError: (ev: DailyEventObjectFatalError) => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
@@ -134,6 +197,7 @@ const useFrame = (
if (!frame) return;
frame.on("left-meeting", cbs.onLeftMeeting);
frame.on("custom-button-click", cbs.onCustomButtonClick);
frame.on("error", cbs.onError);
const joinCb = () => {
if (!frame) {
console.error("frame is null in joined-meeting callback");
@@ -145,6 +209,7 @@ const useFrame = (
return () => {
frame.off("left-meeting", cbs.onLeftMeeting);
frame.off("custom-button-click", cbs.onCustomButtonClick);
frame.off("error", cbs.onError);
frame.off("joined-meeting", joinCb);
};
}, [frame, cbs]);
@@ -188,6 +253,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const joinMutation = useRoomJoinMeeting();
const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const [fatalError, setFatalError] = useState<FatalError | null>(null);
// Generate deterministic instanceIds so all participants use SAME IDs
const cloudInstanceId = parseNonEmptyString(meeting.id);
@@ -234,8 +300,18 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => {
// If a fatal error occurred, don't redirect — let the error UI show
if (fatalError) return;
router.push("/browse");
}, [router]);
}, [router, fatalError]);
const handleError = useCallback((ev: DailyEventObjectFatalError) => {
const error: FatalError = {
type: ev.error?.type ?? "unknown",
message: ev.errorMsg,
};
setFatalError(error);
}, []);
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
@@ -324,6 +400,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
onLeftMeeting: handleLeave,
onCustomButtonClick: handleCustomButtonClick,
onJoinMeeting: handleFrameJoinMeeting,
onError: handleError,
});
useEffect(() => {
@@ -380,13 +457,27 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
}
if (joinMutation.isError) {
const apiDetail = printApiError(
joinMutation.error as /*ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3*/ ApiError,
);
return (
<Center width="100vw" height="100vh">
<Text color="red.500">Failed to join meeting. Please try again.</Text>
<VStack gap={4}>
<Text color="red.500">
{apiDetail ?? "Failed to join meeting. Please try again."}
</Text>
<Button onClick={() => router.push(`/${roomName}`)}>
Back to Room
</Button>
</VStack>
</Center>
);
}
if (fatalError) {
return <FatalErrorScreen error={fatalError} roomName={roomName} />;
}
if (!roomUrl) {
return null;
}

View File

@@ -1,246 +0,0 @@
import "@testing-library/jest-dom";
// --- Module mocks (hoisted before imports) ---
jest.mock("../apiClient", () => ({
client: {
GET: jest.fn(),
POST: jest.fn(),
PUT: jest.fn(),
PATCH: jest.fn(),
DELETE: jest.fn(),
use: jest.fn(),
},
$api: {
useQuery: jest.fn(),
useMutation: jest.fn(),
queryOptions: (method: string, path: string, init?: unknown) =>
init === undefined
? { queryKey: [method, path] }
: { queryKey: [method, path, init] },
},
API_URL: "http://test",
WEBSOCKET_URL: "ws://test",
configureApiAuth: jest.fn(),
}));
jest.mock("../AuthProvider", () => ({
useAuth: () => ({
status: "authenticated" as const,
accessToken: "test-token",
accessTokenExpires: Date.now() + 3600000,
user: { id: "user1", name: "Test User" },
update: jest.fn(),
signIn: jest.fn(),
signOut: jest.fn(),
lastUserId: "user1",
}),
}));
// --- Imports (after mocks) ---
import React from "react";
import { render, waitFor, screen } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useRoomsBulkMeetingStatus, BulkMeetingStatusMap } from "../apiHooks";
import { client } from "../apiClient";
import { ErrorProvider } from "../../(errors)/errorContext";
const mockClient = client as { POST: jest.Mock };
// --- Helpers ---
function mockBulkStatusEndpoint(
roomData?: Record<
string,
{ active_meetings: unknown[]; upcoming_events: unknown[] }
>,
) {
mockClient.POST.mockImplementation(
async (_path: string, options: { body: { room_names: string[] } }) => {
const roomNames: string[] = options.body.room_names;
const src = roomData ?? {};
const data = Object.fromEntries(
roomNames.map((name) => [
name,
src[name] ?? { active_meetings: [], upcoming_events: [] },
]),
);
return { data, error: undefined, response: {} };
},
);
}
// --- Test component: uses the bulk hook and displays results ---
function BulkStatusDisplay({ roomNames }: { roomNames: string[] }) {
const { data, isLoading } = useRoomsBulkMeetingStatus(roomNames);
if (isLoading) {
return <div data-testid="status">loading</div>;
}
if (!data) {
return <div data-testid="status">no data</div>;
}
return (
<div data-testid="status">
{roomNames.map((name) => {
const status = data[name];
return (
<div key={name} data-testid={`room-${name}`}>
{status?.active_meetings?.length ?? 0} active,{" "}
{status?.upcoming_events?.length ?? 0} upcoming
</div>
);
})}
</div>
);
}
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: {
queries: { retry: false },
},
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>
<ErrorProvider>{children}</ErrorProvider>
</QueryClientProvider>
);
};
}
// --- Tests ---
describe("bulk meeting status (prop-drilling)", () => {
afterEach(() => jest.clearAllMocks());
it("fetches all room statuses in a single POST request", async () => {
const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`);
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={rooms} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
for (const name of rooms) {
expect(screen.getByTestId(`room-${name}`)).toHaveTextContent(
"0 active, 0 upcoming",
);
}
});
const postCalls = mockClient.POST.mock.calls.filter(
([path]: [string]) => path === "/v1/rooms/meetings/bulk-status",
);
// Prop-drilling: exactly 1 POST for all rooms (no batcher needed)
expect(postCalls).toHaveLength(1);
// The single call contains all room names
const requestedRooms: string[] = postCalls[0][1].body.room_names;
expect(requestedRooms).toHaveLength(10);
for (const name of rooms) {
expect(requestedRooms).toContain(name);
}
});
it("returns room-specific data correctly", async () => {
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
render(<BulkStatusDisplay roomNames={["room-a", "room-b"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("room-room-a")).toHaveTextContent(
"1 active, 0 upcoming",
);
expect(screen.getByTestId("room-room-b")).toHaveTextContent(
"0 active, 1 upcoming",
);
});
// Still just 1 POST
expect(mockClient.POST).toHaveBeenCalledTimes(1);
});
it("does not fetch when roomNames is empty", async () => {
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={[]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
// No POST calls when no rooms
expect(mockClient.POST).not.toHaveBeenCalled();
});
it("surfaces error when POST fails", async () => {
mockClient.POST.mockResolvedValue({
data: undefined,
error: { detail: "server error" },
response: {},
});
function ErrorDisplay({ roomNames }: { roomNames: string[] }) {
const { error } = useRoomsBulkMeetingStatus(roomNames);
if (error) return <div data-testid="error">{error.message}</div>;
return <div data-testid="error">no error</div>;
}
render(<ErrorDisplay roomNames={["room-x"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("error")).toHaveTextContent(
"bulk-status fetch failed",
);
});
});
it("does not fetch when unauthenticated", async () => {
// Override useAuth to return unauthenticated
const authModule = jest.requireMock("../AuthProvider");
const originalUseAuth = authModule.useAuth;
authModule.useAuth = () => ({
...originalUseAuth(),
status: "unauthenticated",
});
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={["room-1"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
expect(mockClient.POST).not.toHaveBeenCalled();
// Restore
authModule.useAuth = originalUseAuth;
});
});

View File

@@ -1,14 +1,15 @@
"use client";
import { $api, client } from "./apiClient";
import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext";
import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
/*
* ref 095959E6-01CC-4CF0-B3A9-F65F12F046D3
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
* this is either a limitation or incorrect usage of Python json schema generator
* or, limitation or incorrect usage of .d type generator from json schema
@@ -641,21 +642,16 @@ export function useMeetingDeactivate() {
setError(error as Error, "Failed to end meeting");
},
onSuccess: () => {
return Promise.all([
queryClient.invalidateQueries({
predicate: (query) => {
const key = query.queryKey;
return key.some(
(k) =>
typeof k === "string" &&
!!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)),
);
},
}),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}),
]);
return queryClient.invalidateQueries({
predicate: (query) => {
const key = query.queryKey;
return key.some(
(k) =>
typeof k === "string" &&
!!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)),
);
},
});
},
});
}
@@ -712,9 +708,6 @@ export function useRoomsCreateMeeting() {
},
).queryKey,
}),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}),
]);
},
onError: (error) => {
@@ -780,32 +773,6 @@ export function useRoomActiveMeetings(roomName: string | null) {
);
}
type RoomMeetingStatus = components["schemas"]["RoomMeetingStatus"];
export type BulkMeetingStatusMap = Partial<Record<string, RoomMeetingStatus>>;
export function useRoomsBulkMeetingStatus(roomNames: string[]) {
const { isAuthenticated } = useAuthReady();
const sortedNames = [...roomNames].sort();
return useQuery({
queryKey: ["bulk-meeting-status", sortedNames],
queryFn: async (): Promise<BulkMeetingStatusMap> => {
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: roomNames } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return data;
},
enabled: sortedNames.length > 0 && isAuthenticated,
});
}
export function useRoomGetMeeting(
roomName: string | null,
meetingId: MeetingId | null,

View File

@@ -118,23 +118,6 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/rooms/meetings/bulk-status": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Rooms Bulk Meeting Status */
post: operations["v1_rooms_bulk_meeting_status"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}": {
parameters: {
query?: never;
@@ -816,11 +799,6 @@ export interface components {
*/
chunk: string;
};
/** BulkStatusRequest */
BulkStatusRequest: {
/** Room Names */
room_names: string[];
};
/** CalendarEventResponse */
CalendarEventResponse: {
/** Id */
@@ -1697,13 +1675,6 @@ export interface components {
*/
skip_consent: boolean;
};
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RoomDetails */
RoomDetails: {
/** Id */
@@ -2301,41 +2272,6 @@ export interface operations {
};
};
};
v1_rooms_bulk_meeting_status: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["BulkStatusRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: components["schemas"]["RoomMeetingStatus"];
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_get: {
parameters: {
query?: never;

View File

@@ -1,22 +1,8 @@
module.exports = {
testEnvironment: "jest-environment-jsdom",
preset: "ts-jest",
testEnvironment: "node",
roots: ["<rootDir>/app"],
testMatch: ["**/__tests__/**/*.test.ts", "**/__tests__/**/*.test.tsx"],
collectCoverage: false,
transform: {
"^.+\\.[jt]sx?$": [
"ts-jest",
{
tsconfig: {
jsx: "react-jsx",
module: "esnext",
moduleResolution: "bundler",
esModuleInterop: true,
strict: true,
downlevelIteration: true,
lib: ["dom", "dom.iterable", "esnext"],
},
},
],
},
testMatch: ["**/__tests__/**/*.test.ts"],
collectCoverage: true,
collectCoverageFrom: ["app/**/*.ts", "!app/**/*.d.ts"],
};

View File

@@ -61,13 +61,9 @@
"author": "Andreas <andreas@monadical.com>",
"license": "All Rights Reserved",
"devDependencies": {
"@testing-library/dom": "^10.4.1",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/jest": "^30.0.0",
"@types/react": "18.2.20",
"jest": "^30.1.3",
"jest-environment-jsdom": "^30.2.0",
"openapi-typescript": "^7.9.1",
"prettier": "^3.0.0",
"ts-jest": "^29.4.1"

787
www/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff