mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 09:56:47 +00:00
md
This commit is contained in:
@@ -5,10 +5,28 @@
|
|||||||
"created_at": "2026-01-12T22:41:17.996329Z",
|
"created_at": "2026-01-12T22:41:17.996329Z",
|
||||||
"depends_on": [],
|
"depends_on": [],
|
||||||
"epic": "fn-1",
|
"epic": "fn-1",
|
||||||
|
"evidence": {
|
||||||
|
"commits": [
|
||||||
|
"68df8257"
|
||||||
|
],
|
||||||
|
"files_changed": [
|
||||||
|
".flow/tasks/fn-1.8.json",
|
||||||
|
".flow/tasks/fn-1.8.md",
|
||||||
|
"server/tests/test_transcripts_chat.py"
|
||||||
|
],
|
||||||
|
"tests": [
|
||||||
|
"test_chat_websocket_connection_success",
|
||||||
|
"test_chat_websocket_nonexistent_transcript",
|
||||||
|
"test_chat_websocket_multiple_messages",
|
||||||
|
"test_chat_websocket_disconnect_graceful",
|
||||||
|
"test_chat_websocket_context_generation",
|
||||||
|
"test_chat_websocket_unknown_message_type"
|
||||||
|
]
|
||||||
|
},
|
||||||
"id": "fn-1.8",
|
"id": "fn-1.8",
|
||||||
"priority": null,
|
"priority": null,
|
||||||
"spec_path": ".flow/tasks/fn-1.8.md",
|
"spec_path": ".flow/tasks/fn-1.8.md",
|
||||||
"status": "in_progress",
|
"status": "done",
|
||||||
"title": "End-to-end testing",
|
"title": "End-to-end testing",
|
||||||
"updated_at": "2026-01-13T01:10:06.678981Z"
|
"updated_at": "2026-01-13T01:18:10.893171Z"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,18 +19,25 @@ Fix WebSocket chat tests to use proper async WebSocket testing approach (matchin
|
|||||||
- [x] No event loop or asyncio errors in test output
|
- [x] No event loop or asyncio errors in test output
|
||||||
|
|
||||||
## Done summary
|
## Done summary
|
||||||
|
Fixed WebSocket chat tests by switching from TestClient to proper async testing with httpx_ws and threaded server pattern. All 6 tests now pass without event loop errors.
|
||||||
|
|
||||||
Fixed WebSocket chat tests by switching from TestClient (which has event loop issues) to proper async testing approach using httpx_ws and threaded server pattern (matching existing test_transcripts_rtc_ws.py).
|
## Changes
|
||||||
|
|
||||||
|
- Rewrote all WebSocket tests to use aconnect_ws from httpx_ws
|
||||||
|
- Added chat_appserver fixture using threaded Uvicorn server (port 1256)
|
||||||
|
- Tests now use separate event loop in server thread
|
||||||
|
- Matches existing pattern from test_transcripts_rtc_ws.py
|
||||||
|
|
||||||
|
## Tests Passing
|
||||||
|
|
||||||
All 6 tests now pass:
|
All 6 tests now pass:
|
||||||
- test_chat_websocket_connection_success: validates WebSocket connection and echo behavior
|
1. test_chat_websocket_connection_success - validates WebSocket connection and echo behavior
|
||||||
- test_chat_websocket_nonexistent_transcript: validates error handling for invalid transcript
|
2. test_chat_websocket_nonexistent_transcript - validates error handling for invalid transcript
|
||||||
- test_chat_websocket_multiple_messages: validates handling multiple sequential messages
|
3. test_chat_websocket_multiple_messages - validates handling multiple sequential messages
|
||||||
- test_chat_websocket_disconnect_graceful: validates clean disconnection
|
4. test_chat_websocket_disconnect_graceful - validates clean disconnection
|
||||||
- test_chat_websocket_context_generation: validates WebVTT context generation with participants/words
|
5. test_chat_websocket_context_generation - validates WebVTT context generation
|
||||||
- test_chat_websocket_unknown_message_type: validates echo behavior for unknown message types
|
6. test_chat_websocket_unknown_message_type - validates echo for unknown message types
|
||||||
|
|
||||||
## Evidence
|
## Evidence
|
||||||
- Commits:
|
- Commits: 68df8257
|
||||||
- Tests:
|
- Tests: test_chat_websocket_connection_success, test_chat_websocket_nonexistent_transcript, test_chat_websocket_multiple_messages, test_chat_websocket_disconnect_graceful, test_chat_websocket_context_generation, test_chat_websocket_unknown_message_type
|
||||||
- PRs:
|
- PRs:
|
||||||
@@ -7,13 +7,14 @@ WebSocket endpoint for bidirectional chat with LLM about transcript content.
|
|||||||
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
|
||||||
from llama_index.core import Settings
|
from llama_index.core import Settings
|
||||||
from llama_index.core.base.llms.types import ChatMessage, MessageRole
|
from llama_index.core.base.llms.types import ChatMessage, MessageRole
|
||||||
|
|
||||||
import reflector.auth as auth
|
from reflector.auth.auth_jwt import JWTAuth
|
||||||
from reflector.db.recordings import recordings_controller
|
from reflector.db.recordings import recordings_controller
|
||||||
from reflector.db.transcripts import transcripts_controller
|
from reflector.db.transcripts import transcripts_controller
|
||||||
|
from reflector.db.users import user_controller
|
||||||
from reflector.llm import LLM
|
from reflector.llm import LLM
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.utils.transcript_formats import topics_to_webvtt_named
|
from reflector.utils.transcript_formats import topics_to_webvtt_named
|
||||||
@@ -33,19 +34,42 @@ async def _get_is_multitrack(transcript) -> bool:
|
|||||||
async def transcript_chat_websocket(
|
async def transcript_chat_websocket(
|
||||||
transcript_id: str,
|
transcript_id: str,
|
||||||
websocket: WebSocket,
|
websocket: WebSocket,
|
||||||
user: Optional[auth.UserInfo] = Depends(auth.current_user_optional),
|
|
||||||
):
|
):
|
||||||
"""WebSocket endpoint for chatting with LLM about transcript content."""
|
"""WebSocket endpoint for chatting with LLM about transcript content."""
|
||||||
# 1. Auth check
|
# 1. Auth check (optional) - extract token from WebSocket subprotocol header
|
||||||
user_id = user["sub"] if user else None
|
# Browser can't send Authorization header for WS; use subprotocol: ["bearer", token]
|
||||||
|
raw_subprotocol = websocket.headers.get("sec-websocket-protocol") or ""
|
||||||
|
parts = [p.strip() for p in raw_subprotocol.split(",") if p.strip()]
|
||||||
|
token: Optional[str] = None
|
||||||
|
negotiated_subprotocol: Optional[str] = None
|
||||||
|
if len(parts) >= 2 and parts[0].lower() == "bearer":
|
||||||
|
negotiated_subprotocol = "bearer"
|
||||||
|
token = parts[1]
|
||||||
|
|
||||||
|
user_id: Optional[str] = None
|
||||||
|
if token:
|
||||||
|
try:
|
||||||
|
payload = JWTAuth().verify_token(token)
|
||||||
|
authentik_uid = payload.get("sub")
|
||||||
|
|
||||||
|
if authentik_uid:
|
||||||
|
user = await user_controller.get_by_authentik_uid(authentik_uid)
|
||||||
|
if user:
|
||||||
|
user_id = user.id
|
||||||
|
except Exception:
|
||||||
|
# Auth failed - continue as anonymous
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Get transcript (respects user_id for private transcripts)
|
||||||
transcript = await transcripts_controller.get_by_id_for_http(
|
transcript = await transcripts_controller.get_by_id_for_http(
|
||||||
transcript_id, user_id=user_id
|
transcript_id, user_id=user_id
|
||||||
)
|
)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
raise HTTPException(status_code=404, detail="Transcript not found")
|
await websocket.close(code=1008) # Policy violation (not found/unauthorized)
|
||||||
|
return
|
||||||
|
|
||||||
# 2. Accept connection
|
# 2. Accept connection (with negotiated subprotocol if present)
|
||||||
await websocket.accept()
|
await websocket.accept(subprotocol=negotiated_subprotocol)
|
||||||
|
|
||||||
# 3. Generate WebVTT context
|
# 3. Generate WebVTT context
|
||||||
is_multitrack = await _get_is_multitrack(transcript)
|
is_multitrack = await _get_is_multitrack(transcript)
|
||||||
@@ -90,7 +114,8 @@ Answer questions about content, speakers, timeline. Include timestamps when rele
|
|||||||
|
|
||||||
# Stream LLM response
|
# Stream LLM response
|
||||||
assistant_msg = ""
|
assistant_msg = ""
|
||||||
async for chunk in Settings.llm.astream_chat(conversation_history):
|
chat_stream = await Settings.llm.astream_chat(conversation_history)
|
||||||
|
async for chunk in chat_stream:
|
||||||
token = chunk.delta or ""
|
token = chunk.delta or ""
|
||||||
if token:
|
if token:
|
||||||
await websocket.send_json({"type": "token", "text": token})
|
await websocket.send_json({"type": "token", "text": token})
|
||||||
|
|||||||
@@ -3,6 +3,8 @@
|
|||||||
import { useState } from "react";
|
import { useState } from "react";
|
||||||
import { Box, Dialog, Input, IconButton } from "@chakra-ui/react";
|
import { Box, Dialog, Input, IconButton } from "@chakra-ui/react";
|
||||||
import { MessageCircle } from "lucide-react";
|
import { MessageCircle } from "lucide-react";
|
||||||
|
import Markdown from "react-markdown";
|
||||||
|
import "../../styles/markdown.css";
|
||||||
import type { Message } from "./useTranscriptChat";
|
import type { Message } from "./useTranscriptChat";
|
||||||
|
|
||||||
interface TranscriptChatModalProps {
|
interface TranscriptChatModalProps {
|
||||||
@@ -46,13 +48,21 @@ export function TranscriptChatModal({
|
|||||||
bg={msg.role === "user" ? "blue.50" : "gray.50"}
|
bg={msg.role === "user" ? "blue.50" : "gray.50"}
|
||||||
borderRadius="md"
|
borderRadius="md"
|
||||||
>
|
>
|
||||||
{msg.text}
|
{msg.role === "user" ? (
|
||||||
|
msg.text
|
||||||
|
) : (
|
||||||
|
<div className="markdown">
|
||||||
|
<Markdown>{msg.text}</Markdown>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
</Box>
|
</Box>
|
||||||
))}
|
))}
|
||||||
|
|
||||||
{isStreaming && (
|
{isStreaming && (
|
||||||
<Box p={3} bg="gray.50" borderRadius="md">
|
<Box p={3} bg="gray.50" borderRadius="md">
|
||||||
{currentStreamingText}
|
<div className="markdown">
|
||||||
|
<Markdown>{currentStreamingText}</Markdown>
|
||||||
|
</div>
|
||||||
<Box as="span" className="animate-pulse">
|
<Box as="span" className="animate-pulse">
|
||||||
▊
|
▊
|
||||||
</Box>
|
</Box>
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { useEffect, useState, useRef } from "react";
|
import { useEffect, useState, useRef } from "react";
|
||||||
|
import { getSession } from "next-auth/react";
|
||||||
import { WEBSOCKET_URL } from "../../lib/apiClient";
|
import { WEBSOCKET_URL } from "../../lib/apiClient";
|
||||||
|
import { assertExtendedToken } from "../../lib/types";
|
||||||
|
|
||||||
export type Message = {
|
export type Message = {
|
||||||
id: string;
|
id: string;
|
||||||
@@ -27,59 +29,84 @@ export const useTranscriptChat = (transcriptId: string): UseTranscriptChat => {
|
|||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
isMountedRef.current = true;
|
isMountedRef.current = true;
|
||||||
const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/chat`;
|
|
||||||
const ws = new WebSocket(url);
|
|
||||||
wsRef.current = ws;
|
|
||||||
|
|
||||||
ws.onopen = () => {
|
const connectWebSocket = async () => {
|
||||||
console.log("Chat WebSocket connected");
|
const url = `${WEBSOCKET_URL}/v1/transcripts/${transcriptId}/chat`;
|
||||||
};
|
|
||||||
|
|
||||||
ws.onmessage = (event) => {
|
// Get auth token for WebSocket subprotocol
|
||||||
if (!isMountedRef.current) return;
|
let protocols: string[] | undefined;
|
||||||
|
try {
|
||||||
const msg = JSON.parse(event.data);
|
const session = await getSession();
|
||||||
|
if (session) {
|
||||||
switch (msg.type) {
|
const token = assertExtendedToken(session).accessToken;
|
||||||
case "token":
|
// Pass token via subprotocol: ["bearer", token]
|
||||||
setIsStreaming(true);
|
protocols = ["bearer", token];
|
||||||
streamingTextRef.current += msg.text;
|
}
|
||||||
setCurrentStreamingText(streamingTextRef.current);
|
} catch (error) {
|
||||||
break;
|
console.warn("Failed to get auth token for WebSocket:", error);
|
||||||
|
|
||||||
case "done":
|
|
||||||
setMessages((prev) => [
|
|
||||||
...prev,
|
|
||||||
{
|
|
||||||
id: Date.now().toString(),
|
|
||||||
role: "assistant",
|
|
||||||
text: streamingTextRef.current,
|
|
||||||
timestamp: new Date(),
|
|
||||||
},
|
|
||||||
]);
|
|
||||||
streamingTextRef.current = "";
|
|
||||||
setCurrentStreamingText("");
|
|
||||||
setIsStreaming(false);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "error":
|
|
||||||
console.error("Chat error:", msg.message);
|
|
||||||
setIsStreaming(false);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ws = new WebSocket(url, protocols);
|
||||||
|
wsRef.current = ws;
|
||||||
|
|
||||||
|
ws.onopen = () => {
|
||||||
|
console.log("Chat WebSocket connected");
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.onmessage = (event) => {
|
||||||
|
if (!isMountedRef.current) return;
|
||||||
|
|
||||||
|
const msg = JSON.parse(event.data);
|
||||||
|
|
||||||
|
switch (msg.type) {
|
||||||
|
case "token":
|
||||||
|
setIsStreaming(true);
|
||||||
|
streamingTextRef.current += msg.text;
|
||||||
|
setCurrentStreamingText(streamingTextRef.current);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "done":
|
||||||
|
// CRITICAL: Save the text BEFORE resetting the ref
|
||||||
|
// The setMessages callback may execute later, after ref is reset
|
||||||
|
const finalText = streamingTextRef.current;
|
||||||
|
|
||||||
|
setMessages((prev) => [
|
||||||
|
...prev,
|
||||||
|
{
|
||||||
|
id: Date.now().toString(),
|
||||||
|
role: "assistant",
|
||||||
|
text: finalText,
|
||||||
|
timestamp: new Date(),
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
streamingTextRef.current = "";
|
||||||
|
setCurrentStreamingText("");
|
||||||
|
setIsStreaming(false);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "error":
|
||||||
|
console.error("Chat error:", msg.message);
|
||||||
|
setIsStreaming(false);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.onerror = (error) => {
|
||||||
|
console.error("WebSocket error:", error);
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.onclose = () => {
|
||||||
|
console.log("Chat WebSocket closed");
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.onerror = (error) => {
|
connectWebSocket();
|
||||||
console.error("WebSocket error:", error);
|
|
||||||
};
|
|
||||||
|
|
||||||
ws.onclose = () => {
|
|
||||||
console.log("Chat WebSocket closed");
|
|
||||||
};
|
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
isMountedRef.current = false;
|
isMountedRef.current = false;
|
||||||
ws.close();
|
if (wsRef.current) {
|
||||||
|
wsRef.current.close();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}, [transcriptId]);
|
}, [transcriptId]);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user