Files
reflector/server/reflector/hatchet/broadcast.py
Igor Monadical 972a52d22f fix: live flow real-time updates during processing (#861)
* fix: live flow real-time updates during processing

Three gaps caused transcript pages to require manual refresh after
live recording/processing:

1. UserEventsProvider only invalidated list queries on TRANSCRIPT_STATUS,
   not individual transcript queries. Now parses data.id from the event
   and calls invalidateTranscript for the specific transcript.

2. useWebSockets had no reconnection logic — a dropped WS silently
   killed all real-time updates. Added exponential backoff reconnection
   (1s-30s, max 10 retries) with intentional close detection.

3. No polling fallback — WS was single point of failure. Added
   conditional refetchInterval to useTranscriptGet that polls every 5s
   when transcript status is processing/uploaded/recording.

* feat: type-safe WebSocket events via OpenAPI stub

Define Pydantic models with Literal discriminators for all WS events
(9 transcript-level, 5 user-level). Expose via stub GET endpoints so
pnpm openapi generates TS discriminated unions with exhaustive switch
narrowing on the frontend.

- New server/reflector/ws_events.py with TranscriptWsEvent and UserWsEvent
- Tighten backend emit signatures with TranscriptEventName literal
- Frontend uses generated types, removes Zod schema and manual casts
- Fix pre-existing bugs: waveform mapping, FINAL_LONG_SUMMARY field name
- STATUS value now typed as TranscriptStatus literal end-to-end
- TOPIC handler simplified to query invalidation only (avoids shape mismatch)

* fix: restore TOPIC WS handler with immediate state update

The setTopics call provides instant topic rendering during live
transcription. Query invalidation still follows for full data sync.

* fix: align TOPIC WS event data with GetTranscriptTopic shape

Convert TranscriptTopic → GetTranscriptTopic in pipeline before
emitting, so WS sends segments instead of words. Removes the
`as unknown as Topic` cast on the frontend.

* fix: use NonEmptyString and TranscriptStatus in user WS event models

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-12 14:49:57 -05:00

100 lines
3.1 KiB
Python

"""WebSocket broadcasting helpers for Hatchet workflows.
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
decorator behavior. Events are broadcast to transcript rooms and user rooms.
"""
from typing import Any
import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS: set[TranscriptEventName] = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
transcript_id: NonEmptyString,
event: TranscriptEvent,
logger: structlog.BoundLogger,
) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution.
"""
logger.info(
"Broadcasting event",
transcript_id=transcript_id,
event_type=event.event,
)
try:
ws_manager = get_ws_manager()
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message=event.model_dump(mode="json"),
)
logger.info(
"Event sent to transcript room",
transcript_id=transcript_id,
event_type=event.event,
)
if event.event in USER_ROOM_EVENTS:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript and transcript.user_id:
await ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
message={
"event": f"TRANSCRIPT_{event.event}",
"data": {"id": transcript_id, **event.data},
},
)
except Exception as e:
logger.warning(
"Failed to broadcast event",
error=str(e),
transcript_id=transcript_id,
event_type=event.event,
)
async def set_status_and_broadcast(
transcript_id: NonEmptyString,
status: str,
logger: structlog.BoundLogger,
) -> None:
"""Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
"""
event = await transcripts_controller.set_status(transcript_id, status)
if event:
await broadcast_event(transcript_id, event, logger=logger)
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: TranscriptEventName,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket.
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
"""
event = await transcripts_controller.append_event(
transcript=transcript,
event=event_name,
data=data,
)
await broadcast_event(transcript_id, event, logger=logger)
return event