From 3652de9fca02c74b2183fb7cd11f0bab8770ea89 Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Tue, 13 Jan 2026 12:44:43 -0500 Subject: [PATCH] md --- .flow/tasks/fn-1.8.json | 22 +++- .flow/tasks/fn-1.8.md | 29 +++-- server/reflector/views/transcripts_chat.py | 43 +++++-- .../(app)/transcripts/TranscriptChatModal.tsx | 14 ++- .../(app)/transcripts/useTranscriptChat.ts | 117 +++++++++++------- 5 files changed, 156 insertions(+), 69 deletions(-) diff --git a/.flow/tasks/fn-1.8.json b/.flow/tasks/fn-1.8.json index 3141e46a..43852bde 100644 --- a/.flow/tasks/fn-1.8.json +++ b/.flow/tasks/fn-1.8.json @@ -5,10 +5,28 @@ "created_at": "2026-01-12T22:41:17.996329Z", "depends_on": [], "epic": "fn-1", + "evidence": { + "commits": [ + "68df8257" + ], + "files_changed": [ + ".flow/tasks/fn-1.8.json", + ".flow/tasks/fn-1.8.md", + "server/tests/test_transcripts_chat.py" + ], + "tests": [ + "test_chat_websocket_connection_success", + "test_chat_websocket_nonexistent_transcript", + "test_chat_websocket_multiple_messages", + "test_chat_websocket_disconnect_graceful", + "test_chat_websocket_context_generation", + "test_chat_websocket_unknown_message_type" + ] + }, "id": "fn-1.8", "priority": null, "spec_path": ".flow/tasks/fn-1.8.md", - "status": "in_progress", + "status": "done", "title": "End-to-end testing", - "updated_at": "2026-01-13T01:10:06.678981Z" + "updated_at": "2026-01-13T01:18:10.893171Z" } diff --git a/.flow/tasks/fn-1.8.md b/.flow/tasks/fn-1.8.md index 4224553c..54c2e826 100644 --- a/.flow/tasks/fn-1.8.md +++ b/.flow/tasks/fn-1.8.md @@ -19,18 +19,25 @@ Fix WebSocket chat tests to use proper async WebSocket testing approach (matchin - [x] No event loop or asyncio errors in test output ## Done summary +Fixed WebSocket chat tests by switching from TestClient to proper async testing with httpx_ws and threaded server pattern. All 6 tests now pass without event loop errors. -Fixed WebSocket chat tests by switching from TestClient (which has event loop issues) to proper async testing approach using httpx_ws and threaded server pattern (matching existing test_transcripts_rtc_ws.py). +## Changes + +- Rewrote all WebSocket tests to use aconnect_ws from httpx_ws +- Added chat_appserver fixture using threaded Uvicorn server (port 1256) +- Tests now use separate event loop in server thread +- Matches existing pattern from test_transcripts_rtc_ws.py + +## Tests Passing All 6 tests now pass: -- test_chat_websocket_connection_success: validates WebSocket connection and echo behavior -- test_chat_websocket_nonexistent_transcript: validates error handling for invalid transcript -- test_chat_websocket_multiple_messages: validates handling multiple sequential messages -- test_chat_websocket_disconnect_graceful: validates clean disconnection -- test_chat_websocket_context_generation: validates WebVTT context generation with participants/words -- test_chat_websocket_unknown_message_type: validates echo behavior for unknown message types - +1. test_chat_websocket_connection_success - validates WebSocket connection and echo behavior +2. test_chat_websocket_nonexistent_transcript - validates error handling for invalid transcript +3. test_chat_websocket_multiple_messages - validates handling multiple sequential messages +4. test_chat_websocket_disconnect_graceful - validates clean disconnection +5. test_chat_websocket_context_generation - validates WebVTT context generation +6. test_chat_websocket_unknown_message_type - validates echo for unknown message types ## Evidence -- Commits: -- Tests: -- PRs: +- Commits: 68df8257 +- Tests: test_chat_websocket_connection_success, test_chat_websocket_nonexistent_transcript, test_chat_websocket_multiple_messages, test_chat_websocket_disconnect_graceful, test_chat_websocket_context_generation, test_chat_websocket_unknown_message_type +- PRs: \ No newline at end of file diff --git a/server/reflector/views/transcripts_chat.py b/server/reflector/views/transcripts_chat.py index 6cefe3bc..a1c17ed6 100644 --- a/server/reflector/views/transcripts_chat.py +++ b/server/reflector/views/transcripts_chat.py @@ -7,13 +7,14 @@ WebSocket endpoint for bidirectional chat with LLM about transcript content. from typing import Optional -from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, WebSocket, WebSocketDisconnect from llama_index.core import Settings from llama_index.core.base.llms.types import ChatMessage, MessageRole -import reflector.auth as auth +from reflector.auth.auth_jwt import JWTAuth from reflector.db.recordings import recordings_controller from reflector.db.transcripts import transcripts_controller +from reflector.db.users import user_controller from reflector.llm import LLM from reflector.settings import settings from reflector.utils.transcript_formats import topics_to_webvtt_named @@ -33,19 +34,42 @@ async def _get_is_multitrack(transcript) -> bool: async def transcript_chat_websocket( transcript_id: str, websocket: WebSocket, - user: Optional[auth.UserInfo] = Depends(auth.current_user_optional), ): """WebSocket endpoint for chatting with LLM about transcript content.""" - # 1. Auth check - user_id = user["sub"] if user else None + # 1. Auth check (optional) - extract token from WebSocket subprotocol header + # Browser can't send Authorization header for WS; use subprotocol: ["bearer", token] + raw_subprotocol = websocket.headers.get("sec-websocket-protocol") or "" + parts = [p.strip() for p in raw_subprotocol.split(",") if p.strip()] + token: Optional[str] = None + negotiated_subprotocol: Optional[str] = None + if len(parts) >= 2 and parts[0].lower() == "bearer": + negotiated_subprotocol = "bearer" + token = parts[1] + + user_id: Optional[str] = None + if token: + try: + payload = JWTAuth().verify_token(token) + authentik_uid = payload.get("sub") + + if authentik_uid: + user = await user_controller.get_by_authentik_uid(authentik_uid) + if user: + user_id = user.id + except Exception: + # Auth failed - continue as anonymous + pass + + # Get transcript (respects user_id for private transcripts) transcript = await transcripts_controller.get_by_id_for_http( transcript_id, user_id=user_id ) if not transcript: - raise HTTPException(status_code=404, detail="Transcript not found") + await websocket.close(code=1008) # Policy violation (not found/unauthorized) + return - # 2. Accept connection - await websocket.accept() + # 2. Accept connection (with negotiated subprotocol if present) + await websocket.accept(subprotocol=negotiated_subprotocol) # 3. Generate WebVTT context is_multitrack = await _get_is_multitrack(transcript) @@ -90,7 +114,8 @@ Answer questions about content, speakers, timeline. Include timestamps when rele # Stream LLM response assistant_msg = "" - async for chunk in Settings.llm.astream_chat(conversation_history): + chat_stream = await Settings.llm.astream_chat(conversation_history) + async for chunk in chat_stream: token = chunk.delta or "" if token: await websocket.send_json({"type": "token", "text": token}) diff --git a/www/app/(app)/transcripts/TranscriptChatModal.tsx b/www/app/(app)/transcripts/TranscriptChatModal.tsx index 93872dae..719997de 100644 --- a/www/app/(app)/transcripts/TranscriptChatModal.tsx +++ b/www/app/(app)/transcripts/TranscriptChatModal.tsx @@ -3,6 +3,8 @@ import { useState } from "react"; import { Box, Dialog, Input, IconButton } from "@chakra-ui/react"; import { MessageCircle } from "lucide-react"; +import Markdown from "react-markdown"; +import "../../styles/markdown.css"; import type { Message } from "./useTranscriptChat"; interface TranscriptChatModalProps { @@ -46,13 +48,21 @@ export function TranscriptChatModal({ bg={msg.role === "user" ? "blue.50" : "gray.50"} borderRadius="md" > - {msg.text} + {msg.role === "user" ? ( + msg.text + ) : ( +
+ {msg.text} +
+ )} ))} {isStreaming && ( - {currentStreamingText} +
+ {currentStreamingText} +
diff --git a/www/app/(app)/transcripts/useTranscriptChat.ts b/www/app/(app)/transcripts/useTranscriptChat.ts index 5eb22ecd..f4d6e813 100644 --- a/www/app/(app)/transcripts/useTranscriptChat.ts +++ b/www/app/(app)/transcripts/useTranscriptChat.ts @@ -1,7 +1,9 @@ "use client"; import { useEffect, useState, useRef } from "react"; +import { getSession } from "next-auth/react"; import { WEBSOCKET_URL } from "../../lib/apiClient"; +import { assertExtendedToken } from "../../lib/types"; export type Message = { id: string; @@ -27,59 +29,84 @@ export const useTranscriptChat = (transcriptId: string): UseTranscriptChat => { useEffect(() => { isMountedRef.current = true; - const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/chat`; - const ws = new WebSocket(url); - wsRef.current = ws; - ws.onopen = () => { - console.log("Chat WebSocket connected"); - }; + const connectWebSocket = async () => { + const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/chat`; - ws.onmessage = (event) => { - if (!isMountedRef.current) return; - - const msg = JSON.parse(event.data); - - switch (msg.type) { - case "token": - setIsStreaming(true); - streamingTextRef.current += msg.text; - setCurrentStreamingText(streamingTextRef.current); - break; - - case "done": - setMessages((prev) => [ - ...prev, - { - id: Date.now().toString(), - role: "assistant", - text: streamingTextRef.current, - timestamp: new Date(), - }, - ]); - streamingTextRef.current = ""; - setCurrentStreamingText(""); - setIsStreaming(false); - break; - - case "error": - console.error("Chat error:", msg.message); - setIsStreaming(false); - break; + // Get auth token for WebSocket subprotocol + let protocols: string[] | undefined; + try { + const session = await getSession(); + if (session) { + const token = assertExtendedToken(session).accessToken; + // Pass token via subprotocol: ["bearer", token] + protocols = ["bearer", token]; + } + } catch (error) { + console.warn("Failed to get auth token for WebSocket:", error); } + + const ws = new WebSocket(url, protocols); + wsRef.current = ws; + + ws.onopen = () => { + console.log("Chat WebSocket connected"); + }; + + ws.onmessage = (event) => { + if (!isMountedRef.current) return; + + const msg = JSON.parse(event.data); + + switch (msg.type) { + case "token": + setIsStreaming(true); + streamingTextRef.current += msg.text; + setCurrentStreamingText(streamingTextRef.current); + break; + + case "done": + // CRITICAL: Save the text BEFORE resetting the ref + // The setMessages callback may execute later, after ref is reset + const finalText = streamingTextRef.current; + + setMessages((prev) => [ + ...prev, + { + id: Date.now().toString(), + role: "assistant", + text: finalText, + timestamp: new Date(), + }, + ]); + streamingTextRef.current = ""; + setCurrentStreamingText(""); + setIsStreaming(false); + break; + + case "error": + console.error("Chat error:", msg.message); + setIsStreaming(false); + break; + } + }; + + ws.onerror = (error) => { + console.error("WebSocket error:", error); + }; + + ws.onclose = () => { + console.log("Chat WebSocket closed"); + }; }; - ws.onerror = (error) => { - console.error("WebSocket error:", error); - }; - - ws.onclose = () => { - console.log("Chat WebSocket closed"); - }; + connectWebSocket(); return () => { isMountedRef.current = false; - ws.close(); + if (wsRef.current) { + wsRef.current.close(); + } }; }, [transcriptId]);