fix: live flow real-time updates during processing (#861)

* fix: live flow real-time updates during processing

Three gaps caused transcript pages to require manual refresh after
live recording/processing:

1. UserEventsProvider only invalidated list queries on TRANSCRIPT_STATUS,
   not individual transcript queries. Now parses data.id from the event
   and calls invalidateTranscript for the specific transcript.

2. useWebSockets had no reconnection logic — a dropped WS silently
   killed all real-time updates. Added exponential backoff reconnection
   (1s-30s, max 10 retries) with intentional close detection.

3. No polling fallback — WS was single point of failure. Added
   conditional refetchInterval to useTranscriptGet that polls every 5s
   when transcript status is processing/uploaded/recording.

* feat: type-safe WebSocket events via OpenAPI stub

Define Pydantic models with Literal discriminators for all WS events
(9 transcript-level, 5 user-level). Expose via stub GET endpoints so
pnpm openapi generates TS discriminated unions with exhaustive switch
narrowing on the frontend.

- New server/reflector/ws_events.py with TranscriptWsEvent and UserWsEvent
- Tighten backend emit signatures with TranscriptEventName literal
- Frontend uses generated types, removes Zod schema and manual casts
- Fix pre-existing bugs: waveform mapping, FINAL_LONG_SUMMARY field name
- STATUS value now typed as TranscriptStatus literal end-to-end
- TOPIC handler simplified to query invalidation only (avoids shape mismatch)

* fix: restore TOPIC WS handler with immediate state update

The setTopics call provides instant topic rendering during live
transcription. Query invalidation still follows for full data sync.

* fix: align TOPIC WS event data with GetTranscriptTopic shape

Convert TranscriptTopic → GetTranscriptTopic in pipeline before
emitting, so WS sends segments instead of words. Removes the
`as unknown as Topic` cast on the frontend.

* fix: use NonEmptyString and TranscriptStatus in user WS event models

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
2026-02-12 14:49:57 -05:00
committed by GitHub
parent b468427f1b
commit 972a52d22f
13 changed files with 704 additions and 159 deletions

View File

@@ -12,3 +12,5 @@ AccessTokenInfo = auth_module.AccessTokenInfo
authenticated = auth_module.authenticated
current_user = auth_module.current_user
current_user_optional = auth_module.current_user_optional
parse_ws_bearer_token = auth_module.parse_ws_bearer_token
current_user_ws_optional = auth_module.current_user_ws_optional

View File

@@ -1,6 +1,9 @@
from typing import Annotated, List, Optional
from typing import TYPE_CHECKING, Annotated, List, Optional
from fastapi import Depends, HTTPException
if TYPE_CHECKING:
from fastapi import WebSocket
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
@@ -124,3 +127,20 @@ async def current_user_optional(
jwtauth: JWTAuth = Depends(),
):
return await _authenticate_user(jwt_token, api_key, jwtauth)
def parse_ws_bearer_token(
websocket: "WebSocket",
) -> tuple[Optional[str], Optional[str]]:
raw = websocket.headers.get("sec-websocket-protocol") or ""
parts = [p.strip() for p in raw.split(",") if p.strip()]
if len(parts) >= 2 and parts[0].lower() == "bearer":
return parts[1], "bearer"
return None, None
async def current_user_ws_optional(websocket: "WebSocket") -> Optional[UserInfo]:
token, _ = parse_ws_bearer_token(websocket)
if not token:
return None
return await _authenticate_user(token, None, JWTAuth())

View File

@@ -19,3 +19,11 @@ def current_user():
def current_user_optional():
return None
def parse_ws_bearer_token(websocket):
return None, None
async def current_user_ws_optional(websocket):
return None

View File

@@ -5,7 +5,10 @@ import shutil
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Literal, Sequence
from typing import TYPE_CHECKING, Any, Literal, Sequence
if TYPE_CHECKING:
from reflector.ws_events import TranscriptEventName
import sqlalchemy
from fastapi import HTTPException
@@ -184,7 +187,7 @@ class TranscriptWaveform(BaseModel):
class TranscriptEvent(BaseModel):
event: str
event: str # Typed at call sites via ws_events.TranscriptEventName; str here for DB compat
data: dict
@@ -233,7 +236,9 @@ class Transcript(BaseModel):
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
def add_event(self, event: str, data: BaseModel) -> TranscriptEvent:
def add_event(
self, event: "TranscriptEventName", data: BaseModel
) -> TranscriptEvent:
ev = TranscriptEvent(event=event, data=data.model_dump())
self.events.append(ev)
return ev
@@ -688,7 +693,7 @@ class TranscriptController:
async def append_event(
self,
transcript: Transcript,
event: str,
event: "TranscriptEventName",
data: Any,
) -> TranscriptEvent:
"""

View File

@@ -12,10 +12,11 @@ import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
USER_ROOM_EVENTS: set[TranscriptEventName] = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
@@ -81,8 +82,7 @@ async def set_status_and_broadcast(
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: NonEmptyString,
# TODO proper dictionary event => type
event_name: TranscriptEventName,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:

View File

@@ -62,6 +62,8 @@ from reflector.processors.types import (
from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.views.transcripts import GetTranscriptTopic
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import WebsocketManager, get_ws_manager
from reflector.zulip import (
get_zulip_message,
@@ -89,7 +91,11 @@ def broadcast_to_sockets(func):
if transcript and transcript.user_id:
# Emit only relevant events to the user room to avoid noisy updates.
# Allowed: STATUS, FINAL_TITLE, DURATION. All are prefixed with TRANSCRIPT_
allowed_user_events = {"STATUS", "FINAL_TITLE", "DURATION"}
allowed_user_events: set[TranscriptEventName] = {
"STATUS",
"FINAL_TITLE",
"DURATION",
}
if resp.event in allowed_user_events:
await self.ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
@@ -244,13 +250,14 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
)
if isinstance(data, TitleSummaryWithIdProcessorType):
topic.id = data.id
get_topic = GetTranscriptTopic.from_transcript_topic(topic)
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.upsert_topic(transcript, topic)
return await transcripts_controller.append_event(
transcript=transcript,
event="TOPIC",
data=topic,
data=get_topic,
)
@broadcast_to_sockets

View File

@@ -4,18 +4,22 @@ Transcripts websocket API
"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.ws_events import TranscriptWsEvent
from reflector.ws_manager import get_ws_manager
router = APIRouter()
@router.get("/transcripts/{transcript_id}/events")
@router.get(
"/transcripts/{transcript_id}/events",
response_model=TranscriptWsEvent,
summary="Transcript WebSocket event schema",
description="Stub exposing the discriminated union of all transcript-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.",
)
async def transcript_get_websocket_events(transcript_id: str):
pass
@@ -24,8 +28,9 @@ async def transcript_get_websocket_events(transcript_id: str):
async def transcript_events_websocket(
transcript_id: str,
websocket: WebSocket,
user: Optional[auth.UserInfo] = Depends(auth.current_user_optional),
):
_, negotiated_subprotocol = auth.parse_ws_bearer_token(websocket)
user = await auth.current_user_ws_optional(websocket)
user_id = user["sub"] if user else None
transcript = await transcripts_controller.get_by_id_for_http(
transcript_id, user_id=user_id
@@ -37,7 +42,9 @@ async def transcript_events_websocket(
# use ts:transcript_id as room id
room_id = f"ts:{transcript_id}"
ws_manager = get_ws_manager()
await ws_manager.add_user_to_room(room_id, websocket)
await ws_manager.add_user_to_room(
room_id, websocket, subprotocol=negotiated_subprotocol
)
try:
# on first connection, send all events only to the current user

View File

@@ -4,10 +4,22 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from reflector.auth.auth_jwt import JWTAuth # type: ignore
from reflector.db.users import user_controller
from reflector.ws_events import UserWsEvent
from reflector.ws_manager import get_ws_manager
router = APIRouter()
@router.get(
"/events",
response_model=UserWsEvent,
summary="User WebSocket event schema",
description="Stub exposing the discriminated union of all user-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.",
)
async def user_get_websocket_events():
pass
# Close code for unauthorized WebSocket connections
UNAUTHORISED = 4401

View File

@@ -0,0 +1,188 @@
"""Typed WebSocket event models.
Defines Pydantic models with Literal discriminators for all WS events.
Exposed via stub GET endpoints so ``pnpm openapi`` generates TS discriminated unions.
"""
from typing import Annotated, Literal, Union
from pydantic import BaseModel, Discriminator
from reflector.db.transcripts import (
TranscriptActionItems,
TranscriptDuration,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
TranscriptFinalTitle,
TranscriptStatus,
TranscriptText,
TranscriptWaveform,
)
from reflector.utils.string import NonEmptyString
from reflector.views.transcripts import GetTranscriptTopic
# ---------------------------------------------------------------------------
# Transcript-level event name literal
# ---------------------------------------------------------------------------
TranscriptEventName = Literal[
"TRANSCRIPT",
"TOPIC",
"STATUS",
"FINAL_TITLE",
"FINAL_LONG_SUMMARY",
"FINAL_SHORT_SUMMARY",
"ACTION_ITEMS",
"DURATION",
"WAVEFORM",
]
# ---------------------------------------------------------------------------
# Transcript-level WS event wrappers
# ---------------------------------------------------------------------------
class TranscriptWsTranscript(BaseModel):
event: Literal["TRANSCRIPT"] = "TRANSCRIPT"
data: TranscriptText
class TranscriptWsTopic(BaseModel):
event: Literal["TOPIC"] = "TOPIC"
data: GetTranscriptTopic
class TranscriptWsStatusData(BaseModel):
value: TranscriptStatus
class TranscriptWsStatus(BaseModel):
event: Literal["STATUS"] = "STATUS"
data: TranscriptWsStatusData
class TranscriptWsFinalTitle(BaseModel):
event: Literal["FINAL_TITLE"] = "FINAL_TITLE"
data: TranscriptFinalTitle
class TranscriptWsFinalLongSummary(BaseModel):
event: Literal["FINAL_LONG_SUMMARY"] = "FINAL_LONG_SUMMARY"
data: TranscriptFinalLongSummary
class TranscriptWsFinalShortSummary(BaseModel):
event: Literal["FINAL_SHORT_SUMMARY"] = "FINAL_SHORT_SUMMARY"
data: TranscriptFinalShortSummary
class TranscriptWsActionItems(BaseModel):
event: Literal["ACTION_ITEMS"] = "ACTION_ITEMS"
data: TranscriptActionItems
class TranscriptWsDuration(BaseModel):
event: Literal["DURATION"] = "DURATION"
data: TranscriptDuration
class TranscriptWsWaveform(BaseModel):
event: Literal["WAVEFORM"] = "WAVEFORM"
data: TranscriptWaveform
TranscriptWsEvent = Annotated[
Union[
TranscriptWsTranscript,
TranscriptWsTopic,
TranscriptWsStatus,
TranscriptWsFinalTitle,
TranscriptWsFinalLongSummary,
TranscriptWsFinalShortSummary,
TranscriptWsActionItems,
TranscriptWsDuration,
TranscriptWsWaveform,
],
Discriminator("event"),
]
# ---------------------------------------------------------------------------
# User-level event name literal
# ---------------------------------------------------------------------------
UserEventName = Literal[
"TRANSCRIPT_CREATED",
"TRANSCRIPT_DELETED",
"TRANSCRIPT_STATUS",
"TRANSCRIPT_FINAL_TITLE",
"TRANSCRIPT_DURATION",
]
# ---------------------------------------------------------------------------
# User-level WS event data models
# ---------------------------------------------------------------------------
class UserTranscriptCreatedData(BaseModel):
id: NonEmptyString
class UserTranscriptDeletedData(BaseModel):
id: NonEmptyString
class UserTranscriptStatusData(BaseModel):
id: NonEmptyString
value: TranscriptStatus
class UserTranscriptFinalTitleData(BaseModel):
id: NonEmptyString
title: NonEmptyString
class UserTranscriptDurationData(BaseModel):
id: NonEmptyString
duration: float
# ---------------------------------------------------------------------------
# User-level WS event wrappers
# ---------------------------------------------------------------------------
class UserWsTranscriptCreated(BaseModel):
event: Literal["TRANSCRIPT_CREATED"] = "TRANSCRIPT_CREATED"
data: UserTranscriptCreatedData
class UserWsTranscriptDeleted(BaseModel):
event: Literal["TRANSCRIPT_DELETED"] = "TRANSCRIPT_DELETED"
data: UserTranscriptDeletedData
class UserWsTranscriptStatus(BaseModel):
event: Literal["TRANSCRIPT_STATUS"] = "TRANSCRIPT_STATUS"
data: UserTranscriptStatusData
class UserWsTranscriptFinalTitle(BaseModel):
event: Literal["TRANSCRIPT_FINAL_TITLE"] = "TRANSCRIPT_FINAL_TITLE"
data: UserTranscriptFinalTitleData
class UserWsTranscriptDuration(BaseModel):
event: Literal["TRANSCRIPT_DURATION"] = "TRANSCRIPT_DURATION"
data: UserTranscriptDurationData
UserWsEvent = Annotated[
Union[
UserWsTranscriptCreated,
UserWsTranscriptDeleted,
UserWsTranscriptStatus,
UserWsTranscriptFinalTitle,
UserWsTranscriptDuration,
],
Discriminator("event"),
]