Compare commits

..

9 Commits

Author SHA1 Message Date
Igor Loskutov
528154ae96 fix: update standalone docs to match self-contained compose usage 2026-02-13 09:56:09 -05:00
Igor Loskutov
c42b8439dd fix: make standalone compose self-contained (drop !reset dependency)
docker-compose.standalone.yml used !reset YAML tags to clear
network_mode and volumes from the base compose. !reset requires
Compose v2.24+ and breaks on Colima + brew-installed compose.

Rewrite as a fully self-contained file with all services defined
directly (server, worker, beat, redis, postgres, web, garage, cpu,
gpu-nvidia, ollama, ollama-cpu). No longer overlays docker-compose.yml.

Update setup-standalone.sh compose_cmd() to use only the standalone
file instead of both files.
2026-02-13 09:52:07 -05:00
Igor Loskutov
14a8b5808e fix: check for Docker BuildKit (buildx) before building images
Dockerfiles use RUN --mount for caching which requires BuildKit.
Colima and bare Docker Engine installs don't bundle docker-buildx.
2026-02-12 18:57:32 -05:00
Igor Loskutov
e57c6186f9 fix: check compose version output, not just exit code
Without the plugin, `docker compose version` can still exit 0
by falling through to `docker version`. Grep for "Compose" in
the output to reliably detect the plugin.
2026-02-12 18:32:16 -05:00
Igor Loskutov
36a8daee61 fix: check for Docker Compose plugin before running standalone setup
Without the compose plugin, `docker compose -f ...` produces a
misleading "unknown shorthand flag: 'f'" error instead of telling
the user compose is missing.
2026-02-12 18:24:24 -05:00
Igor Loskutov
3d13e5d42f fix: auto-rebuild standalone images and blank Hatchet vars
- Add rebuild_images() to setup-standalone.sh that runs `compose build`
  before `up -d`, with image hash comparison to log whether each service
  was rebuilt or unchanged
- Blank HATCHET_CLIENT_SERVER_URL/HOST_PORT in standalone compose since
  Hatchet is not started (localhost URLs break after network_mode:host removal)
- Fix grep -qx -> -qxF for ollama model matching (dots in model names)
2026-02-12 18:21:09 -05:00
Igor Loskutov
695f3c4928 fix: standalone server networking and setup diagnostics
Replace network_mode:host with standard compose networking for macOS
Docker Desktop compatibility. Add dump_diagnostics() for automatic
failure debugging and docker-exec-based server health checks.
2026-02-12 17:46:00 -05:00
5bca92510a feat: standalone frontend uses production build instead of dev server (#862)
* feat: standalone frontend uses production build instead of dev server

Override web service in docker-compose.standalone.yml to build from
www/Dockerfile (multi-stage: deps → build → standalone runner) instead
of running pnpm dev with bind-mounted source.

* chore: move standalone compose TODO to Huly issue RFFR-46

* fix: add required env vars for standalone production frontend

The standalone web service (node server.js) has no bind-mounted .env
files and the base env_file (.env.local) has API_URL commented out.
Next.js standalone server can't auto-load .env files without them on
disk, so all required vars must be explicit in the compose override.

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-12 15:36:52 -05:00
972a52d22f fix: live flow real-time updates during processing (#861)
* fix: live flow real-time updates during processing

Three gaps caused transcript pages to require manual refresh after
live recording/processing:

1. UserEventsProvider only invalidated list queries on TRANSCRIPT_STATUS,
   not individual transcript queries. Now parses data.id from the event
   and calls invalidateTranscript for the specific transcript.

2. useWebSockets had no reconnection logic — a dropped WS silently
   killed all real-time updates. Added exponential backoff reconnection
   (1s-30s, max 10 retries) with intentional close detection.

3. No polling fallback — WS was single point of failure. Added
   conditional refetchInterval to useTranscriptGet that polls every 5s
   when transcript status is processing/uploaded/recording.

* feat: type-safe WebSocket events via OpenAPI stub

Define Pydantic models with Literal discriminators for all WS events
(9 transcript-level, 5 user-level). Expose via stub GET endpoints so
pnpm openapi generates TS discriminated unions with exhaustive switch
narrowing on the frontend.

- New server/reflector/ws_events.py with TranscriptWsEvent and UserWsEvent
- Tighten backend emit signatures with TranscriptEventName literal
- Frontend uses generated types, removes Zod schema and manual casts
- Fix pre-existing bugs: waveform mapping, FINAL_LONG_SUMMARY field name
- STATUS value now typed as TranscriptStatus literal end-to-end
- TOPIC handler simplified to query invalidation only (avoids shape mismatch)

* fix: restore TOPIC WS handler with immediate state update

The setTopics call provides instant topic rendering during live
transcription. Query invalidation still follows for full data sync.

* fix: align TOPIC WS event data with GetTranscriptTopic shape

Convert TranscriptTopic → GetTranscriptTopic in pipeline before
emitting, so WS sends segments instead of words. Removes the
`as unknown as Topic` cast on the frontend.

* fix: use NonEmptyString and TranscriptStatus in user WS event models

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-12 14:49:57 -05:00
20 changed files with 1004 additions and 424 deletions

View File

@@ -1,11 +1,128 @@
# Standalone services for fully local deployment (no external dependencies).
# Usage: docker compose -f docker-compose.yml -f docker-compose.standalone.yml up -d
# Self-contained standalone compose for fully local deployment (no external dependencies).
# Usage: docker compose -f docker-compose.standalone.yml up -d
#
# On Linux with NVIDIA GPU, also pass: --profile ollama-gpu
# On Linux without GPU: --profile ollama-cpu
# On Mac: Ollama runs natively (Metal GPU) — no profile needed, services here unused.
services:
server:
build:
context: server
ports:
- "1250:1250"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: server
# Docker DNS names instead of localhost
DATABASE_URL: postgresql+asyncpg://reflector:reflector@postgres:5432/reflector
REDIS_HOST: redis
CELERY_BROKER_URL: redis://redis:6379/1
CELERY_RESULT_BACKEND: redis://redis:6379/1
# Standalone doesn't run Hatchet
HATCHET_CLIENT_SERVER_URL: ""
HATCHET_CLIENT_HOST_PORT: ""
# Self-hosted transcription/diarization via CPU service
TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://cpu:8000
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://cpu:8000
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
worker:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: worker
HATCHET_CLIENT_SERVER_URL: ""
HATCHET_CLIENT_HOST_PORT: ""
TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://cpu:8000
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://cpu:8000
depends_on:
redis:
condition: service_started
beat:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: beat
depends_on:
redis:
condition: service_started
redis:
image: redis:7.2
ports:
- 6379:6379
postgres:
image: postgres:17
command: postgres -c 'max_connections=200'
ports:
- 5432:5432
environment:
POSTGRES_USER: reflector
POSTGRES_PASSWORD: reflector
POSTGRES_DB: reflector
volumes:
- ./data/postgres:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
interval: 5s
timeout: 5s
retries: 10
start_period: 15s
web:
image: reflector-frontend-standalone
build:
context: ./www
ports:
- "3000:3000"
command: ["node", "server.js"]
environment:
NODE_ENV: production
# Browser-facing URLs (host-accessible ports)
API_URL: http://localhost:1250
WEBSOCKET_URL: ws://localhost:1250
SITE_URL: http://localhost:3000
# Server-side URLs (docker-network internal)
SERVER_API_URL: http://server:1250
KV_URL: redis://redis:6379
KV_USE_TLS: "false"
# Standalone: no external auth provider
FEATURE_REQUIRE_LOGIN: "false"
NEXTAUTH_URL: http://localhost:3000
NEXTAUTH_SECRET: standalone-local-secret
# Nullify partial auth vars inherited from base env_file
AUTHENTIK_ISSUER: ""
AUTHENTIK_REFRESH_TOKEN_URL: ""
garage:
image: dxflrs/garage:v1.1.0
ports:
@@ -23,59 +140,6 @@ services:
retries: 5
start_period: 5s
ollama:
image: ollama/ollama:latest
profiles: ["ollama-gpu"]
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"]
interval: 10s
timeout: 5s
retries: 5
ollama-cpu:
image: ollama/ollama:latest
profiles: ["ollama-cpu"]
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"]
interval: 10s
timeout: 5s
retries: 5
# Override server/worker/beat to use self-hosted GPU service for transcription+diarization.
# compose `environment:` overrides values from `env_file:` — no need to edit server/.env.
server:
environment:
TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://localhost:8100
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://localhost:8100
worker:
environment:
TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://cpu:8000
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://cpu:8000
cpu:
build:
context: ./gpu/self_hosted
@@ -113,6 +177,41 @@ services:
retries: 10
start_period: 120s
ollama:
image: ollama/ollama:latest
profiles: ["ollama-gpu"]
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"]
interval: 10s
timeout: 5s
retries: 5
ollama-cpu:
image: ollama/ollama:latest
profiles: ["ollama-cpu"]
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:11434/api/tags"]
interval: 10s
timeout: 5s
retries: 5
volumes:
garage_data:
garage_meta:

View File

@@ -191,7 +191,7 @@ Standalone runs without authentication (`FEATURE_REQUIRE_LOGIN=false`, `AUTH_BAC
1. In `www/.env.local`: set `FEATURE_REQUIRE_LOGIN=true`, uncomment `AUTHENTIK_ISSUER` and `AUTHENTIK_REFRESH_TOKEN_URL`
2. In `server/.env`: set `AUTH_BACKEND=authentik` (or your backend), configure `AUTH_JWT_AUDIENCE`
3. Restart: `docker compose -f docker-compose.yml -f docker-compose.standalone.yml up -d --force-recreate web server`
3. Restart: `docker compose -f docker-compose.standalone.yml up -d --force-recreate web server`
## What's NOT covered

View File

@@ -35,6 +35,75 @@ err() { echo -e "${RED} ✗${NC} $*" >&2; }
# --- Helpers ---
dump_diagnostics() {
local failed_svc="${1:-}"
echo ""
err "========== DIAGNOSTICS =========="
err "Container status:"
compose_cmd ps -a --format "table {{.Name}}\t{{.Status}}" 2>/dev/null || true
echo ""
# Show logs for any container that exited
local stopped
stopped=$(compose_cmd ps -a --format '{{.Name}}\t{{.Status}}' 2>/dev/null \
| grep -iv 'up\|running' | awk -F'\t' '{print $1}' || true)
for c in $stopped; do
err "--- Logs for $c (exited/unhealthy) ---"
docker logs --tail 30 "$c" 2>&1 || true
echo ""
done
# If a specific service failed, always show its logs
if [[ -n "$failed_svc" ]]; then
err "--- Logs for $failed_svc (last 40) ---"
compose_cmd logs "$failed_svc" --tail 40 2>&1 || true
echo ""
# Try health check from inside the container as extra signal
err "--- Internal health check ($failed_svc) ---"
compose_cmd exec -T "$failed_svc" \
curl -sf http://localhost:1250/health 2>&1 || echo "(not reachable internally either)"
fi
err "================================="
}
trap 'dump_diagnostics' ERR
# Get the image ID for a compose service (works even when containers are not running).
svc_image_id() {
local svc="$1"
# Extract image name from compose config YAML, fall back to <project>-<service>
local img_name
img_name=$(compose_cmd config 2>/dev/null \
| sed -n "/^ ${svc}:/,/^ [a-z]/p" | grep '^\s*image:' | awk '{print $2}')
img_name="${img_name:-reflector-$svc}"
docker images -q "$img_name" 2>/dev/null | head -1
}
# Ensure images with build contexts are up-to-date.
# Docker layer cache makes this fast (~seconds) when source hasn't changed.
rebuild_images() {
local svc
for svc in web cpu; do
local old_id
old_id=$(svc_image_id "$svc")
old_id="${old_id:-<none>}"
info "Building $svc..."
compose_cmd build "$svc"
local new_id
new_id=$(svc_image_id "$svc")
if [[ "$old_id" == "$new_id" ]]; then
ok "$svc unchanged (${new_id:0:12})"
else
ok "$svc rebuilt (${old_id:0:12} -> ${new_id:0:12})"
fi
done
}
wait_for_url() {
local url="$1" label="$2" retries="${3:-30}" interval="${4:-2}"
for i in $(seq 1 "$retries"); do
@@ -79,7 +148,7 @@ resolve_symlink() {
}
compose_cmd() {
local compose_files="-f $ROOT_DIR/docker-compose.yml -f $ROOT_DIR/docker-compose.standalone.yml"
local compose_files="-f $ROOT_DIR/docker-compose.standalone.yml"
if [[ "$OS" == "Linux" ]] && [[ -n "${OLLAMA_PROFILE:-}" ]]; then
docker compose $compose_files --profile "$OLLAMA_PROFILE" "$@"
else
@@ -113,7 +182,7 @@ step_llm() {
echo ""
# Pull model if not already present
if ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then
if ollama list 2>/dev/null | awk '{print $1}' | grep -qxF "$MODEL"; then
ok "Model $MODEL already pulled"
else
info "Pulling model $MODEL (this may take a while)..."
@@ -143,7 +212,7 @@ step_llm() {
echo ""
# Pull model inside container
if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then
if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | awk '{print $1}' | grep -qxF "$MODEL"; then
ok "Model $MODEL already pulled"
else
info "Pulling model $MODEL inside container (this may take a while)..."
@@ -313,9 +382,24 @@ step_services() {
warn "Continuing anyway (services will start but may be shadowed)"
fi
# Rebuild images if source has changed (Docker layer cache makes this fast when unchanged)
rebuild_images
# server runs alembic migrations on startup automatically (see runserver.sh)
compose_cmd up -d postgres redis garage cpu server worker beat web
ok "Containers started"
# Quick sanity check — catch containers that exit immediately (bad image, missing file, etc.)
sleep 3
local exited
exited=$(compose_cmd ps -a --format '{{.Name}} {{.Status}}' 2>/dev/null \
| grep -i 'exit' || true)
if [[ -n "$exited" ]]; then
warn "Some containers exited immediately:"
echo "$exited" | while read -r line; do warn " $line"; done
dump_diagnostics
fi
info "Server is running migrations (alembic upgrade head)..."
}
@@ -345,9 +429,36 @@ step_health() {
warn "Check with: docker compose logs cpu"
fi
wait_for_url "http://localhost:1250/health" "Server API" 60 3
# Server may take a long time on first run — alembic migrations run before uvicorn starts.
# Use docker exec so this works regardless of network_mode or port mapping.
info "Waiting for Server API (first run includes database migrations)..."
local server_ok=false
for i in $(seq 1 90); do
# Check if container is still running
local svc_status
svc_status=$(compose_cmd ps server --format '{{.Status}}' 2>/dev/null || true)
if [[ -z "$svc_status" ]] || echo "$svc_status" | grep -qi 'exit'; then
echo ""
err "Server container exited unexpectedly"
dump_diagnostics server
exit 1
fi
# Health check from inside container (avoids host networking issues)
if compose_cmd exec -T server curl -sf http://localhost:1250/health > /dev/null 2>&1; then
server_ok=true
break
fi
echo -ne "\r Waiting for Server API... ($i/90)"
sleep 5
done
echo ""
ok "Server API healthy"
if [[ "$server_ok" == "true" ]]; then
ok "Server API healthy"
else
err "Server API not ready after ~7 minutes"
dump_diagnostics server
exit 1
fi
wait_for_url "http://localhost:3000" "Frontend" 90 3
echo ""
@@ -380,6 +491,22 @@ main() {
exit 1
fi
# Ensure Docker Compose V2 plugin is available.
# Check output for "Compose" — without the plugin, `docker compose version`
# may still exit 0 (falling through to `docker version`).
if ! docker compose version 2>/dev/null | grep -qi compose; then
err "Docker Compose plugin not found."
err "Install Docker Desktop, OrbStack, or: brew install docker-compose"
exit 1
fi
# Dockerfiles use RUN --mount which requires BuildKit.
# Docker Desktop/OrbStack bundle it; Colima/bare engine need docker-buildx.
if ! docker buildx version &>/dev/null; then
err "Docker BuildKit (buildx) not found."
err "Install Docker Desktop, OrbStack, or: brew install docker-buildx"
exit 1
fi
# LLM_URL_VALUE is set by step_llm, used by later steps
LLM_URL_VALUE=""

View File

@@ -12,3 +12,5 @@ AccessTokenInfo = auth_module.AccessTokenInfo
authenticated = auth_module.authenticated
current_user = auth_module.current_user
current_user_optional = auth_module.current_user_optional
parse_ws_bearer_token = auth_module.parse_ws_bearer_token
current_user_ws_optional = auth_module.current_user_ws_optional

View File

@@ -1,6 +1,9 @@
from typing import Annotated, List, Optional
from typing import TYPE_CHECKING, Annotated, List, Optional
from fastapi import Depends, HTTPException
if TYPE_CHECKING:
from fastapi import WebSocket
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
@@ -124,3 +127,20 @@ async def current_user_optional(
jwtauth: JWTAuth = Depends(),
):
return await _authenticate_user(jwt_token, api_key, jwtauth)
def parse_ws_bearer_token(
websocket: "WebSocket",
) -> tuple[Optional[str], Optional[str]]:
raw = websocket.headers.get("sec-websocket-protocol") or ""
parts = [p.strip() for p in raw.split(",") if p.strip()]
if len(parts) >= 2 and parts[0].lower() == "bearer":
return parts[1], "bearer"
return None, None
async def current_user_ws_optional(websocket: "WebSocket") -> Optional[UserInfo]:
token, _ = parse_ws_bearer_token(websocket)
if not token:
return None
return await _authenticate_user(token, None, JWTAuth())

View File

@@ -19,3 +19,11 @@ def current_user():
def current_user_optional():
return None
def parse_ws_bearer_token(websocket):
return None, None
async def current_user_ws_optional(websocket):
return None

View File

@@ -5,7 +5,10 @@ import shutil
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Literal, Sequence
from typing import TYPE_CHECKING, Any, Literal, Sequence
if TYPE_CHECKING:
from reflector.ws_events import TranscriptEventName
import sqlalchemy
from fastapi import HTTPException
@@ -184,7 +187,7 @@ class TranscriptWaveform(BaseModel):
class TranscriptEvent(BaseModel):
event: str
event: str # Typed at call sites via ws_events.TranscriptEventName; str here for DB compat
data: dict
@@ -233,7 +236,9 @@ class Transcript(BaseModel):
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
def add_event(self, event: str, data: BaseModel) -> TranscriptEvent:
def add_event(
self, event: "TranscriptEventName", data: BaseModel
) -> TranscriptEvent:
ev = TranscriptEvent(event=event, data=data.model_dump())
self.events.append(ev)
return ev
@@ -688,7 +693,7 @@ class TranscriptController:
async def append_event(
self,
transcript: Transcript,
event: str,
event: "TranscriptEventName",
data: Any,
) -> TranscriptEvent:
"""

View File

@@ -12,10 +12,11 @@ import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
USER_ROOM_EVENTS: set[TranscriptEventName] = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
@@ -81,8 +82,7 @@ async def set_status_and_broadcast(
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: NonEmptyString,
# TODO proper dictionary event => type
event_name: TranscriptEventName,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:

View File

@@ -720,6 +720,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
chunk_text=chunk["text"],
timestamp=chunk["timestamp"],
duration=chunk["duration"],
words=chunk["words"],
)
)
for chunk in chunks
@@ -731,41 +732,31 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
]
# Build index-to-words map from local chunks (words not in child workflow results)
chunks_by_index = {chunk["index"]: chunk["words"] for chunk in chunks}
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Clear topics for idempotency on retry (each topic gets a fresh UUID,
# so upsert_topic would append duplicates without this)
await transcripts_controller.update(transcript, {"topics": []})
for chunk in topic_chunks:
chunk_words = chunks_by_index[chunk.chunk_index]
topic = TranscriptTopic(
title=chunk.title,
summary=chunk.summary,
timestamp=chunk.timestamp,
transcript=" ".join(w.text for w in chunk_words),
words=chunk_words,
transcript=" ".join(w.text for w in chunk.words),
words=chunk.words,
)
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger
)
# Words omitted from TopicsResult — already persisted to DB above.
# Downstream tasks that need words refetch from DB.
topics_list = [
TitleSummary(
title=chunk.title,
summary=chunk.summary,
timestamp=chunk.timestamp,
duration=chunk.duration,
transcript=TranscriptType(words=[]),
transcript=TranscriptType(words=chunk.words),
)
for chunk in topic_chunks
]
@@ -851,8 +842,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
if not topics_result.topics:
if not topics:
ctx.log("extract_subjects: no topics, returning empty subjects")
return SubjectsResult(
subjects=[],
@@ -865,13 +857,11 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.types import words_to_segments # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
# Build transcript text from DB topics (words omitted from task output
# to reduce Hatchet payload size — refetch from DB where they were persisted)
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
speakermap = {}
if transcript and transcript.participants:
speakermap = {
@@ -881,8 +871,8 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
}
text_lines = []
for db_topic in transcript.topics:
for segment in words_to_segments(db_topic.words):
for topic in topics:
for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
text_lines.append(f"{name}: {segment.text}")

View File

@@ -95,6 +95,7 @@ class TopicChunkResult(BaseModel):
summary: str
timestamp: float
duration: float
words: list[Word]
class TopicsResult(BaseModel):

View File

@@ -20,6 +20,7 @@ from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import Word
class TopicChunkInput(BaseModel):
@@ -29,6 +30,7 @@ class TopicChunkInput(BaseModel):
chunk_text: str
timestamp: float
duration: float
words: list[Word]
hatchet = HatchetClientManager.get_client()
@@ -97,4 +99,5 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
summary=response.summary,
timestamp=input.timestamp,
duration=input.duration,
words=input.words,
)

View File

@@ -62,6 +62,8 @@ from reflector.processors.types import (
from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.views.transcripts import GetTranscriptTopic
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import WebsocketManager, get_ws_manager
from reflector.zulip import (
get_zulip_message,
@@ -89,7 +91,11 @@ def broadcast_to_sockets(func):
if transcript and transcript.user_id:
# Emit only relevant events to the user room to avoid noisy updates.
# Allowed: STATUS, FINAL_TITLE, DURATION. All are prefixed with TRANSCRIPT_
allowed_user_events = {"STATUS", "FINAL_TITLE", "DURATION"}
allowed_user_events: set[TranscriptEventName] = {
"STATUS",
"FINAL_TITLE",
"DURATION",
}
if resp.event in allowed_user_events:
await self.ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
@@ -244,13 +250,14 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
)
if isinstance(data, TitleSummaryWithIdProcessorType):
topic.id = data.id
get_topic = GetTranscriptTopic.from_transcript_topic(topic)
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.upsert_topic(transcript, topic)
return await transcripts_controller.append_event(
transcript=transcript,
event="TOPIC",
data=topic,
data=get_topic,
)
@broadcast_to_sockets

View File

@@ -4,18 +4,22 @@ Transcripts websocket API
"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.ws_events import TranscriptWsEvent
from reflector.ws_manager import get_ws_manager
router = APIRouter()
@router.get("/transcripts/{transcript_id}/events")
@router.get(
"/transcripts/{transcript_id}/events",
response_model=TranscriptWsEvent,
summary="Transcript WebSocket event schema",
description="Stub exposing the discriminated union of all transcript-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.",
)
async def transcript_get_websocket_events(transcript_id: str):
pass
@@ -24,8 +28,9 @@ async def transcript_get_websocket_events(transcript_id: str):
async def transcript_events_websocket(
transcript_id: str,
websocket: WebSocket,
user: Optional[auth.UserInfo] = Depends(auth.current_user_optional),
):
_, negotiated_subprotocol = auth.parse_ws_bearer_token(websocket)
user = await auth.current_user_ws_optional(websocket)
user_id = user["sub"] if user else None
transcript = await transcripts_controller.get_by_id_for_http(
transcript_id, user_id=user_id
@@ -37,7 +42,9 @@ async def transcript_events_websocket(
# use ts:transcript_id as room id
room_id = f"ts:{transcript_id}"
ws_manager = get_ws_manager()
await ws_manager.add_user_to_room(room_id, websocket)
await ws_manager.add_user_to_room(
room_id, websocket, subprotocol=negotiated_subprotocol
)
try:
# on first connection, send all events only to the current user

View File

@@ -4,10 +4,22 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from reflector.auth.auth_jwt import JWTAuth # type: ignore
from reflector.db.users import user_controller
from reflector.ws_events import UserWsEvent
from reflector.ws_manager import get_ws_manager
router = APIRouter()
@router.get(
"/events",
response_model=UserWsEvent,
summary="User WebSocket event schema",
description="Stub exposing the discriminated union of all user-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.",
)
async def user_get_websocket_events():
pass
# Close code for unauthorized WebSocket connections
UNAUTHORISED = 4401

View File

@@ -0,0 +1,188 @@
"""Typed WebSocket event models.
Defines Pydantic models with Literal discriminators for all WS events.
Exposed via stub GET endpoints so ``pnpm openapi`` generates TS discriminated unions.
"""
from typing import Annotated, Literal, Union
from pydantic import BaseModel, Discriminator
from reflector.db.transcripts import (
TranscriptActionItems,
TranscriptDuration,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
TranscriptFinalTitle,
TranscriptStatus,
TranscriptText,
TranscriptWaveform,
)
from reflector.utils.string import NonEmptyString
from reflector.views.transcripts import GetTranscriptTopic
# ---------------------------------------------------------------------------
# Transcript-level event name literal
# ---------------------------------------------------------------------------
TranscriptEventName = Literal[
"TRANSCRIPT",
"TOPIC",
"STATUS",
"FINAL_TITLE",
"FINAL_LONG_SUMMARY",
"FINAL_SHORT_SUMMARY",
"ACTION_ITEMS",
"DURATION",
"WAVEFORM",
]
# ---------------------------------------------------------------------------
# Transcript-level WS event wrappers
# ---------------------------------------------------------------------------
class TranscriptWsTranscript(BaseModel):
event: Literal["TRANSCRIPT"] = "TRANSCRIPT"
data: TranscriptText
class TranscriptWsTopic(BaseModel):
event: Literal["TOPIC"] = "TOPIC"
data: GetTranscriptTopic
class TranscriptWsStatusData(BaseModel):
value: TranscriptStatus
class TranscriptWsStatus(BaseModel):
event: Literal["STATUS"] = "STATUS"
data: TranscriptWsStatusData
class TranscriptWsFinalTitle(BaseModel):
event: Literal["FINAL_TITLE"] = "FINAL_TITLE"
data: TranscriptFinalTitle
class TranscriptWsFinalLongSummary(BaseModel):
event: Literal["FINAL_LONG_SUMMARY"] = "FINAL_LONG_SUMMARY"
data: TranscriptFinalLongSummary
class TranscriptWsFinalShortSummary(BaseModel):
event: Literal["FINAL_SHORT_SUMMARY"] = "FINAL_SHORT_SUMMARY"
data: TranscriptFinalShortSummary
class TranscriptWsActionItems(BaseModel):
event: Literal["ACTION_ITEMS"] = "ACTION_ITEMS"
data: TranscriptActionItems
class TranscriptWsDuration(BaseModel):
event: Literal["DURATION"] = "DURATION"
data: TranscriptDuration
class TranscriptWsWaveform(BaseModel):
event: Literal["WAVEFORM"] = "WAVEFORM"
data: TranscriptWaveform
TranscriptWsEvent = Annotated[
Union[
TranscriptWsTranscript,
TranscriptWsTopic,
TranscriptWsStatus,
TranscriptWsFinalTitle,
TranscriptWsFinalLongSummary,
TranscriptWsFinalShortSummary,
TranscriptWsActionItems,
TranscriptWsDuration,
TranscriptWsWaveform,
],
Discriminator("event"),
]
# ---------------------------------------------------------------------------
# User-level event name literal
# ---------------------------------------------------------------------------
UserEventName = Literal[
"TRANSCRIPT_CREATED",
"TRANSCRIPT_DELETED",
"TRANSCRIPT_STATUS",
"TRANSCRIPT_FINAL_TITLE",
"TRANSCRIPT_DURATION",
]
# ---------------------------------------------------------------------------
# User-level WS event data models
# ---------------------------------------------------------------------------
class UserTranscriptCreatedData(BaseModel):
id: NonEmptyString
class UserTranscriptDeletedData(BaseModel):
id: NonEmptyString
class UserTranscriptStatusData(BaseModel):
id: NonEmptyString
value: TranscriptStatus
class UserTranscriptFinalTitleData(BaseModel):
id: NonEmptyString
title: NonEmptyString
class UserTranscriptDurationData(BaseModel):
id: NonEmptyString
duration: float
# ---------------------------------------------------------------------------
# User-level WS event wrappers
# ---------------------------------------------------------------------------
class UserWsTranscriptCreated(BaseModel):
event: Literal["TRANSCRIPT_CREATED"] = "TRANSCRIPT_CREATED"
data: UserTranscriptCreatedData
class UserWsTranscriptDeleted(BaseModel):
event: Literal["TRANSCRIPT_DELETED"] = "TRANSCRIPT_DELETED"
data: UserTranscriptDeletedData
class UserWsTranscriptStatus(BaseModel):
event: Literal["TRANSCRIPT_STATUS"] = "TRANSCRIPT_STATUS"
data: UserTranscriptStatusData
class UserWsTranscriptFinalTitle(BaseModel):
event: Literal["TRANSCRIPT_FINAL_TITLE"] = "TRANSCRIPT_FINAL_TITLE"
data: UserTranscriptFinalTitleData
class UserWsTranscriptDuration(BaseModel):
event: Literal["TRANSCRIPT_DURATION"] = "TRANSCRIPT_DURATION"
data: UserTranscriptDurationData
UserWsEvent = Annotated[
Union[
UserWsTranscriptCreated,
UserWsTranscriptDeleted,
UserWsTranscriptStatus,
UserWsTranscriptFinalTitle,
UserWsTranscriptDuration,
],
Discriminator("event"),
]

View File

@@ -1,185 +0,0 @@
"""
Tests for Hatchet payload thinning optimizations.
Verifies that:
1. TopicChunkInput no longer carries words
2. TopicChunkResult no longer carries words
3. words_to_segments() matches Transcript.as_segments(is_multitrack=False) — behavioral equivalence
for the extract_subjects refactoring
4. TopicsResult can be constructed with empty transcript words
"""
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.hatchet.workflows.topic_chunk_processing import TopicChunkInput
from reflector.processors.types import Word
def _make_words(speaker: int = 0, start: float = 0.0) -> list[Word]:
return [
Word(text="Hello", start=start, end=start + 0.5, speaker=speaker),
Word(text=" world.", start=start + 0.5, end=start + 1.0, speaker=speaker),
]
class TestTopicChunkInputNoWords:
"""TopicChunkInput must not have a words field."""
def test_no_words_field(self):
assert "words" not in TopicChunkInput.model_fields
def test_construction_without_words(self):
inp = TopicChunkInput(
chunk_index=0, chunk_text="Hello world.", timestamp=0.0, duration=1.0
)
assert inp.chunk_index == 0
assert inp.chunk_text == "Hello world."
def test_rejects_words_kwarg(self):
"""Passing words= should raise a validation error (field doesn't exist)."""
import pydantic
try:
TopicChunkInput(
chunk_index=0,
chunk_text="text",
timestamp=0.0,
duration=1.0,
words=_make_words(),
)
# If pydantic is configured to ignore extra, this won't raise.
# Verify the field is still absent from the model.
assert "words" not in TopicChunkInput.model_fields
except pydantic.ValidationError:
pass # Expected
class TestTopicChunkResultNoWords:
"""TopicChunkResult must not have a words field."""
def test_no_words_field(self):
assert "words" not in TopicChunkResult.model_fields
def test_construction_without_words(self):
result = TopicChunkResult(
chunk_index=0,
title="Test",
summary="Summary",
timestamp=0.0,
duration=1.0,
)
assert result.title == "Test"
assert result.chunk_index == 0
def test_serialization_roundtrip(self):
"""Serialized TopicChunkResult has no words key."""
result = TopicChunkResult(
chunk_index=0,
title="Test",
summary="Summary",
timestamp=0.0,
duration=1.0,
)
data = result.model_dump()
assert "words" not in data
reconstructed = TopicChunkResult(**data)
assert reconstructed == result
class TestWordsToSegmentsBehavioralEquivalence:
"""words_to_segments() must produce same output as Transcript.as_segments(is_multitrack=False).
This ensures the extract_subjects refactoring (from task output topic.transcript.as_segments()
to words_to_segments(db_topic.words)) preserves identical behavior.
"""
def test_single_speaker(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
words = _make_words(speaker=0)
direct = words_to_segments(words)
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
assert len(direct) == len(via_transcript)
for d, v in zip(direct, via_transcript):
assert d.text == v.text
assert d.speaker == v.speaker
assert d.start == v.start
assert d.end == v.end
def test_multiple_speakers(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
words = [
Word(text="Hello", start=0.0, end=0.5, speaker=0),
Word(text=" world.", start=0.5, end=1.0, speaker=0),
Word(text=" How", start=1.0, end=1.5, speaker=1),
Word(text=" are", start=1.5, end=2.0, speaker=1),
Word(text=" you?", start=2.0, end=2.5, speaker=1),
]
direct = words_to_segments(words)
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
assert len(direct) == len(via_transcript)
for d, v in zip(direct, via_transcript):
assert d.text == v.text
assert d.speaker == v.speaker
def test_empty_words(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
assert words_to_segments([]) == []
assert TranscriptType(words=[]).as_segments(is_multitrack=False) == []
class TestTopicsResultEmptyWords:
"""TopicsResult can carry topics with empty transcript words."""
def test_construction_with_empty_words(self):
from reflector.hatchet.workflows.models import TopicsResult
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
topics = [
TitleSummary(
title="Topic A",
summary="Summary A",
timestamp=0.0,
duration=5.0,
transcript=TranscriptType(words=[]),
),
TitleSummary(
title="Topic B",
summary="Summary B",
timestamp=5.0,
duration=5.0,
transcript=TranscriptType(words=[]),
),
]
result = TopicsResult(topics=topics)
assert len(result.topics) == 2
for t in result.topics:
assert t.transcript.words == []
def test_serialization_roundtrip(self):
from reflector.hatchet.workflows.models import TopicsResult
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
topics = [
TitleSummary(
title="Topic",
summary="Summary",
timestamp=0.0,
duration=1.0,
transcript=TranscriptType(words=[]),
)
]
result = TopicsResult(topics=topics)
data = result.model_dump()
reconstructed = TopicsResult(**data)
assert len(reconstructed.topics) == 1
assert reconstructed.topics[0].transcript.words == []

View File

@@ -1,18 +1,22 @@
import { useEffect, useState } from "react";
import { Topic, FinalSummary, Status } from "./webSocketTypes";
import { useError } from "../../(errors)/errorContext";
import type { components } from "../../reflector-api";
import type { components, operations } from "../../reflector-api";
type AudioWaveform = components["schemas"]["AudioWaveform"];
type GetTranscriptSegmentTopic =
components["schemas"]["GetTranscriptSegmentTopic"];
import { useQueryClient } from "@tanstack/react-query";
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
import { WEBSOCKET_URL } from "../../lib/apiClient";
import {
invalidateTranscript,
invalidateTranscriptTopics,
invalidateTranscriptWaveform,
} from "../../lib/apiHooks";
import { NonEmptyString } from "../../lib/utils";
import { useAuth } from "../../lib/AuthProvider";
import { parseNonEmptyString } from "../../lib/utils";
type TranscriptWsEvent =
operations["v1_transcript_get_websocket_events"]["responses"][200]["content"]["application/json"];
export type UseWebSockets = {
transcriptTextLive: string;
@@ -27,6 +31,7 @@ export type UseWebSockets = {
};
export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
const auth = useAuth();
const [transcriptTextLive, setTranscriptTextLive] = useState<string>("");
const [translateText, setTranslateText] = useState<string>("");
const [title, setTitle] = useState<string>("");
@@ -331,156 +336,168 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
};
if (!transcriptId) return;
const tsId = parseNonEmptyString(transcriptId);
const MAX_RETRIES = 10;
const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/events`;
let ws = new WebSocket(url);
let ws: WebSocket | null = null;
let retryCount = 0;
let retryTimeout: ReturnType<typeof setTimeout> | null = null;
let intentionalClose = false;
ws.onopen = () => {
console.debug("WebSocket connection opened");
};
const connect = () => {
const subprotocols = auth.accessToken
? ["bearer", auth.accessToken]
: undefined;
ws = new WebSocket(url, subprotocols);
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
ws.onopen = () => {
console.debug("WebSocket connection opened");
retryCount = 0;
};
try {
switch (message.event) {
case "TRANSCRIPT":
const newText = (message.data.text ?? "").trim();
const newTranslation = (message.data.translation ?? "").trim();
ws.onmessage = (event) => {
const message: TranscriptWsEvent = JSON.parse(event.data);
if (!newText) break;
try {
switch (message.event) {
case "TRANSCRIPT": {
const newText = (message.data.text ?? "").trim();
const newTranslation = (message.data.translation ?? "").trim();
console.debug("TRANSCRIPT event:", newText);
setTextQueue((prevQueue) => [...prevQueue, newText]);
setTranslationQueue((prevQueue) => [...prevQueue, newTranslation]);
if (!newText) break;
setAccumulatedText((prevText) => prevText + " " + newText);
break;
console.debug("TRANSCRIPT event:", newText);
setTextQueue((prevQueue) => [...prevQueue, newText]);
setTranslationQueue((prevQueue) => [
...prevQueue,
newTranslation,
]);
case "TOPIC":
setTopics((prevTopics) => {
const topic = message.data as Topic;
const index = prevTopics.findIndex(
(prevTopic) => prevTopic.id === topic.id,
);
if (index >= 0) {
prevTopics[index] = topic;
return prevTopics;
}
setAccumulatedText((prevText) =>
prevText.slice(topic.transcript.length),
);
return [...prevTopics, topic];
});
console.debug("TOPIC event:", message.data);
// Invalidate topics query to sync with WebSocket data
invalidateTranscriptTopics(
queryClient,
transcriptId as NonEmptyString,
);
break;
case "FINAL_SHORT_SUMMARY":
console.debug("FINAL_SHORT_SUMMARY event:", message.data);
break;
case "FINAL_LONG_SUMMARY":
if (message.data) {
setFinalSummary(message.data);
// Invalidate transcript query to sync summary
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
setAccumulatedText((prevText) => prevText + " " + newText);
break;
}
break;
case "FINAL_TITLE":
console.debug("FINAL_TITLE event:", message.data);
if (message.data) {
case "TOPIC":
setTopics((prevTopics) => {
const topic = message.data;
const index = prevTopics.findIndex(
(prevTopic) => prevTopic.id === topic.id,
);
if (index >= 0) {
prevTopics[index] = topic;
return prevTopics;
}
setAccumulatedText((prevText) =>
prevText.slice(topic.transcript?.length ?? 0),
);
return [...prevTopics, topic];
});
console.debug("TOPIC event:", message.data);
invalidateTranscriptTopics(queryClient, tsId);
break;
case "FINAL_SHORT_SUMMARY":
console.debug("FINAL_SHORT_SUMMARY event:", message.data);
break;
case "FINAL_LONG_SUMMARY":
setFinalSummary({ summary: message.data.long_summary });
invalidateTranscript(queryClient, tsId);
break;
case "FINAL_TITLE":
console.debug("FINAL_TITLE event:", message.data);
setTitle(message.data.title);
// Invalidate transcript query to sync title
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
}
break;
invalidateTranscript(queryClient, tsId);
break;
case "WAVEFORM":
console.debug(
"WAVEFORM event length:",
message.data.waveform.length,
);
if (message.data) {
setWaveForm(message.data.waveform);
invalidateTranscriptWaveform(
queryClient,
transcriptId as NonEmptyString,
case "WAVEFORM":
console.debug(
"WAVEFORM event length:",
message.data.waveform.length,
);
}
break;
case "DURATION":
console.debug("DURATION event:", message.data);
if (message.data) {
setWaveForm({ data: message.data.waveform });
invalidateTranscriptWaveform(queryClient, tsId);
break;
case "DURATION":
console.debug("DURATION event:", message.data);
setDuration(message.data.duration);
}
break;
break;
case "STATUS":
console.log("STATUS event:", message.data);
if (message.data.value === "error") {
setError(
Error("Websocket error status"),
"There was an error processing this meeting.",
case "STATUS":
console.log("STATUS event:", message.data);
if (message.data.value === "error") {
setError(
Error("Websocket error status"),
"There was an error processing this meeting.",
);
}
setStatus(message.data);
invalidateTranscript(queryClient, tsId);
if (message.data.value === "ended") {
intentionalClose = true;
ws?.close();
}
break;
case "ACTION_ITEMS":
console.debug("ACTION_ITEMS event:", message.data);
invalidateTranscript(queryClient, tsId);
break;
default: {
const _exhaustive: never = message;
console.warn(
`Received unknown WebSocket event: ${(_exhaustive as TranscriptWsEvent).event}`,
);
}
setStatus(message.data);
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
if (message.data.value === "ended") {
ws.close();
}
break;
default:
setError(
new Error(`Received unknown WebSocket event: ${message.event}`),
);
}
} catch (error) {
setError(error);
}
} catch (error) {
setError(error);
}
};
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
setError(new Error("A WebSocket error occurred."));
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
};
ws.onclose = (event) => {
console.debug("WebSocket connection closed");
switch (event.code) {
case 1000: // Normal Closure:
break;
case 1005: // Closure by client FF
break;
case 1001: // Navigate away
break;
case 1006: // Closed by client Chrome
console.warn(
"WebSocket closed by client, likely duplicated connection in react dev mode",
ws.onclose = (event) => {
console.debug("WebSocket connection closed, code:", event.code);
if (intentionalClose) return;
const normalCodes = [1000, 1001, 1005];
if (normalCodes.includes(event.code)) return;
if (retryCount < MAX_RETRIES) {
const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
console.log(
`WebSocket reconnecting in ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRIES})`,
);
break;
default:
if (retryCount === 0) {
setError(
new Error("WebSocket connection lost"),
"Connection lost. Reconnecting...",
);
}
retryCount++;
retryTimeout = setTimeout(connect, delay);
} else {
setError(
new Error(`WebSocket closed unexpectedly with code: ${event.code}`),
"Disconnected from the server. Please refresh the page.",
);
console.log(
"Socket is closed. Reconnect will be attempted in 1 second.",
event.reason,
);
// todo handle reconnect with socket.io
}
}
};
};
connect();
return () => {
ws.close();
intentionalClose = true;
if (retryTimeout) clearTimeout(retryTimeout);
ws?.close();
};
}, [transcriptId]);

View File

@@ -4,14 +4,12 @@ import React, { useEffect, useRef } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { WEBSOCKET_URL } from "./apiClient";
import { useAuth } from "./AuthProvider";
import { z } from "zod";
import { invalidateTranscriptLists, TRANSCRIPT_SEARCH_URL } from "./apiHooks";
import { invalidateTranscript, invalidateTranscriptLists } from "./apiHooks";
import { parseNonEmptyString } from "./utils";
import type { operations } from "../reflector-api";
const UserEvent = z.object({
event: z.string(),
});
type UserEvent = z.TypeOf<typeof UserEvent>;
type UserWsEvent =
operations["v1_user_get_websocket_events"]["responses"][200]["content"]["application/json"];
class UserEventsStore {
private socket: WebSocket | null = null;
@@ -133,23 +131,26 @@ export function UserEventsProvider({
if (!detachRef.current) {
const onMessage = (event: MessageEvent) => {
try {
const msg = UserEvent.parse(JSON.parse(event.data));
const eventName = msg.event;
const msg: UserWsEvent = JSON.parse(event.data);
const invalidateList = () => invalidateTranscriptLists(queryClient);
switch (eventName) {
switch (msg.event) {
case "TRANSCRIPT_CREATED":
case "TRANSCRIPT_DELETED":
case "TRANSCRIPT_STATUS":
case "TRANSCRIPT_FINAL_TITLE":
case "TRANSCRIPT_DURATION":
invalidateList().then(() => {});
break;
default:
// Ignore other content events for list updates
invalidateTranscriptLists(queryClient).then(() => {});
invalidateTranscript(
queryClient,
parseNonEmptyString(msg.data.id),
).then(() => {});
break;
default: {
const _exhaustive: never = msg;
console.warn(
`Unknown user event: ${(_exhaustive as UserWsEvent).event}`,
);
}
}
} catch (err) {
console.warn("Invalid user event message", event.data);

View File

@@ -7,6 +7,7 @@ import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
import { NonEmptyString } from "./utils";
import type { TranscriptStatus } from "./transcript";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -104,6 +105,12 @@ export function useTranscriptProcess() {
});
}
const ACTIVE_TRANSCRIPT_STATUSES = new Set<TranscriptStatus>([
"processing",
"uploaded",
"recording",
]);
export function useTranscriptGet(transcriptId: NonEmptyString | null) {
return $api.useQuery(
"get",
@@ -117,6 +124,10 @@ export function useTranscriptGet(transcriptId: NonEmptyString | null) {
},
{
enabled: !!transcriptId,
refetchInterval: (query) => {
const status = query.state.data?.status;
return status && ACTIVE_TRANSCRIPT_STATUSES.has(status) ? 5000 : false;
},
},
);
}

View File

@@ -568,7 +568,10 @@ export interface paths {
path?: never;
cookie?: never;
};
/** Transcript Get Websocket Events */
/**
* Transcript WebSocket event schema
* @description Stub exposing the discriminated union of all transcript-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.
*/
get: operations["v1_transcript_get_websocket_events"];
put?: never;
post?: never;
@@ -664,6 +667,26 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/events": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/**
* User WebSocket event schema
* @description Stub exposing the discriminated union of all user-level WS events for OpenAPI type generation. Real events are delivered over the WebSocket at the same path.
*/
get: operations["v1_user_get_websocket_events"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/v1/zulip/streams": {
parameters: {
query?: never;
@@ -1877,6 +1900,33 @@ export interface components {
/** Name */
name: string;
};
/** TranscriptActionItems */
TranscriptActionItems: {
/** Action Items */
action_items: {
[key: string]: unknown;
};
};
/** TranscriptDuration */
TranscriptDuration: {
/** Duration */
duration: number;
};
/** TranscriptFinalLongSummary */
TranscriptFinalLongSummary: {
/** Long Summary */
long_summary: string;
};
/** TranscriptFinalShortSummary */
TranscriptFinalShortSummary: {
/** Short Summary */
short_summary: string;
};
/** TranscriptFinalTitle */
TranscriptFinalTitle: {
/** Title */
title: string;
};
/** TranscriptParticipant */
TranscriptParticipant: {
/** Id */
@@ -1917,6 +1967,113 @@ export interface components {
/** End */
end: number;
};
/** TranscriptText */
TranscriptText: {
/** Text */
text: string;
/** Translation */
translation: string | null;
};
/** TranscriptWaveform */
TranscriptWaveform: {
/** Waveform */
waveform: number[];
};
/** TranscriptWsActionItems */
TranscriptWsActionItems: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "ACTION_ITEMS";
data: components["schemas"]["TranscriptActionItems"];
};
/** TranscriptWsDuration */
TranscriptWsDuration: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "DURATION";
data: components["schemas"]["TranscriptDuration"];
};
/** TranscriptWsFinalLongSummary */
TranscriptWsFinalLongSummary: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "FINAL_LONG_SUMMARY";
data: components["schemas"]["TranscriptFinalLongSummary"];
};
/** TranscriptWsFinalShortSummary */
TranscriptWsFinalShortSummary: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "FINAL_SHORT_SUMMARY";
data: components["schemas"]["TranscriptFinalShortSummary"];
};
/** TranscriptWsFinalTitle */
TranscriptWsFinalTitle: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "FINAL_TITLE";
data: components["schemas"]["TranscriptFinalTitle"];
};
/** TranscriptWsStatus */
TranscriptWsStatus: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "STATUS";
data: components["schemas"]["TranscriptWsStatusData"];
};
/** TranscriptWsStatusData */
TranscriptWsStatusData: {
/**
* Value
* @enum {string}
*/
value:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
};
/** TranscriptWsTopic */
TranscriptWsTopic: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TOPIC";
data: components["schemas"]["GetTranscriptTopic"];
};
/** TranscriptWsTranscript */
TranscriptWsTranscript: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT";
data: components["schemas"]["TranscriptText"];
};
/** TranscriptWsWaveform */
TranscriptWsWaveform: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "WAVEFORM";
data: components["schemas"]["TranscriptWaveform"];
};
/** UpdateParticipant */
UpdateParticipant: {
/** Speaker */
@@ -1987,6 +2144,82 @@ export interface components {
/** Email */
email: string | null;
};
/** UserTranscriptCreatedData */
UserTranscriptCreatedData: {
/** Id */
id: string;
};
/** UserTranscriptDeletedData */
UserTranscriptDeletedData: {
/** Id */
id: string;
};
/** UserTranscriptDurationData */
UserTranscriptDurationData: {
/** Id */
id: string;
/** Duration */
duration: number;
};
/** UserTranscriptFinalTitleData */
UserTranscriptFinalTitleData: {
/** Id */
id: string;
/** Title */
title: string;
};
/** UserTranscriptStatusData */
UserTranscriptStatusData: {
/** Id */
id: string;
/** Value */
value: string;
};
/** UserWsTranscriptCreated */
UserWsTranscriptCreated: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT_CREATED";
data: components["schemas"]["UserTranscriptCreatedData"];
};
/** UserWsTranscriptDeleted */
UserWsTranscriptDeleted: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT_DELETED";
data: components["schemas"]["UserTranscriptDeletedData"];
};
/** UserWsTranscriptDuration */
UserWsTranscriptDuration: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT_DURATION";
data: components["schemas"]["UserTranscriptDurationData"];
};
/** UserWsTranscriptFinalTitle */
UserWsTranscriptFinalTitle: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT_FINAL_TITLE";
data: components["schemas"]["UserTranscriptFinalTitleData"];
};
/** UserWsTranscriptStatus */
UserWsTranscriptStatus: {
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
event: "TRANSCRIPT_STATUS";
data: components["schemas"]["UserTranscriptStatusData"];
};
/** ValidationError */
ValidationError: {
/** Location */
@@ -3423,7 +3656,16 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": unknown;
"application/json":
| components["schemas"]["TranscriptWsTranscript"]
| components["schemas"]["TranscriptWsTopic"]
| components["schemas"]["TranscriptWsStatus"]
| components["schemas"]["TranscriptWsFinalTitle"]
| components["schemas"]["TranscriptWsFinalLongSummary"]
| components["schemas"]["TranscriptWsFinalShortSummary"]
| components["schemas"]["TranscriptWsActionItems"]
| components["schemas"]["TranscriptWsDuration"]
| components["schemas"]["TranscriptWsWaveform"];
};
};
/** @description Validation Error */
@@ -3607,6 +3849,31 @@ export interface operations {
};
};
};
v1_user_get_websocket_events: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json":
| components["schemas"]["UserWsTranscriptCreated"]
| components["schemas"]["UserWsTranscriptDeleted"]
| components["schemas"]["UserWsTranscriptStatus"]
| components["schemas"]["UserWsTranscriptFinalTitle"]
| components["schemas"]["UserWsTranscriptDuration"];
};
};
};
};
v1_zulip_get_streams: {
parameters: {
query?: never;