fix(app): terminal cross-talk (#14184)
This commit is contained in:
@@ -18,18 +18,24 @@ export namespace Pty {
|
|||||||
|
|
||||||
type Socket = {
|
type Socket = {
|
||||||
readyState: number
|
readyState: number
|
||||||
|
data: object
|
||||||
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
|
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
|
||||||
close: (code?: number, reason?: string) => void
|
close: (code?: number, reason?: string) => void
|
||||||
}
|
}
|
||||||
|
|
||||||
const sockets = new WeakMap<object, number>()
|
// Bun's ServerWebSocket has a per-connection `.data` object (set during
|
||||||
let socketCounter = 0
|
// `server.upgrade`) that changes when the underlying connection is recycled.
|
||||||
|
// We keep a reference to a stable part of it so output can't leak even when
|
||||||
|
// websocket objects are reused.
|
||||||
|
const token = (ws: Socket) => {
|
||||||
|
const data = ws.data
|
||||||
|
const events = (data as { events?: unknown }).events
|
||||||
|
if (events && typeof events === "object") return events
|
||||||
|
|
||||||
const tagSocket = (ws: Socket) => {
|
const url = (data as { url?: unknown }).url
|
||||||
if (!ws || typeof ws !== "object") return
|
if (url && typeof url === "object") return url
|
||||||
const next = (socketCounter = (socketCounter + 1) % Number.MAX_SAFE_INTEGER)
|
|
||||||
sockets.set(ws, next)
|
return data
|
||||||
return next
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WebSocket control frame: 0x00 + UTF-8 JSON (currently { cursor }).
|
// WebSocket control frame: 0x00 + UTF-8 JSON (currently { cursor }).
|
||||||
@@ -96,7 +102,7 @@ export namespace Pty {
|
|||||||
buffer: string
|
buffer: string
|
||||||
bufferCursor: number
|
bufferCursor: number
|
||||||
cursor: number
|
cursor: number
|
||||||
subscribers: Map<Socket, number>
|
subscribers: Map<Socket, object>
|
||||||
}
|
}
|
||||||
|
|
||||||
const state = Instance.state(
|
const state = Instance.state(
|
||||||
@@ -176,26 +182,27 @@ export namespace Pty {
|
|||||||
subscribers: new Map(),
|
subscribers: new Map(),
|
||||||
}
|
}
|
||||||
state().set(id, session)
|
state().set(id, session)
|
||||||
ptyProcess.onData((data) => {
|
ptyProcess.onData((chunk) => {
|
||||||
session.cursor += data.length
|
session.cursor += chunk.length
|
||||||
|
|
||||||
for (const [ws, id] of session.subscribers) {
|
for (const [ws, data] of session.subscribers) {
|
||||||
if (ws.readyState !== 1) {
|
if (ws.readyState !== 1) {
|
||||||
session.subscribers.delete(ws)
|
session.subscribers.delete(ws)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if (typeof ws === "object" && sockets.get(ws) !== id) {
|
|
||||||
|
if (token(ws) !== data) {
|
||||||
session.subscribers.delete(ws)
|
session.subscribers.delete(ws)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
ws.send(data)
|
ws.send(chunk)
|
||||||
} catch {
|
} catch {
|
||||||
session.subscribers.delete(ws)
|
session.subscribers.delete(ws)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session.buffer += data
|
session.buffer += chunk
|
||||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||||
const excess = session.buffer.length - BUFFER_LIMIT
|
const excess = session.buffer.length - BUFFER_LIMIT
|
||||||
session.buffer = session.buffer.slice(excess)
|
session.buffer = session.buffer.slice(excess)
|
||||||
@@ -305,8 +312,12 @@ export namespace Pty {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const socketId = tagSocket(ws)
|
if (!ws.data || typeof ws.data !== "object") {
|
||||||
if (typeof socketId === "number") session.subscribers.set(ws, socketId)
|
ws.close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
session.subscribers.set(ws, token(ws))
|
||||||
return {
|
return {
|
||||||
onMessage: (message: string | ArrayBuffer) => {
|
onMessage: (message: string | ArrayBuffer) => {
|
||||||
session.process.write(String(message))
|
session.process.write(String(message))
|
||||||
|
|||||||
@@ -163,6 +163,7 @@ export const PtyRoutes = lazy(() =>
|
|||||||
|
|
||||||
type Socket = {
|
type Socket = {
|
||||||
readyState: number
|
readyState: number
|
||||||
|
data: object
|
||||||
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
|
send: (data: string | Uint8Array<ArrayBuffer> | ArrayBuffer) => void
|
||||||
close: (code?: number, reason?: string) => void
|
close: (code?: number, reason?: string) => void
|
||||||
}
|
}
|
||||||
@@ -170,6 +171,10 @@ export const PtyRoutes = lazy(() =>
|
|||||||
const isSocket = (value: unknown): value is Socket => {
|
const isSocket = (value: unknown): value is Socket => {
|
||||||
if (!value || typeof value !== "object") return false
|
if (!value || typeof value !== "object") return false
|
||||||
if (!("readyState" in value)) return false
|
if (!("readyState" in value)) return false
|
||||||
|
if (!("data" in value)) return false
|
||||||
|
if (!((value as { data?: unknown }).data && typeof (value as { data?: unknown }).data === "object")) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if (!("send" in value) || typeof (value as { send?: unknown }).send !== "function") return false
|
if (!("send" in value) || typeof (value as { send?: unknown }).send !== "function") return false
|
||||||
if (!("close" in value) || typeof (value as { close?: unknown }).close !== "function") return false
|
if (!("close" in value) || typeof (value as { close?: unknown }).close !== "function") return false
|
||||||
return typeof (value as { readyState?: unknown }).readyState === "number"
|
return typeof (value as { readyState?: unknown }).readyState === "number"
|
||||||
@@ -177,11 +182,16 @@ export const PtyRoutes = lazy(() =>
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
onOpen(_event, ws) {
|
onOpen(_event, ws) {
|
||||||
const socket = isSocket(ws.raw) ? ws.raw : ws
|
const raw = ws.raw
|
||||||
handler = Pty.connect(id, socket, cursor)
|
if (!isSocket(raw)) {
|
||||||
|
ws.close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handler = Pty.connect(id, raw, cursor)
|
||||||
},
|
},
|
||||||
onMessage(event) {
|
onMessage(event) {
|
||||||
handler?.onMessage(String(event.data))
|
if (typeof event.data !== "string") return
|
||||||
|
handler?.onMessage(event.data)
|
||||||
},
|
},
|
||||||
onClose() {
|
onClose() {
|
||||||
handler?.onClose()
|
handler?.onClose()
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ describe("pty", () => {
|
|||||||
|
|
||||||
const ws = {
|
const ws = {
|
||||||
readyState: 1,
|
readyState: 1,
|
||||||
|
data: { events: { connection: "a" } },
|
||||||
send: (data: unknown) => {
|
send: (data: unknown) => {
|
||||||
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||||
},
|
},
|
||||||
@@ -30,6 +31,7 @@ describe("pty", () => {
|
|||||||
Pty.connect(a.id, ws as any)
|
Pty.connect(a.id, ws as any)
|
||||||
|
|
||||||
// Now "reuse" the same ws object for another connection.
|
// Now "reuse" the same ws object for another connection.
|
||||||
|
ws.data = { events: { connection: "b" } }
|
||||||
ws.send = (data: unknown) => {
|
ws.send = (data: unknown) => {
|
||||||
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||||
}
|
}
|
||||||
@@ -51,4 +53,48 @@ describe("pty", () => {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("does not leak output when Bun recycles websocket objects before re-connect", async () => {
|
||||||
|
await using dir = await tmpdir({ git: true })
|
||||||
|
|
||||||
|
await Instance.provide({
|
||||||
|
directory: dir.path,
|
||||||
|
fn: async () => {
|
||||||
|
const a = await Pty.create({ command: "cat", title: "a" })
|
||||||
|
try {
|
||||||
|
const outA: string[] = []
|
||||||
|
const outB: string[] = []
|
||||||
|
|
||||||
|
const ws = {
|
||||||
|
readyState: 1,
|
||||||
|
data: { events: { connection: "a" } },
|
||||||
|
send: (data: unknown) => {
|
||||||
|
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||||
|
},
|
||||||
|
close: () => {
|
||||||
|
// no-op (simulate abrupt drop)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect "a" first.
|
||||||
|
Pty.connect(a.id, ws as any)
|
||||||
|
outA.length = 0
|
||||||
|
|
||||||
|
// Simulate Bun reusing the same websocket object for another connection
|
||||||
|
// before the new onOpen handler has a chance to tag it.
|
||||||
|
ws.data = { events: { connection: "b" } }
|
||||||
|
ws.send = (data: unknown) => {
|
||||||
|
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||||
|
}
|
||||||
|
|
||||||
|
Pty.write(a.id, "AAA\n")
|
||||||
|
await Bun.sleep(100)
|
||||||
|
|
||||||
|
expect(outB.join("")).not.toContain("AAA")
|
||||||
|
} finally {
|
||||||
|
await Pty.remove(a.id)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user