mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-04-25 14:45:19 +00:00
feat: post Hatchet DAG status to Zulip with live updates
Post a rendered DAG status message to a configurable Zulip stream/topic when a Hatchet workflow is dispatched. Update it after each task completes (success or failure), and delete it when post_zulip runs so the final transcript notification replaces it. On pipeline failure, appends an error banner (:cross_mark:) with the failing step name and error message. New settings: ZULIP_DAG_STREAM, ZULIP_DAG_TOPIC, ZULIP_HOST_HEADER. New module: reflector/hatchet/dag_zulip.py (create/update/delete).
This commit is contained in:
144
server/reflector/hatchet/dag_zulip.py
Normal file
144
server/reflector/hatchet/dag_zulip.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""
|
||||
Hatchet DAG Status -> Zulip Live Updates.
|
||||
|
||||
Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
|
||||
All functions are fire-and-forget (catch + warning log on failure).
|
||||
|
||||
Note: Uses deferred imports throughout for fork-safety,
|
||||
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
|
||||
"""
|
||||
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
def _dag_zulip_enabled() -> bool:
|
||||
return bool(
|
||||
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
|
||||
)
|
||||
|
||||
|
||||
async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
|
||||
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
|
||||
if not _dag_zulip_enabled():
|
||||
return
|
||||
|
||||
try:
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
|
||||
render_run_detail,
|
||||
)
|
||||
from reflector.zulip import send_message_to_zulip # noqa: PLC0415
|
||||
|
||||
client = HatchetClientManager.get_client()
|
||||
details = await client.runs.aio_get(workflow_run_id)
|
||||
content = render_run_detail(details)
|
||||
|
||||
response = await send_message_to_zulip(
|
||||
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
|
||||
)
|
||||
message_id = response.get("id")
|
||||
|
||||
if message_id:
|
||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||
if transcript:
|
||||
await transcripts_controller.update(
|
||||
transcript, {"zulip_message_id": message_id}
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[DAG Zulip] Failed to create DAG message",
|
||||
transcript_id=transcript_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
async def update_dag_zulip_message(
|
||||
transcript_id: str,
|
||||
workflow_run_id: str,
|
||||
error_message: str | None = None,
|
||||
) -> None:
|
||||
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).
|
||||
|
||||
Args:
|
||||
error_message: If set, appended as an error banner to the rendered DAG.
|
||||
"""
|
||||
if not _dag_zulip_enabled():
|
||||
return
|
||||
|
||||
try:
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
|
||||
fresh_db_connection,
|
||||
)
|
||||
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
|
||||
render_run_detail,
|
||||
)
|
||||
from reflector.zulip import update_zulip_message # noqa: PLC0415
|
||||
|
||||
async with fresh_db_connection():
|
||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||
if not transcript or not transcript.zulip_message_id:
|
||||
return
|
||||
|
||||
client = HatchetClientManager.get_client()
|
||||
details = await client.runs.aio_get(workflow_run_id)
|
||||
content = render_run_detail(details)
|
||||
|
||||
if error_message:
|
||||
content += f"\n\n:cross_mark: **{error_message}**"
|
||||
|
||||
await update_zulip_message(
|
||||
transcript.zulip_message_id,
|
||||
settings.ZULIP_DAG_STREAM,
|
||||
settings.ZULIP_DAG_TOPIC,
|
||||
content,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[DAG Zulip] Failed to update DAG message",
|
||||
transcript_id=transcript_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
async def delete_dag_zulip_message(transcript_id: str) -> None:
|
||||
"""Delete DAG Zulip message and clear zulip_message_id.
|
||||
|
||||
Called from post_zulip task (already inside fresh_db_connection).
|
||||
Swallows InvalidMessageError (message already deleted).
|
||||
"""
|
||||
if not _dag_zulip_enabled():
|
||||
return
|
||||
|
||||
try:
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.zulip import ( # noqa: PLC0415
|
||||
InvalidMessageError,
|
||||
delete_zulip_message,
|
||||
)
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||
if not transcript or not transcript.zulip_message_id:
|
||||
return
|
||||
|
||||
try:
|
||||
await delete_zulip_message(transcript.zulip_message_id)
|
||||
except InvalidMessageError:
|
||||
logger.warning(
|
||||
"[DAG Zulip] Message already deleted",
|
||||
transcript_id=transcript_id,
|
||||
zulip_message_id=transcript.zulip_message_id,
|
||||
)
|
||||
|
||||
await transcripts_controller.update(transcript, {"zulip_message_id": None})
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[DAG Zulip] Failed to delete DAG message",
|
||||
transcript_id=transcript_id,
|
||||
exc_info=True,
|
||||
)
|
||||
@@ -45,6 +45,7 @@ from reflector.hatchet.constants import (
|
||||
TIMEOUT_SHORT,
|
||||
TaskName,
|
||||
)
|
||||
from reflector.hatchet.dag_zulip import update_dag_zulip_message
|
||||
from reflector.hatchet.workflows.models import (
|
||||
ActionItemsResult,
|
||||
ConsentResult,
|
||||
@@ -238,7 +239,14 @@ def with_error_handling(
|
||||
@functools.wraps(func)
|
||||
async def wrapper(input: PipelineInput, ctx: Context) -> R:
|
||||
try:
|
||||
return await func(input, ctx)
|
||||
result = await func(input, ctx)
|
||||
try:
|
||||
await update_dag_zulip_message(
|
||||
input.transcript_id, ctx.workflow_run_id
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[Hatchet] {step_name} failed",
|
||||
@@ -246,6 +254,14 @@ def with_error_handling(
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
try:
|
||||
await update_dag_zulip_message(
|
||||
input.transcript_id,
|
||||
ctx.workflow_run_id,
|
||||
error_message=f"{step_name} failed: {e}",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
if set_error_status:
|
||||
await set_workflow_error_status(input.transcript_id)
|
||||
raise
|
||||
@@ -1294,6 +1310,11 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
|
||||
delete_dag_zulip_message,
|
||||
)
|
||||
|
||||
await delete_dag_zulip_message(input.transcript_id)
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if transcript:
|
||||
|
||||
Reference in New Issue
Block a user