fix(app): recover state after sse reconnect and harden sse streams (#13973)
This commit is contained in:
@@ -2,9 +2,14 @@ import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2/client"
|
||||
import { createSimpleContext } from "@opencode-ai/ui/context"
|
||||
import { createGlobalEmitter } from "@solid-primitives/event-bus"
|
||||
import { batch, onCleanup } from "solid-js"
|
||||
import z from "zod"
|
||||
import { usePlatform } from "./platform"
|
||||
import { useServer } from "./server"
|
||||
|
||||
const abortError = z.object({
|
||||
name: z.literal("AbortError"),
|
||||
})
|
||||
|
||||
export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleContext({
|
||||
name: "GlobalSDK",
|
||||
init: () => {
|
||||
@@ -93,12 +98,35 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
|
||||
|
||||
let streamErrorLogged = false
|
||||
const wait = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms))
|
||||
const aborted = (error: unknown) => abortError.safeParse(error).success
|
||||
|
||||
let attempt: AbortController | undefined
|
||||
const HEARTBEAT_TIMEOUT_MS = 15_000
|
||||
let heartbeat: ReturnType<typeof setTimeout> | undefined
|
||||
const resetHeartbeat = () => {
|
||||
if (heartbeat) clearTimeout(heartbeat)
|
||||
heartbeat = setTimeout(() => {
|
||||
attempt?.abort()
|
||||
}, HEARTBEAT_TIMEOUT_MS)
|
||||
}
|
||||
const clearHeartbeat = () => {
|
||||
if (!heartbeat) return
|
||||
clearTimeout(heartbeat)
|
||||
heartbeat = undefined
|
||||
}
|
||||
|
||||
void (async () => {
|
||||
while (!abort.signal.aborted) {
|
||||
attempt = new AbortController()
|
||||
const onAbort = () => {
|
||||
attempt?.abort()
|
||||
}
|
||||
abort.signal.addEventListener("abort", onAbort)
|
||||
try {
|
||||
const events = await eventSdk.global.event({
|
||||
signal: attempt.signal,
|
||||
onSseError: (error) => {
|
||||
if (aborted(error)) return
|
||||
if (streamErrorLogged) return
|
||||
streamErrorLogged = true
|
||||
console.error("[global-sdk] event stream error", {
|
||||
@@ -109,7 +137,9 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
|
||||
},
|
||||
})
|
||||
let yielded = Date.now()
|
||||
resetHeartbeat()
|
||||
for await (const event of events.stream) {
|
||||
resetHeartbeat()
|
||||
streamErrorLogged = false
|
||||
const directory = event.directory ?? "global"
|
||||
const payload = event.payload
|
||||
@@ -130,7 +160,7 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
|
||||
await wait(0)
|
||||
}
|
||||
} catch (error) {
|
||||
if (!streamErrorLogged) {
|
||||
if (!aborted(error) && !streamErrorLogged) {
|
||||
streamErrorLogged = true
|
||||
console.error("[global-sdk] event stream failed", {
|
||||
url: server.url,
|
||||
@@ -138,6 +168,10 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
|
||||
error,
|
||||
})
|
||||
}
|
||||
} finally {
|
||||
abort.signal.removeEventListener("abort", onAbort)
|
||||
attempt = undefined
|
||||
clearHeartbeat()
|
||||
}
|
||||
|
||||
if (abort.signal.aborted) return
|
||||
@@ -145,7 +179,19 @@ export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleCo
|
||||
}
|
||||
})().finally(flush)
|
||||
|
||||
const onVisibility = () => {
|
||||
if (typeof document === "undefined") return
|
||||
if (document.visibilityState !== "visible") return
|
||||
attempt?.abort()
|
||||
}
|
||||
if (typeof document !== "undefined") {
|
||||
document.addEventListener("visibilitychange", onVisibility)
|
||||
}
|
||||
|
||||
onCleanup(() => {
|
||||
if (typeof document !== "undefined") {
|
||||
document.removeEventListener("visibilitychange", onVisibility)
|
||||
}
|
||||
abort.abort()
|
||||
flush()
|
||||
})
|
||||
|
||||
@@ -270,6 +270,11 @@ function createGlobalSync() {
|
||||
setGlobalStore("project", next)
|
||||
},
|
||||
})
|
||||
if (event.type === "server.connected" || event.type === "global.disposed") {
|
||||
for (const directory of Object.keys(children.children)) {
|
||||
queue.push(directory)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -116,6 +116,20 @@ describe("applyGlobalEvent", () => {
|
||||
|
||||
expect(refreshCount).toBe(1)
|
||||
})
|
||||
|
||||
test("handles server.connected by triggering refresh", () => {
|
||||
let refreshCount = 0
|
||||
applyGlobalEvent({
|
||||
event: { type: "server.connected" },
|
||||
project: [],
|
||||
refresh: () => {
|
||||
refreshCount += 1
|
||||
},
|
||||
setGlobalProject() {},
|
||||
})
|
||||
|
||||
expect(refreshCount).toBe(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe("applyDirectoryEvent", () => {
|
||||
|
||||
@@ -20,7 +20,7 @@ export function applyGlobalEvent(input: {
|
||||
setGlobalProject: (next: Project[] | ((draft: Project[]) => void)) => void
|
||||
refresh: () => void
|
||||
}) {
|
||||
if (input.event.type === "global.disposed") {
|
||||
if (input.event.type === "global.disposed" || input.event.type === "server.connected") {
|
||||
input.refresh()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -66,6 +66,8 @@ export const GlobalRoutes = lazy(() =>
|
||||
}),
|
||||
async (c) => {
|
||||
log.info("global event connected")
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
@@ -82,7 +84,7 @@ export const GlobalRoutes = lazy(() =>
|
||||
}
|
||||
GlobalBus.on("event", handler)
|
||||
|
||||
// Send heartbeat every 30s to prevent WKWebView timeout (60s default)
|
||||
// Send heartbeat every 10s to prevent stalled proxy streams.
|
||||
const heartbeat = setInterval(() => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
@@ -92,7 +94,7 @@ export const GlobalRoutes = lazy(() =>
|
||||
},
|
||||
}),
|
||||
})
|
||||
}, 30000)
|
||||
}, 10_000)
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
|
||||
@@ -501,6 +501,8 @@ export namespace Server {
|
||||
}),
|
||||
async (c) => {
|
||||
log.info("event connected")
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
@@ -517,7 +519,7 @@ export namespace Server {
|
||||
}
|
||||
})
|
||||
|
||||
// Send heartbeat every 30s to prevent WKWebView timeout (60s default)
|
||||
// Send heartbeat every 10s to prevent stalled proxy streams.
|
||||
const heartbeat = setInterval(() => {
|
||||
stream.writeSSE({
|
||||
data: JSON.stringify({
|
||||
@@ -525,7 +527,7 @@ export namespace Server {
|
||||
properties: {},
|
||||
}),
|
||||
})
|
||||
}, 30000)
|
||||
}, 10_000)
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
|
||||
Reference in New Issue
Block a user