Files
reflector/www/app/lib/UserEventsProvider.tsx
Igor Loskutov 0c06cdd117 fix: consolidate DagTask types, fix REST fallback shape, fix lint noqa
- Extract shared DagTask/DagTaskStatus types into www/app/lib/dagTypes.ts
- Re-export from useWebSockets.ts and UserEventsProvider.tsx
- Fix browse page REST fallback: dag_status is list[dict] directly, not {tasks: [...]}
- Add missing # noqa: PLC0415 for fork-safe deferred imports
2026-02-09 13:25:40 -05:00

223 lines
6.4 KiB
TypeScript

"use client";
import React, { useEffect, useRef, useState } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { WEBSOCKET_URL } from "./apiClient";
import { useAuth } from "./AuthProvider";
import { z } from "zod";
import { invalidateTranscriptLists, TRANSCRIPT_SEARCH_URL } from "./apiHooks";
import type { DagTask } from "./dagTypes";
export type { DagTask, DagTaskStatus } from "./dagTypes";
const DagStatusContext = React.createContext<Map<string, DagTask[]>>(new Map());
export function useDagStatusMap() {
return React.useContext(DagStatusContext);
}
const UserEvent = z.object({
event: z.string(),
});
type UserEvent = z.TypeOf<typeof UserEvent>;
class UserEventsStore {
private socket: WebSocket | null = null;
private listeners: Set<(event: MessageEvent) => void> = new Set();
private closeTimeoutId: number | null = null;
private isConnecting = false;
ensureConnection(url: string, subprotocols?: string[]) {
if (typeof window === "undefined") return;
if (this.closeTimeoutId !== null) {
clearTimeout(this.closeTimeoutId);
this.closeTimeoutId = null;
}
if (this.isConnecting) return;
if (
this.socket &&
(this.socket.readyState === WebSocket.OPEN ||
this.socket.readyState === WebSocket.CONNECTING)
) {
return;
}
this.isConnecting = true;
const ws = new WebSocket(url, subprotocols || []);
this.socket = ws;
ws.onmessage = (event: MessageEvent) => {
this.listeners.forEach((listener) => {
try {
listener(event);
} catch (err) {
console.error("UserEvents listener error", err);
}
});
};
ws.onopen = () => {
if (this.socket === ws) this.isConnecting = false;
};
ws.onclose = () => {
if (this.socket === ws) {
this.socket = null;
this.isConnecting = false;
}
};
ws.onerror = () => {
if (this.socket === ws) this.isConnecting = false;
};
}
subscribe(listener: (event: MessageEvent) => void): () => void {
this.listeners.add(listener);
if (this.closeTimeoutId !== null) {
clearTimeout(this.closeTimeoutId);
this.closeTimeoutId = null;
}
return () => {
this.listeners.delete(listener);
if (this.listeners.size === 0) {
this.closeTimeoutId = window.setTimeout(() => {
if (this.socket) {
try {
this.socket.close();
} catch (err) {
console.warn("Error closing user events socket", err);
}
}
this.socket = null;
this.closeTimeoutId = null;
}, 1000);
}
};
}
}
const sharedStore = new UserEventsStore();
export function UserEventsProvider({
children,
}: {
children: React.ReactNode;
}) {
const auth = useAuth();
const queryClient = useQueryClient();
const tokenRef = useRef<string | null>(null);
const detachRef = useRef<(() => void) | null>(null);
const [dagStatusMap, setDagStatusMap] = useState<Map<string, DagTask[]>>(
new Map(),
);
useEffect(() => {
// Only tear down when the user is truly unauthenticated
if (auth.status === "unauthenticated") {
if (detachRef.current) {
try {
detachRef.current();
} catch (err) {
console.warn("Error detaching UserEvents listener", err);
}
detachRef.current = null;
}
tokenRef.current = null;
return;
}
// During loading/refreshing, keep the existing connection intact
if (auth.status !== "authenticated") {
return;
}
// Authenticated: pin the initial token for the lifetime of this WS connection
if (!tokenRef.current && auth.accessToken) {
tokenRef.current = auth.accessToken;
}
const pinnedToken = tokenRef.current;
const url = `${WEBSOCKET_URL}/v1/events`;
// Ensure a single shared connection
sharedStore.ensureConnection(
url,
pinnedToken ? ["bearer", pinnedToken] : undefined,
);
// Subscribe once; avoid re-subscribing during transient status changes
if (!detachRef.current) {
const onMessage = (event: MessageEvent) => {
try {
const fullMsg = JSON.parse(event.data);
const msg = UserEvent.parse(fullMsg);
const eventName = msg.event;
const invalidateList = () => invalidateTranscriptLists(queryClient);
switch (eventName) {
case "TRANSCRIPT_CREATED":
case "TRANSCRIPT_DELETED":
case "TRANSCRIPT_FINAL_TITLE":
case "TRANSCRIPT_DURATION":
invalidateList().then(() => {});
break;
case "TRANSCRIPT_STATUS": {
invalidateList().then(() => {});
const transcriptId = fullMsg.data?.id as string | undefined;
const status = fullMsg.data?.value as string | undefined;
if (transcriptId && status && status !== "processing") {
setDagStatusMap((prev) => {
const next = new Map(prev);
next.delete(transcriptId);
return next;
});
}
break;
}
case "TRANSCRIPT_DAG_STATUS": {
const transcriptId = fullMsg.data?.id as string | undefined;
const tasks = fullMsg.data?.tasks as DagTask[] | undefined;
if (transcriptId && tasks) {
setDagStatusMap((prev) => {
const next = new Map(prev);
next.set(transcriptId, tasks);
return next;
});
}
break;
}
default:
// Ignore other content events for list updates
break;
}
} catch (err) {
console.warn("Invalid user event message", event.data);
}
};
const unsubscribe = sharedStore.subscribe(onMessage);
detachRef.current = unsubscribe;
}
}, [auth.status, queryClient]);
// On unmount, detach the listener and clear the pinned token
useEffect(() => {
return () => {
if (detachRef.current) {
try {
detachRef.current();
} catch (err) {
console.warn("Error detaching UserEvents listener on unmount", err);
}
detachRef.current = null;
}
tokenRef.current = null;
};
}, []);
return (
<DagStatusContext.Provider value={dagStatusMap}>
{children}
</DagStatusContext.Provider>
);
}