diff --git a/server/reflector/auth/__init__.py b/server/reflector/auth/__init__.py index 6e1e010e..d9673d1e 100644 --- a/server/reflector/auth/__init__.py +++ b/server/reflector/auth/__init__.py @@ -12,3 +12,5 @@ AccessTokenInfo = auth_module.AccessTokenInfo authenticated = auth_module.authenticated current_user = auth_module.current_user current_user_optional = auth_module.current_user_optional +parse_ws_bearer_token = auth_module.parse_ws_bearer_token +current_user_ws_optional = auth_module.current_user_ws_optional diff --git a/server/reflector/auth/auth_jwt.py b/server/reflector/auth/auth_jwt.py index 625a371b..0a912d79 100644 --- a/server/reflector/auth/auth_jwt.py +++ b/server/reflector/auth/auth_jwt.py @@ -1,6 +1,9 @@ -from typing import Annotated, List, Optional +from typing import TYPE_CHECKING, Annotated, List, Optional from fastapi import Depends, HTTPException + +if TYPE_CHECKING: + from fastapi import WebSocket from fastapi.security import APIKeyHeader, OAuth2PasswordBearer from jose import JWTError, jwt from pydantic import BaseModel @@ -124,3 +127,20 @@ async def current_user_optional( jwtauth: JWTAuth = Depends(), ): return await _authenticate_user(jwt_token, api_key, jwtauth) + + +def parse_ws_bearer_token( + websocket: "WebSocket", +) -> tuple[Optional[str], Optional[str]]: + raw = websocket.headers.get("sec-websocket-protocol") or "" + parts = [p.strip() for p in raw.split(",") if p.strip()] + if len(parts) >= 2 and parts[0].lower() == "bearer": + return parts[1], "bearer" + return None, None + + +async def current_user_ws_optional(websocket: "WebSocket") -> Optional[UserInfo]: + token, _ = parse_ws_bearer_token(websocket) + if not token: + return None + return await _authenticate_user(token, None, JWTAuth()) diff --git a/server/reflector/auth/auth_none.py b/server/reflector/auth/auth_none.py index 4806a560..33c3b81b 100644 --- a/server/reflector/auth/auth_none.py +++ b/server/reflector/auth/auth_none.py @@ -19,3 +19,11 @@ def current_user(): def current_user_optional(): return None + + +def parse_ws_bearer_token(websocket): + return None, None + + +async def current_user_ws_optional(websocket): + return None diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index f3f9a410..0e4520b6 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -5,7 +5,10 @@ import shutil from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Any, Literal, Sequence +from typing import TYPE_CHECKING, Any, Literal, Sequence + +if TYPE_CHECKING: + from reflector.ws_events import TranscriptEventName import sqlalchemy from fastapi import HTTPException @@ -184,7 +187,7 @@ class TranscriptWaveform(BaseModel): class TranscriptEvent(BaseModel): - event: str + event: str # Typed at call sites via ws_events.TranscriptEventName; str here for DB compat data: dict @@ -233,7 +236,9 @@ class Transcript(BaseModel): dt = dt.replace(tzinfo=timezone.utc) return dt.isoformat() - def add_event(self, event: str, data: BaseModel) -> TranscriptEvent: + def add_event( + self, event: "TranscriptEventName", data: BaseModel + ) -> TranscriptEvent: ev = TranscriptEvent(event=event, data=data.model_dump()) self.events.append(ev) return ev @@ -688,7 +693,7 @@ class TranscriptController: async def append_event( self, transcript: Transcript, - event: str, + event: "TranscriptEventName", data: Any, ) -> TranscriptEvent: """ diff --git a/server/reflector/hatchet/broadcast.py b/server/reflector/hatchet/broadcast.py index 6b42ddbd..d735ba15 100644 --- a/server/reflector/hatchet/broadcast.py +++ b/server/reflector/hatchet/broadcast.py @@ -12,10 +12,11 @@ import structlog from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller from reflector.utils.string import NonEmptyString +from reflector.ws_events import TranscriptEventName from reflector.ws_manager import get_ws_manager # Events that should also be sent to user room (matches Celery behavior) -USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"} +USER_ROOM_EVENTS: set[TranscriptEventName] = {"STATUS", "FINAL_TITLE", "DURATION"} async def broadcast_event( @@ -81,8 +82,7 @@ async def set_status_and_broadcast( async def append_event_and_broadcast( transcript_id: NonEmptyString, transcript: Transcript, - event_name: NonEmptyString, - # TODO proper dictionary event => type + event_name: TranscriptEventName, data: Any, logger: structlog.BoundLogger, ) -> TranscriptEvent: diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index fbe83737..30fe14c9 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -62,6 +62,8 @@ from reflector.processors.types import ( from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.settings import settings from reflector.storage import get_transcripts_storage +from reflector.views.transcripts import GetTranscriptTopic +from reflector.ws_events import TranscriptEventName from reflector.ws_manager import WebsocketManager, get_ws_manager from reflector.zulip import ( get_zulip_message, @@ -89,7 +91,11 @@ def broadcast_to_sockets(func): if transcript and transcript.user_id: # Emit only relevant events to the user room to avoid noisy updates. # Allowed: STATUS, FINAL_TITLE, DURATION. All are prefixed with TRANSCRIPT_ - allowed_user_events = {"STATUS", "FINAL_TITLE", "DURATION"} + allowed_user_events: set[TranscriptEventName] = { + "STATUS", + "FINAL_TITLE", + "DURATION", + } if resp.event in allowed_user_events: await self.ws_manager.send_json( room_id=f"user:{transcript.user_id}", @@ -244,13 +250,14 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage] ) if isinstance(data, TitleSummaryWithIdProcessorType): topic.id = data.id + get_topic = GetTranscriptTopic.from_transcript_topic(topic) async with self.transaction(): transcript = await self.get_transcript() await transcripts_controller.upsert_topic(transcript, topic) return await transcripts_controller.append_event( transcript=transcript, event="TOPIC", - data=topic, + data=get_topic, ) @broadcast_to_sockets diff --git a/server/reflector/views/transcripts_websocket.py b/server/reflector/views/transcripts_websocket.py index ccb7d7ff..53d7ba47 100644 --- a/server/reflector/views/transcripts_websocket.py +++ b/server/reflector/views/transcripts_websocket.py @@ -4,18 +4,22 @@ Transcripts websocket API """ -from typing import Optional - -from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect import reflector.auth as auth from reflector.db.transcripts import transcripts_controller +from reflector.ws_events import TranscriptWsEvent from reflector.ws_manager import get_ws_manager router = APIRouter() -@router.get("/transcripts/{transcript_id}/events") +@router.get( + "/transcripts/{transcript_id}/events", + response_model=TranscriptWsEvent, + summary="Transcript WebSocket event schema", + description="Stub exposing the discriminated union of all transcript-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.", +) async def transcript_get_websocket_events(transcript_id: str): pass @@ -24,8 +28,9 @@ async def transcript_get_websocket_events(transcript_id: str): async def transcript_events_websocket( transcript_id: str, websocket: WebSocket, - user: Optional[auth.UserInfo] = Depends(auth.current_user_optional), ): + _, negotiated_subprotocol = auth.parse_ws_bearer_token(websocket) + user = await auth.current_user_ws_optional(websocket) user_id = user["sub"] if user else None transcript = await transcripts_controller.get_by_id_for_http( transcript_id, user_id=user_id @@ -37,7 +42,9 @@ async def transcript_events_websocket( # use ts:transcript_id as room id room_id = f"ts:{transcript_id}" ws_manager = get_ws_manager() - await ws_manager.add_user_to_room(room_id, websocket) + await ws_manager.add_user_to_room( + room_id, websocket, subprotocol=negotiated_subprotocol + ) try: # on first connection, send all events only to the current user diff --git a/server/reflector/views/user_websocket.py b/server/reflector/views/user_websocket.py index 4d27b14e..d9a147ee 100644 --- a/server/reflector/views/user_websocket.py +++ b/server/reflector/views/user_websocket.py @@ -4,10 +4,22 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect from reflector.auth.auth_jwt import JWTAuth # type: ignore from reflector.db.users import user_controller +from reflector.ws_events import UserWsEvent from reflector.ws_manager import get_ws_manager router = APIRouter() + +@router.get( + "/events", + response_model=UserWsEvent, + summary="User WebSocket event schema", + description="Stub exposing the discriminated union of all user-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.", +) +async def user_get_websocket_events(): + pass + + # Close code for unauthorized WebSocket connections UNAUTHORISED = 4401 diff --git a/server/reflector/ws_events.py b/server/reflector/ws_events.py new file mode 100644 index 00000000..3ac89b44 --- /dev/null +++ b/server/reflector/ws_events.py @@ -0,0 +1,188 @@ +"""Typed WebSocket event models. + +Defines Pydantic models with Literal discriminators for all WS events. +Exposed via stub GET endpoints so ``pnpm openapi`` generates TS discriminated unions. +""" + +from typing import Annotated, Literal, Union + +from pydantic import BaseModel, Discriminator + +from reflector.db.transcripts import ( + TranscriptActionItems, + TranscriptDuration, + TranscriptFinalLongSummary, + TranscriptFinalShortSummary, + TranscriptFinalTitle, + TranscriptStatus, + TranscriptText, + TranscriptWaveform, +) +from reflector.utils.string import NonEmptyString +from reflector.views.transcripts import GetTranscriptTopic + +# --------------------------------------------------------------------------- +# Transcript-level event name literal +# --------------------------------------------------------------------------- + +TranscriptEventName = Literal[ + "TRANSCRIPT", + "TOPIC", + "STATUS", + "FINAL_TITLE", + "FINAL_LONG_SUMMARY", + "FINAL_SHORT_SUMMARY", + "ACTION_ITEMS", + "DURATION", + "WAVEFORM", +] + +# --------------------------------------------------------------------------- +# Transcript-level WS event wrappers +# --------------------------------------------------------------------------- + + +class TranscriptWsTranscript(BaseModel): + event: Literal["TRANSCRIPT"] = "TRANSCRIPT" + data: TranscriptText + + +class TranscriptWsTopic(BaseModel): + event: Literal["TOPIC"] = "TOPIC" + data: GetTranscriptTopic + + +class TranscriptWsStatusData(BaseModel): + value: TranscriptStatus + + +class TranscriptWsStatus(BaseModel): + event: Literal["STATUS"] = "STATUS" + data: TranscriptWsStatusData + + +class TranscriptWsFinalTitle(BaseModel): + event: Literal["FINAL_TITLE"] = "FINAL_TITLE" + data: TranscriptFinalTitle + + +class TranscriptWsFinalLongSummary(BaseModel): + event: Literal["FINAL_LONG_SUMMARY"] = "FINAL_LONG_SUMMARY" + data: TranscriptFinalLongSummary + + +class TranscriptWsFinalShortSummary(BaseModel): + event: Literal["FINAL_SHORT_SUMMARY"] = "FINAL_SHORT_SUMMARY" + data: TranscriptFinalShortSummary + + +class TranscriptWsActionItems(BaseModel): + event: Literal["ACTION_ITEMS"] = "ACTION_ITEMS" + data: TranscriptActionItems + + +class TranscriptWsDuration(BaseModel): + event: Literal["DURATION"] = "DURATION" + data: TranscriptDuration + + +class TranscriptWsWaveform(BaseModel): + event: Literal["WAVEFORM"] = "WAVEFORM" + data: TranscriptWaveform + + +TranscriptWsEvent = Annotated[ + Union[ + TranscriptWsTranscript, + TranscriptWsTopic, + TranscriptWsStatus, + TranscriptWsFinalTitle, + TranscriptWsFinalLongSummary, + TranscriptWsFinalShortSummary, + TranscriptWsActionItems, + TranscriptWsDuration, + TranscriptWsWaveform, + ], + Discriminator("event"), +] + +# --------------------------------------------------------------------------- +# User-level event name literal +# --------------------------------------------------------------------------- + +UserEventName = Literal[ + "TRANSCRIPT_CREATED", + "TRANSCRIPT_DELETED", + "TRANSCRIPT_STATUS", + "TRANSCRIPT_FINAL_TITLE", + "TRANSCRIPT_DURATION", +] + +# --------------------------------------------------------------------------- +# User-level WS event data models +# --------------------------------------------------------------------------- + + +class UserTranscriptCreatedData(BaseModel): + id: NonEmptyString + + +class UserTranscriptDeletedData(BaseModel): + id: NonEmptyString + + +class UserTranscriptStatusData(BaseModel): + id: NonEmptyString + value: TranscriptStatus + + +class UserTranscriptFinalTitleData(BaseModel): + id: NonEmptyString + title: NonEmptyString + + +class UserTranscriptDurationData(BaseModel): + id: NonEmptyString + duration: float + + +# --------------------------------------------------------------------------- +# User-level WS event wrappers +# --------------------------------------------------------------------------- + + +class UserWsTranscriptCreated(BaseModel): + event: Literal["TRANSCRIPT_CREATED"] = "TRANSCRIPT_CREATED" + data: UserTranscriptCreatedData + + +class UserWsTranscriptDeleted(BaseModel): + event: Literal["TRANSCRIPT_DELETED"] = "TRANSCRIPT_DELETED" + data: UserTranscriptDeletedData + + +class UserWsTranscriptStatus(BaseModel): + event: Literal["TRANSCRIPT_STATUS"] = "TRANSCRIPT_STATUS" + data: UserTranscriptStatusData + + +class UserWsTranscriptFinalTitle(BaseModel): + event: Literal["TRANSCRIPT_FINAL_TITLE"] = "TRANSCRIPT_FINAL_TITLE" + data: UserTranscriptFinalTitleData + + +class UserWsTranscriptDuration(BaseModel): + event: Literal["TRANSCRIPT_DURATION"] = "TRANSCRIPT_DURATION" + data: UserTranscriptDurationData + + +UserWsEvent = Annotated[ + Union[ + UserWsTranscriptCreated, + UserWsTranscriptDeleted, + UserWsTranscriptStatus, + UserWsTranscriptFinalTitle, + UserWsTranscriptDuration, + ], + Discriminator("event"), +] diff --git a/www/app/(app)/transcripts/useWebSockets.ts b/www/app/(app)/transcripts/useWebSockets.ts index 7c27ed59..23122a47 100644 --- a/www/app/(app)/transcripts/useWebSockets.ts +++ b/www/app/(app)/transcripts/useWebSockets.ts @@ -1,18 +1,22 @@ import { useEffect, useState } from "react"; import { Topic, FinalSummary, Status } from "./webSocketTypes"; import { useError } from "../../(errors)/errorContext"; -import type { components } from "../../reflector-api"; +import type { components, operations } from "../../reflector-api"; type AudioWaveform = components["schemas"]["AudioWaveform"]; type GetTranscriptSegmentTopic = components["schemas"]["GetTranscriptSegmentTopic"]; import { useQueryClient } from "@tanstack/react-query"; -import { $api, WEBSOCKET_URL } from "../../lib/apiClient"; +import { WEBSOCKET_URL } from "../../lib/apiClient"; import { invalidateTranscript, invalidateTranscriptTopics, invalidateTranscriptWaveform, } from "../../lib/apiHooks"; -import { NonEmptyString } from "../../lib/utils"; +import { useAuth } from "../../lib/AuthProvider"; +import { parseNonEmptyString } from "../../lib/utils"; + +type TranscriptWsEvent = + operations["v1_transcript_get_websocket_events"]["responses"][200]["content"]["application/json"]; export type UseWebSockets = { transcriptTextLive: string; @@ -27,6 +31,7 @@ export type UseWebSockets = { }; export const useWebSockets = (transcriptId: string | null): UseWebSockets => { + const auth = useAuth(); const [transcriptTextLive, setTranscriptTextLive] = useState(""); const [translateText, setTranslateText] = useState(""); const [title, setTitle] = useState(""); @@ -331,156 +336,168 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { }; if (!transcriptId) return; + const tsId = parseNonEmptyString(transcriptId); + const MAX_RETRIES = 10; const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/events`; - let ws = new WebSocket(url); + let ws: WebSocket | null = null; + let retryCount = 0; + let retryTimeout: ReturnType | null = null; + let intentionalClose = false; - ws.onopen = () => { - console.debug("WebSocket connection opened"); - }; + const connect = () => { + const subprotocols = auth.accessToken + ? ["bearer", auth.accessToken] + : undefined; + ws = new WebSocket(url, subprotocols); - ws.onmessage = (event) => { - const message = JSON.parse(event.data); + ws.onopen = () => { + console.debug("WebSocket connection opened"); + retryCount = 0; + }; - try { - switch (message.event) { - case "TRANSCRIPT": - const newText = (message.data.text ?? "").trim(); - const newTranslation = (message.data.translation ?? "").trim(); + ws.onmessage = (event) => { + const message: TranscriptWsEvent = JSON.parse(event.data); - if (!newText) break; + try { + switch (message.event) { + case "TRANSCRIPT": { + const newText = (message.data.text ?? "").trim(); + const newTranslation = (message.data.translation ?? "").trim(); - console.debug("TRANSCRIPT event:", newText); - setTextQueue((prevQueue) => [...prevQueue, newText]); - setTranslationQueue((prevQueue) => [...prevQueue, newTranslation]); + if (!newText) break; - setAccumulatedText((prevText) => prevText + " " + newText); - break; + console.debug("TRANSCRIPT event:", newText); + setTextQueue((prevQueue) => [...prevQueue, newText]); + setTranslationQueue((prevQueue) => [ + ...prevQueue, + newTranslation, + ]); - case "TOPIC": - setTopics((prevTopics) => { - const topic = message.data as Topic; - const index = prevTopics.findIndex( - (prevTopic) => prevTopic.id === topic.id, - ); - if (index >= 0) { - prevTopics[index] = topic; - return prevTopics; - } - setAccumulatedText((prevText) => - prevText.slice(topic.transcript.length), - ); - - return [...prevTopics, topic]; - }); - console.debug("TOPIC event:", message.data); - // Invalidate topics query to sync with WebSocket data - invalidateTranscriptTopics( - queryClient, - transcriptId as NonEmptyString, - ); - break; - - case "FINAL_SHORT_SUMMARY": - console.debug("FINAL_SHORT_SUMMARY event:", message.data); - break; - - case "FINAL_LONG_SUMMARY": - if (message.data) { - setFinalSummary(message.data); - // Invalidate transcript query to sync summary - invalidateTranscript(queryClient, transcriptId as NonEmptyString); + setAccumulatedText((prevText) => prevText + " " + newText); + break; } - break; - case "FINAL_TITLE": - console.debug("FINAL_TITLE event:", message.data); - if (message.data) { + case "TOPIC": + setTopics((prevTopics) => { + const topic = message.data; + const index = prevTopics.findIndex( + (prevTopic) => prevTopic.id === topic.id, + ); + if (index >= 0) { + prevTopics[index] = topic; + return prevTopics; + } + setAccumulatedText((prevText) => + prevText.slice(topic.transcript?.length ?? 0), + ); + return [...prevTopics, topic]; + }); + console.debug("TOPIC event:", message.data); + invalidateTranscriptTopics(queryClient, tsId); + break; + + case "FINAL_SHORT_SUMMARY": + console.debug("FINAL_SHORT_SUMMARY event:", message.data); + break; + + case "FINAL_LONG_SUMMARY": + setFinalSummary({ summary: message.data.long_summary }); + invalidateTranscript(queryClient, tsId); + break; + + case "FINAL_TITLE": + console.debug("FINAL_TITLE event:", message.data); setTitle(message.data.title); - // Invalidate transcript query to sync title - invalidateTranscript(queryClient, transcriptId as NonEmptyString); - } - break; + invalidateTranscript(queryClient, tsId); + break; - case "WAVEFORM": - console.debug( - "WAVEFORM event length:", - message.data.waveform.length, - ); - if (message.data) { - setWaveForm(message.data.waveform); - invalidateTranscriptWaveform( - queryClient, - transcriptId as NonEmptyString, + case "WAVEFORM": + console.debug( + "WAVEFORM event length:", + message.data.waveform.length, ); - } - break; - case "DURATION": - console.debug("DURATION event:", message.data); - if (message.data) { + setWaveForm({ data: message.data.waveform }); + invalidateTranscriptWaveform(queryClient, tsId); + break; + + case "DURATION": + console.debug("DURATION event:", message.data); setDuration(message.data.duration); - } - break; + break; - case "STATUS": - console.log("STATUS event:", message.data); - if (message.data.value === "error") { - setError( - Error("Websocket error status"), - "There was an error processing this meeting.", + case "STATUS": + console.log("STATUS event:", message.data); + if (message.data.value === "error") { + setError( + Error("Websocket error status"), + "There was an error processing this meeting.", + ); + } + setStatus(message.data); + invalidateTranscript(queryClient, tsId); + if (message.data.value === "ended") { + intentionalClose = true; + ws?.close(); + } + break; + + case "ACTION_ITEMS": + console.debug("ACTION_ITEMS event:", message.data); + invalidateTranscript(queryClient, tsId); + break; + + default: { + const _exhaustive: never = message; + console.warn( + `Received unknown WebSocket event: ${(_exhaustive as TranscriptWsEvent).event}`, ); } - setStatus(message.data); - invalidateTranscript(queryClient, transcriptId as NonEmptyString); - if (message.data.value === "ended") { - ws.close(); - } - break; - - default: - setError( - new Error(`Received unknown WebSocket event: ${message.event}`), - ); + } + } catch (error) { + setError(error); } - } catch (error) { - setError(error); - } - }; + }; - ws.onerror = (error) => { - console.error("WebSocket error:", error); - setError(new Error("A WebSocket error occurred.")); - }; + ws.onerror = (error) => { + console.error("WebSocket error:", error); + }; - ws.onclose = (event) => { - console.debug("WebSocket connection closed"); - switch (event.code) { - case 1000: // Normal Closure: - break; - case 1005: // Closure by client FF - break; - case 1001: // Navigate away - break; - case 1006: // Closed by client Chrome - console.warn( - "WebSocket closed by client, likely duplicated connection in react dev mode", + ws.onclose = (event) => { + console.debug("WebSocket connection closed, code:", event.code); + if (intentionalClose) return; + + const normalCodes = [1000, 1001, 1005]; + if (normalCodes.includes(event.code)) return; + + if (retryCount < MAX_RETRIES) { + const delay = Math.min(1000 * Math.pow(2, retryCount), 30000); + console.log( + `WebSocket reconnecting in ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRIES})`, ); - break; - default: + if (retryCount === 0) { + setError( + new Error("WebSocket connection lost"), + "Connection lost. Reconnecting...", + ); + } + retryCount++; + retryTimeout = setTimeout(connect, delay); + } else { setError( new Error(`WebSocket closed unexpectedly with code: ${event.code}`), "Disconnected from the server. Please refresh the page.", ); - console.log( - "Socket is closed. Reconnect will be attempted in 1 second.", - event.reason, - ); - // todo handle reconnect with socket.io - } + } + }; }; + connect(); + return () => { - ws.close(); + intentionalClose = true; + if (retryTimeout) clearTimeout(retryTimeout); + ws?.close(); }; }, [transcriptId]); diff --git a/www/app/lib/UserEventsProvider.tsx b/www/app/lib/UserEventsProvider.tsx index 89ec5a11..454429ce 100644 --- a/www/app/lib/UserEventsProvider.tsx +++ b/www/app/lib/UserEventsProvider.tsx @@ -4,14 +4,12 @@ import React, { useEffect, useRef } from "react"; import { useQueryClient } from "@tanstack/react-query"; import { WEBSOCKET_URL } from "./apiClient"; import { useAuth } from "./AuthProvider"; -import { z } from "zod"; -import { invalidateTranscriptLists, TRANSCRIPT_SEARCH_URL } from "./apiHooks"; +import { invalidateTranscript, invalidateTranscriptLists } from "./apiHooks"; +import { parseNonEmptyString } from "./utils"; +import type { operations } from "../reflector-api"; -const UserEvent = z.object({ - event: z.string(), -}); - -type UserEvent = z.TypeOf; +type UserWsEvent = + operations["v1_user_get_websocket_events"]["responses"][200]["content"]["application/json"]; class UserEventsStore { private socket: WebSocket | null = null; @@ -133,23 +131,26 @@ export function UserEventsProvider({ if (!detachRef.current) { const onMessage = (event: MessageEvent) => { try { - const msg = UserEvent.parse(JSON.parse(event.data)); - const eventName = msg.event; + const msg: UserWsEvent = JSON.parse(event.data); - const invalidateList = () => invalidateTranscriptLists(queryClient); - - switch (eventName) { + switch (msg.event) { case "TRANSCRIPT_CREATED": case "TRANSCRIPT_DELETED": case "TRANSCRIPT_STATUS": case "TRANSCRIPT_FINAL_TITLE": case "TRANSCRIPT_DURATION": - invalidateList().then(() => {}); - break; - - default: - // Ignore other content events for list updates + invalidateTranscriptLists(queryClient).then(() => {}); + invalidateTranscript( + queryClient, + parseNonEmptyString(msg.data.id), + ).then(() => {}); break; + default: { + const _exhaustive: never = msg; + console.warn( + `Unknown user event: ${(_exhaustive as UserWsEvent).event}`, + ); + } } } catch (err) { console.warn("Invalid user event message", event.data); diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index 788dfac6..96c2b053 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -7,6 +7,7 @@ import type { components } from "../reflector-api"; import { useAuth } from "./AuthProvider"; import { MeetingId } from "./types"; import { NonEmptyString } from "./utils"; +import type { TranscriptStatus } from "./transcript"; /* * XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other @@ -104,6 +105,12 @@ export function useTranscriptProcess() { }); } +const ACTIVE_TRANSCRIPT_STATUSES = new Set([ + "processing", + "uploaded", + "recording", +]); + export function useTranscriptGet(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", @@ -117,6 +124,10 @@ export function useTranscriptGet(transcriptId: NonEmptyString | null) { }, { enabled: !!transcriptId, + refetchInterval: (query) => { + const status = query.state.data?.status; + return status && ACTIVE_TRANSCRIPT_STATUSES.has(status) ? 5000 : false; + }, }, ); } diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index 12a7085c..af5ec7fa 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -568,7 +568,10 @@ export interface paths { path?: never; cookie?: never; }; - /** Transcript Get Websocket Events */ + /** + * Transcript WebSocket event schema + * @description Stub exposing the discriminated union of all transcript-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path. + */ get: operations["v1_transcript_get_websocket_events"]; put?: never; post?: never; @@ -664,6 +667,26 @@ export interface paths { patch?: never; trace?: never; }; + "/v1/events": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * User WebSocket event schema + * @description Stub exposing the discriminated union of all user-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path. + */ + get: operations["v1_user_get_websocket_events"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/v1/zulip/streams": { parameters: { query?: never; @@ -1877,6 +1900,33 @@ export interface components { /** Name */ name: string; }; + /** TranscriptActionItems */ + TranscriptActionItems: { + /** Action Items */ + action_items: { + [key: string]: unknown; + }; + }; + /** TranscriptDuration */ + TranscriptDuration: { + /** Duration */ + duration: number; + }; + /** TranscriptFinalLongSummary */ + TranscriptFinalLongSummary: { + /** Long Summary */ + long_summary: string; + }; + /** TranscriptFinalShortSummary */ + TranscriptFinalShortSummary: { + /** Short Summary */ + short_summary: string; + }; + /** TranscriptFinalTitle */ + TranscriptFinalTitle: { + /** Title */ + title: string; + }; /** TranscriptParticipant */ TranscriptParticipant: { /** Id */ @@ -1917,6 +1967,113 @@ export interface components { /** End */ end: number; }; + /** TranscriptText */ + TranscriptText: { + /** Text */ + text: string; + /** Translation */ + translation: string | null; + }; + /** TranscriptWaveform */ + TranscriptWaveform: { + /** Waveform */ + waveform: number[]; + }; + /** TranscriptWsActionItems */ + TranscriptWsActionItems: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "ACTION_ITEMS"; + data: components["schemas"]["TranscriptActionItems"]; + }; + /** TranscriptWsDuration */ + TranscriptWsDuration: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "DURATION"; + data: components["schemas"]["TranscriptDuration"]; + }; + /** TranscriptWsFinalLongSummary */ + TranscriptWsFinalLongSummary: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "FINAL_LONG_SUMMARY"; + data: components["schemas"]["TranscriptFinalLongSummary"]; + }; + /** TranscriptWsFinalShortSummary */ + TranscriptWsFinalShortSummary: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "FINAL_SHORT_SUMMARY"; + data: components["schemas"]["TranscriptFinalShortSummary"]; + }; + /** TranscriptWsFinalTitle */ + TranscriptWsFinalTitle: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "FINAL_TITLE"; + data: components["schemas"]["TranscriptFinalTitle"]; + }; + /** TranscriptWsStatus */ + TranscriptWsStatus: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "STATUS"; + data: components["schemas"]["TranscriptWsStatusData"]; + }; + /** TranscriptWsStatusData */ + TranscriptWsStatusData: { + /** + * Value + * @enum {string} + */ + value: + | "idle" + | "uploaded" + | "recording" + | "processing" + | "error" + | "ended"; + }; + /** TranscriptWsTopic */ + TranscriptWsTopic: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TOPIC"; + data: components["schemas"]["GetTranscriptTopic"]; + }; + /** TranscriptWsTranscript */ + TranscriptWsTranscript: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT"; + data: components["schemas"]["TranscriptText"]; + }; + /** TranscriptWsWaveform */ + TranscriptWsWaveform: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "WAVEFORM"; + data: components["schemas"]["TranscriptWaveform"]; + }; /** UpdateParticipant */ UpdateParticipant: { /** Speaker */ @@ -1987,6 +2144,82 @@ export interface components { /** Email */ email: string | null; }; + /** UserTranscriptCreatedData */ + UserTranscriptCreatedData: { + /** Id */ + id: string; + }; + /** UserTranscriptDeletedData */ + UserTranscriptDeletedData: { + /** Id */ + id: string; + }; + /** UserTranscriptDurationData */ + UserTranscriptDurationData: { + /** Id */ + id: string; + /** Duration */ + duration: number; + }; + /** UserTranscriptFinalTitleData */ + UserTranscriptFinalTitleData: { + /** Id */ + id: string; + /** Title */ + title: string; + }; + /** UserTranscriptStatusData */ + UserTranscriptStatusData: { + /** Id */ + id: string; + /** Value */ + value: string; + }; + /** UserWsTranscriptCreated */ + UserWsTranscriptCreated: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT_CREATED"; + data: components["schemas"]["UserTranscriptCreatedData"]; + }; + /** UserWsTranscriptDeleted */ + UserWsTranscriptDeleted: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT_DELETED"; + data: components["schemas"]["UserTranscriptDeletedData"]; + }; + /** UserWsTranscriptDuration */ + UserWsTranscriptDuration: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT_DURATION"; + data: components["schemas"]["UserTranscriptDurationData"]; + }; + /** UserWsTranscriptFinalTitle */ + UserWsTranscriptFinalTitle: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT_FINAL_TITLE"; + data: components["schemas"]["UserTranscriptFinalTitleData"]; + }; + /** UserWsTranscriptStatus */ + UserWsTranscriptStatus: { + /** + * @description discriminator enum property added by openapi-typescript + * @enum {string} + */ + event: "TRANSCRIPT_STATUS"; + data: components["schemas"]["UserTranscriptStatusData"]; + }; /** ValidationError */ ValidationError: { /** Location */ @@ -3423,7 +3656,16 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": unknown; + "application/json": + | components["schemas"]["TranscriptWsTranscript"] + | components["schemas"]["TranscriptWsTopic"] + | components["schemas"]["TranscriptWsStatus"] + | components["schemas"]["TranscriptWsFinalTitle"] + | components["schemas"]["TranscriptWsFinalLongSummary"] + | components["schemas"]["TranscriptWsFinalShortSummary"] + | components["schemas"]["TranscriptWsActionItems"] + | components["schemas"]["TranscriptWsDuration"] + | components["schemas"]["TranscriptWsWaveform"]; }; }; /** @description Validation Error */ @@ -3607,6 +3849,31 @@ export interface operations { }; }; }; + v1_user_get_websocket_events: { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": + | components["schemas"]["UserWsTranscriptCreated"] + | components["schemas"]["UserWsTranscriptDeleted"] + | components["schemas"]["UserWsTranscriptStatus"] + | components["schemas"]["UserWsTranscriptFinalTitle"] + | components["schemas"]["UserWsTranscriptDuration"]; + }; + }; + }; + }; v1_zulip_get_streams: { parameters: { query?: never;