mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-24 06:09:07 +00:00
* durable (no-mistakes) * hatchet no-mistake * hatchet no-mistake * hatchet no-mistake, better logging * remove conductor and add hatchet tests (no-mistakes) * self-review (no-mistakes) * hatched logs * remove shadow mode for hatchet * and add hatchet processor setting to room * . * cleanup * hatchet init db * self-review (no-mistakes) * self-review (no-mistakes) * hatchet: restore zullip report * self-review round * self-review round * self-review round * dry hatchet with celery * dry hatched with celery - 2 * self-review round * more NES instead of str * self-review wip * self-review round * self-review round * self-review round * can_replay cancelled * add forgotten file * pr autoreviewer fixes * better log webhook events * durable_started return * migration sync * latest changes feature parity * migration merge * pr review --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
"""WebSocket broadcasting helpers for Hatchet workflows.
|
|
|
|
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
|
|
|
|
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
|
|
decorator behavior. Events are broadcast to transcript rooms and user rooms.
|
|
"""
|
|
|
|
from typing import Any
|
|
|
|
import structlog
|
|
|
|
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
|
|
from reflector.utils.string import NonEmptyString
|
|
from reflector.ws_manager import get_ws_manager
|
|
|
|
# Events that should also be sent to user room (matches Celery behavior)
|
|
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
|
|
|
|
|
|
async def broadcast_event(
|
|
transcript_id: NonEmptyString,
|
|
event: TranscriptEvent,
|
|
logger: structlog.BoundLogger,
|
|
) -> None:
|
|
"""Broadcast a TranscriptEvent to WebSocket subscribers.
|
|
|
|
Fire-and-forget: errors are logged but don't interrupt workflow execution.
|
|
"""
|
|
logger.info(
|
|
"Broadcasting event",
|
|
transcript_id=transcript_id,
|
|
event_type=event.event,
|
|
)
|
|
try:
|
|
ws_manager = get_ws_manager()
|
|
|
|
await ws_manager.send_json(
|
|
room_id=f"ts:{transcript_id}",
|
|
message=event.model_dump(mode="json"),
|
|
)
|
|
logger.info(
|
|
"Event sent to transcript room",
|
|
transcript_id=transcript_id,
|
|
event_type=event.event,
|
|
)
|
|
|
|
if event.event in USER_ROOM_EVENTS:
|
|
transcript = await transcripts_controller.get_by_id(transcript_id)
|
|
if transcript and transcript.user_id:
|
|
await ws_manager.send_json(
|
|
room_id=f"user:{transcript.user_id}",
|
|
message={
|
|
"event": f"TRANSCRIPT_{event.event}",
|
|
"data": {"id": transcript_id, **event.data},
|
|
},
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to broadcast event",
|
|
error=str(e),
|
|
transcript_id=transcript_id,
|
|
event_type=event.event,
|
|
)
|
|
|
|
|
|
async def set_status_and_broadcast(
|
|
transcript_id: NonEmptyString,
|
|
status: str,
|
|
logger: structlog.BoundLogger,
|
|
) -> None:
|
|
"""Set transcript status and broadcast to WebSocket.
|
|
|
|
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
|
|
"""
|
|
event = await transcripts_controller.set_status(transcript_id, status)
|
|
if event:
|
|
await broadcast_event(transcript_id, event, logger=logger)
|
|
|
|
|
|
async def append_event_and_broadcast(
|
|
transcript_id: NonEmptyString,
|
|
transcript: Transcript,
|
|
event_name: str,
|
|
data: Any,
|
|
logger: structlog.BoundLogger,
|
|
) -> TranscriptEvent:
|
|
"""Append event to transcript and broadcast to WebSocket.
|
|
|
|
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
|
|
"""
|
|
event = await transcripts_controller.append_event(
|
|
transcript=transcript,
|
|
event=event_name,
|
|
data=data,
|
|
)
|
|
await broadcast_event(transcript_id, event, logger=logger)
|
|
return event
|