fix(app): terminal rework (#14217)

This commit is contained in:
Adam
2026-02-19 06:35:14 -06:00
committed by GitHub
parent c7b35342dd
commit d07f09925f
6 changed files with 124 additions and 160 deletions

View File

@@ -18,27 +18,26 @@ export namespace Pty {
type Socket = {
readyState: number
data: object
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
send: (data: string | Uint8Array | ArrayBuffer) => void
close: (code?: number, reason?: string) => void
}
// Bun's ServerWebSocket has a per-connection `.data` object (set during
// `server.upgrade`) that changes when the underlying connection is recycled.
// We keep a reference to a stable part of it so output can't leak even when
// websocket objects are reused.
const token = (ws: Socket) => {
const data = ws.data
const events = (data as { events?: unknown }).events
if (events && typeof events === "object") return events
const url = (data as { url?: unknown }).url
if (url && typeof url === "object") return url
return data
type Subscriber = {
id: number
}
// WebSocket control frame: 0x00 + UTF-8 JSON (currently { cursor }).
const sockets = new WeakMap<object, number>()
const owners = new WeakMap<object, string>()
let socketCounter = 0
const tagSocket = (ws: Socket) => {
if (!ws || typeof ws !== "object") return
const next = (socketCounter = (socketCounter + 1) % Number.MAX_SAFE_INTEGER)
sockets.set(ws, next)
return next
}
// WebSocket control frame: 0x00 + UTF-8 JSON.
const meta = (cursor: number) => {
const json = JSON.stringify({ cursor })
const bytes = encoder.encode(json)
@@ -102,7 +101,7 @@ export namespace Pty {
buffer: string
bufferCursor: number
cursor: number
subscribers: Map<Socket, object>
subscribers: Map<Socket, Subscriber>
}
const state = Instance.state(
@@ -185,13 +184,13 @@ export namespace Pty {
ptyProcess.onData((chunk) => {
session.cursor += chunk.length
for (const [ws, data] of session.subscribers) {
for (const [ws, sub] of session.subscribers) {
if (ws.readyState !== 1) {
session.subscribers.delete(ws)
continue
}
if (token(ws) !== data) {
if (typeof ws === "object" && sockets.get(ws) !== sub.id) {
session.subscribers.delete(ws)
continue
}
@@ -280,6 +279,25 @@ export namespace Pty {
}
log.info("client connected to session", { id })
const socketId = tagSocket(ws)
if (socketId === undefined) {
ws.close()
return
}
const previous = owners.get(ws)
if (previous && previous !== id) {
state().get(previous)?.subscribers.delete(ws)
}
owners.set(ws, id)
session.subscribers.set(ws, { id: socketId })
const cleanup = () => {
session.subscribers.delete(ws)
if (owners.get(ws) === id) owners.delete(ws)
}
const start = session.bufferCursor
const end = session.cursor
@@ -300,6 +318,7 @@ export namespace Pty {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
cleanup()
ws.close()
return
}
@@ -308,23 +327,17 @@ export namespace Pty {
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
if (!ws.data || typeof ws.data !== "object") {
ws.close()
return
}
session.subscribers.set(ws, token(ws))
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
session.subscribers.delete(ws)
cleanup()
},
}
}

View File

@@ -163,18 +163,13 @@ export const PtyRoutes = lazy(() =>
type Socket = {
readyState: number
data: object
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
send: (data: string | Uint8Array | ArrayBuffer) => void
close: (code?: number, reason?: string) => void
}
const isSocket = (value: unknown): value is Socket => {
if (!value || typeof value !== "object") return false
if (!("readyState" in value)) return false
if (!("data" in value)) return false
if (!((value as { data?: unknown }).data && typeof (value as { data?: unknown }).data === "object")) {
return false
}
if (!("send" in value) || typeof (value as { send?: unknown }).send !== "function") return false
if (!("close" in value) || typeof (value as { close?: unknown }).close !== "function") return false
return typeof (value as { readyState?: unknown }).readyState === "number"
@@ -182,12 +177,12 @@ export const PtyRoutes = lazy(() =>
return {
onOpen(_event, ws) {
const raw = ws.raw
if (!isSocket(raw)) {
const socket = ws.raw
if (!isSocket(socket)) {
ws.close()
return
}
handler = Pty.connect(id, raw, cursor)
handler = Pty.connect(id, socket, cursor)
},
onMessage(event) {
if (typeof event.data !== "string") return

View File

@@ -18,7 +18,6 @@ describe("pty", () => {
const ws = {
readyState: 1,
data: { events: { connection: "a" } },
send: (data: unknown) => {
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
},
@@ -31,7 +30,6 @@ describe("pty", () => {
Pty.connect(a.id, ws as any)
// Now "reuse" the same ws object for another connection.
ws.data = { events: { connection: "b" } }
ws.send = (data: unknown) => {
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
}
@@ -53,48 +51,4 @@ describe("pty", () => {
},
})
})
test("does not leak output when Bun recycles websocket objects before re-connect", async () => {
await using dir = await tmpdir({ git: true })
await Instance.provide({
directory: dir.path,
fn: async () => {
const a = await Pty.create({ command: "cat", title: "a" })
try {
const outA: string[] = []
const outB: string[] = []
const ws = {
readyState: 1,
data: { events: { connection: "a" } },
send: (data: unknown) => {
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
},
close: () => {
// no-op (simulate abrupt drop)
},
}
// Connect "a" first.
Pty.connect(a.id, ws as any)
outA.length = 0
// Simulate Bun reusing the same websocket object for another connection
// before the new onOpen handler has a chance to tag it.
ws.data = { events: { connection: "b" } }
ws.send = (data: unknown) => {
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
}
Pty.write(a.id, "AAA\n")
await Bun.sleep(100)
expect(outB.join("")).not.toContain("AAA")
} finally {
await Pty.remove(a.id)
}
},
})
})
})