self-review round

This commit is contained in:
Igor Loskutov
2025-12-17 13:51:50 -05:00
parent 6ae621eadd
commit e77f38a12a

View File

@@ -4,7 +4,9 @@ Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_
decorator behavior. Events are broadcast to transcript rooms and user rooms. decorator behavior. Events are broadcast to transcript rooms and user rooms.
""" """
from reflector.db.transcripts import TranscriptEvent from typing import Any
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.logger import logger from reflector.logger import logger
from reflector.ws_manager import get_ws_manager from reflector.ws_manager import get_ws_manager
@@ -17,6 +19,11 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None:
Fire-and-forget: errors are logged but don't interrupt workflow execution. Fire-and-forget: errors are logged but don't interrupt workflow execution.
""" """
logger.info(
"[Hatchet Broadcast] Broadcasting event",
transcript_id=transcript_id,
event_type=event.event,
)
try: try:
ws_manager = get_ws_manager() ws_manager = get_ws_manager()
@@ -25,12 +32,14 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None:
room_id=f"ts:{transcript_id}", room_id=f"ts:{transcript_id}",
message=event.model_dump(mode="json"), message=event.model_dump(mode="json"),
) )
logger.info(
"[Hatchet Broadcast] Event sent to transcript room",
transcript_id=transcript_id,
event_type=event.event,
)
# Also broadcast to user room for certain events # Also broadcast to user room for certain events
if event.event in USER_ROOM_EVENTS: if event.event in USER_ROOM_EVENTS:
# Deferred import to avoid circular dependency
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(transcript_id) transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript and transcript.user_id: if transcript and transcript.user_id:
await ws_manager.send_json( await ws_manager.send_json(
@@ -45,7 +54,7 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None:
"[Hatchet Broadcast] Failed to broadcast event", "[Hatchet Broadcast] Failed to broadcast event",
error=str(e), error=str(e),
transcript_id=transcript_id, transcript_id=transcript_id,
event=event.event, event_type=event.event,
) )
@@ -54,8 +63,6 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None:
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
""" """
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
event = await transcripts_controller.set_status(transcript_id, status) event = await transcripts_controller.set_status(transcript_id, status)
if event: if event:
await broadcast_event(transcript_id, event) await broadcast_event(transcript_id, event)
@@ -63,16 +70,14 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None:
async def append_event_and_broadcast( async def append_event_and_broadcast(
transcript_id: str, transcript_id: str,
transcript, # Transcript model transcript: Transcript,
event_name: str, event_name: str,
data, # Pydantic model data: Any,
) -> TranscriptEvent: ) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket. """Append event to transcript and broadcast to WebSocket.
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting. Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
""" """
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
event = await transcripts_controller.append_event( event = await transcripts_controller.append_event(
transcript=transcript, transcript=transcript,
event=event_name, event=event_name,