fix: prevent memory leaks from AbortController closures (#12024)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import z from "zod"
|
||||
import { Tool } from "./tool"
|
||||
import DESCRIPTION from "./codesearch.txt"
|
||||
import { abortAfterAny } from "../util/abort"
|
||||
|
||||
const API_CONFIG = {
|
||||
BASE_URL: "https://mcp.exa.ai",
|
||||
@@ -73,8 +74,7 @@ export const CodeSearchTool = Tool.define("codesearch", {
|
||||
},
|
||||
}
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), 30000)
|
||||
const { signal, clearTimeout } = abortAfterAny(30000, ctx.abort)
|
||||
|
||||
try {
|
||||
const headers: Record<string, string> = {
|
||||
@@ -86,10 +86,10 @@ export const CodeSearchTool = Tool.define("codesearch", {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(codeRequest),
|
||||
signal: AbortSignal.any([controller.signal, ctx.abort]),
|
||||
signal,
|
||||
})
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout()
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
@@ -120,7 +120,7 @@ export const CodeSearchTool = Tool.define("codesearch", {
|
||||
metadata: {},
|
||||
}
|
||||
} catch (error) {
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout()
|
||||
|
||||
if (error instanceof Error && error.name === "AbortError") {
|
||||
throw new Error("Code search request timed out")
|
||||
|
||||
@@ -2,6 +2,7 @@ import z from "zod"
|
||||
import { Tool } from "./tool"
|
||||
import TurndownService from "turndown"
|
||||
import DESCRIPTION from "./webfetch.txt"
|
||||
import { abortAfterAny } from "../util/abort"
|
||||
|
||||
const MAX_RESPONSE_SIZE = 5 * 1024 * 1024 // 5MB
|
||||
const DEFAULT_TIMEOUT = 30 * 1000 // 30 seconds
|
||||
@@ -36,8 +37,7 @@ export const WebFetchTool = Tool.define("webfetch", {
|
||||
|
||||
const timeout = Math.min((params.timeout ?? DEFAULT_TIMEOUT / 1000) * 1000, MAX_TIMEOUT)
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
||||
const { signal, clearTimeout } = abortAfterAny(timeout, ctx.abort)
|
||||
|
||||
// Build Accept header based on requested format with q parameters for fallbacks
|
||||
let acceptHeader = "*/*"
|
||||
@@ -55,8 +55,6 @@ export const WebFetchTool = Tool.define("webfetch", {
|
||||
acceptHeader =
|
||||
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8"
|
||||
}
|
||||
|
||||
const signal = AbortSignal.any([controller.signal, ctx.abort])
|
||||
const headers = {
|
||||
"User-Agent":
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
|
||||
@@ -72,7 +70,7 @@ export const WebFetchTool = Tool.define("webfetch", {
|
||||
? await fetch(params.url, { signal, headers: { ...headers, "User-Agent": "opencode" } })
|
||||
: initial
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout()
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Request failed with status code: ${response.status}`)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import z from "zod"
|
||||
import { Tool } from "./tool"
|
||||
import DESCRIPTION from "./websearch.txt"
|
||||
import { abortAfterAny } from "../util/abort"
|
||||
|
||||
const API_CONFIG = {
|
||||
BASE_URL: "https://mcp.exa.ai",
|
||||
@@ -91,8 +92,7 @@ export const WebSearchTool = Tool.define("websearch", async () => {
|
||||
},
|
||||
}
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), 25000)
|
||||
const { signal, clearTimeout } = abortAfterAny(25000, ctx.abort)
|
||||
|
||||
try {
|
||||
const headers: Record<string, string> = {
|
||||
@@ -104,10 +104,10 @@ export const WebSearchTool = Tool.define("websearch", async () => {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(searchRequest),
|
||||
signal: AbortSignal.any([controller.signal, ctx.abort]),
|
||||
signal,
|
||||
})
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout()
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
@@ -137,7 +137,7 @@ export const WebSearchTool = Tool.define("websearch", async () => {
|
||||
metadata: {},
|
||||
}
|
||||
} catch (error) {
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout()
|
||||
|
||||
if (error instanceof Error && error.name === "AbortError") {
|
||||
throw new Error("Search request timed out")
|
||||
|
||||
35
packages/opencode/src/util/abort.ts
Normal file
35
packages/opencode/src/util/abort.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Creates an AbortController that automatically aborts after a timeout.
|
||||
*
|
||||
* Uses bind() instead of arrow functions to avoid capturing the surrounding
|
||||
* scope in closures. Arrow functions like `() => controller.abort()` capture
|
||||
* request bodies and other large objects, preventing GC for the timer lifetime.
|
||||
*
|
||||
* @param ms Timeout in milliseconds
|
||||
* @returns Object with controller, signal, and clearTimeout function
|
||||
*/
|
||||
export function abortAfter(ms: number) {
|
||||
const controller = new AbortController()
|
||||
const id = setTimeout(controller.abort.bind(controller), ms)
|
||||
return {
|
||||
controller,
|
||||
signal: controller.signal,
|
||||
clearTimeout: () => globalThis.clearTimeout(id),
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Combines multiple AbortSignals with a timeout.
|
||||
*
|
||||
* @param ms Timeout in milliseconds
|
||||
* @param signals Additional signals to combine
|
||||
* @returns Combined signal that aborts on timeout or when any input signal aborts
|
||||
*/
|
||||
export function abortAfterAny(ms: number, ...signals: AbortSignal[]) {
|
||||
const timeout = abortAfter(ms)
|
||||
const signal = AbortSignal.any([timeout.signal, ...signals])
|
||||
return {
|
||||
signal,
|
||||
clearTimeout: timeout.clearTimeout,
|
||||
}
|
||||
}
|
||||
136
packages/opencode/test/memory/abort-leak.test.ts
Normal file
136
packages/opencode/test/memory/abort-leak.test.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
import { describe, test, expect } from "bun:test"
|
||||
import path from "path"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { WebFetchTool } from "../../src/tool/webfetch"
|
||||
|
||||
const projectRoot = path.join(__dirname, "../..")
|
||||
|
||||
const ctx = {
|
||||
sessionID: "test",
|
||||
messageID: "",
|
||||
callID: "",
|
||||
agent: "build",
|
||||
abort: new AbortController().signal,
|
||||
messages: [],
|
||||
metadata: () => {},
|
||||
ask: async () => {},
|
||||
}
|
||||
|
||||
const MB = 1024 * 1024
|
||||
const ITERATIONS = 50
|
||||
|
||||
const getHeapMB = () => {
|
||||
Bun.gc(true)
|
||||
return process.memoryUsage().heapUsed / MB
|
||||
}
|
||||
|
||||
describe("memory: abort controller leak", () => {
|
||||
test("webfetch does not leak memory over many invocations", async () => {
|
||||
await Instance.provide({
|
||||
directory: projectRoot,
|
||||
fn: async () => {
|
||||
const tool = await WebFetchTool.init()
|
||||
|
||||
// Warm up
|
||||
await tool.execute({ url: "https://example.com", format: "text" }, ctx).catch(() => {})
|
||||
|
||||
Bun.gc(true)
|
||||
const baseline = getHeapMB()
|
||||
|
||||
// Run many fetches
|
||||
for (let i = 0; i < ITERATIONS; i++) {
|
||||
await tool.execute({ url: "https://example.com", format: "text" }, ctx).catch(() => {})
|
||||
}
|
||||
|
||||
Bun.gc(true)
|
||||
const after = getHeapMB()
|
||||
const growth = after - baseline
|
||||
|
||||
console.log(`Baseline: ${baseline.toFixed(2)} MB`)
|
||||
console.log(`After ${ITERATIONS} fetches: ${after.toFixed(2)} MB`)
|
||||
console.log(`Growth: ${growth.toFixed(2)} MB`)
|
||||
|
||||
// Memory growth should be minimal - less than 1MB per 10 requests
|
||||
// With the old closure pattern, this would grow ~0.5MB per request
|
||||
expect(growth).toBeLessThan(ITERATIONS / 10)
|
||||
},
|
||||
})
|
||||
}, 60000)
|
||||
|
||||
test("compare closure vs bind pattern directly", async () => {
|
||||
const ITERATIONS = 500
|
||||
|
||||
// Test OLD pattern: arrow function closure
|
||||
// Store closures in a map keyed by content to force retention
|
||||
const closureMap = new Map<string, () => void>()
|
||||
const timers: Timer[] = []
|
||||
const controllers: AbortController[] = []
|
||||
|
||||
Bun.gc(true)
|
||||
Bun.sleepSync(100)
|
||||
const baseline = getHeapMB()
|
||||
|
||||
for (let i = 0; i < ITERATIONS; i++) {
|
||||
// Simulate large response body like webfetch would have
|
||||
const content = `${i}:${"x".repeat(50 * 1024)}` // 50KB unique per iteration
|
||||
const controller = new AbortController()
|
||||
controllers.push(controller)
|
||||
|
||||
// OLD pattern - closure captures `content`
|
||||
const handler = () => {
|
||||
// Actually use content so it can't be optimized away
|
||||
if (content.length > 1000000000) controller.abort()
|
||||
}
|
||||
closureMap.set(content, handler)
|
||||
const timeoutId = setTimeout(handler, 30000)
|
||||
timers.push(timeoutId)
|
||||
}
|
||||
|
||||
Bun.gc(true)
|
||||
Bun.sleepSync(100)
|
||||
const after = getHeapMB()
|
||||
const oldGrowth = after - baseline
|
||||
|
||||
console.log(`OLD pattern (closure): ${oldGrowth.toFixed(2)} MB growth (${closureMap.size} closures)`)
|
||||
|
||||
// Cleanup after measuring
|
||||
timers.forEach(clearTimeout)
|
||||
controllers.forEach((c) => c.abort())
|
||||
closureMap.clear()
|
||||
|
||||
// Test NEW pattern: bind
|
||||
Bun.gc(true)
|
||||
Bun.sleepSync(100)
|
||||
const baseline2 = getHeapMB()
|
||||
const handlers2: (() => void)[] = []
|
||||
const timers2: Timer[] = []
|
||||
const controllers2: AbortController[] = []
|
||||
|
||||
for (let i = 0; i < ITERATIONS; i++) {
|
||||
const _content = `${i}:${"x".repeat(50 * 1024)}` // 50KB - won't be captured
|
||||
const controller = new AbortController()
|
||||
controllers2.push(controller)
|
||||
|
||||
// NEW pattern - bind doesn't capture surrounding scope
|
||||
const handler = controller.abort.bind(controller)
|
||||
handlers2.push(handler)
|
||||
const timeoutId = setTimeout(handler, 30000)
|
||||
timers2.push(timeoutId)
|
||||
}
|
||||
|
||||
Bun.gc(true)
|
||||
Bun.sleepSync(100)
|
||||
const after2 = getHeapMB()
|
||||
const newGrowth = after2 - baseline2
|
||||
|
||||
// Cleanup after measuring
|
||||
timers2.forEach(clearTimeout)
|
||||
controllers2.forEach((c) => c.abort())
|
||||
handlers2.length = 0
|
||||
|
||||
console.log(`NEW pattern (bind): ${newGrowth.toFixed(2)} MB growth`)
|
||||
console.log(`Improvement: ${(oldGrowth - newGrowth).toFixed(2)} MB saved`)
|
||||
|
||||
expect(newGrowth).toBeLessThanOrEqual(oldGrowth)
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user