Compare commits

...

10 Commits

Author SHA1 Message Date
ae44f5227b Merge branch 'main' into fix-room-query-batching 2026-02-05 19:42:29 -05:00
Igor Loskutov
4339ffffcf fix: use $api.queryOptions for batcher query keys
Replace custom meetingStatusKeys with $api.queryOptions()-derived keys
so cache identity matches the original per-room GET endpoints.
2026-02-05 19:40:13 -05:00
Igor Loskutov
9dc6c20ef8 fix: address code review findings
- Add max_length=100 on BulkStatusRequest.room_names to prevent abuse
- Filter bulk endpoint results to rooms user can see (owned or shared)
- Throw on bulk-status fetch error instead of silently returning empty data
- Fix room_by_id type annotation: dict[str, DbRoom] instead of Any
- Remove stale "200ms" comment in test
- Enable strict: true in jest tsconfig
- Remove working docs from tracked files
- Simplify redundant ternary in test helper
2026-02-05 19:36:17 -05:00
Igor Loskutov
931c344ddf feat: add frontend test infrastructure and fix CI workflow
- Fix pnpm version mismatch in test_next_server.yml (8 → auto-detect 10)
- Add concurrency group to cancel stale CI runs
- Remove redundant setup-node step
- Update jest.config.js for jsdom + tsx support
- Add meetingStatusBatcher integration test (3 tests)
- Extract createMeetingStatusBatcher factory for testability
2026-02-05 19:06:32 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
Igor Loskutov
129290517e docs: add handoff report and frontend testing research
BATSHIT_REPORT.md: full context on bulk query batching — business goal,
approach, all changes, verification status, and how to run.
FRONTEND_TEST_RESEARCH.md: research on unit testing react-query hooks
with jest.mock, renderHook, and batcher testing patterns.
2026-02-05 17:49:23 -05:00
Igor Loskutov
7e072219bf feat: batch room meeting status queries into single bulk endpoint
Reduces rooms list page from 2N+2 HTTP requests to 1 POST request.
Backend: POST /v1/rooms/meetings/bulk-status with 3 DB queries total.
Frontend: @yornaath/batshit DataLoader-style batcher with 10ms window.
2026-02-05 17:47:58 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
fa3cf5da0f chore(main): release 0.32.2 (#842) 2026-02-03 22:05:22 -05:00
25 changed files with 1552 additions and 340 deletions

View File

@@ -13,6 +13,9 @@ on:
jobs: jobs:
test-next-server: test-next-server:
runs-on: ubuntu-latest runs-on: ubuntu-latest
concurrency:
group: test-next-server-${{ github.ref }}
cancel-in-progress: true
defaults: defaults:
run: run:
@@ -21,17 +24,12 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install pnpm - name: Install pnpm
uses: pnpm/action-setup@v4 uses: pnpm/action-setup@v4
with: with:
version: 8 package_json_file: './www/package.json'
- name: Setup Node.js cache - name: Setup Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v4
with: with:
node-version: '20' node-version: '20'

View File

@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,14 @@
# Changelog # Changelog
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
### Bug Fixes
* increase TIMEOUT_MEDIUM from 2m to 5m for LLM tasks ([#843](https://github.com/Monadical-SAS/reflector/issues/843)) ([4acde4b](https://github.com/Monadical-SAS/reflector/commit/4acde4b7fdef88cc02ca12cf38c9020b05ed96ac))
* make caddy optional ([#841](https://github.com/Monadical-SAS/reflector/issues/841)) ([a2ed7d6](https://github.com/Monadical-SAS/reflector/commit/a2ed7d60d557b551a5b64e4dfd909b63a791d9fc))
* use Daily API recording.duration as master source for transcript duration ([#844](https://github.com/Monadical-SAS/reflector/issues/844)) ([8707c66](https://github.com/Monadical-SAS/reflector/commit/8707c6694a80c939b6214bbc13331741f192e082))
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30) ## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \ UV_LINK_MODE=copy \
UV_NO_CACHE=1 UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp WORKDIR /tmp
RUN apt-get update \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \ && apt-get install -y \
ffmpeg \ ffmpeg \
curl \ curl \
ca-certificates \ ca-certificates \
gnupg \ gnupg \
wget \ wget
&& apt-get clean
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12 # Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \ && rm /cuda-keyring.deb \
&& apt-get update \ && apt-get update \
&& apt-get install -y --no-install-recommends \ && apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \ cuda-cudart-12-6 \
libcublas-12-6 \ libcublas-12-6 \
libcudnn9-cuda-12 \ libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \ libcudnn9-dev-cuda-12
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH" ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/ COPY ./main.py /app/
COPY ./runserver.sh /app/ COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000 EXPOSE 8000
CMD ["sh", "/app/runserver.sh"] CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -104,6 +104,26 @@ class CalendarEventController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results] return [CalendarEvent(**result) for result in results]
async def get_upcoming_for_rooms(
self, room_ids: list[str], minutes_ahead: int = 120
) -> list[CalendarEvent]:
now = datetime.now(timezone.utc)
future_time = now + timedelta(minutes=minutes_ahead)
query = (
calendar_events.select()
.where(
sa.and_(
calendar_events.c.room_id.in_(room_ids),
calendar_events.c.is_deleted == False,
calendar_events.c.start_time <= future_time,
calendar_events.c.end_time >= now,
)
)
.order_by(calendar_events.c.start_time.asc())
)
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_by_id(self, event_id: str) -> CalendarEvent | None: async def get_by_id(self, event_id: str) -> CalendarEvent | None:
query = calendar_events.select().where(calendar_events.c.id == event_id) query = calendar_events.select().where(calendar_events.c.id == event_id)
result = await get_database().fetch_one(query) result = await get_database().fetch_one(query)

View File

@@ -301,6 +301,23 @@ class MeetingController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results] return [Meeting(**result) for result in results]
async def get_all_active_for_rooms(
self, room_ids: list[str], current_time: datetime
) -> list[Meeting]:
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id.in_(room_ids),
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
.order_by(meetings.c.end_date.desc())
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_active_by_calendar_event( async def get_active_by_calendar_event(
self, room: Room, calendar_event_id: str, current_time: datetime self, room: Room, calendar_event_id: str, current_time: datetime
) -> Meeting | None: ) -> Meeting | None:

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False
@@ -245,6 +238,11 @@ class RoomController:
return room return room
async def get_by_names(self, names: list[str]) -> list[Room]:
query = rooms.select().where(rooms.c.name.in_(names))
results = await get_database().fetch_all(query)
return [Room(**r) for r in results]
async def get_ics_enabled(self) -> list[Room]: async def get_ics_enabled(self) -> list[Room]:
query = rooms.select().where( query = rooms.select().where(
rooms.c.ics_enabled == True, rooms.c.ics_url != None rooms.c.ics_enabled == True, rooms.c.ics_url != None

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
use_celery = False # Multitrack processing always uses Hatchet (no Celery fallback)
if config.room_id: # First check if we can replay (outside transaction since it's read-only)
room = await rooms_controller.get_by_id(config.room_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
use_celery = room.use_celery if room else False if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
use_hatchet = not use_celery transcript.workflow_run_id
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
if use_hatchet: await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
# First check if we can replay (outside transaction since it's read-only) logger.info(
transcript = await transcripts_controller.get_by_id(config.transcript_id) "Replaying Hatchet workflow",
if transcript and transcript.workflow_run_id and not force: workflow_id=transcript.workflow_run_id,
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
if can_replay: return None
await HatchetClientManager.replay_workflow( else:
transcript.workflow_run_id # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
) # Log and proceed to start new workflow
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): logger.info(
logger.info( "Old workflow not replayable, starting new",
"Concurrent workflow detected, skipping dispatch", old_workflow_id=transcript.workflow_run_id,
workflow_id=transcript.workflow_run_id, old_status=status.value,
) )
return None except NotFoundException:
except ApiException: # Workflow deleted from Hatchet but ID still in DB
# Workflow might be gone (404) or API issue - proceed with new workflow logger.info(
pass "Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow( # Force: cancel old workflow if exists
workflow_name="DiarizationPipeline", if force and transcript and transcript.workflow_run_id:
input_data={ try:
"recording_id": config.recording_id, await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
"tracks": [{"s3_key": k} for k in config.track_keys], logger.info(
"bucket_name": config.bucket_name, "Cancelled old workflow (--force)",
"transcript_id": config.transcript_id, workflow_id=transcript.workflow_run_id,
"room_id": config.room_id, )
}, except NotFoundException:
additional_metadata={ logger.info(
"transcript_id": config.transcript_id, "Old workflow already deleted (--force)",
"recording_id": config.recording_id, workflow_id=transcript.workflow_run_id,
"daily_recording_id": config.recording_id, )
}, await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
) )
if transcript: logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
await transcripts_controller.update( return None
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None

View File

@@ -1,4 +1,6 @@
import asyncio
import logging import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from typing import Annotated, Any, Literal, Optional from typing import Annotated, Any, Literal, Optional
@@ -6,13 +8,14 @@ from typing import Annotated, Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate from fastapi_pagination.ext.databases import apaginate
from pydantic import BaseModel from pydantic import BaseModel, Field
from redis.exceptions import LockError from redis.exceptions import LockError
import reflector.auth as auth import reflector.auth as auth
from reflector.db import get_database from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room as DbRoom
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock from reflector.redis_cache import RedisAsyncLock
from reflector.schemas.platform import Platform from reflector.schemas.platform import Platform
@@ -195,6 +198,69 @@ async def rooms_list(
return paginated return paginated
class BulkStatusRequest(BaseModel):
room_names: list[str] = Field(max_length=100)
class RoomMeetingStatus(BaseModel):
active_meetings: list[Meeting]
upcoming_events: list[CalendarEventResponse]
@router.post("/rooms/meetings/bulk-status", response_model=dict[str, RoomMeetingStatus])
async def rooms_bulk_meeting_status(
request: BulkStatusRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
user_id = user["sub"] if user else None
all_rooms = await rooms_controller.get_by_names(request.room_names)
# Filter to rooms the user can see (owned or shared), matching rooms_list behavior
rooms = [
r
for r in all_rooms
if r.is_shared or (user_id is not None and r.user_id == user_id)
]
room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms}
room_ids = list(room_by_id.keys())
current_time = datetime.now(timezone.utc)
active_meetings, upcoming_events = await asyncio.gather(
meetings_controller.get_all_active_for_rooms(room_ids, current_time),
calendar_events_controller.get_upcoming_for_rooms(room_ids),
)
# Group by room name
active_by_room: dict[str, list[Meeting]] = defaultdict(list)
for m in active_meetings:
room = room_by_id.get(m.room_id)
if not room:
continue
m.platform = room.platform
if user_id != room.user_id and m.platform == "whereby":
m.host_room_url = ""
active_by_room[room.name].append(m)
upcoming_by_room: dict[str, list[CalendarEventResponse]] = defaultdict(list)
for e in upcoming_events:
room = room_by_id.get(e.room_id)
if not room:
continue
if user_id != room.user_id:
e.description = None
e.attendees = None
upcoming_by_room[room.name].append(e)
result: dict[str, RoomMeetingStatus] = {}
for name in request.room_names:
result[name] = RoomMeetingStatus(
active_meetings=active_by_room.get(name, []),
upcoming_events=upcoming_by_room.get(name, []),
)
return result
@router.get("/rooms/{room_id}", response_model=RoomDetails) @router.get("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_get( async def rooms_get(
room_id: str, room_id: str,

View File

@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_celery = room and room.use_celery # Multitrack processing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
if use_celery: input_data={
logger.info( "recording_id": recording_id,
"Room uses legacy Celery processing", "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
room_id=room.id, "bucket_name": bucket_name,
transcript_id=transcript.id, "transcript_id": transcript.id,
) "room_id": room.id,
},
if use_hatchet: additional_metadata={
workflow_id = await HatchetClientManager.start_workflow( "transcript_id": transcript.id,
workflow_name="DiarizationPipeline", "recording_id": recording_id,
input_data={ "daily_recording_id": recording_id,
"recording_id": recording_id, },
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
) )
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task @shared_task
@@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_celery = room and room.use_celery # Multitrack reprocessing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery if not transcript:
logger.warning(
if use_hatchet: "No transcript for Hatchet reprocessing, skipping",
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) workflow_id = await HatchetClientManager.start_workflow(
# Reprocessing uses recording.meeting_id directly instead of time-based matching workflow_name="DiarizationPipeline",
recording_start_ts = int(recording.recorded_at.timestamp()) input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay( logger.info(
bucket_name=bucket_name, "Queued Daily recording for Hatchet reprocessing",
daily_room_name=meeting.room_name, recording_id=recording.id,
recording_id=recording.id, workflow_id=workflow_id,
track_keys=recording.track_keys, room_name=meeting.room_name,
recording_start_ts=recording_start_ts, track_count=len(recording.track_keys),
) )
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio import asyncio
import json import json
import threading
import redis.asyncio as redis import redis.asyncio as redis
from fastapi import WebSocket from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber): async def _pubsub_data_reader(self, pubsub_subscriber):
while True: while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message( message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True ignore_subscribe_messages=True
) )
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data) await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager: def get_ws_manager() -> WebsocketManager:
""" """
Returns the WebsocketManager instance for managing websockets. Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance, Creates instance on first call, subsequent calls return cached instance.
which is responsible for managing websockets and handling websocket Thread-safe via GIL. Concurrent initialization may create duplicate
connections. instances but last write wins (acceptable for this use case).
Returns: Returns:
WebsocketManager: The initialized WebsocketManager instance. WebsocketManager: The global WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
""" """
local = threading.local() global _ws_manager
if hasattr(local, "ws_manager"):
return local.ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager( pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST, host=settings.REDIS_HOST,
port=settings.REDIS_PORT, port=settings.REDIS_PORT,
) )
ws_manager = WebsocketManager(pubsub_client=pubsub_client) _ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager return _ws_manager
return ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -1,11 +1,10 @@
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -15,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield
@@ -333,11 +333,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def celery_config(): def celery_config():
with NamedTemporaryFile() as f: redis_host = os.environ.get("REDIS_HOST", "localhost")
yield { redis_port = os.environ.get("REDIS_PORT", "6379")
"broker_url": "memory://", # Use db 2 to avoid conflicts with main app
"result_backend": f"db+sqlite:///{f.name}", redis_url = f"redis://{redis_host}:{redis_port}/2"
} yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@@ -370,9 +373,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue): def __init__(self, queue: asyncio.Queue):
self.queue = queue self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True): async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try: try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05) return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception: except Exception:
return None return None

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline # Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called() mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline # Daily.co multitrack recordings should use Hatchet workflow
mock_multitrack_pipeline.delay.assert_called_once_with( mock_hatchet.start_workflow.assert_called_once()
transcript_id=transcript.id, call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
bucket_name="daily-bucket", assert call_kwargs["workflow_name"] == "DiarizationPipeline"
track_keys=track_keys, assert call_kwargs["input_data"]["transcript_id"] == transcript.id
) assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session") # Using celery_includes from conftest.py which includes both pipelines
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance: if server_instance:
server_instance.should_exit = True server_instance.should_exit = True
server_thread.join(timeout=30) server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create # Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws: async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action # Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com", "email": "user-abc@example.com",
} }
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac: async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room # Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"}) resp = await ac.post("/transcripts", json={"name": "WS Test"})

View File

@@ -0,0 +1,217 @@
import "@testing-library/jest-dom";
// --- Module mocks (hoisted before imports) ---
jest.mock("../apiClient", () => ({
client: {
GET: jest.fn(),
POST: jest.fn(),
PUT: jest.fn(),
PATCH: jest.fn(),
DELETE: jest.fn(),
use: jest.fn(),
},
$api: {
useQuery: jest.fn(),
useMutation: jest.fn(),
queryOptions: (method: string, path: string, init?: unknown) =>
init === undefined
? { queryKey: [method, path] }
: { queryKey: [method, path, init] },
},
API_URL: "http://test",
WEBSOCKET_URL: "ws://test",
configureApiAuth: jest.fn(),
}));
jest.mock("../AuthProvider", () => ({
useAuth: () => ({
status: "authenticated" as const,
accessToken: "test-token",
accessTokenExpires: Date.now() + 3600000,
user: { id: "user1", name: "Test User" },
update: jest.fn(),
signIn: jest.fn(),
signOut: jest.fn(),
lastUserId: "user1",
}),
}));
// Recreate the batcher with a 0ms window. setTimeout(fn, 0) defers to the next
// macrotask boundary — after all synchronous React rendering completes. All
// useQuery queryFns fire within the same macrotask, so they all queue into one
// batch before the timer fires. This is deterministic and avoids fake timers.
jest.mock("../meetingStatusBatcher", () => {
const actual = jest.requireActual("../meetingStatusBatcher");
return {
...actual,
meetingStatusBatcher: actual.createMeetingStatusBatcher(0),
};
});
// --- Imports (after mocks) ---
import React from "react";
import { render, waitFor, screen } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useRoomActiveMeetings, useRoomUpcomingMeetings } from "../apiHooks";
import { client } from "../apiClient";
import { ErrorProvider } from "../../(errors)/errorContext";
const mockClient = client as { POST: jest.Mock };
// --- Helpers ---
function mockBulkStatusEndpoint(
roomData?: Record<
string,
{ active_meetings: unknown[]; upcoming_events: unknown[] }
>,
) {
mockClient.POST.mockImplementation(
async (_path: string, options: { body: { room_names: string[] } }) => {
const roomNames: string[] = options.body.room_names;
const src = roomData ?? {};
const data = Object.fromEntries(
roomNames.map((name) => [
name,
src[name] ?? { active_meetings: [], upcoming_events: [] },
]),
);
return { data, error: undefined, response: {} };
},
);
}
// --- Test component: renders N room cards, each using both hooks ---
function RoomCard({ roomName }: { roomName: string }) {
const active = useRoomActiveMeetings(roomName);
const upcoming = useRoomUpcomingMeetings(roomName);
if (active.isLoading || upcoming.isLoading) {
return <div data-testid={`room-${roomName}`}>loading</div>;
}
return (
<div data-testid={`room-${roomName}`}>
{active.data?.length ?? 0} active, {upcoming.data?.length ?? 0} upcoming
</div>
);
}
function RoomList({ roomNames }: { roomNames: string[] }) {
return (
<>
{roomNames.map((name) => (
<RoomCard key={name} roomName={name} />
))}
</>
);
}
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: {
queries: { retry: false },
},
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>
<ErrorProvider>{children}</ErrorProvider>
</QueryClientProvider>
);
};
}
// --- Tests ---
describe("meeting status batcher integration", () => {
afterEach(() => jest.clearAllMocks());
it("batches multiple room queries into a single POST request", async () => {
const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`);
mockBulkStatusEndpoint();
render(<RoomList roomNames={rooms} />, { wrapper: createWrapper() });
await waitFor(() => {
for (const name of rooms) {
expect(screen.getByTestId(`room-${name}`)).toHaveTextContent(
"0 active, 0 upcoming",
);
}
});
const postCalls = mockClient.POST.mock.calls.filter(
([path]: [string]) => path === "/v1/rooms/meetings/bulk-status",
);
// Without batching this would be 20 calls (2 hooks x 10 rooms).
expect(postCalls).toHaveLength(1);
// The single call should contain all 10 rooms (deduplicated)
const requestedRooms: string[] = postCalls[0][1].body.room_names;
for (const name of rooms) {
expect(requestedRooms).toContain(name);
}
});
it("batcher fetcher returns room-specific data", async () => {
const {
meetingStatusBatcher: batcher,
} = require("../meetingStatusBatcher");
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
const [resultA, resultB] = await Promise.all([
batcher.fetch("room-a"),
batcher.fetch("room-b"),
]);
expect(mockClient.POST).toHaveBeenCalledTimes(1);
expect(resultA.active_meetings).toEqual([
{ id: "m1", room_name: "room-a" },
]);
expect(resultA.upcoming_events).toEqual([]);
expect(resultB.active_meetings).toEqual([]);
expect(resultB.upcoming_events).toEqual([{ id: "e1", title: "Standup" }]);
});
it("renders room-specific meeting data through hooks", async () => {
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
render(<RoomList roomNames={["room-a", "room-b"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("room-room-a")).toHaveTextContent(
"1 active, 0 upcoming",
);
expect(screen.getByTestId("room-room-b")).toHaveTextContent(
"0 active, 1 upcoming",
);
});
});
});

View File

@@ -2,9 +2,10 @@
import { $api } from "./apiClient"; import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext"; import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query"; import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api"; import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider"; import { useAuth } from "./AuthProvider";
import { meetingStatusBatcher } from "./meetingStatusBatcher";
import { MeetingId } from "./types"; import { MeetingId } from "./types";
import { NonEmptyString } from "./utils"; import { NonEmptyString } from "./utils";
@@ -697,15 +698,7 @@ export function useRoomsCreateMeeting() {
queryKey: $api.queryOptions("get", "/v1/rooms").queryKey, queryKey: $api.queryOptions("get", "/v1/rooms").queryKey,
}), }),
queryClient.invalidateQueries({ queryClient.invalidateQueries({
queryKey: $api.queryOptions( queryKey: meetingStatusKeys.active(roomName),
"get",
"/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`,
{
params: {
path: { room_name: roomName },
},
},
).queryKey,
}), }),
]); ]);
}, },
@@ -734,42 +727,39 @@ export function useRoomGetByName(roomName: string | null) {
export function useRoomUpcomingMeetings(roomName: string | null) { export function useRoomUpcomingMeetings(roomName: string | null) {
const { isAuthenticated } = useAuthReady(); const { isAuthenticated } = useAuthReady();
return $api.useQuery( return useQuery({
"get", queryKey: meetingStatusKeys.upcoming(roomName!),
"/v1/rooms/{room_name}/meetings/upcoming" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_UPCOMING_PATH_PARTIAL}`, queryFn: async () => {
{ const result = await meetingStatusBatcher.fetch(roomName!);
params: { return result.upcoming_events;
path: { room_name: roomName! },
},
}, },
{ enabled: !!roomName && isAuthenticated,
enabled: !!roomName && isAuthenticated, });
},
);
} }
const MEETINGS_PATH_PARTIAL = "meetings" as const; // Query keys reuse $api.queryOptions so cache identity matches the original
const MEETINGS_ACTIVE_PATH_PARTIAL = `${MEETINGS_PATH_PARTIAL}/active` as const; // per-room GET endpoints. The actual fetch goes through the batcher, but the
const MEETINGS_UPCOMING_PATH_PARTIAL = // keys stay consistent with the rest of the codebase.
`${MEETINGS_PATH_PARTIAL}/upcoming` as const; const meetingStatusKeys = {
const MEETING_LIST_PATH_PARTIALS = [ active: (roomName: string) =>
MEETINGS_ACTIVE_PATH_PARTIAL, $api.queryOptions("get", "/v1/rooms/{room_name}/meetings/active", {
MEETINGS_UPCOMING_PATH_PARTIAL, params: { path: { room_name: roomName } },
]; }).queryKey,
upcoming: (roomName: string) =>
$api.queryOptions("get", "/v1/rooms/{room_name}/meetings/upcoming", {
params: { path: { room_name: roomName } },
}).queryKey,
};
export function useRoomActiveMeetings(roomName: string | null) { export function useRoomActiveMeetings(roomName: string | null) {
return $api.useQuery( return useQuery({
"get", queryKey: meetingStatusKeys.active(roomName!),
"/v1/rooms/{room_name}/meetings/active" satisfies `/v1/rooms/{room_name}/${typeof MEETINGS_ACTIVE_PATH_PARTIAL}`, queryFn: async () => {
{ const result = await meetingStatusBatcher.fetch(roomName!);
params: { return result.active_meetings;
path: { room_name: roomName! },
},
}, },
{ enabled: !!roomName,
enabled: !!roomName, });
},
);
} }
export function useRoomGetMeeting( export function useRoomGetMeeting(

View File

@@ -0,0 +1,37 @@
import { create, keyResolver, windowScheduler } from "@yornaath/batshit";
import { client } from "./apiClient";
import type { components } from "../reflector-api";
type MeetingStatusResult = {
roomName: string;
active_meetings: components["schemas"]["Meeting"][];
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
const BATCH_WINDOW_MS = 10;
export function createMeetingStatusBatcher(windowMs: number = BATCH_WINDOW_MS) {
return create({
fetcher: async (roomNames: string[]): Promise<MeetingStatusResult[]> => {
const unique = [...new Set(roomNames)];
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: unique } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return roomNames.map((name) => ({
roomName: name,
active_meetings: data[name]?.active_meetings ?? [],
upcoming_events: data[name]?.upcoming_events ?? [],
}));
},
resolver: keyResolver("roomName"),
scheduler: windowScheduler(windowMs),
});
}
export const meetingStatusBatcher = createMeetingStatusBatcher();

View File

@@ -118,6 +118,23 @@ export interface paths {
patch?: never; patch?: never;
trace?: never; trace?: never;
}; };
"/v1/rooms/meetings/bulk-status": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Rooms Bulk Meeting Status */
post: operations["v1_rooms_bulk_meeting_status"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}": { "/v1/rooms/{room_id}": {
parameters: { parameters: {
query?: never; query?: never;
@@ -799,6 +816,11 @@ export interface components {
*/ */
chunk: string; chunk: string;
}; };
/** BulkStatusRequest */
BulkStatusRequest: {
/** Room Names */
room_names: string[];
};
/** CalendarEventResponse */ /** CalendarEventResponse */
CalendarEventResponse: { CalendarEventResponse: {
/** Id */ /** Id */
@@ -1735,6 +1757,13 @@ export interface components {
/** Webhook Secret */ /** Webhook Secret */
webhook_secret: string | null; webhook_secret: string | null;
}; };
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RtcOffer */ /** RtcOffer */
RtcOffer: { RtcOffer: {
/** Sdp */ /** Sdp */
@@ -2272,6 +2301,41 @@ export interface operations {
}; };
}; };
}; };
v1_rooms_bulk_meeting_status: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["BulkStatusRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: components["schemas"]["RoomMeetingStatus"];
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_get: { v1_rooms_get: {
parameters: { parameters: {
query?: never; query?: never;

View File

@@ -1,8 +1,22 @@
module.exports = { module.exports = {
preset: "ts-jest", testEnvironment: "jest-environment-jsdom",
testEnvironment: "node",
roots: ["<rootDir>/app"], roots: ["<rootDir>/app"],
testMatch: ["**/__tests__/**/*.test.ts"], testMatch: ["**/__tests__/**/*.test.ts", "**/__tests__/**/*.test.tsx"],
collectCoverage: true, collectCoverage: false,
collectCoverageFrom: ["app/**/*.ts", "!app/**/*.d.ts"], transform: {
"^.+\\.[jt]sx?$": [
"ts-jest",
{
tsconfig: {
jsx: "react-jsx",
module: "esnext",
moduleResolution: "bundler",
esModuleInterop: true,
strict: true,
downlevelIteration: true,
lib: ["dom", "dom.iterable", "esnext"],
},
},
],
},
}; };

View File

@@ -23,6 +23,7 @@
"@tanstack/react-query": "^5.85.9", "@tanstack/react-query": "^5.85.9",
"@types/ioredis": "^5.0.0", "@types/ioredis": "^5.0.0",
"@whereby.com/browser-sdk": "^3.3.4", "@whereby.com/browser-sdk": "^3.3.4",
"@yornaath/batshit": "^0.14.0",
"autoprefixer": "10.4.20", "autoprefixer": "10.4.20",
"axios": "^1.8.2", "axios": "^1.8.2",
"eslint": "^9.33.0", "eslint": "^9.33.0",
@@ -61,9 +62,13 @@
"author": "Andreas <andreas@monadical.com>", "author": "Andreas <andreas@monadical.com>",
"license": "All Rights Reserved", "license": "All Rights Reserved",
"devDependencies": { "devDependencies": {
"@testing-library/dom": "^10.4.1",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/jest": "^30.0.0", "@types/jest": "^30.0.0",
"@types/react": "18.2.20", "@types/react": "18.2.20",
"jest": "^30.1.3", "jest": "^30.1.3",
"jest-environment-jsdom": "^30.2.0",
"openapi-typescript": "^7.9.1", "openapi-typescript": "^7.9.1",
"prettier": "^3.0.0", "prettier": "^3.0.0",
"ts-jest": "^29.4.1" "ts-jest": "^29.4.1"

808
www/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff