Compare commits

..

2 Commits

Author SHA1 Message Date
Igor Loskutov
df6916385b fix: address review feedback
- Add PUBLIC_MODE auth guard on bulk-status endpoint
- Convert DB models to view models via model_validate()
- Early return when no accessible rooms (skip DB queries)
- BulkMeetingStatusMap: Partial<Record> for type honesty
- Sort roomNames in query key for cache stability
- Remove redundant empty-guard in queryFn
- Add 7 backend tests: auth, redaction, whereby host_room_url, 401, empty
- Add 2 frontend tests: error handling, unauthenticated case
2026-02-05 20:30:26 -05:00
Igor Loskutov
083a50cbcd fix: batch room meeting status queries via prop-drilling
Alternative to the batcher approach (#848): parent fetches all room
meeting statuses in a single bulk POST and passes data down as props.
No extra dependency (@yornaath/batshit), no implicit batching magic.

Backend: POST /v1/rooms/meetings/bulk-status + bulk DB methods.
Frontend: useRoomsBulkMeetingStatus hook in RoomList, MeetingStatus
receives data as props instead of calling per-room hooks.
CI: fix pnpm 8→10 auto-detect, add concurrency group.
Tests: Jest+jsdom+testing-library for bulk hook.
2026-02-05 20:04:31 -05:00
26 changed files with 1739 additions and 1438 deletions

View File

@@ -13,6 +13,9 @@ on:
jobs: jobs:
test-next-server: test-next-server:
runs-on: ubuntu-latest runs-on: ubuntu-latest
concurrency:
group: test-next-server-${{ github.ref }}
cancel-in-progress: true
defaults: defaults:
run: run:
@@ -21,17 +24,12 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install pnpm - name: Install pnpm
uses: pnpm/action-setup@v4 uses: pnpm/action-setup@v4
with: with:
version: 8 package_json_file: './www/package.json'
- name: Setup Node.js cache - name: Setup Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v4
with: with:
node-version: '20' node-version: '20'
@@ -42,4 +40,4 @@ jobs:
run: pnpm install run: pnpm install
- name: Run tests - name: Run tests
run: pnpm test run: pnpm test

View File

@@ -1,17 +1,5 @@
# Changelog # Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03) ## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)

View File

@@ -1,35 +0,0 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -104,6 +104,26 @@ class CalendarEventController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results] return [CalendarEvent(**result) for result in results]
async def get_upcoming_for_rooms(
self, room_ids: list[str], minutes_ahead: int = 120
) -> list[CalendarEvent]:
now = datetime.now(timezone.utc)
future_time = now + timedelta(minutes=minutes_ahead)
query = (
calendar_events.select()
.where(
sa.and_(
calendar_events.c.room_id.in_(room_ids),
calendar_events.c.is_deleted == False,
calendar_events.c.start_time <= future_time,
calendar_events.c.end_time >= now,
)
)
.order_by(calendar_events.c.start_time.asc())
)
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_by_id(self, event_id: str) -> CalendarEvent | None: async def get_by_id(self, event_id: str) -> CalendarEvent | None:
query = calendar_events.select().where(calendar_events.c.id == event_id) query = calendar_events.select().where(calendar_events.c.id == event_id)
result = await get_database().fetch_one(query) result = await get_database().fetch_one(query)

View File

@@ -301,6 +301,23 @@ class MeetingController:
results = await get_database().fetch_all(query) results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results] return [Meeting(**result) for result in results]
async def get_all_active_for_rooms(
self, room_ids: list[str], current_time: datetime
) -> list[Meeting]:
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id.in_(room_ids),
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
.order_by(meetings.c.end_date.desc())
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_active_by_calendar_event( async def get_active_by_calendar_event(
self, room: Room, calendar_event_id: str, current_time: datetime self, room: Room, calendar_event_id: str, current_time: datetime
) -> Meeting | None: ) -> Meeting | None:

View File

@@ -57,6 +57,12 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -91,6 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False
@@ -238,6 +245,11 @@ class RoomController:
return room return room
async def get_by_names(self, names: list[str]) -> list[Room]:
query = rooms.select().where(rooms.c.name.in_(names))
results = await get_database().fetch_all(query)
return [Room(**r) for r in results]
async def get_ics_enabled(self) -> list[Room]: async def get_ics_enabled(self) -> list[Room]:
query = rooms.select().where( query = rooms.select().where(
rooms.c.ics_enabled == True, rooms.c.ics_url != None rooms.c.ics_enabled == True, rooms.c.ics_url != None

View File

@@ -1,144 +0,0 @@
"""
Hatchet DAG Status -> Zulip Live Updates.
Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
All functions are fire-and-forget (catch + warning log on failure).
Note: Uses deferred imports throughout for fork-safety,
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
"""
from reflector.logger import logger
from reflector.settings import settings
def _dag_zulip_enabled() -> bool:
return bool(
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
)
async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import send_message_to_zulip # noqa: PLC0415
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
response = await send_message_to_zulip(
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
)
message_id = response.get("id")
if message_id:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def update_dag_zulip_message(
transcript_id: str,
workflow_run_id: str,
error_message: str | None = None,
) -> None:
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).
Args:
error_message: If set, appended as an error banner to the rendered DAG.
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
fresh_db_connection,
)
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import update_zulip_message # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
if error_message:
content += f"\n\n:cross_mark: **{error_message}**"
await update_zulip_message(
transcript.zulip_message_id,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to update DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def delete_dag_zulip_message(transcript_id: str) -> None:
"""Delete DAG Zulip message and clear zulip_message_id.
Called from post_zulip task (already inside fresh_db_connection).
Swallows InvalidMessageError (message already deleted).
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import ( # noqa: PLC0415
InvalidMessageError,
delete_zulip_message,
)
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
try:
await delete_zulip_message(transcript.zulip_message_id)
except InvalidMessageError:
logger.warning(
"[DAG Zulip] Message already deleted",
transcript_id=transcript_id,
zulip_message_id=transcript.zulip_message_id,
)
await transcripts_controller.update(transcript, {"zulip_message_id": None})
except Exception:
logger.warning(
"[DAG Zulip] Failed to delete DAG message",
transcript_id=transcript_id,
exc_info=True,
)

View File

@@ -45,7 +45,6 @@ from reflector.hatchet.constants import (
TIMEOUT_SHORT, TIMEOUT_SHORT,
TaskName, TaskName,
) )
from reflector.hatchet.dag_zulip import update_dag_zulip_message
from reflector.hatchet.workflows.models import ( from reflector.hatchet.workflows.models import (
ActionItemsResult, ActionItemsResult,
ConsentResult, ConsentResult,
@@ -239,14 +238,7 @@ def with_error_handling(
@functools.wraps(func) @functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context) -> R: async def wrapper(input: PipelineInput, ctx: Context) -> R:
try: try:
result = await func(input, ctx) return await func(input, ctx)
try:
await update_dag_zulip_message(
input.transcript_id, ctx.workflow_run_id
)
except Exception:
pass
return result
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[Hatchet] {step_name} failed", f"[Hatchet] {step_name} failed",
@@ -254,14 +246,6 @@ def with_error_handling(
error=str(e), error=str(e),
exc_info=True, exc_info=True,
) )
try:
await update_dag_zulip_message(
input.transcript_id,
ctx.workflow_run_id,
error_message=f"{step_name} failed: {e}",
)
except Exception:
pass
if set_error_status: if set_error_status:
await set_workflow_error_status(input.transcript_id) await set_workflow_error_status(input.transcript_id)
raise raise
@@ -1310,11 +1294,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
delete_dag_zulip_message,
)
await delete_dag_zulip_message(input.transcript_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript: if transcript:

View File

@@ -15,11 +15,14 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -178,108 +181,124 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
# Multitrack processing always uses Hatchet (no Celery fallback) use_celery = False
# First check if we can replay (outside transaction since it's read-only) if config.room_id:
transcript = await transcripts_controller.get_by_id(config.transcript_id) room = await rooms_controller.get_by_id(config.room_id)
if transcript and transcript.workflow_run_id and not force: use_celery = room.use_celery if room else False
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id) if use_hatchet:
logger.info( # First check if we can replay (outside transaction since it's read-only)
"Replaying Hatchet workflow", transcript = await transcripts_controller.get_by_id(config.transcript_id)
workflow_id=transcript.workflow_run_id, if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
return None if can_replay:
else: await HatchetClientManager.replay_workflow(
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted) transcript.workflow_run_id
# Log and proceed to start new workflow )
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
logger.info( if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
"Old workflow not replayable, starting new", logger.info(
old_workflow_id=transcript.workflow_run_id, "Concurrent workflow detected, skipping dispatch",
old_status=status.value, workflow_id=transcript.workflow_run_id,
) )
except NotFoundException: return None
# Workflow deleted from Hatchet but ID still in DB except ApiException:
logger.info( # Workflow might be gone (404) or API issue - proceed with new workflow
"Old workflow not found in Hatchet, starting new", pass
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists workflow_id = await HatchetClientManager.start_workflow(
if force and transcript and transcript.workflow_run_id: workflow_name="DiarizationPipeline",
try: input_data={
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) "recording_id": config.recording_id,
logger.info( "tracks": [{"s3_key": k} for k in config.track_keys],
"Cancelled old workflow (--force)", "bucket_name": config.bucket_name,
workflow_id=transcript.workflow_run_id, "transcript_id": config.transcript_id,
) "room_id": config.room_id,
except NotFoundException: },
logger.info( additional_metadata={
"Old workflow already deleted (--force)", "transcript_id": config.transcript_id,
workflow_id=transcript.workflow_run_id, "recording_id": config.recording_id,
) "daily_recording_id": config.recording_id,
await transcripts_controller.update(transcript, {"workflow_run_id": None}) },
)
# Re-fetch and check for concurrent dispatch (optimistic approach). if transcript:
# No database lock - worst case is duplicate dispatch, but Hatchet await transcripts_controller.update(
# workflows are idempotent so this is acceptable. transcript, {"workflow_run_id": workflow_id}
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow( logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
workflow_name="DiarizationPipeline", return None
input_data={
"recording_id": config.recording_id, # Celery pipeline (durable workflows disabled)
"tracks": [{"s3_key": k} for k in config.track_keys], return task_pipeline_multitrack_process.delay(
"bucket_name": config.bucket_name, transcript_id=config.transcript_id,
"transcript_id": config.transcript_id, bucket_name=config.bucket_name,
"room_id": config.room_id, track_keys=config.track_keys,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
) )
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(config.transcript_id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=config.transcript_id,
workflow_id=workflow_id,
exc_info=True,
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import DAILY_PLATFORM, Platform from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -155,15 +155,12 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
ZULIP_HOST_HEADER: str | None = None
# Hatchet workflow orchestration (always enabled for multitrack processing) # Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None HATCHET_CLIENT_TOKEN: str | None = None

View File

@@ -1,412 +0,0 @@
"""
Render Hatchet workflow runs as text DAG.
Usage:
# Show latest 5 runs (summary table)
uv run -m reflector.tools.render_hatchet_run
# Show specific run with full DAG + task details
uv run -m reflector.tools.render_hatchet_run <workflow_run_id>
# Drill into Nth run from the list (1-indexed)
uv run -m reflector.tools.render_hatchet_run --show 1
# Show latest N runs
uv run -m reflector.tools.render_hatchet_run --last 10
# Filter by status
uv run -m reflector.tools.render_hatchet_run --status FAILED
uv run -m reflector.tools.render_hatchet_run --status RUNNING
"""
import argparse
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from hatchet_sdk.clients.rest.models import (
V1TaskEvent,
V1TaskStatus,
V1TaskSummary,
V1WorkflowRunDetails,
WorkflowRunShapeItemForWorkflowRunDetails,
)
from reflector.hatchet.client import HatchetClientManager
STATUS_ICON = {
V1TaskStatus.COMPLETED: "\u2705",
V1TaskStatus.RUNNING: "\u23f3",
V1TaskStatus.FAILED: "\u274c",
V1TaskStatus.QUEUED: "\u23f8\ufe0f",
V1TaskStatus.CANCELLED: "\u26a0\ufe0f",
}
STATUS_LABEL = {
V1TaskStatus.COMPLETED: "Complete",
V1TaskStatus.RUNNING: "Running",
V1TaskStatus.FAILED: "FAILED",
V1TaskStatus.QUEUED: "Queued",
V1TaskStatus.CANCELLED: "Cancelled",
}
def _fmt_time(dt: datetime | None) -> str:
if dt is None:
return "-"
return dt.strftime("%H:%M:%S")
def _fmt_duration(ms: int | None) -> str:
if ms is None:
return "-"
secs = ms / 1000
if secs < 60:
return f"{secs:.1f}s"
mins = secs / 60
return f"{mins:.1f}m"
def _fmt_status_line(task: V1TaskSummary) -> str:
"""Format a status line like: Complete (finished 20:31:44)"""
label = STATUS_LABEL.get(task.status, task.status.value)
icon = STATUS_ICON.get(task.status, "?")
if task.status == V1TaskStatus.COMPLETED and task.finished_at:
return f"{icon} {label} (finished {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.RUNNING and task.started_at:
parts = [f"started {_fmt_time(task.started_at)}"]
if task.duration:
parts.append(f"{_fmt_duration(task.duration)} elapsed")
return f"{icon} {label} ({', '.join(parts)})"
elif task.status == V1TaskStatus.FAILED and task.finished_at:
return f"{icon} {label} (failed {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.CANCELLED:
return f"{icon} {label}"
elif task.status == V1TaskStatus.QUEUED:
return f"{icon} {label}"
return f"{icon} {label}"
def _topo_sort(
shape: list[WorkflowRunShapeItemForWorkflowRunDetails],
) -> list[str]:
"""Topological sort of step_ids from shape DAG."""
step_ids = {s.step_id for s in shape}
children_map: dict[str, list[str]] = {}
in_degree: dict[str, int] = {sid: 0 for sid in step_ids}
for s in shape:
children = [c for c in (s.children_step_ids or []) if c in step_ids]
children_map[s.step_id] = children
for c in children:
in_degree[c] += 1
queue = sorted(sid for sid, deg in in_degree.items() if deg == 0)
result: list[str] = []
while queue:
node = queue.pop(0)
result.append(node)
for c in children_map.get(node, []):
in_degree[c] -= 1
if in_degree[c] == 0:
queue.append(c)
queue.sort()
return result
def render_run_detail(details: V1WorkflowRunDetails) -> str:
"""Render a single workflow run as markdown DAG with task details."""
shape = details.shape or []
tasks = details.tasks or []
events = details.task_events or []
run = details.run
if not shape:
return f"Run {run.metadata.id}: {run.status.value} (no shape data)"
# Build lookups
step_to_shape: dict[str, WorkflowRunShapeItemForWorkflowRunDetails] = {
s.step_id: s for s in shape
}
step_to_name: dict[str, str] = {s.step_id: s.task_name for s in shape}
# Reverse edges (parents)
parents: dict[str, list[str]] = {s.step_id: [] for s in shape}
for s in shape:
for child_id in s.children_step_ids or []:
if child_id in parents:
parents[child_id].append(s.step_id)
# Join tasks by step_id
task_by_step: dict[str, V1TaskSummary] = {}
for t in tasks:
if t.step_id and t.step_id in step_to_name:
task_by_step[t.step_id] = t
# Events indexed by task_external_id
events_by_task: dict[str, list[V1TaskEvent]] = defaultdict(list)
for ev in events:
events_by_task[ev.task_id].append(ev)
ordered = _topo_sort(shape)
lines: list[str] = []
# Run header
run_icon = STATUS_ICON.get(run.status, "?")
run_name = run.display_name or run.workflow_id
dur = _fmt_duration(run.duration)
lines.append(f"**{run_name}** {run_icon} {dur}")
lines.append(f"ID: `{run.metadata.id}`")
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
lines.append(f"Meta: {', '.join(meta_parts)}")
if run.error_message:
# Take first line of error only for header
first_line = run.error_message.split("\n")[0]
lines.append(f"Error: {first_line}")
lines.append("")
# DAG Status Overview table
lines.append("**DAG Status Overview**")
lines.append("")
lines.append("| Node | Status | Duration | Dependencies |")
lines.append("|------|--------|----------|--------------|")
for step_id in ordered:
s = step_to_shape[step_id]
t = task_by_step.get(step_id)
name = step_to_name[step_id]
icon = STATUS_ICON.get(t.status, "?") if t else "?"
dur = _fmt_duration(t.duration) if t else "-"
parent_names = [step_to_name[p] for p in parents[step_id]]
child_names = [
step_to_name[c] for c in (s.children_step_ids or []) if c in step_to_name
]
deps_left = ", ".join(parent_names) if parent_names else ""
deps_right = ", ".join(child_names) if child_names else ""
if deps_left and deps_right:
deps = f"{deps_left} \u2192 {deps_right}"
elif deps_right:
deps = f"\u2192 {deps_right}"
elif deps_left:
deps = f"{deps_left} \u2192"
else:
deps = "-"
lines.append(f"| {name} | {icon} | {dur} | {deps} |")
lines.append("")
lines.append("---")
lines.append("")
# Node details
for step_id in ordered:
t = task_by_step.get(step_id)
name = step_to_name[step_id]
if not t:
lines.append(f"**\U0001f4e6 {name}**")
lines.append("Status: no task data")
lines.append("")
continue
lines.append(f"**\U0001f4e6 {name}**")
lines.append(f"Status: {_fmt_status_line(t)}")
if t.duration:
lines.append(f"Duration: {_fmt_duration(t.duration)}")
if t.retry_count and t.retry_count > 0:
lines.append(f"Retries: {t.retry_count}")
# Fan-out children
if t.num_spawned_children and t.num_spawned_children > 0:
children = t.children or []
completed = sum(1 for c in children if c.status == V1TaskStatus.COMPLETED)
failed = sum(1 for c in children if c.status == V1TaskStatus.FAILED)
running = sum(1 for c in children if c.status == V1TaskStatus.RUNNING)
lines.append(
f"Spawned children: {completed}/{t.num_spawned_children} done"
f"{f', {running} running' if running else ''}"
f"{f', {failed} failed' if failed else ''}"
)
# Error message (first meaningful line only, full trace in events)
if t.error_message:
err_lines = t.error_message.strip().split("\n")
# Find first non-empty, non-traceback line
err_summary = err_lines[0]
for line in err_lines:
stripped = line.strip()
if stripped and not stripped.startswith(
("Traceback", "File ", "{", ")")
):
err_summary = stripped
break
lines.append(f"Error: `{err_summary}`")
# Events log
task_events = sorted(
events_by_task.get(t.task_external_id, []),
key=lambda e: e.timestamp,
)
if task_events:
lines.append("Events:")
for ev in task_events:
ts = ev.timestamp.strftime("%H:%M:%S")
ev_icon = ""
if ev.event_type.value == "FINISHED":
ev_icon = "\u2705 "
elif ev.event_type.value in ("FAILED", "TIMED_OUT"):
ev_icon = "\u274c "
elif ev.event_type.value == "STARTED":
ev_icon = "\u25b6\ufe0f "
elif ev.event_type.value == "RETRYING":
ev_icon = "\U0001f504 "
elif ev.event_type.value == "CANCELLED":
ev_icon = "\u26a0\ufe0f "
msg = ev.message.strip()
if ev.error_message:
# Just first line of error in event log
err_first = ev.error_message.strip().split("\n")[0]
if msg:
msg += f" | {err_first}"
else:
msg = err_first
if msg:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}: {msg}")
else:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}")
lines.append("")
return "\n".join(lines)
def render_run_summary(idx: int, run: V1TaskSummary) -> str:
"""One-line summary for a run in the list view."""
icon = STATUS_ICON.get(run.status, "?")
name = run.display_name or run.workflow_name or "?"
run_id = run.workflow_run_external_id or "?"
dur = _fmt_duration(run.duration)
started = _fmt_time(run.started_at)
meta = ""
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
meta = f" ({', '.join(meta_parts)})"
return (
f" {idx}. {icon} **{name}** started={started} dur={dur}{meta}\n"
f" `{run_id}`"
)
async def _fetch_run_list(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> list[V1TaskSummary]:
client = HatchetClientManager.get_client()
since = datetime.now(timezone.utc) - timedelta(days=7)
runs = await client.runs.aio_list(
since=since,
statuses=statuses,
limit=count,
)
return runs.rows or []
async def list_recent_runs(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""List recent workflow runs as text."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
lines = [f"Recent runs ({len(rows)}):", ""]
for i, run in enumerate(rows, 1):
lines.append(render_run_summary(i, run))
lines.append("")
lines.append("Use `--show N` to see full DAG for run N")
return "\n".join(lines)
async def show_run(workflow_run_id: str) -> str:
"""Fetch and render a single run."""
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
return render_run_detail(details)
async def show_nth_run(
n: int,
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""Fetch list, then drill into Nth run."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
if n < 1 or n > len(rows):
return f"Invalid index {n}. Have {len(rows)} runs (1-{len(rows)})."
run = rows[n - 1]
return await show_run(run.workflow_run_external_id)
async def main_async(args: argparse.Namespace) -> None:
statuses = [V1TaskStatus(args.status)] if args.status else None
if args.run_id:
output = await show_run(args.run_id)
elif args.show is not None:
output = await show_nth_run(args.show, count=args.last, statuses=statuses)
else:
output = await list_recent_runs(count=args.last, statuses=statuses)
print(output)
def main() -> None:
parser = argparse.ArgumentParser(
description="Render Hatchet workflow runs as text DAG"
)
parser.add_argument(
"run_id",
nargs="?",
default=None,
help="Workflow run ID to show in detail. If omitted, lists recent runs.",
)
parser.add_argument(
"--show",
type=int,
default=None,
metavar="N",
help="Show full DAG for the Nth run in the list (1-indexed)",
)
parser.add_argument(
"--last",
type=int,
default=5,
help="Number of recent runs to list (default: 5)",
)
parser.add_argument(
"--status",
choices=["QUEUED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED"],
help="Filter by status",
)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,6 @@
import asyncio
import logging import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from typing import Annotated, Any, Literal, Optional from typing import Annotated, Any, Literal, Optional
@@ -6,13 +8,14 @@ from typing import Annotated, Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Page from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate from fastapi_pagination.ext.databases import apaginate
from pydantic import BaseModel from pydantic import BaseModel, Field
from redis.exceptions import LockError from redis.exceptions import LockError
import reflector.auth as auth import reflector.auth as auth
from reflector.db import get_database from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room as DbRoom
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock from reflector.redis_cache import RedisAsyncLock
from reflector.schemas.platform import Platform from reflector.schemas.platform import Platform
@@ -195,6 +198,82 @@ async def rooms_list(
return paginated return paginated
class BulkStatusRequest(BaseModel):
room_names: list[str] = Field(max_length=100)
class RoomMeetingStatus(BaseModel):
active_meetings: list[Meeting]
upcoming_events: list[CalendarEventResponse]
@router.post("/rooms/meetings/bulk-status", response_model=dict[str, RoomMeetingStatus])
async def rooms_bulk_meeting_status(
request: BulkStatusRequest,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
if not user and not settings.PUBLIC_MODE:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = user["sub"] if user else None
all_rooms = await rooms_controller.get_by_names(request.room_names)
# Filter to rooms the user can see (owned or shared), matching rooms_list behavior
rooms = [
r
for r in all_rooms
if r.is_shared or (user_id is not None and r.user_id == user_id)
]
room_by_id: dict[str, DbRoom] = {r.id: r for r in rooms}
room_ids = list(room_by_id.keys())
if not room_ids:
return {
name: RoomMeetingStatus(active_meetings=[], upcoming_events=[])
for name in request.room_names
}
current_time = datetime.now(timezone.utc)
active_meetings, upcoming_events = await asyncio.gather(
meetings_controller.get_all_active_for_rooms(room_ids, current_time),
calendar_events_controller.get_upcoming_for_rooms(room_ids),
)
# Group by room name, converting DB models to view models
active_by_room: dict[str, list[Meeting]] = defaultdict(list)
for m in active_meetings:
room = room_by_id.get(m.room_id)
if not room:
continue
m.platform = room.platform
if user_id != room.user_id and m.platform == "whereby":
m.host_room_url = ""
active_by_room[room.name].append(
Meeting.model_validate(m, from_attributes=True)
)
upcoming_by_room: dict[str, list[CalendarEventResponse]] = defaultdict(list)
for e in upcoming_events:
room = room_by_id.get(e.room_id)
if not room:
continue
if user_id != room.user_id:
e.description = None
e.attendees = None
upcoming_by_room[room.name].append(
CalendarEventResponse.model_validate(e, from_attributes=True)
)
result: dict[str, RoomMeetingStatus] = {}
for name in request.room_names:
result[name] = RoomMeetingStatus(
active_meetings=active_by_room.get(name, []),
upcoming_events=upcoming_by_room.get(name, []),
)
return result
@router.get("/rooms/{room_id}", response_model=RoomDetails) @router.get("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_get( async def rooms_get(
room_id: str, room_id: str,

View File

@@ -25,9 +25,11 @@ from reflector.db.transcripts import (
transcripts_controller, transcripts_controller,
) )
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -349,40 +351,50 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
# Multitrack processing always uses Hatchet (no Celery fallback) use_celery = room and room.use_celery
workflow_id = await HatchetClientManager.start_workflow( use_hatchet = not use_celery
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id}) if use_celery:
logger.info(
try: "Room uses legacy Celery processing",
await create_dag_zulip_message(transcript.id, workflow_id) room_id=room.id,
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=transcript.id, transcript_id=transcript.id,
workflow_id=workflow_id,
exc_info=True,
) )
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
@shared_task @shared_task
@asynctask @asynctask
@@ -1060,53 +1072,66 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
# Multitrack reprocessing always uses Hatchet (no Celery fallback) use_celery = room and room.use_celery
if not transcript: use_hatchet = not use_celery
logger.warning(
"No transcript for Hatchet reprocessing, skipping", if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at reprocess dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id, workflow_id=workflow_id,
exc_info=True, room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
logger.info( # For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
"Queued Daily recording for Hatchet reprocessing", # Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_id=recording.id, recording_start_ts = int(recording.recorded_at.timestamp())
workflow_id=workflow_id,
room_name=meeting.room_name, process_multitrack_recording.delay(
track_count=len(recording.track_keys), bucket_name=bucket_name,
) daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -12,16 +12,9 @@ class InvalidMessageError(Exception):
pass pass
def _zulip_client() -> httpx.AsyncClient:
headers = {}
if settings.ZULIP_HOST_HEADER:
headers["Host"] = settings.ZULIP_HOST_HEADER
return httpx.AsyncClient(verify=False, headers=headers)
async def get_zulip_topics(stream_id: int) -> list[dict]: async def get_zulip_topics(stream_id: int) -> list[dict]:
try: try:
async with _zulip_client() as client: async with httpx.AsyncClient() as client:
response = await client.get( response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics", f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY), auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -36,7 +29,7 @@ async def get_zulip_topics(stream_id: int) -> list[dict]:
async def get_zulip_streams() -> list[dict]: async def get_zulip_streams() -> list[dict]:
try: try:
async with _zulip_client() as client: async with httpx.AsyncClient() as client:
response = await client.get( response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/streams", f"https://{settings.ZULIP_REALM}/api/v1/streams",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY), auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -51,7 +44,7 @@ async def get_zulip_streams() -> list[dict]:
async def send_message_to_zulip(stream: str, topic: str, content: str): async def send_message_to_zulip(stream: str, topic: str, content: str):
try: try:
async with _zulip_client() as client: async with httpx.AsyncClient() as client:
response = await client.post( response = await client.post(
f"https://{settings.ZULIP_REALM}/api/v1/messages", f"https://{settings.ZULIP_REALM}/api/v1/messages",
data={ data={
@@ -73,7 +66,7 @@ async def send_message_to_zulip(stream: str, topic: str, content: str):
async def update_zulip_message(message_id: int, stream: str, topic: str, content: str): async def update_zulip_message(message_id: int, stream: str, topic: str, content: str):
try: try:
async with _zulip_client() as client: async with httpx.AsyncClient() as client:
response = await client.patch( response = await client.patch(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}", f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
data={ data={
@@ -97,27 +90,6 @@ async def update_zulip_message(message_id: int, stream: str, topic: str, content
raise Exception(f"Failed to update Zulip message: {error}") raise Exception(f"Failed to update Zulip message: {error}")
async def delete_zulip_message(message_id: int):
try:
async with _zulip_client() as client:
response = await client.delete(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
)
if (
response.status_code == 400
and response.json()["msg"] == "Invalid message(s)"
):
raise InvalidMessageError(f"There is no message with id: {message_id}")
response.raise_for_status()
return response.json()
except httpx.RequestError as error:
raise Exception(f"Failed to delete Zulip message: {error}")
def get_zulip_message(transcript: Transcript, include_topics: bool): def get_zulip_message(transcript: Transcript, include_topics: bool):
transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM from reflector.schemas.platform import WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -14,7 +14,6 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield

View File

@@ -1,536 +0,0 @@
"""
Tests for Hatchet DAG Status -> Zulip Live Updates.
Tests cover:
- _dag_zulip_enabled() guard logic
- create_dag_zulip_message: sends + stores message ID
- update_dag_zulip_message: updates existing; noop when no message_id
- delete_dag_zulip_message: deletes + clears; handles InvalidMessageError
- delete_zulip_message: sends HTTP DELETE; raises on 400
- with_error_handling integration: calls update after success + failure
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.transcripts import Transcript
@pytest.fixture
def dag_settings():
"""Patch settings for DAG Zulip tests."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
yield mock_settings
@pytest.fixture
def dag_settings_disabled():
"""Patch settings with DAG Zulip disabled."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = None
mock_settings.ZULIP_DAG_TOPIC = None
yield mock_settings
@pytest.fixture
def mock_transcript():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=None,
)
@pytest.fixture
def mock_transcript_with_zulip_id():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=42,
)
class TestDagZulipEnabled:
def test_enabled_when_all_set(self, dag_settings):
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is True
def test_disabled_when_realm_missing(self, dag_settings):
dag_settings.ZULIP_REALM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_stream_missing(self, dag_settings):
dag_settings.ZULIP_DAG_STREAM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_topic_missing(self, dag_settings):
dag_settings.ZULIP_DAG_TOPIC = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestCreateDagZulipMessage:
async def test_sends_and_stores_message_id(self, dag_settings, mock_transcript):
mock_run_details = MagicMock()
rendered_md = "**DAG** rendered"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 99},
) as mock_send,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_update,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_called_once_with("dag-stream", "dag-topic", rendered_md)
mock_update.assert_called_once_with(
mock_transcript, {"zulip_message_id": 99}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
) as mock_send:
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_not_called()
async def test_logs_warning_on_failure(self, dag_settings, mock_transcript):
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value="rendered",
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
side_effect=Exception("Zulip down"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch("reflector.hatchet.dag_zulip.logger") as mock_logger,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=MagicMock())
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
# Should not raise
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_logger.warning.assert_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestUpdateDagZulipMessage:
async def test_updates_existing_message(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_called_once_with(
42, "dag-stream", "dag-topic", rendered_md
)
async def test_appends_error_banner(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message(
"test-transcript-id",
"workflow-run-123",
error_message="get_recording failed: connection timeout",
)
call_args = mock_update.call_args
content = call_args[0][3]
assert rendered_md in content
assert "get_recording failed: connection timeout" in content
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update:
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestDeleteDagZulipMessage:
async def test_deletes_and_clears(
self, dag_settings, mock_transcript_with_zulip_id
):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_called_once_with(42)
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
async def test_handles_invalid_message_error(
self, dag_settings, mock_transcript_with_zulip_id
):
from reflector.zulip import InvalidMessageError
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
side_effect=InvalidMessageError("gone"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
patch("reflector.hatchet.dag_zulip.logger"),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
# Should not raise; should still clear the message_id
await delete_dag_zulip_message("test-transcript-id")
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete:
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
@pytest.mark.asyncio
class TestDeleteZulipMessage:
async def test_sends_delete_request(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"result": "success"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
result = await delete_zulip_message(123)
assert result == {"result": "success"}
mock_client.delete.assert_called_once()
call_args = mock_client.delete.call_args
assert "123" in call_args.args[0]
async def test_raises_invalid_message_on_400(self):
from reflector.zulip import InvalidMessageError
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"msg": "Invalid message(s)"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
with pytest.raises(InvalidMessageError):
await delete_zulip_message(999)
@pytest.mark.asyncio
class TestWithErrorHandlingDagUpdate:
"""Test that with_error_handling calls update_dag_zulip_message."""
async def test_calls_update_on_success(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def fake_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update:
result = await fake_task(input_data, mock_ctx)
assert result == "ok"
mock_update.assert_called_once_with("tid-1", "wfr-123")
async def test_calls_update_on_failure_with_error_message(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def failing_task(input: PipelineInput, ctx) -> str:
raise ValueError("boom")
with (
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.set_workflow_error_status",
new_callable=AsyncMock,
),
):
with pytest.raises(ValueError, match="boom"):
await failing_task(input_data, mock_ctx)
mock_update.assert_called_once_with(
"tid-1", "wfr-123", error_message="get_recording failed: boom"
)
async def test_dag_failure_doesnt_affect_task(self):
"""DAG update failure should not prevent task from succeeding."""
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def ok_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
side_effect=Exception("zulip exploded"),
):
result = await ok_task(input_data, mock_ctx)
assert result == "ok"

View File

@@ -0,0 +1,184 @@
from datetime import datetime, timedelta, timezone
import pytest
from conftest import authenticated_client_ctx
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.settings import settings
async def _create_room(name: str, user_id: str, is_shared: bool = False) -> Room:
return await rooms_controller.add(
name=name,
user_id=user_id,
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=is_shared,
webhook_url="",
webhook_secret="",
)
async def _create_meeting(room: Room, active: bool = True):
now = datetime.now(timezone.utc)
return await meetings_controller.create(
id=f"meeting-{room.name}-{now.timestamp()}",
room_name=room.name,
room_url="room-url",
host_room_url="host-url",
start_date=now - timedelta(minutes=10),
end_date=now + timedelta(minutes=50) if active else now - timedelta(minutes=1),
room=room,
)
async def _create_calendar_event(room: Room):
now = datetime.now(timezone.utc)
return await calendar_events_controller.upsert(
CalendarEvent(
room_id=room.id,
ics_uid=f"event-{room.name}",
title=f"Upcoming in {room.name}",
description="secret description",
start_time=now + timedelta(minutes=30),
end_time=now + timedelta(minutes=90),
attendees=[{"name": "Alice", "email": "alice@example.com"}],
)
)
@pytest.mark.asyncio
async def test_bulk_status_returns_empty_for_no_rooms(client):
"""Empty room_names returns empty dict."""
async with authenticated_client_ctx():
resp = await client.post("/rooms/meetings/bulk-status", json={"room_names": []})
assert resp.status_code == 200
assert resp.json() == {}
@pytest.mark.asyncio
async def test_bulk_status_returns_active_meetings_and_upcoming_events(client):
"""Owner sees active meetings and upcoming events for their rooms."""
room = await _create_room("bulk-test-room", "randomuserid")
await _create_meeting(room, active=True)
await _create_calendar_event(room)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["bulk-test-room"]},
)
assert resp.status_code == 200
data = resp.json()
assert "bulk-test-room" in data
status = data["bulk-test-room"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Owner sees description
assert status["upcoming_events"][0]["description"] == "secret description"
@pytest.mark.asyncio
async def test_bulk_status_redacts_data_for_non_owner(client):
"""Non-owner of a shared room gets redacted calendar events and no whereby host_room_url."""
room = await _create_room("shared-bulk", "other-user-id", is_shared=True)
await _create_meeting(room, active=True)
await _create_calendar_event(room)
# authenticated as "randomuserid" but room owned by "other-user-id"
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-bulk"]},
)
assert resp.status_code == 200
status = resp.json()["shared-bulk"]
assert len(status["active_meetings"]) == 1
assert len(status["upcoming_events"]) == 1
# Non-owner: description and attendees redacted
assert status["upcoming_events"][0]["description"] is None
assert status["upcoming_events"][0]["attendees"] is None
@pytest.mark.asyncio
async def test_bulk_status_filters_private_rooms_of_other_users(client):
"""User cannot see private rooms owned by others."""
await _create_room("private-other", "other-user-id", is_shared=False)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["private-other"]},
)
assert resp.status_code == 200
status = resp.json()["private-other"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []
@pytest.mark.asyncio
async def test_bulk_status_redacts_whereby_host_room_url_for_non_owner(client):
"""Non-owner of a shared whereby room gets empty host_room_url."""
room = await _create_room("shared-whereby", "other-user-id", is_shared=True)
# Force platform to whereby
from reflector.db import get_database
from reflector.db.rooms import rooms as rooms_table
await get_database().execute(
rooms_table.update()
.where(rooms_table.c.id == room.id)
.values(platform="whereby")
)
await _create_meeting(room, active=True)
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["shared-whereby"]},
)
assert resp.status_code == 200
status = resp.json()["shared-whereby"]
assert len(status["active_meetings"]) == 1
assert status["active_meetings"][0]["host_room_url"] == ""
@pytest.mark.asyncio
async def test_bulk_status_unauthenticated_rejected_non_public(client):
"""Unauthenticated request on non-PUBLIC_MODE instance returns 401."""
original = settings.PUBLIC_MODE
try:
settings.PUBLIC_MODE = False
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["any-room"]},
)
assert resp.status_code == 401
finally:
settings.PUBLIC_MODE = original
@pytest.mark.asyncio
async def test_bulk_status_nonexistent_room_returns_empty(client):
"""Requesting a room that doesn't exist returns empty lists."""
async with authenticated_client_ctx():
resp = await client.post(
"/rooms/meetings/bulk-status",
json={"room_names": ["does-not-exist"]},
)
assert resp.status_code == 200
status = resp.json()["does-not-exist"]
assert status["active_meetings"] == []
assert status["upcoming_events"] == []

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import AsyncMock, patch from unittest.mock import patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_hatchet, ) as mock_multitrack_pipeline,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline, not Hatchet # Whereby recordings should use file pipeline
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_hatchet.start_workflow.assert_not_called() mock_multitrack_pipeline.delay.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,6 +177,8 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -211,23 +213,18 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_hatchet, ) as mock_multitrack_pipeline,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use Hatchet workflow # Daily.co multitrack recordings should use multitrack pipeline
mock_hatchet.start_workflow.assert_called_once() mock_multitrack_pipeline.delay.assert_called_once_with(
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs transcript_id=transcript.id,
assert call_kwargs["workflow_name"] == "DiarizationPipeline" bucket_name="daily-bucket",
assert call_kwargs["input_data"]["transcript_id"] == transcript.id track_keys=track_keys,
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket" )
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -1,5 +1,10 @@
import { useMemo } from "react";
import { Box, Heading, Text, VStack } from "@chakra-ui/react"; import { Box, Heading, Text, VStack } from "@chakra-ui/react";
import type { components } from "../../../reflector-api"; import type { components } from "../../../reflector-api";
import {
useRoomsBulkMeetingStatus,
BulkMeetingStatusMap,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"]; type Room = components["schemas"]["Room"];
import { RoomTable } from "./RoomTable"; import { RoomTable } from "./RoomTable";
@@ -31,6 +36,10 @@ export function RoomList({
pt, pt,
loading, loading,
}: RoomListProps) { }: RoomListProps) {
const roomNames = useMemo(() => rooms.map((r) => r.name), [rooms]);
const bulkStatusQuery = useRoomsBulkMeetingStatus(roomNames);
const meetingStatusMap: BulkMeetingStatusMap = bulkStatusQuery.data ?? {};
return ( return (
<VStack alignItems="start" gap={4} mb={mb} pt={pt}> <VStack alignItems="start" gap={4} mb={mb} pt={pt}>
<Heading size="md">{title}</Heading> <Heading size="md">{title}</Heading>
@@ -43,6 +52,8 @@ export function RoomList({
onEdit={onEdit} onEdit={onEdit}
onDelete={onDelete} onDelete={onDelete}
loading={loading} loading={loading}
meetingStatusMap={meetingStatusMap}
meetingStatusLoading={bulkStatusQuery.isLoading}
/> />
<RoomCards <RoomCards
rooms={rooms} rooms={rooms}

View File

@@ -14,11 +14,7 @@ import {
import { LuLink, LuRefreshCw } from "react-icons/lu"; import { LuLink, LuRefreshCw } from "react-icons/lu";
import { FaCalendarAlt } from "react-icons/fa"; import { FaCalendarAlt } from "react-icons/fa";
import type { components } from "../../../reflector-api"; import type { components } from "../../../reflector-api";
import { import { useRoomIcsSync, BulkMeetingStatusMap } from "../../../lib/apiHooks";
useRoomActiveMeetings,
useRoomUpcomingMeetings,
useRoomIcsSync,
} from "../../../lib/apiHooks";
type Room = components["schemas"]["Room"]; type Room = components["schemas"]["Room"];
type Meeting = components["schemas"]["Meeting"]; type Meeting = components["schemas"]["Meeting"];
@@ -62,6 +58,8 @@ interface RoomTableProps {
onEdit: (roomId: string, roomData: any) => void; onEdit: (roomId: string, roomData: any) => void;
onDelete: (roomId: string) => void; onDelete: (roomId: string) => void;
loading?: boolean; loading?: boolean;
meetingStatusMap: BulkMeetingStatusMap;
meetingStatusLoading: boolean;
} }
const getRoomModeDisplay = (mode: string): string => { const getRoomModeDisplay = (mode: string): string => {
@@ -104,14 +102,16 @@ const getZulipDisplay = (
return "Enabled"; return "Enabled";
}; };
function MeetingStatus({ roomName }: { roomName: string }) { function MeetingStatus({
const activeMeetingsQuery = useRoomActiveMeetings(roomName); activeMeetings,
const upcomingMeetingsQuery = useRoomUpcomingMeetings(roomName); upcomingMeetings,
isLoading,
const activeMeetings = activeMeetingsQuery.data || []; }: {
const upcomingMeetings = upcomingMeetingsQuery.data || []; activeMeetings: Meeting[];
upcomingMeetings: CalendarEventResponse[];
if (activeMeetingsQuery.isLoading || upcomingMeetingsQuery.isLoading) { isLoading: boolean;
}) {
if (isLoading) {
return <Spinner size="sm" />; return <Spinner size="sm" />;
} }
@@ -176,6 +176,8 @@ export function RoomTable({
onEdit, onEdit,
onDelete, onDelete,
loading, loading,
meetingStatusMap,
meetingStatusLoading,
}: RoomTableProps) { }: RoomTableProps) {
const [syncingRooms, setSyncingRooms] = useState<Set<NonEmptyString>>( const [syncingRooms, setSyncingRooms] = useState<Set<NonEmptyString>>(
new Set(), new Set(),
@@ -252,7 +254,15 @@ export function RoomTable({
<Link href={`/${room.name}`}>{room.name}</Link> <Link href={`/${room.name}`}>{room.name}</Link>
</Table.Cell> </Table.Cell>
<Table.Cell> <Table.Cell>
<MeetingStatus roomName={room.name} /> <MeetingStatus
activeMeetings={
meetingStatusMap[room.name]?.active_meetings ?? []
}
upcomingMeetings={
meetingStatusMap[room.name]?.upcoming_events ?? []
}
isLoading={meetingStatusLoading}
/>
</Table.Cell> </Table.Cell>
<Table.Cell> <Table.Cell>
{getZulipDisplay( {getZulipDisplay(

View File

@@ -0,0 +1,246 @@
import "@testing-library/jest-dom";
// --- Module mocks (hoisted before imports) ---
jest.mock("../apiClient", () => ({
client: {
GET: jest.fn(),
POST: jest.fn(),
PUT: jest.fn(),
PATCH: jest.fn(),
DELETE: jest.fn(),
use: jest.fn(),
},
$api: {
useQuery: jest.fn(),
useMutation: jest.fn(),
queryOptions: (method: string, path: string, init?: unknown) =>
init === undefined
? { queryKey: [method, path] }
: { queryKey: [method, path, init] },
},
API_URL: "http://test",
WEBSOCKET_URL: "ws://test",
configureApiAuth: jest.fn(),
}));
jest.mock("../AuthProvider", () => ({
useAuth: () => ({
status: "authenticated" as const,
accessToken: "test-token",
accessTokenExpires: Date.now() + 3600000,
user: { id: "user1", name: "Test User" },
update: jest.fn(),
signIn: jest.fn(),
signOut: jest.fn(),
lastUserId: "user1",
}),
}));
// --- Imports (after mocks) ---
import React from "react";
import { render, waitFor, screen } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useRoomsBulkMeetingStatus, BulkMeetingStatusMap } from "../apiHooks";
import { client } from "../apiClient";
import { ErrorProvider } from "../../(errors)/errorContext";
const mockClient = client as { POST: jest.Mock };
// --- Helpers ---
function mockBulkStatusEndpoint(
roomData?: Record<
string,
{ active_meetings: unknown[]; upcoming_events: unknown[] }
>,
) {
mockClient.POST.mockImplementation(
async (_path: string, options: { body: { room_names: string[] } }) => {
const roomNames: string[] = options.body.room_names;
const src = roomData ?? {};
const data = Object.fromEntries(
roomNames.map((name) => [
name,
src[name] ?? { active_meetings: [], upcoming_events: [] },
]),
);
return { data, error: undefined, response: {} };
},
);
}
// --- Test component: uses the bulk hook and displays results ---
function BulkStatusDisplay({ roomNames }: { roomNames: string[] }) {
const { data, isLoading } = useRoomsBulkMeetingStatus(roomNames);
if (isLoading) {
return <div data-testid="status">loading</div>;
}
if (!data) {
return <div data-testid="status">no data</div>;
}
return (
<div data-testid="status">
{roomNames.map((name) => {
const status = data[name];
return (
<div key={name} data-testid={`room-${name}`}>
{status?.active_meetings?.length ?? 0} active,{" "}
{status?.upcoming_events?.length ?? 0} upcoming
</div>
);
})}
</div>
);
}
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: {
queries: { retry: false },
},
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>
<ErrorProvider>{children}</ErrorProvider>
</QueryClientProvider>
);
};
}
// --- Tests ---
describe("bulk meeting status (prop-drilling)", () => {
afterEach(() => jest.clearAllMocks());
it("fetches all room statuses in a single POST request", async () => {
const rooms = Array.from({ length: 10 }, (_, i) => `room-${i}`);
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={rooms} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
for (const name of rooms) {
expect(screen.getByTestId(`room-${name}`)).toHaveTextContent(
"0 active, 0 upcoming",
);
}
});
const postCalls = mockClient.POST.mock.calls.filter(
([path]: [string]) => path === "/v1/rooms/meetings/bulk-status",
);
// Prop-drilling: exactly 1 POST for all rooms (no batcher needed)
expect(postCalls).toHaveLength(1);
// The single call contains all room names
const requestedRooms: string[] = postCalls[0][1].body.room_names;
expect(requestedRooms).toHaveLength(10);
for (const name of rooms) {
expect(requestedRooms).toContain(name);
}
});
it("returns room-specific data correctly", async () => {
mockBulkStatusEndpoint({
"room-a": {
active_meetings: [{ id: "m1", room_name: "room-a" }],
upcoming_events: [],
},
"room-b": {
active_meetings: [],
upcoming_events: [{ id: "e1", title: "Standup" }],
},
});
render(<BulkStatusDisplay roomNames={["room-a", "room-b"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("room-room-a")).toHaveTextContent(
"1 active, 0 upcoming",
);
expect(screen.getByTestId("room-room-b")).toHaveTextContent(
"0 active, 1 upcoming",
);
});
// Still just 1 POST
expect(mockClient.POST).toHaveBeenCalledTimes(1);
});
it("does not fetch when roomNames is empty", async () => {
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={[]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
// No POST calls when no rooms
expect(mockClient.POST).not.toHaveBeenCalled();
});
it("surfaces error when POST fails", async () => {
mockClient.POST.mockResolvedValue({
data: undefined,
error: { detail: "server error" },
response: {},
});
function ErrorDisplay({ roomNames }: { roomNames: string[] }) {
const { error } = useRoomsBulkMeetingStatus(roomNames);
if (error) return <div data-testid="error">{error.message}</div>;
return <div data-testid="error">no error</div>;
}
render(<ErrorDisplay roomNames={["room-x"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("error")).toHaveTextContent(
"bulk-status fetch failed",
);
});
});
it("does not fetch when unauthenticated", async () => {
// Override useAuth to return unauthenticated
const authModule = jest.requireMock("../AuthProvider");
const originalUseAuth = authModule.useAuth;
authModule.useAuth = () => ({
...originalUseAuth(),
status: "unauthenticated",
});
mockBulkStatusEndpoint();
render(<BulkStatusDisplay roomNames={["room-1"]} />, {
wrapper: createWrapper(),
});
await waitFor(() => {
expect(screen.getByTestId("status")).toHaveTextContent("no data");
});
expect(mockClient.POST).not.toHaveBeenCalled();
// Restore
authModule.useAuth = originalUseAuth;
});
});

View File

@@ -1,8 +1,8 @@
"use client"; "use client";
import { $api } from "./apiClient"; import { $api, client } from "./apiClient";
import { useError } from "../(errors)/errorContext"; import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query"; import { QueryClient, useQuery, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api"; import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider"; import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types"; import { MeetingId } from "./types";
@@ -641,16 +641,21 @@ export function useMeetingDeactivate() {
setError(error as Error, "Failed to end meeting"); setError(error as Error, "Failed to end meeting");
}, },
onSuccess: () => { onSuccess: () => {
return queryClient.invalidateQueries({ return Promise.all([
predicate: (query) => { queryClient.invalidateQueries({
const key = query.queryKey; predicate: (query) => {
return key.some( const key = query.queryKey;
(k) => return key.some(
typeof k === "string" && (k) =>
!!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)), typeof k === "string" &&
); !!MEETING_LIST_PATH_PARTIALS.find((e) => k.includes(e)),
}, );
}); },
}),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}),
]);
}, },
}); });
} }
@@ -707,6 +712,9 @@ export function useRoomsCreateMeeting() {
}, },
).queryKey, ).queryKey,
}), }),
queryClient.invalidateQueries({
queryKey: ["bulk-meeting-status"],
}),
]); ]);
}, },
onError: (error) => { onError: (error) => {
@@ -772,6 +780,32 @@ export function useRoomActiveMeetings(roomName: string | null) {
); );
} }
type RoomMeetingStatus = components["schemas"]["RoomMeetingStatus"];
export type BulkMeetingStatusMap = Partial<Record<string, RoomMeetingStatus>>;
export function useRoomsBulkMeetingStatus(roomNames: string[]) {
const { isAuthenticated } = useAuthReady();
const sortedNames = [...roomNames].sort();
return useQuery({
queryKey: ["bulk-meeting-status", sortedNames],
queryFn: async (): Promise<BulkMeetingStatusMap> => {
const { data, error } = await client.POST(
"/v1/rooms/meetings/bulk-status",
{ body: { room_names: roomNames } },
);
if (error || !data) {
throw new Error(
`bulk-status fetch failed: ${JSON.stringify(error ?? "no data")}`,
);
}
return data;
},
enabled: sortedNames.length > 0 && isAuthenticated,
});
}
export function useRoomGetMeeting( export function useRoomGetMeeting(
roomName: string | null, roomName: string | null,
meetingId: MeetingId | null, meetingId: MeetingId | null,

View File

@@ -118,6 +118,23 @@ export interface paths {
patch?: never; patch?: never;
trace?: never; trace?: never;
}; };
"/v1/rooms/meetings/bulk-status": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/** Rooms Bulk Meeting Status */
post: operations["v1_rooms_bulk_meeting_status"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}": { "/v1/rooms/{room_id}": {
parameters: { parameters: {
query?: never; query?: never;
@@ -799,6 +816,11 @@ export interface components {
*/ */
chunk: string; chunk: string;
}; };
/** BulkStatusRequest */
BulkStatusRequest: {
/** Room Names */
room_names: string[];
};
/** CalendarEventResponse */ /** CalendarEventResponse */
CalendarEventResponse: { CalendarEventResponse: {
/** Id */ /** Id */
@@ -1675,6 +1697,13 @@ export interface components {
*/ */
skip_consent: boolean; skip_consent: boolean;
}; };
/** RoomMeetingStatus */
RoomMeetingStatus: {
/** Active Meetings */
active_meetings: components["schemas"]["Meeting"][];
/** Upcoming Events */
upcoming_events: components["schemas"]["CalendarEventResponse"][];
};
/** RoomDetails */ /** RoomDetails */
RoomDetails: { RoomDetails: {
/** Id */ /** Id */
@@ -2272,6 +2301,41 @@ export interface operations {
}; };
}; };
}; };
v1_rooms_bulk_meeting_status: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["BulkStatusRequest"];
};
};
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
[key: string]: components["schemas"]["RoomMeetingStatus"];
};
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_get: { v1_rooms_get: {
parameters: { parameters: {
query?: never; query?: never;

View File

@@ -1,8 +1,22 @@
module.exports = { module.exports = {
preset: "ts-jest", testEnvironment: "jest-environment-jsdom",
testEnvironment: "node",
roots: ["<rootDir>/app"], roots: ["<rootDir>/app"],
testMatch: ["**/__tests__/**/*.test.ts"], testMatch: ["**/__tests__/**/*.test.ts", "**/__tests__/**/*.test.tsx"],
collectCoverage: true, collectCoverage: false,
collectCoverageFrom: ["app/**/*.ts", "!app/**/*.d.ts"], transform: {
"^.+\\.[jt]sx?$": [
"ts-jest",
{
tsconfig: {
jsx: "react-jsx",
module: "esnext",
moduleResolution: "bundler",
esModuleInterop: true,
strict: true,
downlevelIteration: true,
lib: ["dom", "dom.iterable", "esnext"],
},
},
],
},
}; };

View File

@@ -61,9 +61,13 @@
"author": "Andreas <andreas@monadical.com>", "author": "Andreas <andreas@monadical.com>",
"license": "All Rights Reserved", "license": "All Rights Reserved",
"devDependencies": { "devDependencies": {
"@testing-library/dom": "^10.4.1",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/jest": "^30.0.0", "@types/jest": "^30.0.0",
"@types/react": "18.2.20", "@types/react": "18.2.20",
"jest": "^30.1.3", "jest": "^30.1.3",
"jest-environment-jsdom": "^30.2.0",
"openapi-typescript": "^7.9.1", "openapi-typescript": "^7.9.1",
"prettier": "^3.0.0", "prettier": "^3.0.0",
"ts-jest": "^29.4.1" "ts-jest": "^29.4.1"

787
www/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff