add scheduler, cleanup module (#9346)
This commit is contained in:
@@ -11,6 +11,8 @@ import { Instance } from "./instance"
|
|||||||
import { Vcs } from "./vcs"
|
import { Vcs } from "./vcs"
|
||||||
import { Log } from "@/util/log"
|
import { Log } from "@/util/log"
|
||||||
import { ShareNext } from "@/share/share-next"
|
import { ShareNext } from "@/share/share-next"
|
||||||
|
import { Snapshot } from "../snapshot"
|
||||||
|
import { Truncate } from "../tool/truncation"
|
||||||
|
|
||||||
export async function InstanceBootstrap() {
|
export async function InstanceBootstrap() {
|
||||||
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
||||||
@@ -22,6 +24,8 @@ export async function InstanceBootstrap() {
|
|||||||
FileWatcher.init()
|
FileWatcher.init()
|
||||||
File.init()
|
File.init()
|
||||||
Vcs.init()
|
Vcs.init()
|
||||||
|
Snapshot.init()
|
||||||
|
Truncate.init()
|
||||||
|
|
||||||
Bus.subscribe(Command.Event.Executed, async (payload) => {
|
Bus.subscribe(Command.Event.Executed, async (payload) => {
|
||||||
if (payload.properties.name === Command.Default.INIT) {
|
if (payload.properties.name === Command.Default.INIT) {
|
||||||
|
|||||||
61
packages/opencode/src/scheduler/index.ts
Normal file
61
packages/opencode/src/scheduler/index.ts
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import { Instance } from "../project/instance"
|
||||||
|
import { Log } from "../util/log"
|
||||||
|
|
||||||
|
export namespace Scheduler {
|
||||||
|
const log = Log.create({ service: "scheduler" })
|
||||||
|
|
||||||
|
export type Task = {
|
||||||
|
id: string
|
||||||
|
interval: number
|
||||||
|
run: () => Promise<void>
|
||||||
|
scope?: "instance" | "global"
|
||||||
|
}
|
||||||
|
|
||||||
|
type Timer = ReturnType<typeof setInterval>
|
||||||
|
type Entry = {
|
||||||
|
tasks: Map<string, Task>
|
||||||
|
timers: Map<string, Timer>
|
||||||
|
}
|
||||||
|
|
||||||
|
const create = (): Entry => {
|
||||||
|
const tasks = new Map<string, Task>()
|
||||||
|
const timers = new Map<string, Timer>()
|
||||||
|
return { tasks, timers }
|
||||||
|
}
|
||||||
|
|
||||||
|
const shared = create()
|
||||||
|
|
||||||
|
const state = Instance.state(
|
||||||
|
() => create(),
|
||||||
|
async (entry) => {
|
||||||
|
for (const timer of entry.timers.values()) {
|
||||||
|
clearInterval(timer)
|
||||||
|
}
|
||||||
|
entry.tasks.clear()
|
||||||
|
entry.timers.clear()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
export function register(task: Task) {
|
||||||
|
const scope = task.scope ?? "instance"
|
||||||
|
const entry = scope === "global" ? shared : state()
|
||||||
|
const current = entry.timers.get(task.id)
|
||||||
|
if (current && scope === "global") return
|
||||||
|
if (current) clearInterval(current)
|
||||||
|
|
||||||
|
entry.tasks.set(task.id, task)
|
||||||
|
void run(task)
|
||||||
|
const timer = setInterval(() => {
|
||||||
|
void run(task)
|
||||||
|
}, task.interval)
|
||||||
|
timer.unref()
|
||||||
|
entry.timers.set(task.id, timer)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function run(task: Task) {
|
||||||
|
log.info("run", { id: task.id })
|
||||||
|
await task.run().catch((error) => {
|
||||||
|
log.error("run failed", { id: task.id, error })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,9 +6,46 @@ import { Global } from "../global"
|
|||||||
import z from "zod"
|
import z from "zod"
|
||||||
import { Config } from "../config/config"
|
import { Config } from "../config/config"
|
||||||
import { Instance } from "../project/instance"
|
import { Instance } from "../project/instance"
|
||||||
|
import { Scheduler } from "../scheduler"
|
||||||
|
|
||||||
export namespace Snapshot {
|
export namespace Snapshot {
|
||||||
const log = Log.create({ service: "snapshot" })
|
const log = Log.create({ service: "snapshot" })
|
||||||
|
const hour = 60 * 60 * 1000
|
||||||
|
const prune = "7.days"
|
||||||
|
|
||||||
|
export function init() {
|
||||||
|
Scheduler.register({
|
||||||
|
id: "snapshot.cleanup",
|
||||||
|
interval: hour,
|
||||||
|
run: cleanup,
|
||||||
|
scope: "instance",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function cleanup() {
|
||||||
|
if (Instance.project.vcs !== "git") return
|
||||||
|
const cfg = await Config.get()
|
||||||
|
if (cfg.snapshot === false) return
|
||||||
|
const git = gitdir()
|
||||||
|
const exists = await fs
|
||||||
|
.stat(git)
|
||||||
|
.then(() => true)
|
||||||
|
.catch(() => false)
|
||||||
|
if (!exists) return
|
||||||
|
const result = await $`git --git-dir ${git} --work-tree ${Instance.worktree} gc --prune=${prune}`
|
||||||
|
.quiet()
|
||||||
|
.cwd(Instance.directory)
|
||||||
|
.nothrow()
|
||||||
|
if (result.exitCode !== 0) {
|
||||||
|
log.warn("cleanup failed", {
|
||||||
|
exitCode: result.exitCode,
|
||||||
|
stderr: result.stderr.toString(),
|
||||||
|
stdout: result.stdout.toString(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.info("cleanup", { prune })
|
||||||
|
}
|
||||||
|
|
||||||
export async function track() {
|
export async function track() {
|
||||||
if (Instance.project.vcs !== "git") return
|
if (Instance.project.vcs !== "git") return
|
||||||
|
|||||||
@@ -2,9 +2,9 @@ import fs from "fs/promises"
|
|||||||
import path from "path"
|
import path from "path"
|
||||||
import { Global } from "../global"
|
import { Global } from "../global"
|
||||||
import { Identifier } from "../id/id"
|
import { Identifier } from "../id/id"
|
||||||
import { lazy } from "../util/lazy"
|
|
||||||
import { PermissionNext } from "../permission/next"
|
import { PermissionNext } from "../permission/next"
|
||||||
import type { Agent } from "../agent/agent"
|
import type { Agent } from "../agent/agent"
|
||||||
|
import { Scheduler } from "../scheduler"
|
||||||
|
|
||||||
export namespace Truncate {
|
export namespace Truncate {
|
||||||
export const MAX_LINES = 2000
|
export const MAX_LINES = 2000
|
||||||
@@ -12,6 +12,7 @@ export namespace Truncate {
|
|||||||
export const DIR = path.join(Global.Path.data, "tool-output")
|
export const DIR = path.join(Global.Path.data, "tool-output")
|
||||||
export const GLOB = path.join(DIR, "*")
|
export const GLOB = path.join(DIR, "*")
|
||||||
const RETENTION_MS = 7 * 24 * 60 * 60 * 1000 // 7 days
|
const RETENTION_MS = 7 * 24 * 60 * 60 * 1000 // 7 days
|
||||||
|
const HOUR_MS = 60 * 60 * 1000
|
||||||
|
|
||||||
export type Result = { content: string; truncated: false } | { content: string; truncated: true; outputPath: string }
|
export type Result = { content: string; truncated: false } | { content: string; truncated: true; outputPath: string }
|
||||||
|
|
||||||
@@ -21,6 +22,15 @@ export namespace Truncate {
|
|||||||
direction?: "head" | "tail"
|
direction?: "head" | "tail"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function init() {
|
||||||
|
Scheduler.register({
|
||||||
|
id: "tool.truncation.cleanup",
|
||||||
|
interval: HOUR_MS,
|
||||||
|
run: cleanup,
|
||||||
|
scope: "global",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
export async function cleanup() {
|
export async function cleanup() {
|
||||||
const cutoff = Identifier.timestamp(Identifier.create("tool", false, Date.now() - RETENTION_MS))
|
const cutoff = Identifier.timestamp(Identifier.create("tool", false, Date.now() - RETENTION_MS))
|
||||||
const glob = new Bun.Glob("tool_*")
|
const glob = new Bun.Glob("tool_*")
|
||||||
@@ -31,8 +41,6 @@ export namespace Truncate {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const init = lazy(cleanup)
|
|
||||||
|
|
||||||
function hasTaskTool(agent?: Agent.Info): boolean {
|
function hasTaskTool(agent?: Agent.Info): boolean {
|
||||||
if (!agent?.permission) return false
|
if (!agent?.permission) return false
|
||||||
const rule = PermissionNext.evaluate("task", "*", agent.permission)
|
const rule = PermissionNext.evaluate("task", "*", agent.permission)
|
||||||
@@ -81,7 +89,6 @@ export namespace Truncate {
|
|||||||
const unit = hitBytes ? "bytes" : "lines"
|
const unit = hitBytes ? "bytes" : "lines"
|
||||||
const preview = out.join("\n")
|
const preview = out.join("\n")
|
||||||
|
|
||||||
await init()
|
|
||||||
const id = Identifier.ascending("tool")
|
const id = Identifier.ascending("tool")
|
||||||
const filepath = path.join(DIR, id)
|
const filepath = path.join(DIR, id)
|
||||||
await Bun.write(Bun.file(filepath), text)
|
await Bun.write(Bun.file(filepath), text)
|
||||||
|
|||||||
73
packages/opencode/test/scheduler.test.ts
Normal file
73
packages/opencode/test/scheduler.test.ts
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
import { describe, expect, test } from "bun:test"
|
||||||
|
import { Scheduler } from "../src/scheduler"
|
||||||
|
import { Instance } from "../src/project/instance"
|
||||||
|
import { tmpdir } from "./fixture/fixture"
|
||||||
|
|
||||||
|
describe("Scheduler.register", () => {
|
||||||
|
const hour = 60 * 60 * 1000
|
||||||
|
|
||||||
|
test("defaults to instance scope per directory", async () => {
|
||||||
|
await using one = await tmpdir({ git: true })
|
||||||
|
await using two = await tmpdir({ git: true })
|
||||||
|
const runs = { count: 0 }
|
||||||
|
const id = "scheduler.instance." + Math.random().toString(36).slice(2)
|
||||||
|
const task = {
|
||||||
|
id,
|
||||||
|
interval: hour,
|
||||||
|
run: async () => {
|
||||||
|
runs.count += 1
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
await Instance.provide({
|
||||||
|
directory: one.path,
|
||||||
|
fn: async () => {
|
||||||
|
Scheduler.register(task)
|
||||||
|
await Instance.dispose()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
expect(runs.count).toBe(1)
|
||||||
|
|
||||||
|
await Instance.provide({
|
||||||
|
directory: two.path,
|
||||||
|
fn: async () => {
|
||||||
|
Scheduler.register(task)
|
||||||
|
await Instance.dispose()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
expect(runs.count).toBe(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("global scope runs once across instances", async () => {
|
||||||
|
await using one = await tmpdir({ git: true })
|
||||||
|
await using two = await tmpdir({ git: true })
|
||||||
|
const runs = { count: 0 }
|
||||||
|
const id = "scheduler.global." + Math.random().toString(36).slice(2)
|
||||||
|
const task = {
|
||||||
|
id,
|
||||||
|
interval: hour,
|
||||||
|
run: async () => {
|
||||||
|
runs.count += 1
|
||||||
|
},
|
||||||
|
scope: "global" as const,
|
||||||
|
}
|
||||||
|
|
||||||
|
await Instance.provide({
|
||||||
|
directory: one.path,
|
||||||
|
fn: async () => {
|
||||||
|
Scheduler.register(task)
|
||||||
|
await Instance.dispose()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
expect(runs.count).toBe(1)
|
||||||
|
|
||||||
|
await Instance.provide({
|
||||||
|
directory: two.path,
|
||||||
|
fn: async () => {
|
||||||
|
Scheduler.register(task)
|
||||||
|
await Instance.dispose()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
expect(runs.count).toBe(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user