Compare commits

...

6 Commits

Author SHA1 Message Date
Igor Loskutov
1aabe4f880 feat: post Hatchet DAG status to Zulip with live updates
Post a rendered DAG status message to a configurable Zulip stream/topic
when a Hatchet workflow is dispatched. Update it after each task
completes (success or failure), and delete it when post_zulip runs
so the final transcript notification replaces it.

On pipeline failure, appends an error banner (:cross_mark:) with the
failing step name and error message.

New settings: ZULIP_DAG_STREAM, ZULIP_DAG_TOPIC, ZULIP_HOST_HEADER.
New module: reflector/hatchet/dag_zulip.py (create/update/delete).
2026-02-09 10:03:31 -05:00
Igor Loskutov
42e7c0b8fd feat: add CLI tool to render Hatchet workflow runs as text DAG
Cherry-picked from feat/hatchet-run-renderer branch.
2026-02-06 23:25:44 -05:00
cd2255cfbc chore(main): release 0.33.0 (#847) 2026-02-06 18:12:06 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
1ce1c7a910 fix: websocket tests (#825)
* fix websocket tests

* fix: restore timeout and fix celery test infrastructure

- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin?)
- Use Redis for Celery tests (memory:// broker doesn't support chords)
- Add timeout param to in-memory subscriber mock
- Remove duplicate celery_includes fixture from rtc_ws tests

* fix: remove redundant inline imports in test files

* fix: update gitleaks ignore for moved s3_key line

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 14:23:31 -05:00
Rémi Pauchet
984795357e - fix nvidia repo blocked by apt (sha1) (#845)
- use build cache for apt and uv
- limit concurency for uv to prevent crashes with too many cores
2026-02-05 13:59:34 -05:00
19 changed files with 1455 additions and 277 deletions

View File

@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,17 @@
# Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)

View File

@@ -4,27 +4,31 @@ ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \
UV_NO_CACHE=1
# patch until nvidia updates the sha1 repo
ADD sequoia.config /etc/crypto-policies/back-ends/sequoia.config
WORKDIR /tmp
RUN apt-get update \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update \
&& apt-get install -y \
ffmpeg \
curl \
ca-certificates \
gnupg \
wget \
&& apt-get clean
wget
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \
libcublas-12-6 \
libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
libcudnn9-dev-cuda-12
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH"
@@ -39,6 +43,13 @@ COPY ./app /app/app
COPY ./main.py /app/
COPY ./runserver.sh /app/
# prevent uv failing with too many open files on big cpus
ENV UV_CONCURRENT_INSTALLS=16
# first install
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --compile-bytecode --locked
EXPOSE 8000
CMD ["sh", "/app/runserver.sh"]

View File

@@ -0,0 +1,2 @@
[hash_algorithms]
sha1 = "always"

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column(
"skip_consent",
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False

View File

@@ -0,0 +1,144 @@
"""
Hatchet DAG Status -> Zulip Live Updates.
Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
All functions are fire-and-forget (catch + warning log on failure).
Note: Uses deferred imports throughout for fork-safety,
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
"""
from reflector.logger import logger
from reflector.settings import settings
def _dag_zulip_enabled() -> bool:
return bool(
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
)
async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import send_message_to_zulip # noqa: PLC0415
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
response = await send_message_to_zulip(
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
)
message_id = response.get("id")
if message_id:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def update_dag_zulip_message(
transcript_id: str,
workflow_run_id: str,
error_message: str | None = None,
) -> None:
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).
Args:
error_message: If set, appended as an error banner to the rendered DAG.
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
fresh_db_connection,
)
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import update_zulip_message # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)
if error_message:
content += f"\n\n:cross_mark: **{error_message}**"
await update_zulip_message(
transcript.zulip_message_id,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to update DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)
async def delete_dag_zulip_message(transcript_id: str) -> None:
"""Delete DAG Zulip message and clear zulip_message_id.
Called from post_zulip task (already inside fresh_db_connection).
Swallows InvalidMessageError (message already deleted).
"""
if not _dag_zulip_enabled():
return
try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import ( # noqa: PLC0415
InvalidMessageError,
delete_zulip_message,
)
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return
try:
await delete_zulip_message(transcript.zulip_message_id)
except InvalidMessageError:
logger.warning(
"[DAG Zulip] Message already deleted",
transcript_id=transcript_id,
zulip_message_id=transcript.zulip_message_id,
)
await transcripts_controller.update(transcript, {"zulip_message_id": None})
except Exception:
logger.warning(
"[DAG Zulip] Failed to delete DAG message",
transcript_id=transcript_id,
exc_info=True,
)

View File

@@ -45,6 +45,7 @@ from reflector.hatchet.constants import (
TIMEOUT_SHORT,
TaskName,
)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult,
@@ -238,7 +239,14 @@ def with_error_handling(
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context) -> R:
try:
return await func(input, ctx)
result = await func(input, ctx)
try:
await update_dag_zulip_message(
input.transcript_id, ctx.workflow_run_id
)
except Exception:
pass
return result
except Exception as e:
logger.error(
f"[Hatchet] {step_name} failed",
@@ -246,6 +254,14 @@ def with_error_handling(
error=str(e),
exc_info=True,
)
try:
await update_dag_zulip_message(
input.transcript_id,
ctx.workflow_run_id,
error_message=f"{step_name} failed: {e}",
)
except Exception:
pass
if set_error_status:
await set_workflow_error_status(input.transcript_id)
raise
@@ -1294,6 +1310,11 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
delete_dag_zulip_message,
)
await delete_dag_zulip_message(input.transcript_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:

View File

@@ -15,14 +15,11 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString
@@ -181,124 +178,108 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
use_celery = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
use_celery = room.use_celery if room else False
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
# Multitrack processing always uses Hatchet (no Celery fallback)
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
if can_replay:
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(config.transcript_id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=config.transcript_id,
workflow_id=workflow_id,
exc_info=True,
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -155,12 +155,15 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
ZULIP_HOST_HEADER: str | None = None
# Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None

View File

@@ -0,0 +1,412 @@
"""
Render Hatchet workflow runs as text DAG.
Usage:
# Show latest 5 runs (summary table)
uv run -m reflector.tools.render_hatchet_run
# Show specific run with full DAG + task details
uv run -m reflector.tools.render_hatchet_run <workflow_run_id>
# Drill into Nth run from the list (1-indexed)
uv run -m reflector.tools.render_hatchet_run --show 1
# Show latest N runs
uv run -m reflector.tools.render_hatchet_run --last 10
# Filter by status
uv run -m reflector.tools.render_hatchet_run --status FAILED
uv run -m reflector.tools.render_hatchet_run --status RUNNING
"""
import argparse
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from hatchet_sdk.clients.rest.models import (
V1TaskEvent,
V1TaskStatus,
V1TaskSummary,
V1WorkflowRunDetails,
WorkflowRunShapeItemForWorkflowRunDetails,
)
from reflector.hatchet.client import HatchetClientManager
STATUS_ICON = {
V1TaskStatus.COMPLETED: "\u2705",
V1TaskStatus.RUNNING: "\u23f3",
V1TaskStatus.FAILED: "\u274c",
V1TaskStatus.QUEUED: "\u23f8\ufe0f",
V1TaskStatus.CANCELLED: "\u26a0\ufe0f",
}
STATUS_LABEL = {
V1TaskStatus.COMPLETED: "Complete",
V1TaskStatus.RUNNING: "Running",
V1TaskStatus.FAILED: "FAILED",
V1TaskStatus.QUEUED: "Queued",
V1TaskStatus.CANCELLED: "Cancelled",
}
def _fmt_time(dt: datetime | None) -> str:
if dt is None:
return "-"
return dt.strftime("%H:%M:%S")
def _fmt_duration(ms: int | None) -> str:
if ms is None:
return "-"
secs = ms / 1000
if secs < 60:
return f"{secs:.1f}s"
mins = secs / 60
return f"{mins:.1f}m"
def _fmt_status_line(task: V1TaskSummary) -> str:
"""Format a status line like: Complete (finished 20:31:44)"""
label = STATUS_LABEL.get(task.status, task.status.value)
icon = STATUS_ICON.get(task.status, "?")
if task.status == V1TaskStatus.COMPLETED and task.finished_at:
return f"{icon} {label} (finished {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.RUNNING and task.started_at:
parts = [f"started {_fmt_time(task.started_at)}"]
if task.duration:
parts.append(f"{_fmt_duration(task.duration)} elapsed")
return f"{icon} {label} ({', '.join(parts)})"
elif task.status == V1TaskStatus.FAILED and task.finished_at:
return f"{icon} {label} (failed {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.CANCELLED:
return f"{icon} {label}"
elif task.status == V1TaskStatus.QUEUED:
return f"{icon} {label}"
return f"{icon} {label}"
def _topo_sort(
shape: list[WorkflowRunShapeItemForWorkflowRunDetails],
) -> list[str]:
"""Topological sort of step_ids from shape DAG."""
step_ids = {s.step_id for s in shape}
children_map: dict[str, list[str]] = {}
in_degree: dict[str, int] = {sid: 0 for sid in step_ids}
for s in shape:
children = [c for c in (s.children_step_ids or []) if c in step_ids]
children_map[s.step_id] = children
for c in children:
in_degree[c] += 1
queue = sorted(sid for sid, deg in in_degree.items() if deg == 0)
result: list[str] = []
while queue:
node = queue.pop(0)
result.append(node)
for c in children_map.get(node, []):
in_degree[c] -= 1
if in_degree[c] == 0:
queue.append(c)
queue.sort()
return result
def render_run_detail(details: V1WorkflowRunDetails) -> str:
"""Render a single workflow run as markdown DAG with task details."""
shape = details.shape or []
tasks = details.tasks or []
events = details.task_events or []
run = details.run
if not shape:
return f"Run {run.metadata.id}: {run.status.value} (no shape data)"
# Build lookups
step_to_shape: dict[str, WorkflowRunShapeItemForWorkflowRunDetails] = {
s.step_id: s for s in shape
}
step_to_name: dict[str, str] = {s.step_id: s.task_name for s in shape}
# Reverse edges (parents)
parents: dict[str, list[str]] = {s.step_id: [] for s in shape}
for s in shape:
for child_id in s.children_step_ids or []:
if child_id in parents:
parents[child_id].append(s.step_id)
# Join tasks by step_id
task_by_step: dict[str, V1TaskSummary] = {}
for t in tasks:
if t.step_id and t.step_id in step_to_name:
task_by_step[t.step_id] = t
# Events indexed by task_external_id
events_by_task: dict[str, list[V1TaskEvent]] = defaultdict(list)
for ev in events:
events_by_task[ev.task_id].append(ev)
ordered = _topo_sort(shape)
lines: list[str] = []
# Run header
run_icon = STATUS_ICON.get(run.status, "?")
run_name = run.display_name or run.workflow_id
dur = _fmt_duration(run.duration)
lines.append(f"**{run_name}** {run_icon} {dur}")
lines.append(f"ID: `{run.metadata.id}`")
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
lines.append(f"Meta: {', '.join(meta_parts)}")
if run.error_message:
# Take first line of error only for header
first_line = run.error_message.split("\n")[0]
lines.append(f"Error: {first_line}")
lines.append("")
# DAG Status Overview table
lines.append("**DAG Status Overview**")
lines.append("")
lines.append("| Node | Status | Duration | Dependencies |")
lines.append("|------|--------|----------|--------------|")
for step_id in ordered:
s = step_to_shape[step_id]
t = task_by_step.get(step_id)
name = step_to_name[step_id]
icon = STATUS_ICON.get(t.status, "?") if t else "?"
dur = _fmt_duration(t.duration) if t else "-"
parent_names = [step_to_name[p] for p in parents[step_id]]
child_names = [
step_to_name[c] for c in (s.children_step_ids or []) if c in step_to_name
]
deps_left = ", ".join(parent_names) if parent_names else ""
deps_right = ", ".join(child_names) if child_names else ""
if deps_left and deps_right:
deps = f"{deps_left} \u2192 {deps_right}"
elif deps_right:
deps = f"\u2192 {deps_right}"
elif deps_left:
deps = f"{deps_left} \u2192"
else:
deps = "-"
lines.append(f"| {name} | {icon} | {dur} | {deps} |")
lines.append("")
lines.append("---")
lines.append("")
# Node details
for step_id in ordered:
t = task_by_step.get(step_id)
name = step_to_name[step_id]
if not t:
lines.append(f"**\U0001f4e6 {name}**")
lines.append("Status: no task data")
lines.append("")
continue
lines.append(f"**\U0001f4e6 {name}**")
lines.append(f"Status: {_fmt_status_line(t)}")
if t.duration:
lines.append(f"Duration: {_fmt_duration(t.duration)}")
if t.retry_count and t.retry_count > 0:
lines.append(f"Retries: {t.retry_count}")
# Fan-out children
if t.num_spawned_children and t.num_spawned_children > 0:
children = t.children or []
completed = sum(1 for c in children if c.status == V1TaskStatus.COMPLETED)
failed = sum(1 for c in children if c.status == V1TaskStatus.FAILED)
running = sum(1 for c in children if c.status == V1TaskStatus.RUNNING)
lines.append(
f"Spawned children: {completed}/{t.num_spawned_children} done"
f"{f', {running} running' if running else ''}"
f"{f', {failed} failed' if failed else ''}"
)
# Error message (first meaningful line only, full trace in events)
if t.error_message:
err_lines = t.error_message.strip().split("\n")
# Find first non-empty, non-traceback line
err_summary = err_lines[0]
for line in err_lines:
stripped = line.strip()
if stripped and not stripped.startswith(
("Traceback", "File ", "{", ")")
):
err_summary = stripped
break
lines.append(f"Error: `{err_summary}`")
# Events log
task_events = sorted(
events_by_task.get(t.task_external_id, []),
key=lambda e: e.timestamp,
)
if task_events:
lines.append("Events:")
for ev in task_events:
ts = ev.timestamp.strftime("%H:%M:%S")
ev_icon = ""
if ev.event_type.value == "FINISHED":
ev_icon = "\u2705 "
elif ev.event_type.value in ("FAILED", "TIMED_OUT"):
ev_icon = "\u274c "
elif ev.event_type.value == "STARTED":
ev_icon = "\u25b6\ufe0f "
elif ev.event_type.value == "RETRYING":
ev_icon = "\U0001f504 "
elif ev.event_type.value == "CANCELLED":
ev_icon = "\u26a0\ufe0f "
msg = ev.message.strip()
if ev.error_message:
# Just first line of error in event log
err_first = ev.error_message.strip().split("\n")[0]
if msg:
msg += f" | {err_first}"
else:
msg = err_first
if msg:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}: {msg}")
else:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}")
lines.append("")
return "\n".join(lines)
def render_run_summary(idx: int, run: V1TaskSummary) -> str:
"""One-line summary for a run in the list view."""
icon = STATUS_ICON.get(run.status, "?")
name = run.display_name or run.workflow_name or "?"
run_id = run.workflow_run_external_id or "?"
dur = _fmt_duration(run.duration)
started = _fmt_time(run.started_at)
meta = ""
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
meta = f" ({', '.join(meta_parts)})"
return (
f" {idx}. {icon} **{name}** started={started} dur={dur}{meta}\n"
f" `{run_id}`"
)
async def _fetch_run_list(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> list[V1TaskSummary]:
client = HatchetClientManager.get_client()
since = datetime.now(timezone.utc) - timedelta(days=7)
runs = await client.runs.aio_list(
since=since,
statuses=statuses,
limit=count,
)
return runs.rows or []
async def list_recent_runs(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""List recent workflow runs as text."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
lines = [f"Recent runs ({len(rows)}):", ""]
for i, run in enumerate(rows, 1):
lines.append(render_run_summary(i, run))
lines.append("")
lines.append("Use `--show N` to see full DAG for run N")
return "\n".join(lines)
async def show_run(workflow_run_id: str) -> str:
"""Fetch and render a single run."""
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
return render_run_detail(details)
async def show_nth_run(
n: int,
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""Fetch list, then drill into Nth run."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
if n < 1 or n > len(rows):
return f"Invalid index {n}. Have {len(rows)} runs (1-{len(rows)})."
run = rows[n - 1]
return await show_run(run.workflow_run_external_id)
async def main_async(args: argparse.Namespace) -> None:
statuses = [V1TaskStatus(args.status)] if args.status else None
if args.run_id:
output = await show_run(args.run_id)
elif args.show is not None:
output = await show_nth_run(args.show, count=args.last, statuses=statuses)
else:
output = await list_recent_runs(count=args.last, statuses=statuses)
print(output)
def main() -> None:
parser = argparse.ArgumentParser(
description="Render Hatchet workflow runs as text DAG"
)
parser.add_argument(
"run_id",
nargs="?",
default=None,
help="Workflow run ID to show in detail. If omitted, lists recent runs.",
)
parser.add_argument(
"--show",
type=int,
default=None,
metavar="N",
help="Show full DAG for the Nth run in the list (1-indexed)",
)
parser.add_argument(
"--last",
type=int,
default=5,
help="Number of recent runs to list (default: 5)",
)
parser.add_argument(
"--status",
choices=["QUEUED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED"],
help="Filter by status",
)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

View File

@@ -25,11 +25,9 @@ from reflector.db.transcripts import (
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +349,39 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
# Multitrack processing always uses Hatchet (no Celery fallback)
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id,
exc_info=True,
)
@shared_task
@@ -1072,66 +1060,53 @@ async def reprocess_failed_daily_recordings():
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
try:
await create_dag_zulip_message(transcript.id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at reprocess dispatch",
transcript_id=transcript.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
exc_info=True,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
reprocessed_count += 1

View File

@@ -11,7 +11,6 @@ broadcast messages to all connected websockets.
import asyncio
import json
import threading
import redis.asyncio as redis
from fastapi import WebSocket
@@ -98,6 +97,7 @@ class WebsocketManager:
async def _pubsub_data_reader(self, pubsub_subscriber):
while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message(
ignore_subscribe_messages=True
)
@@ -109,29 +109,38 @@ class WebsocketManager:
await socket.send_json(data)
# Process-global singleton to ensure only one WebsocketManager instance exists.
# Multiple instances would cause resource leaks and CPU issues.
_ws_manager: WebsocketManager | None = None
def get_ws_manager() -> WebsocketManager:
"""
Returns the WebsocketManager instance for managing websockets.
Returns the global WebsocketManager singleton.
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
Creates instance on first call, subsequent calls return cached instance.
Thread-safe via GIL. Concurrent initialization may create duplicate
instances but last write wins (acceptable for this use case).
Returns:
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
WebsocketManager: The global WebsocketManager instance.
"""
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
global _ws_manager
if _ws_manager is not None:
return _ws_manager
# No lock needed - GIL makes this safe enough
# Worst case: race creates two instances, last assignment wins
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager
_ws_manager = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager
def reset_ws_manager() -> None:
"""Reset singleton for testing. DO NOT use in production."""
global _ws_manager
_ws_manager = None

View File

@@ -12,9 +12,16 @@ class InvalidMessageError(Exception):
pass
def _zulip_client() -> httpx.AsyncClient:
headers = {}
if settings.ZULIP_HOST_HEADER:
headers["Host"] = settings.ZULIP_HOST_HEADER
return httpx.AsyncClient(verify=False, headers=headers)
async def get_zulip_topics(stream_id: int) -> list[dict]:
try:
async with httpx.AsyncClient() as client:
async with _zulip_client() as client:
response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/users/me/{stream_id}/topics",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -29,7 +36,7 @@ async def get_zulip_topics(stream_id: int) -> list[dict]:
async def get_zulip_streams() -> list[dict]:
try:
async with httpx.AsyncClient() as client:
async with _zulip_client() as client:
response = await client.get(
f"https://{settings.ZULIP_REALM}/api/v1/streams",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
@@ -44,7 +51,7 @@ async def get_zulip_streams() -> list[dict]:
async def send_message_to_zulip(stream: str, topic: str, content: str):
try:
async with httpx.AsyncClient() as client:
async with _zulip_client() as client:
response = await client.post(
f"https://{settings.ZULIP_REALM}/api/v1/messages",
data={
@@ -66,7 +73,7 @@ async def send_message_to_zulip(stream: str, topic: str, content: str):
async def update_zulip_message(message_id: int, stream: str, topic: str, content: str):
try:
async with httpx.AsyncClient() as client:
async with _zulip_client() as client:
response = await client.patch(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
data={
@@ -90,6 +97,27 @@ async def update_zulip_message(message_id: int, stream: str, topic: str, content
raise Exception(f"Failed to update Zulip message: {error}")
async def delete_zulip_message(message_id: int):
try:
async with _zulip_client() as client:
response = await client.delete(
f"https://{settings.ZULIP_REALM}/api/v1/messages/{message_id}",
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
)
if (
response.status_code == 400
and response.json()["msg"] == "Invalid message(s)"
):
raise InvalidMessageError(f"There is no message with id: {message_id}")
response.raise_for_status()
return response.json()
except httpx.RequestError as error:
raise Exception(f"Failed to delete Zulip message: {error}")
def get_zulip_message(transcript: Transcript, include_topics: bool):
transcript_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"

View File

@@ -1,11 +1,10 @@
import os
from contextlib import asynccontextmanager
from tempfile import NamedTemporaryFile
from unittest.mock import patch
import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True)
@@ -15,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield
@@ -333,11 +333,14 @@ def celery_enable_logging():
@pytest.fixture(scope="session")
def celery_config():
with NamedTemporaryFile() as f:
yield {
"broker_url": "memory://",
"result_backend": f"db+sqlite:///{f.name}",
}
redis_host = os.environ.get("REDIS_HOST", "localhost")
redis_port = os.environ.get("REDIS_PORT", "6379")
# Use db 2 to avoid conflicts with main app
redis_url = f"redis://{redis_host}:{redis_port}/2"
yield {
"broker_url": redis_url,
"result_backend": redis_url,
}
@pytest.fixture(scope="session")
@@ -370,9 +373,12 @@ async def ws_manager_in_memory(monkeypatch):
def __init__(self, queue: asyncio.Queue):
self.queue = queue
async def get_message(self, ignore_subscribe_messages: bool = True):
async def get_message(
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
):
wait_timeout = timeout if timeout is not None else 0.05
try:
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
except Exception:
return None

View File

@@ -0,0 +1,536 @@
"""
Tests for Hatchet DAG Status -> Zulip Live Updates.
Tests cover:
- _dag_zulip_enabled() guard logic
- create_dag_zulip_message: sends + stores message ID
- update_dag_zulip_message: updates existing; noop when no message_id
- delete_dag_zulip_message: deletes + clears; handles InvalidMessageError
- delete_zulip_message: sends HTTP DELETE; raises on 400
- with_error_handling integration: calls update after success + failure
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.transcripts import Transcript
@pytest.fixture
def dag_settings():
"""Patch settings for DAG Zulip tests."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
yield mock_settings
@pytest.fixture
def dag_settings_disabled():
"""Patch settings with DAG Zulip disabled."""
with patch("reflector.hatchet.dag_zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_DAG_STREAM = None
mock_settings.ZULIP_DAG_TOPIC = None
yield mock_settings
@pytest.fixture
def mock_transcript():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=None,
)
@pytest.fixture
def mock_transcript_with_zulip_id():
return Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
zulip_message_id=42,
)
class TestDagZulipEnabled:
def test_enabled_when_all_set(self, dag_settings):
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is True
def test_disabled_when_realm_missing(self, dag_settings):
dag_settings.ZULIP_REALM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_stream_missing(self, dag_settings):
dag_settings.ZULIP_DAG_STREAM = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
def test_disabled_when_topic_missing(self, dag_settings):
dag_settings.ZULIP_DAG_TOPIC = None
from reflector.hatchet.dag_zulip import _dag_zulip_enabled
assert _dag_zulip_enabled() is False
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestCreateDagZulipMessage:
async def test_sends_and_stores_message_id(self, dag_settings, mock_transcript):
mock_run_details = MagicMock()
rendered_md = "**DAG** rendered"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 99},
) as mock_send,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_update,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_called_once_with("dag-stream", "dag-topic", rendered_md)
mock_update.assert_called_once_with(
mock_transcript, {"zulip_message_id": 99}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
) as mock_send:
from reflector.hatchet.dag_zulip import create_dag_zulip_message
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_send.assert_not_called()
async def test_logs_warning_on_failure(self, dag_settings, mock_transcript):
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value="rendered",
),
patch(
"reflector.zulip.send_message_to_zulip",
new_callable=AsyncMock,
side_effect=Exception("Zulip down"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch("reflector.hatchet.dag_zulip.logger") as mock_logger,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=MagicMock())
mock_get_client.return_value = mock_client
from reflector.hatchet.dag_zulip import create_dag_zulip_message
# Should not raise
await create_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_logger.warning.assert_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestUpdateDagZulipMessage:
async def test_updates_existing_message(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_called_once_with(
42, "dag-stream", "dag-topic", rendered_md
)
async def test_appends_error_banner(
self, dag_settings, mock_transcript_with_zulip_id
):
mock_run_details = MagicMock()
rendered_md = "**DAG** updated"
with (
patch(
"reflector.hatchet.client.HatchetClientManager.get_client"
) as mock_get_client,
patch(
"reflector.tools.render_hatchet_run.render_run_detail",
return_value=rendered_md,
),
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_client = MagicMock()
mock_client.runs.aio_get = AsyncMock(return_value=mock_run_details)
mock_get_client.return_value = mock_client
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message(
"test-transcript-id",
"workflow-run-123",
error_message="get_recording failed: connection timeout",
)
call_args = mock_update.call_args
content = call_args[0][3]
assert rendered_md in content
assert "get_recording failed: connection timeout" in content
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.fresh_db_connection"
) as mock_fresh_db,
):
mock_fresh_db.return_value.__aenter__ = AsyncMock()
mock_fresh_db.return_value.__aexit__ = AsyncMock(return_value=False)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.update_zulip_message",
new_callable=AsyncMock,
) as mock_update:
from reflector.hatchet.dag_zulip import update_dag_zulip_message
await update_dag_zulip_message("test-transcript-id", "workflow-run-123")
mock_update.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
class TestDeleteDagZulipMessage:
async def test_deletes_and_clears(
self, dag_settings, mock_transcript_with_zulip_id
):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_called_once_with(42)
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_no_message_id(self, dag_settings, mock_transcript):
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete,
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript,
),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
async def test_handles_invalid_message_error(
self, dag_settings, mock_transcript_with_zulip_id
):
from reflector.zulip import InvalidMessageError
with (
patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
side_effect=InvalidMessageError("gone"),
),
patch(
"reflector.db.transcripts.transcripts_controller.get_by_id",
new_callable=AsyncMock,
return_value=mock_transcript_with_zulip_id,
),
patch(
"reflector.db.transcripts.transcripts_controller.update",
new_callable=AsyncMock,
) as mock_tc_update,
patch("reflector.hatchet.dag_zulip.logger"),
):
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
# Should not raise; should still clear the message_id
await delete_dag_zulip_message("test-transcript-id")
mock_tc_update.assert_called_once_with(
mock_transcript_with_zulip_id, {"zulip_message_id": None}
)
async def test_noop_when_disabled(self, dag_settings_disabled):
with patch(
"reflector.zulip.delete_zulip_message",
new_callable=AsyncMock,
) as mock_delete:
from reflector.hatchet.dag_zulip import delete_dag_zulip_message
await delete_dag_zulip_message("test-transcript-id")
mock_delete.assert_not_called()
@pytest.mark.asyncio
class TestDeleteZulipMessage:
async def test_sends_delete_request(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = {"result": "success"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
result = await delete_zulip_message(123)
assert result == {"result": "success"}
mock_client.delete.assert_called_once()
call_args = mock_client.delete.call_args
assert "123" in call_args.args[0]
async def test_raises_invalid_message_on_400(self):
from reflector.zulip import InvalidMessageError
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"msg": "Invalid message(s)"}
mock_client = AsyncMock()
mock_client.delete = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("reflector.zulip.httpx.AsyncClient", return_value=mock_client):
with patch("reflector.zulip.settings") as mock_settings:
mock_settings.ZULIP_REALM = "zulip.example.com"
mock_settings.ZULIP_BOT_EMAIL = "bot@example.com"
mock_settings.ZULIP_API_KEY = "fake-key"
from reflector.zulip import delete_zulip_message
with pytest.raises(InvalidMessageError):
await delete_zulip_message(999)
@pytest.mark.asyncio
class TestWithErrorHandlingDagUpdate:
"""Test that with_error_handling calls update_dag_zulip_message."""
async def test_calls_update_on_success(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def fake_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update:
result = await fake_task(input_data, mock_ctx)
assert result == "ok"
mock_update.assert_called_once_with("tid-1", "wfr-123")
async def test_calls_update_on_failure_with_error_message(self):
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def failing_task(input: PipelineInput, ctx) -> str:
raise ValueError("boom")
with (
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
) as mock_update,
patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.set_workflow_error_status",
new_callable=AsyncMock,
),
):
with pytest.raises(ValueError, match="boom"):
await failing_task(input_data, mock_ctx)
mock_update.assert_called_once_with(
"tid-1", "wfr-123", error_message="get_recording failed: boom"
)
async def test_dag_failure_doesnt_affect_task(self):
"""DAG update failure should not prevent task from succeeding."""
from reflector.hatchet.constants import TaskName
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
with_error_handling,
)
mock_ctx = MagicMock()
mock_ctx.workflow_run_id = "wfr-123"
input_data = PipelineInput(
recording_id="rec-1",
tracks=[{"s3_key": "k"}],
bucket_name="bucket",
transcript_id="tid-1",
)
@with_error_handling(TaskName.GET_RECORDING)
async def ok_task(input: PipelineInput, ctx) -> str:
return "ok"
with patch(
"reflector.hatchet.workflows.daily_multitrack_pipeline.update_dag_zulip_message",
new_callable=AsyncMock,
side_effect=Exception("zulip exploded"),
):
result = await ok_task(input_data, mock_ctx)
assert result == "ok"

View File

@@ -1,6 +1,6 @@
import asyncio
import time
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline
# Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called()
mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add(
"",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet,
):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline
mock_multitrack_pipeline.delay.assert_called_once_with(
transcript_id=transcript.id,
bucket_name="daily-bucket",
track_keys=track_keys,
)
# Daily.co multitrack recordings should use Hatchet workflow
mock_hatchet.start_workflow.assert_called_once()
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called()

View File

@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
settings.DATA_DIR = DATA_DIR
@pytest.fixture(scope="session")
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
# Using celery_includes from conftest.py which includes both pipelines
@pytest.mark.usefixtures("setup_database")

View File

@@ -56,7 +56,12 @@ def appserver_ws_user(setup_database):
if server_instance:
server_instance.should_exit = True
server_thread.join(timeout=30)
server_thread.join(timeout=2.0)
# Reset global singleton for test isolation
from reflector.ws_manager import reset_ws_manager
reset_ws_manager()
@pytest.fixture(autouse=True)
@@ -133,6 +138,8 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
# Connect and then trigger an event via HTTP create
async with aconnect_ws(base_ws, subprotocols=subprotocols) as ws:
await asyncio.sleep(0.2)
# Emit an event to the user's room via a standard HTTP action
from httpx import AsyncClient
@@ -150,6 +157,7 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
"email": "user-abc@example.com",
}
# Use in-memory client (global singleton makes it share ws_manager)
async with AsyncClient(app=app, base_url=f"http://{host}:{port}/v1") as ac:
# Create a transcript as this user so that the server publishes TRANSCRIPT_CREATED to user room
resp = await ac.post("/transcripts", json={"name": "WS Test"})