feat: zulip dag monitor for failed runs (#928)

* feat: zulip dag monitor for failed runs

* fix: add collapsible tags to big information
This commit is contained in:
Juan Diego García
2026-03-25 17:26:41 -05:00
committed by GitHub
parent 7b8d190c52
commit 1f98790e7b
8 changed files with 871 additions and 12 deletions

View File

@@ -41,14 +41,14 @@ uv run celery -A reflector.worker.app beat
**Testing:**
```bash
# Run all tests with coverage
uv run pytest
# Run all tests with coverage (requires Redis on localhost)
REDIS_HOST=localhost REDIS_PORT=6379 uv run pytest
# Run specific test file
uv run pytest tests/test_transcripts.py
REDIS_HOST=localhost REDIS_PORT=6379 uv run pytest tests/test_transcripts.py
# Run tests with verbose output
uv run pytest -v
REDIS_HOST=localhost REDIS_PORT=6379 uv run pytest -v
```
**Process Audio Files:**

View File

@@ -199,6 +199,11 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr
| `DAILY_SUBDOMAIN` | Daily.co subdomain | *(unset)* |
| `DAILYCO_STORAGE_AWS_ACCESS_KEY_ID` | AWS access key for reading Daily's recording bucket | *(unset)* |
| `DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY` | AWS secret key for reading Daily's recording bucket | *(unset)* |
| `ZULIP_REALM` | Zulip server hostname (e.g. `zulip.example.com`) | *(unset)* |
| `ZULIP_API_KEY` | Zulip bot API key | *(unset)* |
| `ZULIP_BOT_EMAIL` | Zulip bot email address | *(unset)* |
| `ZULIP_DAG_STREAM` | Zulip stream for pipeline failure alerts | *(unset)* |
| `ZULIP_DAG_TOPIC` | Zulip topic for pipeline failure alerts | *(unset)* |
| `HATCHET_CLIENT_TOKEN` | Hatchet API token (auto-generated) | *(unset)* |
| `HATCHET_CLIENT_SERVER_URL` | Hatchet server URL | Auto-set when Daily.co configured |
| `HATCHET_CLIENT_HOST_PORT` | Hatchet gRPC address | Auto-set when Daily.co configured |

View File

@@ -419,3 +419,18 @@ User-room broadcasts to `user:{user_id}`:
- `TRANSCRIPT_STATUS`
- `TRANSCRIPT_FINAL_TITLE`
- `TRANSCRIPT_DURATION`
## Failed Runs Monitor (Hatchet Cron)
A `FailedRunsMonitor` Hatchet cron workflow runs hourly (`0 * * * *`) and checks for failed pipeline runs
(DiarizationPipeline, FilePipeline, LivePostProcessingPipeline) in the last hour. For each failed run,
it renders a DAG status overview and posts it to Zulip.
**Required env vars** (all must be set to enable):
- `ZULIP_REALM` — Zulip server hostname
- `ZULIP_API_KEY` — Zulip bot API key
- `ZULIP_BOT_EMAIL` — Zulip bot email
- `ZULIP_DAG_STREAM` — Zulip stream for alerts
- `ZULIP_DAG_TOPIC` — Zulip topic for alerts
If any of these are unset, the monitor workflow is not registered with the Hatchet worker.

View File

@@ -16,6 +16,7 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger
from reflector.settings import settings
SLOTS = 10
WORKER_NAME = "llm-worker-pool"
@@ -34,6 +35,38 @@ def main():
error=str(e),
)
workflows = [
daily_multitrack_pipeline,
file_pipeline,
live_post_pipeline,
topic_chunk_workflow,
subject_workflow,
track_workflow,
]
_zulip_dag_enabled = all(
[
settings.ZULIP_REALM,
settings.ZULIP_API_KEY,
settings.ZULIP_BOT_EMAIL,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
]
)
if _zulip_dag_enabled:
from reflector.hatchet.workflows.failed_runs_monitor import ( # noqa: PLC0415
failed_runs_monitor,
)
workflows.append(failed_runs_monitor)
logger.info(
"FailedRunsMonitor cron enabled",
stream=settings.ZULIP_DAG_STREAM,
topic=settings.ZULIP_DAG_TOPIC,
)
else:
logger.info("FailedRunsMonitor cron disabled (Zulip DAG not configured)")
logger.info(
"Starting Hatchet LLM worker pool (all tasks except mixdown)",
worker_name=WORKER_NAME,
@@ -47,14 +80,7 @@ def main():
labels={
"pool": POOL,
},
workflows=[
daily_multitrack_pipeline,
file_pipeline,
live_post_pipeline,
topic_chunk_workflow,
subject_workflow,
track_workflow,
],
workflows=workflows,
)
try:

View File

@@ -0,0 +1,109 @@
"""
Hatchet cron workflow: FailedRunsMonitor
Runs hourly, queries Hatchet for failed pipeline runs in the last hour,
and posts details to Zulip for visibility.
Only registered with the worker when Zulip DAG settings are configured.
"""
from datetime import datetime, timedelta, timezone
from hatchet_sdk import Context
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.settings import settings
from reflector.tools.render_hatchet_run import render_run_detail
from reflector.zulip import send_message_to_zulip
MONITORED_PIPELINES = {
"DiarizationPipeline",
"FilePipeline",
"LivePostProcessingPipeline",
}
LOOKBACK_HOURS = 1
hatchet = HatchetClientManager.get_client()
failed_runs_monitor = hatchet.workflow(
name="FailedRunsMonitor",
on_crons=["0 * * * *"],
)
async def _check_failed_runs() -> dict:
"""Core logic: query for failed pipeline runs and post each to Zulip.
Extracted from the Hatchet task for testability.
"""
now = datetime.now(tz=timezone.utc)
since = now - timedelta(hours=LOOKBACK_HOURS)
client = HatchetClientManager.get_client()
try:
result = await client.runs.aio_list(
statuses=[V1TaskStatus.FAILED],
since=since,
until=now,
limit=200,
)
except Exception:
logger.exception("[FailedRunsMonitor] Failed to list runs from Hatchet")
return {"checked": 0, "reported": 0, "error": "failed to list runs"}
rows = result.rows or []
# Filter to main pipelines only (skip child workflows like TrackProcessing, etc.)
failed_main_runs = [run for run in rows if run.workflow_name in MONITORED_PIPELINES]
if not failed_main_runs:
logger.info(
"[FailedRunsMonitor] No failed pipeline runs in the last hour",
total_failed=len(rows),
since=since.isoformat(),
)
return {"checked": len(rows), "reported": 0}
logger.info(
"[FailedRunsMonitor] Found failed pipeline runs",
count=len(failed_main_runs),
since=since.isoformat(),
)
reported = 0
for run in failed_main_runs:
try:
details = await client.runs.aio_get(run.workflow_run_external_id)
content = render_run_detail(details)
await send_message_to_zulip(
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
reported += 1
except Exception:
logger.exception(
"[FailedRunsMonitor] Failed to report run",
workflow_run_id=run.workflow_run_external_id,
workflow_name=run.workflow_name,
)
logger.info(
"[FailedRunsMonitor] Finished reporting",
reported=reported,
total_failed_main=len(failed_main_runs),
)
return {"checked": len(rows), "reported": reported}
@failed_runs_monitor.task(
execution_timeout=timedelta(seconds=120),
retries=1,
)
async def check_failed_runs(input, ctx: Context) -> dict:
"""Hatchet task entry point — delegates to _check_failed_runs."""
return await _check_failed_runs()

View File

@@ -194,6 +194,8 @@ class Settings(BaseSettings):
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
# Email / SMTP integration (for transcript email notifications)
SMTP_HOST: str | None = None

View File

@@ -0,0 +1,412 @@
"""
Render Hatchet workflow runs as text DAG.
Usage:
# Show latest 5 runs (summary table)
uv run -m reflector.tools.render_hatchet_run
# Show specific run with full DAG + task details
uv run -m reflector.tools.render_hatchet_run <workflow_run_id>
# Drill into Nth run from the list (1-indexed)
uv run -m reflector.tools.render_hatchet_run --show 1
# Show latest N runs
uv run -m reflector.tools.render_hatchet_run --last 10
# Filter by status
uv run -m reflector.tools.render_hatchet_run --status FAILED
uv run -m reflector.tools.render_hatchet_run --status RUNNING
"""
import argparse
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from hatchet_sdk.clients.rest.models import (
V1TaskEvent,
V1TaskStatus,
V1TaskSummary,
V1WorkflowRunDetails,
WorkflowRunShapeItemForWorkflowRunDetails,
)
from reflector.hatchet.client import HatchetClientManager
STATUS_ICON = {
V1TaskStatus.COMPLETED: "\u2705",
V1TaskStatus.RUNNING: "\u23f3",
V1TaskStatus.FAILED: "\u274c",
V1TaskStatus.QUEUED: "\u23f8\ufe0f",
V1TaskStatus.CANCELLED: "\u26a0\ufe0f",
}
STATUS_LABEL = {
V1TaskStatus.COMPLETED: "Complete",
V1TaskStatus.RUNNING: "Running",
V1TaskStatus.FAILED: "FAILED",
V1TaskStatus.QUEUED: "Queued",
V1TaskStatus.CANCELLED: "Cancelled",
}
def _fmt_time(dt: datetime | None) -> str:
if dt is None:
return "-"
return dt.strftime("%H:%M:%S")
def _fmt_duration(ms: int | None) -> str:
if ms is None:
return "-"
secs = ms / 1000
if secs < 60:
return f"{secs:.1f}s"
mins = secs / 60
return f"{mins:.1f}m"
def _fmt_status_line(task: V1TaskSummary) -> str:
"""Format a status line like: Complete (finished 20:31:44)"""
label = STATUS_LABEL.get(task.status, task.status.value)
icon = STATUS_ICON.get(task.status, "?")
if task.status == V1TaskStatus.COMPLETED and task.finished_at:
return f"{icon} {label} (finished {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.RUNNING and task.started_at:
parts = [f"started {_fmt_time(task.started_at)}"]
if task.duration:
parts.append(f"{_fmt_duration(task.duration)} elapsed")
return f"{icon} {label} ({', '.join(parts)})"
elif task.status == V1TaskStatus.FAILED and task.finished_at:
return f"{icon} {label} (failed {_fmt_time(task.finished_at)})"
elif task.status == V1TaskStatus.CANCELLED:
return f"{icon} {label}"
elif task.status == V1TaskStatus.QUEUED:
return f"{icon} {label}"
return f"{icon} {label}"
def _topo_sort(
shape: list[WorkflowRunShapeItemForWorkflowRunDetails],
) -> list[str]:
"""Topological sort of step_ids from shape DAG."""
step_ids = {s.step_id for s in shape}
children_map: dict[str, list[str]] = {}
in_degree: dict[str, int] = {sid: 0 for sid in step_ids}
for s in shape:
children = [c for c in (s.children_step_ids or []) if c in step_ids]
children_map[s.step_id] = children
for c in children:
in_degree[c] += 1
queue = sorted(sid for sid, deg in in_degree.items() if deg == 0)
result: list[str] = []
while queue:
node = queue.pop(0)
result.append(node)
for c in children_map.get(node, []):
in_degree[c] -= 1
if in_degree[c] == 0:
queue.append(c)
queue.sort()
return result
def render_run_detail(details: V1WorkflowRunDetails) -> str:
"""Render a single workflow run as markdown DAG with task details."""
shape = details.shape or []
tasks = details.tasks or []
events = details.task_events or []
run = details.run
if not shape:
return f"Run {run.metadata.id}: {run.status.value} (no shape data)"
# Build lookups
step_to_shape: dict[str, WorkflowRunShapeItemForWorkflowRunDetails] = {
s.step_id: s for s in shape
}
step_to_name: dict[str, str] = {s.step_id: s.task_name for s in shape}
# Reverse edges (parents)
parents: dict[str, list[str]] = {s.step_id: [] for s in shape}
for s in shape:
for child_id in s.children_step_ids or []:
if child_id in parents:
parents[child_id].append(s.step_id)
# Join tasks by step_id
task_by_step: dict[str, V1TaskSummary] = {}
for t in tasks:
if t.step_id and t.step_id in step_to_name:
task_by_step[t.step_id] = t
# Events indexed by task_external_id
events_by_task: dict[str, list[V1TaskEvent]] = defaultdict(list)
for ev in events:
events_by_task[ev.task_id].append(ev)
ordered = _topo_sort(shape)
lines: list[str] = []
# Run header
run_icon = STATUS_ICON.get(run.status, "?")
run_name = run.display_name or run.workflow_id
dur = _fmt_duration(run.duration)
lines.append(f"**{run_name}** {run_icon} {dur}")
lines.append(f"ID: `{run.metadata.id}`")
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
lines.append(f"Meta: {', '.join(meta_parts)}")
if run.error_message:
# Take first line of error only for header
first_line = run.error_message.split("\n")[0]
lines.append(f"Error: {first_line}")
lines.append("")
# DAG Status Overview table (collapsible)
lines.append("```spoiler DAG Status Overview")
lines.append("| Node | Status | Duration | Dependencies |")
lines.append("|------|--------|----------|--------------|")
for step_id in ordered:
s = step_to_shape[step_id]
t = task_by_step.get(step_id)
name = step_to_name[step_id]
icon = STATUS_ICON.get(t.status, "?") if t else "?"
dur = _fmt_duration(t.duration) if t else "-"
parent_names = [step_to_name[p] for p in parents[step_id]]
child_names = [
step_to_name[c] for c in (s.children_step_ids or []) if c in step_to_name
]
deps_left = ", ".join(parent_names) if parent_names else ""
deps_right = ", ".join(child_names) if child_names else ""
if deps_left and deps_right:
deps = f"{deps_left} \u2192 {deps_right}"
elif deps_right:
deps = f"\u2192 {deps_right}"
elif deps_left:
deps = f"{deps_left} \u2192"
else:
deps = "-"
lines.append(f"| {name} | {icon} | {dur} | {deps} |")
lines.append("```")
lines.append("")
# Node details (collapsible)
lines.append("```spoiler Node Details")
for step_id in ordered:
t = task_by_step.get(step_id)
name = step_to_name[step_id]
if not t:
lines.append(f"**\U0001f4e6 {name}**")
lines.append("Status: no task data")
lines.append("")
continue
lines.append(f"**\U0001f4e6 {name}**")
lines.append(f"Status: {_fmt_status_line(t)}")
if t.duration:
lines.append(f"Duration: {_fmt_duration(t.duration)}")
if t.retry_count and t.retry_count > 0:
lines.append(f"Retries: {t.retry_count}")
# Fan-out children
if t.num_spawned_children and t.num_spawned_children > 0:
children = t.children or []
completed = sum(1 for c in children if c.status == V1TaskStatus.COMPLETED)
failed = sum(1 for c in children if c.status == V1TaskStatus.FAILED)
running = sum(1 for c in children if c.status == V1TaskStatus.RUNNING)
lines.append(
f"Spawned children: {completed}/{t.num_spawned_children} done"
f"{f', {running} running' if running else ''}"
f"{f', {failed} failed' if failed else ''}"
)
# Error message (first meaningful line only, full trace in events)
if t.error_message:
err_lines = t.error_message.strip().split("\n")
# Find first non-empty, non-traceback line
err_summary = err_lines[0]
for line in err_lines:
stripped = line.strip()
if stripped and not stripped.startswith(
("Traceback", "File ", "{", ")")
):
err_summary = stripped
break
lines.append(f"Error: `{err_summary}`")
# Events log
task_events = sorted(
events_by_task.get(t.task_external_id, []),
key=lambda e: e.timestamp,
)
if task_events:
lines.append("Events:")
for ev in task_events:
ts = ev.timestamp.strftime("%H:%M:%S")
ev_icon = ""
if ev.event_type.value == "FINISHED":
ev_icon = "\u2705 "
elif ev.event_type.value in ("FAILED", "TIMED_OUT"):
ev_icon = "\u274c "
elif ev.event_type.value == "STARTED":
ev_icon = "\u25b6\ufe0f "
elif ev.event_type.value == "RETRYING":
ev_icon = "\U0001f504 "
elif ev.event_type.value == "CANCELLED":
ev_icon = "\u26a0\ufe0f "
msg = ev.message.strip()
if ev.error_message:
# Just first line of error in event log
err_first = ev.error_message.strip().split("\n")[0]
if msg:
msg += f" | {err_first}"
else:
msg = err_first
if msg:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}: {msg}")
else:
lines.append(f" `{ts}` {ev_icon}{ev.event_type.value}")
lines.append("")
lines.append("```")
return "\n".join(lines)
def render_run_summary(idx: int, run: V1TaskSummary) -> str:
"""One-line summary for a run in the list view."""
icon = STATUS_ICON.get(run.status, "?")
name = run.display_name or run.workflow_name or "?"
run_id = run.workflow_run_external_id or "?"
dur = _fmt_duration(run.duration)
started = _fmt_time(run.started_at)
meta = ""
if run.additional_metadata:
meta_parts = [f"{k}=`{v}`" for k, v in run.additional_metadata.items()]
meta = f" ({', '.join(meta_parts)})"
return (
f" {idx}. {icon} **{name}** started={started} dur={dur}{meta}\n"
f" `{run_id}`"
)
async def _fetch_run_list(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> list[V1TaskSummary]:
client = HatchetClientManager.get_client()
since = datetime.now(timezone.utc) - timedelta(days=7)
runs = await client.runs.aio_list(
since=since,
statuses=statuses,
limit=count,
)
return runs.rows or []
async def list_recent_runs(
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""List recent workflow runs as text."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
lines = [f"Recent runs ({len(rows)}):", ""]
for i, run in enumerate(rows, 1):
lines.append(render_run_summary(i, run))
lines.append("")
lines.append("Use `--show N` to see full DAG for run N")
return "\n".join(lines)
async def show_run(workflow_run_id: str) -> str:
"""Fetch and render a single run."""
client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
return render_run_detail(details)
async def show_nth_run(
n: int,
count: int = 5,
statuses: list[V1TaskStatus] | None = None,
) -> str:
"""Fetch list, then drill into Nth run."""
rows = await _fetch_run_list(count, statuses)
if not rows:
return "No runs found in the last 7 days."
if n < 1 or n > len(rows):
return f"Invalid index {n}. Have {len(rows)} runs (1-{len(rows)})."
run = rows[n - 1]
return await show_run(run.workflow_run_external_id)
async def main_async(args: argparse.Namespace) -> None:
statuses = [V1TaskStatus(args.status)] if args.status else None
if args.run_id:
output = await show_run(args.run_id)
elif args.show is not None:
output = await show_nth_run(args.show, count=args.last, statuses=statuses)
else:
output = await list_recent_runs(count=args.last, statuses=statuses)
print(output)
def main() -> None:
parser = argparse.ArgumentParser(
description="Render Hatchet workflow runs as text DAG"
)
parser.add_argument(
"run_id",
nargs="?",
default=None,
help="Workflow run ID to show in detail. If omitted, lists recent runs.",
)
parser.add_argument(
"--show",
type=int,
default=None,
metavar="N",
help="Show full DAG for the Nth run in the list (1-indexed)",
)
parser.add_argument(
"--last",
type=int,
default=5,
help="Number of recent runs to list (default: 5)",
)
parser.add_argument(
"--status",
choices=["QUEUED", "RUNNING", "COMPLETED", "FAILED", "CANCELLED"],
help="Filter by status",
)
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,290 @@
"""
Tests for FailedRunsMonitor Hatchet cron workflow.
Tests cover:
- No Zulip message sent when no failures found
- Messages sent for failed main pipeline runs
- Child workflow failures filtered out
- Errors in the monitor itself are caught and logged
"""
from datetime import timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from hatchet_sdk.clients.rest.models import V1TaskStatus
def _make_task_summary(
workflow_name: str,
workflow_run_external_id: str = "run-123",
status: V1TaskStatus = V1TaskStatus.FAILED,
):
"""Create a mock V1TaskSummary."""
mock = MagicMock()
mock.workflow_name = workflow_name
mock.workflow_run_external_id = workflow_run_external_id
mock.status = status
return mock
@pytest.mark.asyncio
class TestCheckFailedRuns:
async def test_no_failures_sends_no_message(self):
mock_result = MagicMock()
mock_result.rows = []
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
with (
patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.send_message_to_zulip",
new_callable=AsyncMock,
) as mock_send,
):
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
assert result["checked"] == 0
assert result["reported"] == 0
mock_send.assert_not_called()
async def test_reports_failed_main_pipeline_runs(self):
failed_runs = [
_make_task_summary("DiarizationPipeline", "run-1"),
_make_task_summary("FilePipeline", "run-2"),
]
mock_result = MagicMock()
mock_result.rows = failed_runs
mock_details = MagicMock()
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
mock_client.runs.aio_get = AsyncMock(return_value=mock_details)
with (
patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.render_run_detail",
return_value="**rendered DAG**",
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 1},
) as mock_send,
patch(
"reflector.hatchet.workflows.failed_runs_monitor.settings"
) as mock_settings,
):
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
assert result["checked"] == 2
assert result["reported"] == 2
assert mock_send.call_count == 2
mock_send.assert_any_call("dag-stream", "dag-topic", "**rendered DAG**")
async def test_filters_out_child_workflows(self):
runs = [
_make_task_summary("DiarizationPipeline", "run-1"),
_make_task_summary("TrackProcessing", "run-2"),
_make_task_summary("TopicChunkProcessing", "run-3"),
_make_task_summary("SubjectProcessing", "run-4"),
]
mock_result = MagicMock()
mock_result.rows = runs
mock_details = MagicMock()
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
mock_client.runs.aio_get = AsyncMock(return_value=mock_details)
with (
patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.render_run_detail",
return_value="**rendered**",
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 1},
) as mock_send,
patch(
"reflector.hatchet.workflows.failed_runs_monitor.settings"
) as mock_settings,
):
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
# Only DiarizationPipeline should be reported
assert result["checked"] == 4
assert result["reported"] == 1
assert mock_send.call_count == 1
async def test_all_three_pipelines_reported(self):
runs = [
_make_task_summary("DiarizationPipeline", "run-1"),
_make_task_summary("FilePipeline", "run-2"),
_make_task_summary("LivePostProcessingPipeline", "run-3"),
]
mock_result = MagicMock()
mock_result.rows = runs
mock_details = MagicMock()
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
mock_client.runs.aio_get = AsyncMock(return_value=mock_details)
with (
patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.render_run_detail",
return_value="**rendered**",
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 1},
) as mock_send,
patch(
"reflector.hatchet.workflows.failed_runs_monitor.settings"
) as mock_settings,
):
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
assert result["reported"] == 3
assert mock_send.call_count == 3
async def test_continues_on_individual_run_failure(self):
"""If one run fails to report, the others should still be reported."""
runs = [
_make_task_summary("DiarizationPipeline", "run-1"),
_make_task_summary("FilePipeline", "run-2"),
]
mock_result = MagicMock()
mock_result.rows = runs
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
# First call raises, second succeeds
mock_client.runs.aio_get = AsyncMock(
side_effect=[Exception("Hatchet API error"), MagicMock()]
)
with (
patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.render_run_detail",
return_value="**rendered**",
),
patch(
"reflector.hatchet.workflows.failed_runs_monitor.send_message_to_zulip",
new_callable=AsyncMock,
return_value={"id": 1},
) as mock_send,
patch(
"reflector.hatchet.workflows.failed_runs_monitor.settings"
) as mock_settings,
):
mock_settings.ZULIP_DAG_STREAM = "dag-stream"
mock_settings.ZULIP_DAG_TOPIC = "dag-topic"
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
# First run failed to report, second succeeded
assert result["reported"] == 1
assert mock_send.call_count == 1
async def test_handles_list_api_failure(self):
"""If aio_list fails, should return error and not crash."""
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(
side_effect=Exception("Connection refused")
)
with patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
):
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
result = await _check_failed_runs()
assert result["checked"] == 0
assert result["reported"] == 0
assert "error" in result
async def test_uses_correct_time_window(self):
"""Verify the correct since/until parameters are passed to aio_list."""
mock_result = MagicMock()
mock_result.rows = []
mock_client = MagicMock()
mock_client.runs.aio_list = AsyncMock(return_value=mock_result)
with patch(
"reflector.hatchet.workflows.failed_runs_monitor.HatchetClientManager.get_client",
return_value=mock_client,
):
from reflector.hatchet.workflows.failed_runs_monitor import (
_check_failed_runs,
)
await _check_failed_runs()
call_kwargs = mock_client.runs.aio_list.call_args
assert call_kwargs.kwargs["statuses"] == [V1TaskStatus.FAILED]
since = call_kwargs.kwargs["since"]
until = call_kwargs.kwargs["until"]
assert since.tzinfo == timezone.utc
assert until.tzinfo == timezone.utc
# Window should be ~1 hour
delta = until - since
assert 3590 < delta.total_seconds() < 3610