Compare commits

..

2 Commits

Author SHA1 Message Date
Igor Loskutov
df6916385b fix: address review feedback
- Add PUBLIC_MODE auth guard on bulk-status endpoint
- Convert DB models to view models via model_validate()
- Early return when no accessible rooms (skip DB queries)
- BulkMeetingStatusMap: Partial<Record> for type honesty
- Sort roomNames in query key for cache stability
- Remove redundant empty-guard in queryFn
- Add 7 backend tests: auth, redaction, whereby host_room_url, 401, empty
- Add 2 frontend tests: error handling, unauthenticated case
2026-02-05 20:30:26 -05:00
Igor Loskutov
083a50cbcd fix: batch room meeting status queries via prop-drilling
Alternative to the batcher approach (#848): parent fetches all room
meeting statuses in a single bulk POST and passes data down as props.
No extra dependency (@yornaath/batshit), no implicit batching magic.

Backend: POST /v1/rooms/meetings/bulk-status + bulk DB methods.
Frontend: useRoomsBulkMeetingStatus hook in RoomList, MeetingStatus
receives data as props instead of calling per-room hooks.
CI: fix pnpm 8→10 auto-detect, add concurrency group.
Tests: Jest+jsdom+testing-library for bulk hook.
2026-02-05 20:04:31 -05:00
18 changed files with 656 additions and 384 deletions

View File

@@ -1,35 +0,0 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -57,6 +57,12 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -91,6 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -15,10 +15,14 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -177,7 +181,21 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
# Multitrack processing always uses Hatchet (no Celery fallback) use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only) # First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force: if transcript and transcript.workflow_run_id and not force:
@@ -185,7 +203,9 @@ async def dispatch_transcript_processing(
transcript.workflow_run_id transcript.workflow_run_id
) )
if can_replay: if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id) await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info( logger.info(
"Replaying Hatchet workflow", "Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id, workflow_id=transcript.workflow_run_id,
@@ -213,7 +233,9 @@ async def dispatch_transcript_processing(
# Force: cancel old workflow if exists # Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id: if force and transcript and transcript.workflow_run_id:
try: try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info( logger.info(
"Cancelled old workflow (--force)", "Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id, workflow_id=transcript.workflow_run_id,
@@ -223,7 +245,9 @@ async def dispatch_transcript_processing(
"Old workflow already deleted (--force)", "Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id, workflow_id=transcript.workflow_run_id,
) )
await transcripts_controller.update(transcript, {"workflow_run_id": None}) await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach). # Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet # No database lock - worst case is duplicate dispatch, but Hatchet
@@ -269,6 +293,12 @@ async def dispatch_transcript_processing(
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id) logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import DAILY_PLATFORM, Platform from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None

View File

@@ -212,6 +212,9 @@ async def rooms_bulk_meeting_status(
request: BulkStatusRequest, request: BulkStatusRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
): ):
if not user and not settings.PUBLIC_MODE:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user["sub"] if user else None user_id = user["sub"] if user else None
all_rooms = await rooms_controller.get_by_names(request.room_names) all_rooms = await rooms_controller.get_by_names(request.room_names)
@@ -224,13 +227,19 @@ async def rooms_bulk_meeting_status(
room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms} room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms}
room_ids = list(room_by_id.keys()) room_ids = list(room_by_id.keys())
if not room_ids:
return {
name: RoomMeetingStatus(active_meetings=[], upcoming_events=[])
for name in request.room_names
}
current_time = datetime.now(timezone.utc) current_time = datetime.now(timezone.utc)
active_meetings, upcoming_events = await asyncio.gather( active_meetings, upcoming_events = await asyncio.gather(
meetings_controller.get_all_active_for_rooms(room_ids, current_time), meetings_controller.get_all_active_for_rooms(room_ids, current_time),
calendar_events_controller.get_upcoming_for_rooms(room_ids), calendar_events_controller.get_upcoming_for_rooms(room_ids),
) )
# Group by room name # Group by room name, converting DB models to view models
active_by_room: dict[str, list[Meeting]] = defaultdict(list) active_by_room: dict[str, list[Meeting]] = defaultdict(list)
for m in active_meetings: for m in active_meetings:
room = room_by_id.get(m.room_id) room = room_by_id.get(m.room_id)

View File

@@ -27,6 +27,9 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -348,7 +351,17 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
# Multitrack processing always uses Hatchet (no Celery fallback) use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DiarizationPipeline",
input_data={ input_data={
@@ -370,7 +383,17 @@ async def _process_multitrack_recording_inner(
transcript_id=transcript.id, transcript_id=transcript.id,
) )
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id}) await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
@shared_task @shared_task
@@ -1049,7 +1072,10 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
# Multitrack reprocessing always uses Hatchet (no Celery fallback) use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript: if not transcript:
logger.warning( logger.warning(
"No transcript for Hatchet reprocessing, skipping", "No transcript for Hatchet reprocessing, skipping",
@@ -1086,6 +1112,26 @@ async def reprocess_failed_daily_recordings():
room_name=meeting.room_name, room_name=meeting.room_name,
track_count=len(recording.track_keys), track_count=len(recording.track_keys),
) )
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM from reflector.schemas.platform import WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -14,7 +14,6 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield

View File

@@ -0,0 +1,184 @@
from datetime import datetime, timedelta, timezone
import pytest
from conftest import authenticated_client_ctx
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.settings import settings
async def _create_room(name: str, user_id: str, is_shared: bool = False) -> Room:
return await rooms_controller.add(
name=name,
user_id=user_id,
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=is_shared,
webhook_url="",
webhook_secret="",
)
async def _create_meeting(room: Room, active: bool = True):
now = datetime.now(timezone.utc)
return await meetings_controller.create(
id=f"meeting-{room.name}-{now.timestamp()}",
room_name=room.name,
room_url="room-url",
host_room_url="host-url",
start_date=now - timedelta(minutes=10),
end_date=now + timedelta(minutes=50) if active else now - timedelta(minutes=1),
room=room,
)
async def _create_calendar_event(room: Room):
now = datetime.now(timezone.utc)
return await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid=f"event-{room.name}",
title=f"Upcoming in {room.name}",
description="secret description",
start_time=now + timedelta(minutes=30),
end_time=now + timedelta(minutes=90),
attendees=[{"name": "Alice", "email": "alice@example.com"}],
)
)
@pytest.mark.asyncio
async def test_bulk_status_returns_empty_for_no_rooms(client):
"""Empty room_names returns empty dict."""
async with authenticated_client_ctx():
resp = await client.post("/rooms/meetings/bulk-status", json={"room_names": []})
assert resp.status_code == 200
assert resp.json() == {}
@pytest.mark.asyncio
async def test_bulk_status_returns_active_meetings_and_upcoming_events(client):
"""Owner sees active meetings and upcoming events for their rooms."""
room = await _create_room("bulk-test-room", "randomuserid")
await _create_meeting(room, active=True)
await _create_calendar_event(room)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["bulk-test-room"]},
)
assert resp.status_code == 200
data = resp.json()
assert "bulk-test-room" in data
status = data["bulk-test-room"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Owner sees description
assert status["upcoming_events"][0]["description"] == "secret description"
@pytest.mark.asyncio
async def test_bulk_status_redacts_data_for_non_owner(client):
"""Non-owner of a shared room gets redacted calendar events and no whereby host_room_url."""
room = await _create_room("shared-bulk", "other-user-id", is_shared=True)
await _create_meeting(room, active=True)
await _create_calendar_event(room)
# authenticated as "randomuserid" but room owned by "other-user-id"
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-bulk"]},
)
assert resp.status_code == 200
status = resp.json()["shared-bulk"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Non-owner: description and attendees redacted
assert status["upcoming_events"][0]["description"] is None
assert status["upcoming_events"][0]["attendees"] is None
@pytest.mark.asyncio
async def test_bulk_status_filters_private_rooms_of_other_users(client):
"""User cannot see private rooms owned by others."""
await _create_room("private-other", "other-user-id", is_shared=False)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["private-other"]},
)
assert resp.status_code == 200
status = resp.json()["private-other"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []
@pytest.mark.asyncio
async def test_bulk_status_redacts_whereby_host_room_url_for_non_owner(client):
"""Non-owner of a shared whereby room gets empty host_room_url."""
room = await _create_room("shared-whereby", "other-user-id", is_shared=True)
# Force platform to whereby
from reflector.db import get_database
from reflector.db.rooms import rooms as rooms_table
await get_database().execute(
rooms_table.update()
.where(rooms_table.c.id == room.id)
.values(platform="whereby")
)
await _create_meeting(room, active=True)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-whereby"]},
)
assert resp.status_code == 200
status = resp.json()["shared-whereby"]
assert len(status["active_meetings"]) == 1
assert status["active_meetings"][0]["host_room_url"] == ""
@pytest.mark.asyncio
async def test_bulk_status_unauthenticated_rejected_non_public(client):
"""Unauthenticated request on non-PUBLIC_MODE instance returns 401."""
original = settings.PUBLIC_MODE
try:
settings.PUBLIC_MODE = False
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["any-room"]},
)
assert resp.status_code == 401
finally:
settings.PUBLIC_MODE = original
@pytest.mark.asyncio
async def test_bulk_status_nonexistent_room_returns_empty(client):
"""Requesting a room that doesn't exist returns empty lists."""
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["does-not-exist"]},
)
assert resp.status_code == 200
status = resp.json()["does-not-exist"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import AsyncMock, patch from unittest.mock import patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_hatchet, ) as mock_multitrack_pipeline,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline, not Hatchet # Whereby recordings should use file pipeline
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_hatchet.start_workflow.assert_not_called() mock_multitrack_pipeline.delay.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,6 +177,8 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -211,23 +213,18 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_hatchet, ) as mock_multitrack_pipeline,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use Hatchet workflow # Daily.co multitrack recordings should use multitrack pipeline
mock_hatchet.start_workflow.assert_called_once() mock_multitrack_pipeline.delay.assert_called_once_with(
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs transcript_id=transcript.id,
assert call_kwargs["workflow_name"] == "DiarizationPipeline" bucket_name="daily-bucket",
assert call_kwargs["input_data"]["transcript_id"] == transcript.id track_keys=track_keys,
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket" )
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -1,5 +1,10 @@
import { useMemo } from "react";
import { Box, Heading, Text, VStack } from "@chakra-ui/react"; import { Box, Heading, Text, VStack } from "@chakra-ui/react";
import type { components } from "../../../reflector-api"; import type { components } from "../../../reflector-api";
import {
useRoomsBulkMeetingStatus,
BulkMeetingStatusMap,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"]; type Room = components["schemas"]["Room"];
import { RoomTable } from "./RoomTable"; import { RoomTable } from "./RoomTable";
@@ -31,6 +36,10 @@ export function RoomList({
pt, pt,
loading, loading,
}: RoomListProps) { }: RoomListProps) {
const roomNames = useMemo(() => rooms.map((r) => r.name), [rooms]);
const bulkStatusQuery = useRoomsBulkMeetingStatus(roomNames);
const meetingStatusMap: BulkMeetingStatusMap = bulkStatusQuery.data ?? {};
return ( return (
<VStack alignItems="start" gap={4} mb={mb} pt={pt}> <VStack alignItems="start" gap={4} mb={mb} pt={pt}>
<Heading size="md">{title}</Heading> <Heading size="md">{title}</Heading>
@@ -43,6 +52,8 @@ export function RoomList({
onEdit={onEdit} onEdit={onEdit}
onDelete={onDelete} onDelete={onDelete}
loading={loading} loading={loading}
meetingStatusMap={meetingStatusMap}
meetingStatusLoading={bulkStatusQuery.isLoading}
/> />
<RoomCards <RoomCards
rooms={rooms} rooms={rooms}

View File

@@ -14,11 +14,7 @@ import {
import { LuLink, LuRefreshCw } from "react-icons/lu"; import { LuLink, LuRefreshCw } from "react-icons/lu";
import { FaCalendarAlt } from "react-icons/fa"; import { FaCalendarAlt } from "react-icons/fa";
import type { components } from "../../../reflector-api"; import type { components } from "../../../reflector-api";
import { import { useRoomIcsSync, BulkMeetingStatusMap } from "../../../lib/apiHooks";
useRoomActiveMeetings,
useRoomUpcomingMeetings,
useRoomIcsSync,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"]; type Room = components["schemas"]["Room"];
type Meeting = components["schemas"]["Meeting"]; type Meeting = components["schemas"]["Meeting"];
@@ -62,6 +58,8 @@ interface RoomTableProps {
onEdit: (roomId: string, roomData: any) => void; onEdit: (roomId: string, roomData: any) => void;
onDelete: (roomId: string) => void; onDelete: (roomId: string) => void;
loading?: boolean; loading?: boolean;
meetingStatusMap: BulkMeetingStatusMap;
meetingStatusLoading: boolean;
} }
const getRoomModeDisplay = (mode: string): string => { const getRoomModeDisplay = (mode: string): string => {
@@ -104,14 +102,16 @@ const getZulipDisplay = (
return "Enabled"; return "Enabled";
}; };
function MeetingStatus({ roomName }: { roomName: string }) { function MeetingStatus({
const activeMeetingsQuery = useRoomActiveMeetings(roomName); activeMeetings,
const upcomingMeetingsQuery = useRoomUpcomingMeetings(roomName); upcomingMeetings,
isLoading,
const activeMeetings = activeMeetingsQuery.data || []; }: {
const upcomingMeetings = upcomingMeetingsQuery.data || []; activeMeetings: Meeting[];
upcomingMeetings: CalendarEventResponse[];
if (activeMeetingsQuery.isLoading || upcomingMeetingsQuery.isLoading) { isLoading: boolean;
}) {
if (isLoading) {
return <Spinner size="sm" />; return <Spinner size="sm" />;
} }
@@ -176,6 +176,8 @@ export function RoomTable({
onEdit, onEdit,
onDelete, onDelete,
loading, loading,
meetingStatusMap,
meetingStatusLoading,
}: RoomTableProps) { }: RoomTableProps) {
const [syncingRooms, setSyncingRooms] = useState<Set<NonEmptyString>>( const [syncingRooms, setSyncingRooms] = useState<Set<NonEmptyString>>(
new Set(), new Set(),
@@ -252,7 +254,15 @@ export function RoomTable({
<Link href={`/${room.name}`}>{room.name}</Link> <Link href={`/${room.name}`}>{room.name}</Link>
</Table.Cell> </Table.Cell>
<Table.Cell> <Table.Cell>
<MeetingStatus roomName={room.name} /> <MeetingStatus
activeMeetings={
meetingStatusMap[room.name]?.active_meetings ?? []
}
upcomingMeetings={
meetingStatusMap[room.name]?.upcoming_events ?? []
}
isLoading={meetingStatusLoading}
/>
</Table.Cell> </Table.Cell>
<Table.Cell> <Table.Cell>
{getZulipDisplay( {getZulipDisplay(

View File

@@ -37,24 +37,12 @@ jest.mock("../AuthProvider", () => ({
}), }),
})); }));
// Recreate the batcher with a 0ms window. setTimeout(fn, 0) defers to the next
// macrotask boundary — after all synchronous React rendering completes. All
// useQuery queryFns fire within the same macrotask, so they all queue into one
// batch before the timer fires. This is deterministic and avoids fake timers.
jest.mock("../meetingStatusBatcher", () => {
const actual = jest.requireActual("../meetingStatusBatcher");
return {
...actual,
meetingStatusBatcher: actual.createMeetingStatusBatcher(0),
};
});
// --- Imports (after mocks) --- // --- Imports (after mocks) ---
import React from "react"; import React from "react";
import { render, waitFor, screen } from "@testing-library/react"; import { render, waitFor, screen } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useRoomActiveMeetings, useRoomUpcomingMeetings } from "../apiHooks"; import { useRoomsBulkMeetingStatus, BulkMeetingStatusMap } from "../apiHooks";
import { client } from "../apiClient"; import { client } from "../apiClient";
import { ErrorProvider } from "../../(errors)/errorContext"; import { ErrorProvider } from "../../(errors)/errorContext";
@@ -83,30 +71,31 @@ function mockBulkStatusEndpoint(
); );
} }
// --- Test component: renders N room cards, each using both hooks --- // --- Test component: uses the bulk hook and displays results ---
function RoomCard({ roomName }: { roomName: string }) { function BulkStatusDisplay({ roomNames }: { roomNames: string[] }) {
const active = useRoomActiveMeetings(roomName); const { data, isLoading } = useRoomsBulkMeetingStatus(roomNames);
const upcoming = useRoomUpcomingMeetings(roomName);
if (active.isLoading || upcoming.isLoading) { if (isLoading) {
return <div data-testid={`room-${roomName}`}>loading</div>; return <div data-testid="status">loading</div>;
}
if (!data) {
return <div data-testid="status">no data</div>;
} }
return ( return (
<div data-testid={`room-${roomName}`}> <div data-testid="status">
{active.data?.length ?? 0} active, {upcoming.data?.length ?? 0} upcoming {roomNames.map((name) => {
const status = data[name];
return (
<div key={name} data-testid={`room-${name}`}>
{status?.active_meetings?.length ?? 0} active,{" "}
{status?.upcoming_events?.length ?? 0} upcoming
</div> </div>
); );
} })}
</div>
function RoomList({ roomNames }: { roomNames: string[] }) {
return (
<>
{roomNames.map((name) => (
<RoomCard key={name} roomName={name} />
))}
</>
); );
} }
@@ -127,15 +116,17 @@ function createWrapper() {
// --- Tests --- // --- Tests ---
describe("meeting status batcher integration", () => { describe("bulk meeting status (prop-drilling)", () => {
afterEach(() => jest.clearAllMocks()); afterEach(() => jest.clearAllMocks());
it("batches multiple room queries into a single POST request", async () => { it("fetches all room statuses in a single POST request", async () => {
const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`); const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`);
mockBulkStatusEndpoint(); mockBulkStatusEndpoint();
render(<RoomList roomNames={rooms} />, { wrapper: createWrapper() }); render(<BulkStatusDisplay roomNames={rooms} />, {
wrapper: createWrapper(),
});
await waitFor(() => { await waitFor(() => {
for (const name of rooms) { for (const name of rooms) {
@@ -149,21 +140,18 @@ describe("meeting status batcher integration", () => {
([path]: [string]) => path === "/v1/rooms/meetings/bulk-status", ([path]: [string]) => path === "/v1/rooms/meetings/bulk-status",
); );
// Without batching this would be 20 calls (2 hooks x 10 rooms). // Prop-drilling: exactly 1 POST for all rooms (no batcher needed)
expect(postCalls).toHaveLength(1); expect(postCalls).toHaveLength(1);
// The single call should contain all 10 rooms (deduplicated) // The single call contains all room names
const requestedRooms: string[] = postCalls[0][1].body.room_names; const requestedRooms: string[] = postCalls[0][1].body.room_names;
expect(requestedRooms).toHaveLength(10);
for (const name of rooms) { for (const name of rooms) {
expect(requestedRooms).toContain(name); expect(requestedRooms).toContain(name);
} }
}); });
it("batcher fetcher returns room-specific data", async () => { it("returns room-specific data correctly", async () => {
const {
meetingStatusBatcher: batcher,
} = require("../meetingStatusBatcher");
mockBulkStatusEndpoint({ mockBulkStatusEndpoint({
"room-a": { "room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }], active_meetings: [{ id: "m1", room_name: "room-a" }],
@@ -175,33 +163,7 @@ describe("meeting status batcher integration", () => {
}, },
}); });
const [resultA, resultB] = await Promise.all([ render(<BulkStatusDisplay roomNames={["room-a", "room-b"]} />, {
batcher.fetch("room-a"),
batcher.fetch("room-b"),
]);
expect(mockClient.POST).toHaveBeenCalledTimes(1);
expect(resultA.active_meetings).toEqual([
{ id: "m1", room_name: "room-a" },
]);
expect(resultA.upcoming_events).toEqual([]);
expect(resultB.active_meetings).toEqual([]);
expect(resultB.upcoming_events).toEqual([{ id: "e1", title: "Standup" }]);
});
it("renders room-specific meeting data through hooks", async () => {
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
render(<RoomList roomNames={["room-a", "room-b"]} />, {
wrapper: createWrapper(), wrapper: createWrapper(),
}); });
@@ -213,5 +175,72 @@ describe("meeting status batcher integration", () => {
"0 active, 1 upcoming", "0 active, 1 upcoming",
); );
}); });
// Still just 1 POST
expect(mockClient.POST).toHaveBeenCalledTimes(1);
});
it("does not fetch when roomNames is empty", async () => {
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={[]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
// No POST calls when no rooms
expect(mockClient.POST).not.toHaveBeenCalled();
});
it("surfaces error when POST fails", async () => {
mockClient.POST.mockResolvedValue({
data: undefined,
error: { detail: "server error" },
response: {},
});
function ErrorDisplay({ roomNames }: { roomNames: string[] }) {
const { error } = useRoomsBulkMeetingStatus(roomNames);
if (error) return <div data-testid="error">{error.message}</div>;
return <div data-testid="error">no error</div>;
}
render(<ErrorDisplay roomNames={["room-x"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("error")).toHaveTextContent(
"bulk-status fetch failed",
);
});
});
it("does not fetch when unauthenticated", async () => {
// Override useAuth to return unauthenticated
const authModule = jest.requireMock("../AuthProvider");
const originalUseAuth = authModule.useAuth;
authModule.useAuth = () => ({
...originalUseAuth(),
status: "unauthenticated",
});
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={["room-1"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
expect(mockClient.POST).not.toHaveBeenCalled();
// Restore
authModule.useAuth = originalUseAuth;
}); });
}); });

View File

@@ -1,11 +1,10 @@
"use client"; "use client";
import { $api } from "./apiClient"; import { $api, client } from "./apiClient";
import { useError } from "../(errors)/errorContext"; import { useError } from "../(errors)/errorContext";
import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query"; import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api"; import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider"; import { useAuth } from "./AuthProvider";
import { meetingStatusBatcher } from "./meetingStatusBatcher";
import { MeetingId } from "./types"; import { MeetingId } from "./types";
import { NonEmptyString } from "./utils"; import { NonEmptyString } from "./utils";
@@ -642,7 +641,8 @@ export function useMeetingDeactivate() {
setError(error as Error, "Failed to end meeting"); setError(error as Error, "Failed to end meeting");
}, },
onSuccess: () => { onSuccess: () => {
return queryClient.invalidateQueries({ return Promise.all([
queryClient.invalidateQueries({
predicate: (query) => { predicate: (query) => {
const key = query.queryKey; const key = query.queryKey;
return key.some( return key.some(
@@ -651,7 +651,11 @@ export function useMeetingDeactivate() {
!!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)), !!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)),
); );
}, },
}); }),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}),
]);
}, },
}); });
} }
@@ -698,7 +702,18 @@ export function useRoomsCreateMeeting() {
queryKey: $api.queryOptions("get", "/v1/rooms").queryKey, queryKey: $api.queryOptions("get", "/v1/rooms").queryKey,
}), }),
queryClient.invalidateQueries({ queryClient.invalidateQueries({
queryKey: meetingStatusKeys.active(roomName), queryKey: $api.queryOptions(
"get",
"/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`,
{
params: {
path: { room_name: roomName },
},
},
).queryKey,
}),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}), }),
]); ]);
}, },
@@ -727,38 +742,67 @@ export function useRoomGetByName(roomName: string | null) {
export function useRoomUpcomingMeetings(roomName: string | null) { export function useRoomUpcomingMeetings(roomName: string | null) {
const { isAuthenticated } = useAuthReady(); const { isAuthenticated } = useAuthReady();
return useQuery({ return $api.useQuery(
queryKey: meetingStatusKeys.upcoming(roomName!), "get",
queryFn: async () => { "/v1/rooms/{room_name}/meetings/upcoming" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_UPCOMING_PATH_PARTIAL}`,
const result = await meetingStatusBatcher.fetch(roomName!); {
return result.upcoming_events; params: {
path: { room_name: roomName! },
}, },
},
{
enabled: !!roomName && isAuthenticated, enabled: !!roomName && isAuthenticated,
}); },
);
} }
// Query keys reuse $api.queryOptions so cache identity matches the original const MEETINGS_PATH_PARTIAL = "meetings" as const;
// per-room GET endpoints. The actual fetch goes through the batcher, but the const MEETINGS_ACTIVE_PATH_PARTIAL = `${MEETINGS_PATH_PARTIAL}/active` as const;
// keys stay consistent with the rest of the codebase. const MEETINGS_UPCOMING_PATH_PARTIAL =
const meetingStatusKeys = { `${MEETINGS_PATH_PARTIAL}/upcoming` as const;
active: (roomName: string) => const MEETING_LIST_PATH_PARTIALS = [
$api.queryOptions("get", "/v1/rooms/{room_name}/meetings/active", { MEETINGS_ACTIVE_PATH_PARTIAL,
params: { path: { room_name: roomName } }, MEETINGS_UPCOMING_PATH_PARTIAL,
}).queryKey, ];
upcoming: (roomName: string) =>
$api.queryOptions("get", "/v1/rooms/{room_name}/meetings/upcoming", {
params: { path: { room_name: roomName } },
}).queryKey,
};
export function useRoomActiveMeetings(roomName: string | null) { export function useRoomActiveMeetings(roomName: string | null) {
return useQuery({ return $api.useQuery(
queryKey: meetingStatusKeys.active(roomName!), "get",
queryFn: async () => { "/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`,
const result = await meetingStatusBatcher.fetch(roomName!); {
return result.active_meetings; params: {
path: { room_name: roomName! },
}, },
},
{
enabled: !!roomName, enabled: !!roomName,
},
);
}
type RoomMeetingStatus = components["schemas"]["RoomMeetingStatus"];
export type BulkMeetingStatusMap = Partial<Record<string, RoomMeetingStatus>>;
export function useRoomsBulkMeetingStatus(roomNames: string[]) {
const { isAuthenticated } = useAuthReady();
const sortedNames = [...roomNames].sort();
return useQuery({
queryKey: ["bulk-meeting-status", sortedNames],
queryFn: async (): Promise<BulkMeetingStatusMap> => {
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: roomNames } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return data;
},
enabled: sortedNames.length > 0 && isAuthenticated,
}); });
} }

View File

@@ -1,37 +0,0 @@
import { create, keyResolver, windowScheduler } from "@yornaath/batshit";
import { client } from "./apiClient";
import type { components } from "../reflector-api";
type MeetingStatusResult = {
roomName: string;
active_meetings: components["schemas"]["Meeting"][];
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
const BATCH_WINDOW_MS = 10;
export function createMeetingStatusBatcher(windowMs: number = BATCH_WINDOW_MS) {
return create({
fetcher: async (roomNames: string[]): Promise<MeetingStatusResult[]> => {
const unique = [...new Set(roomNames)];
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: unique } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return roomNames.map((name) => ({
roomName: name,
active_meetings: data[name]?.active_meetings ?? [],
upcoming_events: data[name]?.upcoming_events ?? [],
}));
},
resolver: keyResolver("roomName"),
scheduler: windowScheduler(windowMs),
});
}
export const meetingStatusBatcher = createMeetingStatusBatcher();

View File

@@ -1697,6 +1697,13 @@ export interface components {
*/ */
skip_consent: boolean; skip_consent: boolean;
}; };
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RoomDetails */ /** RoomDetails */
RoomDetails: { RoomDetails: {
/** Id */ /** Id */
@@ -1757,13 +1764,6 @@ export interface components {
/** Webhook Secret */ /** Webhook Secret */
webhook_secret: string | null; webhook_secret: string | null;
}; };
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RtcOffer */ /** RtcOffer */
RtcOffer: { RtcOffer: {
/** Sdp */ /** Sdp */

View File

@@ -23,7 +23,6 @@
"@tanstack/react-query": "^5.85.9", "@tanstack/react-query": "^5.85.9",
"@types/ioredis": "^5.0.0", "@types/ioredis": "^5.0.0",
"@whereby.com/browser-sdk": "^3.3.4", "@whereby.com/browser-sdk": "^3.3.4",
"@yornaath/batshit": "^0.14.0",
"autoprefixer": "10.4.20", "autoprefixer": "10.4.20",
"axios": "^1.8.2", "axios": "^1.8.2",
"eslint": "^9.33.0", "eslint": "^9.33.0",

21
www/pnpm-lock.yaml generated
View File

@@ -37,9 +37,6 @@ importers:
"@whereby.com/browser-sdk": "@whereby.com/browser-sdk":
specifier: ^3.3.4 specifier: ^3.3.4
version: 3.13.1(@types/react@18.2.20)(react-dom@18.3.1(react@18.3.1))(react@18.3.1) version: 3.13.1(@types/react@18.2.20)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
"@yornaath/batshit":
specifier: ^0.14.0
version: 0.14.0
autoprefixer: autoprefixer:
specifier: 10.4.20 specifier: 10.4.20
version: 10.4.20(postcss@8.4.31) version: 10.4.20(postcss@8.4.31)
@@ -3469,18 +3466,6 @@ packages:
integrity: sha512-NuHqBY1PB/D8xU6s/thBgOAiAP7HOYDQ32+BFZILJ8ivkUkAHQnWfn6WhL79Owj1qmUnoN/YPhktdIoucipkAQ==, integrity: sha512-NuHqBY1PB/D8xU6s/thBgOAiAP7HOYDQ32+BFZILJ8ivkUkAHQnWfn6WhL79Owj1qmUnoN/YPhktdIoucipkAQ==,
} }
"@yornaath/batshit-devtools@1.7.1":
resolution:
{
integrity: sha512-AyttV1Njj5ug+XqEWY1smV45dTWMlWKtj1B8jcFYgBKUFyUlF/qEhD+iP1E5UaRYW6hQRYD9T2WNDwFTrOMWzQ==,
}
"@yornaath/batshit@0.14.0":
resolution:
{
integrity: sha512-0I+xMi5JoRs3+qVXXhk2AmsEl43MwrG+L+VW+nqw/qQqMFtgRPszLaxhJCfsBKnjfJ0gJzTI1Q9Q9+y903HyHQ==,
}
"@zag-js/accordion@1.21.0": "@zag-js/accordion@1.21.0":
resolution: resolution:
{ {
@@ -11927,12 +11912,6 @@ snapshots:
"@xtuc/long@4.2.2": {} "@xtuc/long@4.2.2": {}
"@yornaath/batshit-devtools@1.7.1": {}
"@yornaath/batshit@0.14.0":
dependencies:
"@yornaath/batshit-devtools": 1.7.1
"@zag-js/accordion@1.21.0": "@zag-js/accordion@1.21.0":
dependencies: dependencies:
"@zag-js/anatomy": 1.21.0 "@zag-js/anatomy": 1.21.0