Compare commits

..

4 Commits

Author SHA1 Message Date
Igor Loskutov
1aabe4f880 feat: post Hatchet DAG status to Zulip with live updates
Post a rendered DAG status message to a configurable Zulip stream/topic
when a Hatchet workflow is dispatched. Update it after each task
completes (success or failure), and delete it when post_zulip runs
so the final transcript notification replaces it.

On pipeline failure, appends an error banner (:cross_mark:) with the
failing step name and error message.

New settings: ZULIP_DAG_STREAM, ZULIP_DAG_TOPIC, ZULIP_HOST_HEADER.
New module: reflector/hatchet/dag_zulip.py (create/update/delete).
2026-02-09 10:03:31 -05:00
Igor Loskutov
42e7c0b8fd feat: add CLI tool to render Hatchet workflow runs as text DAG
Cherry-picked from feat/hatchet-run-renderer branch.
2026-02-06 23:25:44 -05:00
cd2255cfbc chore(main): release 0.33.0 (#847) 2026-02-06 18:12:06 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
19 changed files with 1400 additions and 864 deletions

View File

@@ -1,5 +1,17 @@
# Changelog # Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03) ## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -0,0 +1,144 @@
"""
Hatchet DAG Status -> Zulip Live Updates.
Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
All functions are fire-and-forget (catch + warning log on failure).
Note: Uses deferred imports throughout for fork-safety,
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
"""
from reflector.logger import logger
from reflector.settings import settings
def _dag_zulip_enabled() -> bool:
return bool(
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
)
async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import send_message_to_zulip # noqa: PLC0415
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
response = await send_message_to_zulip(
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
)
message_id = response.get("id")
if message_id:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def update_dag_zulip_message(
transcript_id: str,
workflow_run_id: str,
error_message: str | None = None,
) -> None:
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).
Args:
error_message: If set, appended as an error banner to the rendered DAG.
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
fresh_db_connection,
)
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import update_zulip_message # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
if error_message:
content += f"\n\n:cross_mark: **{error_message}**"
await update_zulip_message(
transcript.zulip_message_id,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to update DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def delete_dag_zulip_message(transcript_id: str) -> None:
"""Delete DAG Zulip message and clear zulip_message_id.
Called from post_zulip task (already inside fresh_db_connection).
Swallows InvalidMessageError (message already deleted).
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import ( # noqa: PLC0415
InvalidMessageError,
delete_zulip_message,
)
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
try:
await delete_zulip_message(transcript.zulip_message_id)
except InvalidMessageError:
logger.warning(
"[DAG Zulip] Message already deleted",
transcript_id=transcript_id,
zulip_message_id=transcript.zulip_message_id,
)
await transcripts_controller.update(transcript, {"zulip_message_id": None})
except Exception:
logger.warning(
"[DAG Zulip] Failed to delete DAG message",
transcript_id=transcript_id,
exc_info=True,
)

View File

@@ -45,6 +45,7 @@ from reflector.hatchet.constants import (
TIMEOUT_SHORT, TIMEOUT_SHORT,
TaskName, TaskName,
) )
from reflector.hatchet.dag_zulip import update_dag_zulip_message
from reflector.hatchet.workflows.models import ( from reflector.hatchet.workflows.models import (
ActionItemsResult, ActionItemsResult,
ConsentResult, ConsentResult,
@@ -238,7 +239,14 @@ def with_error_handling(
@functools.wraps(func) @functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context) -> R: async def wrapper(input: PipelineInput, ctx: Context) -> R:
try: try:
return await func(input, ctx) result = await func(input, ctx)
try:
await update_dag_zulip_message(
input.transcript_id, ctx.workflow_run_id
)
except Exception:
pass
return result
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[Hatchet] {step_name} failed", f"[Hatchet] {step_name} failed",
@@ -246,6 +254,14 @@ def with_error_handling(
error=str(e), error=str(e),
exc_info=True, exc_info=True,
) )
try:
await update_dag_zulip_message(
input.transcript_id,
ctx.workflow_run_id,
error_message=f"{step_name} failed: {e}",
)
except Exception:
pass
if set_error_status: if set_error_status:
await set_workflow_error_status(input.transcript_id) await set_workflow_error_status(input.transcript_id)
raise raise
@@ -1294,6 +1310,11 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
delete_dag_zulip_message,
)
await delete_dag_zulip_message(input.transcript_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript: if transcript:

View File

@@ -15,14 +15,11 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -181,124 +178,108 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
use_celery = False # Multitrack processing always uses Hatchet (no Celery fallback)
if config.room_id: # First check if we can replay (outside transaction since it's read-only)
room = await rooms_controller.get_by_id(config.room_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
use_celery = room.use_celery if room else False if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
use_hatchet = not use_celery transcript.workflow_run_id
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
if use_hatchet: await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
# First check if we can replay (outside transaction since it's read-only) logger.info(
transcript = await transcripts_controller.get_by_id(config.transcript_id) "Replaying Hatchet workflow",
if transcript and transcript.workflow_run_id and not force: workflow_id=transcript.workflow_run_id,
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
if can_replay: return None
await HatchetClientManager.replay_workflow( else:
transcript.workflow_run_id # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
) # Log and proceed to start new workflow
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): logger.info(
logger.info( "Old workflow not replayable, starting new",
"Concurrent workflow detected, skipping dispatch", old_workflow_id=transcript.workflow_run_id,
workflow_id=transcript.workflow_run_id, old_status=status.value,
) )
return None except NotFoundException:
except ApiException: # Workflow deleted from Hatchet but ID still in DB
# Workflow might be gone (404) or API issue - proceed with new workflow logger.info(
pass "Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow( # Force: cancel old workflow if exists
workflow_name="DiarizationPipeline", if force and transcript and transcript.workflow_run_id:
input_data={ try:
"recording_id": config.recording_id, await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
"tracks": [{"s3_key": k} for k in config.track_keys], logger.info(
"bucket_name": config.bucket_name, "Cancelled old workflow (--force)",
"transcript_id": config.transcript_id, workflow_id=transcript.workflow_run_id,
"room_id": config.room_id, )
}, except NotFoundException:
additional_metadata={ logger.info(
"transcript_id": config.transcript_id, "Old workflow already deleted (--force)",
"recording_id": config.recording_id, workflow_id=transcript.workflow_run_id,
"daily_recording_id": config.recording_id, )
}, await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
) )
if transcript: try:
await transcripts_controller.update( await create_dag_zulip_message(config.transcript_id, workflow_id)
transcript, {"workflow_run_id": workflow_id} except Exception:
) logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=config.transcript_id,
workflow_id=workflow_id,
exc_info=True,
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id) logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -155,12 +155,15 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
ZULIP_HOST_HEADER: str | None = None
# Hatchet workflow orchestration (always enabled for multitrack processing) # Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None HATCHET_CLIENT_TOKEN: str | None = None

View File

@@ -0,0 +1,412 @@
"""
Render Hatchet workflow runs as text DAG.
Usage:
# Show latest 5 runs (summary table)
uv run -m reflector.tools.render_hatchet_run
# Show specific run with full DAG + task details
uv run -m reflector.tools.render_hatchet_run <workflow_run_id>
# Drill into Nth run from the list (1-indexed)
uv run -m reflector.tools.render_hatchet_run --show 1
# Show latest N runs
uv run -m reflector.tools.render_hatchet_run --last 10
# Filter by status
uv run -m reflector.tools.render_hatchet_run --status FAILED
uv run -m reflector.tools.render_hatchet_run --status RUNNING
"""
import argparse
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from hatchet_sdk.clients.rest.models import (
V1TaskEvent,
V1TaskStatus,
V1TaskSummary,
V1WorkflowRunDetails,
WorkflowRunShapeItemForWorkflowRunDetails,
)
from reflector.hatchet.client import HatchetClientManager
STATUS_ICON = {
V1TaskStatus.COMPLETED: "\u2705",
V1TaskStatus.RUNNING: "\u23f3",
V1TaskStatus.FAILED: "\u274c",
V1TaskStatus.QUEUED: "\u23f8\ufe0f",
V1TaskStatus.CANCELLED: "\u26a0\ufe0f",
}
STATUS_LABEL = {
V1TaskStatus.COMPLETED: "Complete",
V1TaskStatus.RUNNING: "Running",
V1TaskStatus.FAILED: "FAILED",
V1TaskStatus.QUEUED: "Queued",
V1TaskStatus.CANCELLED: "Cancelled",
}
def _fmt_time(dt: datetime | None) -> str:
if dt is None:
return "-"
return dt.strftime("%H:%M:%S")
def _fmt_duration(ms: int | None) -> str:
if ms is None:
return "-"
secs = ms / 1000
if secs < 60:
return f"{secs:.1f}s"
mins = secs / 60
return f"{mins:.1f}m"
def _fmt_status_line(task: V1TaskSummary) -> str:
"""Format a status line like: Complete (finished 20:31:44)"""
label = STATUS_LABEL.get(task.status, task.status.value)
icon = STATUS_ICON.get(task.status, "?")
if task.status == V1TaskStatus.COMPLETED and task.finished_at:
return f"{icon} {label} (finished {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.RUNNING and task.started_at:
parts = [f"started {_fmt_time(task.started_at)}"]
if task.duration:
parts.append(f"{_fmt_duration(task.duration)} elapsed")
return f"{icon} {label} ({', '.join(parts)})"
elif task.status == V1TaskStatus.FAILED and task.finished_at:
return f"{icon} {label} (failed {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.CANCELLED:
return f"{icon} {label}"
elif task.status == V1TaskStatus.QUEUED:
return f"{icon} {label}"
return f"{icon} {label}"
def _topo_sort(
shape: list[WorkflowRunShapeItemForWorkflowRunDetails],
) -> list[str]:
"""Topological sort of step_ids from shape DAG."""
step_ids = {s.step_id for s in shape}
children_map: dict[str, list[str]] = {}
in_degree: dict[str, int] = {sid: 0 for sid in step_ids}
for s in shape:
children = [c for c in (s.children_step_ids or []) if c in step_ids]
children_map[s.step_id] = children
for c in children:
in_degree[c] += 1
queue = sorted(sid for sid, deg in in_degree.items() if deg == 0)
result: list[str] = []
while queue:
node = queue.pop(0)
result.append(node)
for c in children_map.get(node, []):
in_degree[c] -= 1
if in_degree[c] == 0:
queue.append(c)
queue.sort()
return result
def render_run_detail(details: V1WorkflowRunDetails) -> str:
"""Render a single workflow run as markdown DAG with task details."""
shape = details.shape or []
tasks = details.tasks or []
events = details.task_events or []
run = details.run
if not shape:
return f"Run {run.metadata.id}: {run.status.value} (no shape data)"
# Build lookups
step_to_shape: dict[str, WorkflowRunShapeItemForWorkflowRunDetails] = {
s.step_id: s for s in shape
}
step_to_name: dict[str, str] = {s.step_id: s.task_name for s in shape}
# Reverse edges (parents)
parents: dict[str, list[str]] = {s.step_id: [] for s in shape}
for s in shape:
for child_id in s.children_step_ids or []:
if child_id in parents:
parents[child_id].append(s.step_id)
# Join tasks by step_id
task_by_step: dict[str, V1TaskSummary] = {}
for t in tasks:
if t.step_id and t.step_id in step_to_name:
task_by_step[t.step_id] = t
# Events indexed by task_external_id
events_by_task: dict[str, list[V1TaskEvent]] = defaultdict(list)
for ev in events:
events_by_task[ev.task_id].append(ev)
ordered = _topo_sort(shape)
lines: list[str] = []
# Run header
run_icon = STATUS_ICON.get(run.status, "?")
run_name = run.display_name or run.workflow_id
dur = _fmt_duration(run.duration)
lines.append(f"**{run_name}** {run_icon} {dur}")
lines.append(f"ID: `{run.metadata.id}`")
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
lines.append(f"Meta: {', '.join(meta_parts)}")
if run.error_message:
# Take first line of error only for header
first_line = run.error_message.split("\n")[0]
lines.append(f"Error: {first_line}")
lines.append("")
# DAG Status Overview table
lines.append("**DAG Status Overview**")
lines.append("")
lines.append("| Node | Status | Duration | Dependencies |")
lines.append("|------|--------|----------|--------------|")
for step_id in ordered:
s = step_to_shape[step_id]
t = task_by_step.get(step_id)
name = step_to_name[step_id]
icon = STATUS_ICON.get(t.status, "?") if t else "?"
dur = _fmt_duration(t.duration) if t else "-"
parent_names = [step_to_name[p] for p in parents[step_id]]
child_names = [
step_to_name[c] for c in (s.children_step_ids or []) if c in step_to_name
]
deps_left = ", ".join(parent_names) if parent_names else ""
deps_right = ", ".join(child_names) if child_names else ""
if deps_left and deps_right:
deps = f"{deps_left} \u2192 {deps_right}"
elif deps_right:
deps = f"\u2192 {deps_right}"
elif deps_left:
deps = f"{deps_left} \u2192"
else:
deps = "-"
lines.append(f"| {name} | {icon} | {dur} | {deps} |")
lines.append("")
lines.append("---")
lines.append("")
# Node details
for step_id in ordered:
t = task_by_step.get(step_id)
name = step_to_name[step_id]
if not t:
lines.append(f"**\U0001f4e6 {name}**")
lines.append("Status: no task data")
lines.append("")
continue
lines.append(f"**\U0001f4e6 {name}**")
lines.append(f"Status: {_fmt_status_line(t)}")
if t.duration:
lines.append(f"Duration: {_fmt_duration(t.duration)}")
if t.retry_count and t.retry_count > 0:
lines.append(f"Retries: {t.retry_count}")
# Fan-out children
if t.num_spawned_children and t.num_spawned_children > 0:
children = t.children or []
completed = sum(1 for c in children if c.status == V1TaskStatus.COMPLETED)
failed = sum(1 for c in children if c.status == V1TaskStatus.FAILED)
running = sum(1 for c in children if c.status == V1TaskStatus.RUNNING)
lines.append(
f"Spawned children: {completed}/{t.num_spawned_children} done"
f"{f', {running} running' if running else ''}"
f"{f', {failed} failed' if failed else ''}"
)
# Error message (first meaningful line only, full trace in events)
if t.error_message:
err_lines = t.error_message.strip().split("\n")
# Find first non-empty, non-traceback line
err_summary = err_lines[0]
for line in err_lines:
stripped = line.strip()
if stripped and not stripped.startswith(
("Traceback", "File ", "{", ")")
):
err_summary = stripped
break
lines.append(f"Error: `{err_summary}`")
# Events log
task_events = sorted(
events_by_task.get(t.task_external_id, []),
key=lambda e: e.timestamp,
)
if task_events:
lines.append("Events:")
for ev in task_events:
ts = ev.timestamp.strftime("%H:%M:%S")
ev_icon = ""
if ev.event_type.value == "FINISHED":
ev_icon = "\u2705 "
elif ev.event_type.value in ("FAILED", "TIMED_OUT"):
ev_icon = "\u274c "
elif ev.event_type.value == "STARTED":
ev_icon = "\u25b6\ufe0f "
elif ev.event_type.value == "RETRYING":
ev_icon = "\U0001f504 "
elif ev.event_type.value == "CANCELLED":
ev_icon = "\u26a0\ufe0f "
msg = ev.message.strip()
if ev.error_message:
# Just first line of error in event log
err_first = ev.error_message.strip().split("\n")[0]
if msg:
msg += f" | {err_first}"
else:
msg = err_first
if msg:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}: {msg}")
else:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}")
lines.append("")
return "\n".join(lines)
def render_run_summary(idx: int, run: V1TaskSummary) -> str:
"""One-line summary for a run in the list view."""
icon = STATUS_ICON.get(run.status, "?")
name = run.display_name or run.workflow_name or "?"
run_id = run.workflow_run_external_id or "?"
dur = _fmt_duration(run.duration)
started = _fmt_time(run.started_at)
meta = ""
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
meta = f" ({', '.join(meta_parts)})"
return (
f" {idx}. {icon} **{name}** started={started} dur={dur}{meta}\n"
f" `{run_id}`"
)
async def _fetch_run_list(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> list[V1TaskSummary]:
client = HatchetClientManager.get_client()
since = datetime.now(timezone.utc) - timedelta(days=7)
runs = await client.runs.aio_list(
since=since,
statuses=statuses,
limit=count,
)
return runs.rows or []
async def list_recent_runs(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""List recent workflow runs as text."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
lines = [f"Recent runs ({len(rows)}):", ""]
for i, run in enumerate(rows, 1):
lines.append(render_run_summary(i, run))
lines.append("")
lines.append("Use `--show N` to see full DAG for run N")
return "\n".join(lines)
async def show_run(workflow_run_id: str) -> str:
"""Fetch and render a single run."""
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
return render_run_detail(details)
async def show_nth_run(
n: int,
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""Fetch list, then drill into Nth run."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
if n < 1 or n > len(rows):
return f"Invalid index {n}. Have {len(rows)} runs (1-{len(rows)})."
run = rows[n - 1]
return await show_run(run.workflow_run_external_id)
async def main_async(args: argparse.Namespace) -> None:
statuses = [V1TaskStatus(args.status)] if args.status else None
if args.run_id:
output = await show_run(args.run_id)
elif args.show is not None:
output = await show_nth_run(args.show, count=args.last, statuses=statuses)
else:
output = await list_recent_runs(count=args.last, statuses=statuses)
print(output)
def main() -> None:
parser = argparse.ArgumentParser(
description="Render Hatchet workflow runs as text DAG"
)
parser.add_argument(
"run_id",
nargs="?",
default=None,
help="Workflow run ID to show in detail. If omitted, lists recent runs.",
)
parser.add_argument(
"--show",
type=int,
default=None,
metavar="N",
help="Show full DAG for the Nth run in the list (1-indexed)",
)
parser.add_argument(
"--last",
type=int,
default=5,
help="Number of recent runs to list (default: 5)",
)
parser.add_argument(
"--status",
choices=["QUEUED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED"],
help="Filter by status",
)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

View File

@@ -129,10 +129,6 @@ class DailyClient(VideoPlatformClient):
"""Get room presence/session data for a Daily.co room.""" """Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name) return await self._api_client.get_room_presence(room_name)
async def delete_room(self, room_name: str) -> None:
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
return await self._api_client.delete_room(room_name)
async def get_meeting_participants( async def get_meeting_participants(
self, meeting_id: str self, meeting_id: str
) -> MeetingParticipantsResponse: ) -> MeetingParticipantsResponse:

View File

@@ -20,7 +20,6 @@ from reflector.services.ics_sync import ics_sync_service
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.url import add_query_param from reflector.utils.url import add_query_param
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import poll_daily_room_presence_task
from reflector.worker.webhook import test_webhook from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -366,53 +365,6 @@ async def rooms_create_meeting(
return meeting return meeting
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
async def rooms_joined_meeting(
room_name: str,
meeting_id: str,
):
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.delay(meeting_id)
return {"status": "ok"}
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
async def rooms_leave_meeting(
room_name: str,
meeting_id: str,
delay_seconds: int = 2,
):
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
"""
room = await rooms_controller.get_by_name(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if meeting.platform == "daily":
poll_daily_room_presence_task.apply_async(
args=[meeting_id],
countdown=delay_seconds,
)
return {"status": "ok"}
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult) @router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook( async def rooms_test_webhook(
room_id: str, room_id: str,

View File

@@ -25,11 +25,9 @@ from reflector.db.transcripts import (
transcripts_controller, transcripts_controller,
) )
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +349,39 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_celery = room and room.use_celery # Multitrack processing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
if use_celery: input_data={
logger.info( "recording_id": recording_id,
"Room uses legacy Celery processing", "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
room_id=room.id, "bucket_name": bucket_name,
transcript_id=transcript.id, "transcript_id": transcript.id,
) "room_id": room.id,
},
if use_hatchet: additional_metadata={
workflow_id = await HatchetClientManager.start_workflow( "transcript_id": transcript.id,
workflow_name="DiarizationPipeline", "recording_id": recording_id,
input_data={ "daily_recording_id": recording_id,
"recording_id": recording_id, },
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
) )
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id,
exc_info=True,
)
@shared_task @shared_task
@@ -845,47 +833,15 @@ async def process_meetings():
end_date = end_date.replace(tzinfo=timezone.utc) end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform) client = create_platform_client(meeting.platform)
has_active_sessions = False room_sessions = await client.get_room_sessions(meeting.room_name)
has_had_sessions = False
if meeting.platform == "daily": has_active_sessions = bool(
try: room_sessions and any(s.ended_at is None for s in room_sessions)
presence = await client.get_room_presence(meeting.room_name) )
has_active_sessions = presence.total_count > 0 has_had_sessions = bool(room_sessions)
logger_.info(
room_sessions = await client.get_room_sessions( f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
meeting.room_name )
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions: if has_active_sessions:
logger_.debug("Meeting still has active sessions, keep it") logger_.debug("Meeting still has active sessions, keep it")
@@ -904,20 +860,7 @@ async def process_meetings():
await meetings_controller.update_meeting( await meetings_controller.update_meeting(
meeting.id, is_active=False meeting.id, is_active=False
) )
logger_.info("Meeting deactivated in database") logger_.info("Meeting is deactivated")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1 processed_count += 1
@@ -1117,66 +1060,53 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_celery = room and room.use_celery # Multitrack reprocessing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery if not transcript:
logger.warning(
if use_hatchet: "No transcript for Hatchet reprocessing, skipping",
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at reprocess dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id, workflow_id=workflow_id,
room_name=meeting.room_name, exc_info=True,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) logger.info(
# Reprocessing uses recording.meeting_id directly instead of time-based matching "Queued Daily recording for Hatchet reprocessing",
recording_start_ts = int(recording.recorded_at.timestamp()) recording_id=recording.id,
workflow_id=workflow_id,
process_multitrack_recording.delay( room_name=meeting.room_name,
bucket_name=bucket_name, track_count=len(recording.track_keys),
daily_room_name=meeting.room_name, )
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -12,9 +12,16 @@ class InvalidMessageError(Exception):
pass pass
def _zulip_client() -> httpx.AsyncClient:
headers = {}
if settings.ZULIP_HOST_HEADER:
headers["Host"] = settings.ZULIP_HOST_HEADER
return httpx.AsyncClient(verify=False, headers=headers)
async def get_zulip_topics(stream_id: int) -> list[dict]: async def get_zulip_topics(stream_id: int) -> list[dict]:
try: try:
async with httpx.AsyncClient() as client: async with _zulip_client() as client:
response = await client.get( response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics", f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY), auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -29,7 +36,7 @@ async def get_zulip_topics(stream_id: int) -> list[dict]:
async def get_zulip_streams() -> list[dict]: async def get_zulip_streams() -> list[dict]:
try: try:
async with httpx.AsyncClient() as client: async with _zulip_client() as client:
response = await client.get( response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/streams", f"https://{settings.ZULIP_REALM}/api/v1/streams",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY), auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -44,7 +51,7 @@ async def get_zulip_streams() -> list[dict]:
async def send_message_to_zulip(stream: str, topic: str, content: str): async def send_message_to_zulip(stream: str, topic: str, content: str):
try: try:
async with httpx.AsyncClient() as client: async with _zulip_client() as client:
response = await client.post( response = await client.post(
f"https://{settings.ZULIP_REALM}/api/v1/messages", f"https://{settings.ZULIP_REALM}/api/v1/messages",
data={ data={
@@ -66,7 +73,7 @@ async def send_message_to_zulip(stream: str, topic: str, content: str):
async def update_zulip_message(message_id: int, stream: str, topic: str, content: str): async def update_zulip_message(message_id: int, stream: str, topic: str, content: str):
try: try:
async with httpx.AsyncClient() as client: async with _zulip_client() as client:
response = await client.patch( response = await client.patch(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}", f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
data={ data={
@@ -90,6 +97,27 @@ async def update_zulip_message(message_id: int, stream: str, topic: str, content
raise Exception(f"Failed to update Zulip message: {error}") raise Exception(f"Failed to update Zulip message: {error}")
async def delete_zulip_message(message_id: int):
try:
async with _zulip_client() as client:
response = await client.delete(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
)
if (
response.status_code == 400
and response.json()["msg"] == "Invalid message(s)"
):
raise InvalidMessageError(f"There is no message with id: {message_id}")
response.raise_for_status()
return response.json()
except httpx.RequestError as error:
raise Exception(f"Failed to delete Zulip message: {error}")
def get_zulip_message(transcript: Transcript, include_topics: bool): def get_zulip_message(transcript: Transcript, include_topics: bool):
transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield

View File

@@ -0,0 +1,536 @@
"""
Tests for Hatchet DAG Status -> Zulip Live Updates.
Tests cover:
- _dag_zulip_enabled() guard logic
- create_dag_zulip_message: sends + stores message ID
- update_dag_zulip_message: updates existing; noop when no message_id
- delete_dag_zulip_message: deletes + clears; handles InvalidMessageError
- delete_zulip_message: sends HTTP DELETE; raises on 400
- with_error_handling integration: calls update after success + failure
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.transcripts import Transcript
@pytest.fixture
def dag_settings():
"""Patch settings for DAG Zulip tests."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
yield mock_settings
@pytest.fixture
def dag_settings_disabled():
"""Patch settings with DAG Zulip disabled."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = None
mock_settings.ZULIP_DAG_TOPIC = None
yield mock_settings
@pytest.fixture
def mock_transcript():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=None,
)
@pytest.fixture
def mock_transcript_with_zulip_id():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=42,
)
class TestDagZulipEnabled:
def test_enabled_when_all_set(self, dag_settings):
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is True
def test_disabled_when_realm_missing(self, dag_settings):
dag_settings.ZULIP_REALM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_stream_missing(self, dag_settings):
dag_settings.ZULIP_DAG_STREAM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_topic_missing(self, dag_settings):
dag_settings.ZULIP_DAG_TOPIC = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestCreateDagZulipMessage:
async def test_sends_and_stores_message_id(self, dag_settings, mock_transcript):
mock_run_details = MagicMock()
rendered_md = "**DAG** rendered"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 99},
) as mock_send,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_update,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_called_once_with("dag-stream", "dag-topic", rendered_md)
mock_update.assert_called_once_with(
mock_transcript, {"zulip_message_id": 99}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
) as mock_send:
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_not_called()
async def test_logs_warning_on_failure(self, dag_settings, mock_transcript):
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value="rendered",
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
side_effect=Exception("Zulip down"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch("reflector.hatchet.dag_zulip.logger") as mock_logger,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=MagicMock())
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
# Should not raise
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_logger.warning.assert_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestUpdateDagZulipMessage:
async def test_updates_existing_message(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_called_once_with(
42, "dag-stream", "dag-topic", rendered_md
)
async def test_appends_error_banner(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message(
"test-transcript-id",
"workflow-run-123",
error_message="get_recording failed: connection timeout",
)
call_args = mock_update.call_args
content = call_args[0][3]
assert rendered_md in content
assert "get_recording failed: connection timeout" in content
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update:
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestDeleteDagZulipMessage:
async def test_deletes_and_clears(
self, dag_settings, mock_transcript_with_zulip_id
):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_called_once_with(42)
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
async def test_handles_invalid_message_error(
self, dag_settings, mock_transcript_with_zulip_id
):
from reflector.zulip import InvalidMessageError
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
side_effect=InvalidMessageError("gone"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
patch("reflector.hatchet.dag_zulip.logger"),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
# Should not raise; should still clear the message_id
await delete_dag_zulip_message("test-transcript-id")
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete:
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
@pytest.mark.asyncio
class TestDeleteZulipMessage:
async def test_sends_delete_request(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"result": "success"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
result = await delete_zulip_message(123)
assert result == {"result": "success"}
mock_client.delete.assert_called_once()
call_args = mock_client.delete.call_args
assert "123" in call_args.args[0]
async def test_raises_invalid_message_on_400(self):
from reflector.zulip import InvalidMessageError
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"msg": "Invalid message(s)"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
with pytest.raises(InvalidMessageError):
await delete_zulip_message(999)
@pytest.mark.asyncio
class TestWithErrorHandlingDagUpdate:
"""Test that with_error_handling calls update_dag_zulip_message."""
async def test_calls_update_on_success(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def fake_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update:
result = await fake_task(input_data, mock_ctx)
assert result == "ok"
mock_update.assert_called_once_with("tid-1", "wfr-123")
async def test_calls_update_on_failure_with_error_message(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def failing_task(input: PipelineInput, ctx) -> str:
raise ValueError("boom")
with (
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.set_workflow_error_status",
new_callable=AsyncMock,
),
):
with pytest.raises(ValueError, match="boom"):
await failing_task(input_data, mock_ctx)
mock_update.assert_called_once_with(
"tid-1", "wfr-123", error_message="get_recording failed: boom"
)
async def test_dag_failure_doesnt_affect_task(self):
"""DAG update failure should not prevent task from succeeding."""
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def ok_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
side_effect=Exception("zulip exploded"),
):
result = await ok_task(input_data, mock_ctx)
assert result == "ok"

View File

@@ -1,286 +0,0 @@
"""Unit tests for Daily.co presence-based meeting deactivation logic.
Tests the fix for split room race condition by verifying:
1. Real-time presence checking via Daily.co API
2. Room deletion when meetings deactivate
"""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.dailyco_api.responses import (
RoomPresenceParticipant,
RoomPresenceResponse,
)
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.video_platforms.daily import DailyClient
@pytest.fixture
async def daily_room_and_meeting():
"""Create test room and meeting for Daily platform."""
room = await rooms_controller.add(
name="test-daily",
user_id="test-user",
platform="daily",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
current_time = datetime.now(timezone.utc)
end_time = current_time + timedelta(hours=2)
meeting = await meetings_controller.create(
id="test-meeting-id",
room_name="test-daily-20260129120000",
room_url="https://daily.co/test",
host_room_url="https://daily.co/test",
start_date=current_time,
end_date=end_time,
room=room,
)
return room, meeting
@pytest.mark.asyncio
async def test_daily_client_has_delete_room_method():
"""Verify DailyClient has delete_room method for cleanup."""
# Create a mock DailyClient
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Verify delete_room method exists
assert hasattr(client, "delete_room")
assert callable(getattr(client, "delete_room"))
@pytest.mark.asyncio
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
"""Test that get_room_presence returns real-time participant data."""
room, meeting = daily_room_and_meeting
# Mock Daily.co API response
mock_presence = RoomPresenceResponse(
total_count=2,
data=[
RoomPresenceParticipant(
room=meeting.room_name,
id="session-1",
userId="user-1",
userName="User One",
joinTime="2026-01-29T12:00:00.000Z",
duration=120,
),
RoomPresenceParticipant(
room=meeting.room_name,
id="session-2",
userId="user-2",
userName="User Two",
joinTime="2026-01-29T12:05:00.000Z",
duration=60,
),
],
)
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock the API client method
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Call get_room_presence
result = await client.get_room_presence(meeting.room_name)
# Verify it calls Daily.co API
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
# Verify result contains real-time data
assert result.total_count == 2
assert len(result.data) == 2
assert result.data[0].id == "session-1"
assert result.data[1].id == "session-2"
@pytest.mark.asyncio
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
room, meeting = daily_room_and_meeting
current_time = datetime.now(timezone.utc)
# Create stale DB session (left_at=NULL but user actually left)
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
await daily_participant_sessions_controller.upsert_joined(
DailyParticipantSession(
id=session_id,
meeting_id=meeting.id,
room_id=room.id,
session_id="stale-daily-session",
user_name="Stale User",
user_id="stale-user",
joined_at=current_time - timedelta(minutes=5),
left_at=None, # Stale - shows active but user left
)
)
# Verify DB shows active session
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
meeting.id
)
assert len(db_sessions) == 1
# But Daily.co API shows room is empty
mock_presence = RoomPresenceResponse(total_count=0, data=[])
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
# Get real-time presence
presence = await client.get_room_presence(meeting.room_name)
# Real-time API shows no participants (truth)
assert presence.total_count == 0
assert len(presence.data) == 0
# DB shows 1 participant (stale)
assert len(db_sessions) == 1
# Implementation should trust presence API, not DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_empty():
"""Test the core deactivation decision logic when presence shows room empty."""
# This tests the logic that will be in process_meetings
# Simulate: DB shows stale active session
has_active_db_sessions = True # DB is stale
# Simulate: Daily.co presence API shows room empty
presence_count = 0 # Real-time truth
# Simulate: Meeting has been used before
has_had_sessions = True
# Decision logic (what process_meetings should do):
# - If presence API available: trust it
# - If presence shows empty AND has_had_sessions: deactivate
if presence_count == 0 and has_had_sessions:
should_deactivate = True
else:
should_deactivate = False
assert should_deactivate is True # Should deactivate despite stale DB
@pytest.mark.asyncio
async def test_meeting_deactivation_logic_with_presence_active():
"""Test that meetings stay active when presence shows participants."""
# Simulate: DB shows no sessions (not yet updated)
has_active_db_sessions = False # DB hasn't caught up
# Simulate: Daily.co presence API shows active participant
presence_count = 1 # Real-time truth
# Decision logic: presence shows activity, keep meeting active
if presence_count > 0:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Should stay active
@pytest.mark.asyncio
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
"""Test that Daily.co room is deleted when meeting deactivates."""
room, meeting = daily_room_and_meeting
with patch("reflector.dailyco_api.client.DailyApiClient"):
from reflector.video_platforms.models import VideoPlatformConfig
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
client = DailyClient(config)
# Mock delete_room API call
client._api_client.delete_room = AsyncMock()
# Simulate deactivation - should delete room
await client._api_client.delete_room(meeting.room_name)
# Verify delete was called
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
@pytest.mark.asyncio
async def test_delete_room_idempotent_on_404():
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
from reflector.dailyco_api.client import DailyApiClient
# Create real client to test delete_room logic
client = DailyApiClient(api_key="test-key")
# Mock the HTTP client
mock_http_client = AsyncMock()
mock_response = AsyncMock()
mock_response.status_code = 404 # Room not found
mock_http_client.delete = AsyncMock(return_value=mock_response)
# Mock _get_client to return our mock
async def mock_get_client():
return mock_http_client
client._get_client = mock_get_client
# delete_room should succeed even on 404 (idempotent)
await client.delete_room("nonexistent-room")
# Verify delete was attempted
mock_http_client.delete.assert_called_once()
@pytest.mark.asyncio
async def test_api_failure_fallback_to_db_sessions():
"""Test that system falls back to DB sessions if Daily.co API fails."""
# Simulate: Daily.co API throws exception
api_exception = Exception("API unavailable")
# Simulate: DB shows active session
has_active_db_sessions = True
# Decision logic with fallback:
try:
presence_count = None
raise api_exception # Simulating API failure
except Exception:
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
if has_active_db_sessions:
should_deactivate = False
else:
should_deactivate = True
assert should_deactivate is False # Conservative: keep active on API failure

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline # Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called() mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline # Daily.co multitrack recordings should use Hatchet workflow
mock_multitrack_pipeline.delay.assert_called_once_with( mock_hatchet.start_workflow.assert_called_once()
transcript_id=transcript.id, call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
bucket_name="daily-bucket", assert call_kwargs["workflow_name"] == "DiarizationPipeline"
track_keys=track_keys, assert call_kwargs["input_data"]["transcript_id"] == transcript.id
) assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()

View File

@@ -24,24 +24,15 @@ import { useAuth } from "../../lib/AuthProvider";
import { useConsentDialog } from "../../lib/consent"; import { useConsentDialog } from "../../lib/consent";
import { import {
useRoomJoinMeeting, useRoomJoinMeeting,
useRoomJoinedMeeting,
useRoomLeaveMeeting,
useMeetingStartRecording, useMeetingStartRecording,
leaveRoomPostUrl,
LeaveRoomBody,
} from "../../lib/apiHooks"; } from "../../lib/apiHooks";
import { omit } from "remeda"; import { omit } from "remeda";
import { import {
assertExists, assertExists,
assertExistsAndNonEmptyString,
NonEmptyString, NonEmptyString,
parseNonEmptyString, parseNonEmptyString,
} from "../../lib/utils"; } from "../../lib/utils";
import { import { assertMeetingId, DailyRecordingType } from "../../lib/types";
assertMeetingId,
DailyRecordingType,
MeetingId,
} from "../../lib/types";
import { useUuidV5 } from "react-uuid-hook"; import { useUuidV5 } from "react-uuid-hook";
const CONSENT_BUTTON_ID = "recording-consent"; const CONSENT_BUTTON_ID = "recording-consent";
@@ -188,58 +179,6 @@ const useFrame = (
] as const; ] as const;
}; };
const leaveDaily = () => {
const frame = DailyIframe.getCallInstance();
frame?.leave();
};
const useDirtyDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
) => {
useEffect(() => {
if (!meetingId || !roomName) return;
const handleBeforeUnload = () => {
leaveDaily();
navigator.sendBeacon(
leaveRoomPostUrl(
{
room_name: roomName,
meeting_id: meetingId,
},
{
delay_seconds: 5,
},
),
undefined satisfies LeaveRoomBody,
);
};
window.addEventListener("beforeunload", handleBeforeUnload);
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
}, [meetingId, roomName]);
};
const useDisconnects = (
meetingId: NonEmptyString,
roomName: NonEmptyString,
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
) => {
useDirtyDisconnects(meetingId, roomName);
useEffect(() => {
return () => {
leaveDaily();
leaveMutation.mutate({
params: {
path: { meeting_id: meetingId, room_name: roomName },
query: { delay_seconds: 5 },
},
});
};
}, [meetingId, roomName]);
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) { export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter(); const router = useRouter();
const params = useParams(); const params = useParams();
@@ -247,8 +186,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const authLastUserId = auth.lastUserId; const authLastUserId = auth.lastUserId;
const [container, setContainer] = useState<HTMLDivElement | null>(null); const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting(); const joinMutation = useRoomJoinMeeting();
const joinedMutation = useRoomJoinedMeeting();
const leaveMutation = useRoomLeaveMeeting();
const startRecordingMutation = useMeetingStartRecording(); const startRecordingMutation = useMeetingStartRecording();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null); const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
@@ -258,9 +195,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0], useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
); );
if (typeof params.roomName === "object") const roomName = params?.roomName as string;
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
const roomName = assertExistsAndNonEmptyString(params.roomName);
const { const {
showConsentModal, showConsentModal,
@@ -302,8 +237,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
router.push("/browse"); router.push("/browse");
}, [router]); }, [router]);
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
const handleCustomButtonClick = useCallback( const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => { (ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) { if (ev.button_id === CONSENT_BUTTON_ID) {
@@ -316,15 +249,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
); );
const handleFrameJoinMeeting = useCallback(() => { const handleFrameJoinMeeting = useCallback(() => {
joinedMutation.mutate({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
if (meeting.recording_type === "cloud") { if (meeting.recording_type === "cloud") {
console.log("Starting dual recording via REST API", { console.log("Starting dual recording via REST API", {
cloudInstanceId, cloudInstanceId,
@@ -384,10 +308,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
startRecordingWithRetry("raw-tracks", rawTracksInstanceId); startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
} }
}, [ }, [
joinedMutation,
roomName,
meeting.id,
meeting.recording_type, meeting.recording_type,
meeting.id,
startRecordingMutation, startRecordingMutation,
cloudInstanceId, cloudInstanceId,
rawTracksInstanceId, rawTracksInstanceId,

View File

@@ -1,13 +1,12 @@
"use client"; "use client";
import { $api, API_URL } from "./apiClient"; import { $api } from "./apiClient";
import { useError } from "../(errors)/errorContext"; import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query"; import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components, operations } from "../reflector-api"; import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider"; import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types"; import { MeetingId } from "./types";
import { NonEmptyString } from "./utils"; import { NonEmptyString } from "./utils";
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
/* /*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other * XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -808,44 +807,6 @@ export function useRoomJoinMeeting() {
); );
} }
export const LEAVE_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
export const leaveRoomPostUrl = (
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
): string =>
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path, query },
querySerializer: createQuerySerializer(),
});
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
export function useRoomLeaveMeeting() {
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
}
export const JOINED_ROOM_POST_URL_TEMPLATE =
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
export const joinedRoomPostUrl = (
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
): string =>
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
baseUrl: API_URL,
params: { path: params },
querySerializer: () => "",
});
export type JoinedRoomBody =
operations["v1_rooms_joined_meeting"]["requestBody"];
export function useRoomJoinedMeeting() {
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
}
export function useRoomIcsSync() { export function useRoomIcsSync() {
const { setError } = useError(); const { setError } = useError();

View File

@@ -171,48 +171,6 @@ export interface paths {
patch?: never; patch?: never;
trace?: never; trace?: never;
}; };
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Joined Meeting
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
*/
post: operations["v1_rooms_joined_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
/**
* Rooms Leave Meeting
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
*
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
*/
post: operations["v1_rooms_leave_meeting"];
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/rooms/{room_id}/webhook/test": { "/v1/rooms/{room_id}/webhook/test": {
parameters: { parameters: {
query?: never; query?: never;
@@ -2477,72 +2435,6 @@ export interface operations {
}; };
}; };
}; };
v1_rooms_joined_meeting: {
parameters: {
query?: never;
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_leave_meeting: {
parameters: {
query?: {
delay_seconds?: number;
};
header?: never;
path: {
room_name: string;
meeting_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
v1_rooms_test_webhook: { v1_rooms_test_webhook: {
parameters: { parameters: {
query?: never; query?: never;