From e77f38a12a2bfca3121d887e1fc5de3e0b21bd5f Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Wed, 17 Dec 2025 13:51:50 -0500 Subject: [PATCH] self-review round --- server/reflector/hatchet/broadcast.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/server/reflector/hatchet/broadcast.py b/server/reflector/hatchet/broadcast.py index 684b8a02..317b5dbb 100644 --- a/server/reflector/hatchet/broadcast.py +++ b/server/reflector/hatchet/broadcast.py @@ -4,7 +4,9 @@ Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_ decorator behavior. Events are broadcast to transcript rooms and user rooms. """ -from reflector.db.transcripts import TranscriptEvent +from typing import Any + +from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller from reflector.logger import logger from reflector.ws_manager import get_ws_manager @@ -17,6 +19,11 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: Fire-and-forget: errors are logged but don't interrupt workflow execution. """ + logger.info( + "[Hatchet Broadcast] Broadcasting event", + transcript_id=transcript_id, + event_type=event.event, + ) try: ws_manager = get_ws_manager() @@ -25,12 +32,14 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: room_id=f"ts:{transcript_id}", message=event.model_dump(mode="json"), ) + logger.info( + "[Hatchet Broadcast] Event sent to transcript room", + transcript_id=transcript_id, + event_type=event.event, + ) # Also broadcast to user room for certain events if event.event in USER_ROOM_EVENTS: - # Deferred import to avoid circular dependency - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - transcript = await transcripts_controller.get_by_id(transcript_id) if transcript and transcript.user_id: await ws_manager.send_json( @@ -45,7 +54,7 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: "[Hatchet Broadcast] Failed to broadcast event", error=str(e), transcript_id=transcript_id, - event=event.event, + event_type=event.event, ) @@ -54,8 +63,6 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None: Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. """ - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - event = await transcripts_controller.set_status(transcript_id, status) if event: await broadcast_event(transcript_id, event) @@ -63,16 +70,14 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None: async def append_event_and_broadcast( transcript_id: str, - transcript, # Transcript model + transcript: Transcript, event_name: str, - data, # Pydantic model + data: Any, ) -> TranscriptEvent: """Append event to transcript and broadcast to WebSocket. Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting. """ - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - event = await transcripts_controller.append_event( transcript=transcript, event=event_name,