fix(app): terminal replay (#12991)
This commit is contained in:
@@ -15,6 +15,17 @@ export namespace Pty {
|
||||
|
||||
const BUFFER_LIMIT = 1024 * 1024 * 2
|
||||
const BUFFER_CHUNK = 64 * 1024
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
// WebSocket control frame: 0x00 + UTF-8 JSON (currently { cursor }).
|
||||
const meta = (cursor: number) => {
|
||||
const json = JSON.stringify({ cursor })
|
||||
const bytes = encoder.encode(json)
|
||||
const out = new Uint8Array(bytes.length + 1)
|
||||
out[0] = 0
|
||||
out.set(bytes, 1)
|
||||
return out
|
||||
}
|
||||
|
||||
const pty = lazy(async () => {
|
||||
const { spawn } = await import("bun-pty")
|
||||
@@ -68,6 +79,8 @@ export namespace Pty {
|
||||
info: Info
|
||||
process: IPty
|
||||
buffer: string
|
||||
bufferCursor: number
|
||||
cursor: number
|
||||
subscribers: Set<WSContext>
|
||||
}
|
||||
|
||||
@@ -139,23 +152,27 @@ export namespace Pty {
|
||||
info,
|
||||
process: ptyProcess,
|
||||
buffer: "",
|
||||
bufferCursor: 0,
|
||||
cursor: 0,
|
||||
subscribers: new Set(),
|
||||
}
|
||||
state().set(id, session)
|
||||
ptyProcess.onData((data) => {
|
||||
let open = false
|
||||
session.cursor += data.length
|
||||
|
||||
for (const ws of session.subscribers) {
|
||||
if (ws.readyState !== 1) {
|
||||
session.subscribers.delete(ws)
|
||||
continue
|
||||
}
|
||||
open = true
|
||||
ws.send(data)
|
||||
}
|
||||
if (open) return
|
||||
|
||||
session.buffer += data
|
||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||
session.buffer = session.buffer.slice(-BUFFER_LIMIT)
|
||||
const excess = session.buffer.length - BUFFER_LIMIT
|
||||
session.buffer = session.buffer.slice(excess)
|
||||
session.bufferCursor += excess
|
||||
})
|
||||
ptyProcess.onExit(({ exitCode }) => {
|
||||
log.info("session exited", { id, exitCode })
|
||||
@@ -215,28 +232,47 @@ export namespace Pty {
|
||||
}
|
||||
}
|
||||
|
||||
export function connect(id: string, ws: WSContext) {
|
||||
export function connect(id: string, ws: WSContext, cursor?: number) {
|
||||
const session = state().get(id)
|
||||
if (!session) {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
log.info("client connected to session", { id })
|
||||
session.subscribers.add(ws)
|
||||
if (session.buffer) {
|
||||
const buffer = session.buffer.length <= BUFFER_LIMIT ? session.buffer : session.buffer.slice(-BUFFER_LIMIT)
|
||||
session.buffer = ""
|
||||
|
||||
const start = session.bufferCursor
|
||||
const end = session.cursor
|
||||
|
||||
const from =
|
||||
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
|
||||
|
||||
const data = (() => {
|
||||
if (!session.buffer) return ""
|
||||
if (from >= end) return ""
|
||||
const offset = Math.max(0, from - start)
|
||||
if (offset >= session.buffer.length) return ""
|
||||
return session.buffer.slice(offset)
|
||||
})()
|
||||
|
||||
if (data) {
|
||||
try {
|
||||
for (let i = 0; i < buffer.length; i += BUFFER_CHUNK) {
|
||||
ws.send(buffer.slice(i, i + BUFFER_CHUNK))
|
||||
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
|
||||
ws.send(data.slice(i, i + BUFFER_CHUNK))
|
||||
}
|
||||
} catch {
|
||||
session.subscribers.delete(ws)
|
||||
session.buffer = buffer
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(meta(end))
|
||||
} catch {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
|
||||
session.subscribers.add(ws)
|
||||
return {
|
||||
onMessage: (message: string | ArrayBuffer) => {
|
||||
session.process.write(String(message))
|
||||
|
||||
@@ -151,11 +151,18 @@ export const PtyRoutes = lazy(() =>
|
||||
validator("param", z.object({ ptyID: z.string() })),
|
||||
upgradeWebSocket((c) => {
|
||||
const id = c.req.param("ptyID")
|
||||
const cursor = (() => {
|
||||
const value = c.req.query("cursor")
|
||||
if (!value) return
|
||||
const parsed = Number(value)
|
||||
if (!Number.isSafeInteger(parsed) || parsed < -1) return
|
||||
return parsed
|
||||
})()
|
||||
let handler: ReturnType<typeof Pty.connect>
|
||||
if (!Pty.get(id)) throw new Error("Session not found")
|
||||
return {
|
||||
onOpen(_event, ws) {
|
||||
handler = Pty.connect(id, ws)
|
||||
handler = Pty.connect(id, ws, cursor)
|
||||
},
|
||||
onMessage(event) {
|
||||
handler?.onMessage(String(event.data))
|
||||
|
||||
Reference in New Issue
Block a user