From 3a30773874ccb92d085eddbab66d053c5d4e6326 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Sun, 11 Jan 2026 18:53:34 -0500 Subject: [PATCH] tui: refactor event streaming to use SDK instead of manual RPC subscription --- STYLE_GUIDE.md | 67 ++++++++++++++++++- packages/opencode/src/cli/cmd/tui/thread.ts | 15 +---- packages/opencode/src/cli/cmd/tui/worker.ts | 72 ++++++++++++++++----- 3 files changed, 125 insertions(+), 29 deletions(-) diff --git a/STYLE_GUIDE.md b/STYLE_GUIDE.md index 8dd3be589..a46ce221f 100644 --- a/STYLE_GUIDE.md +++ b/STYLE_GUIDE.md @@ -4,8 +4,71 @@ - AVOID unnecessary destructuring of variables. instead of doing `const { a, b } = obj` just reference it as obj.a and obj.b. this preserves context - AVOID `try`/`catch` where possible -- AVOID `else` statements - AVOID using `any` type -- AVOID `let` statements - PREFER single word variable names where possible - Use as many bun apis as possible like Bun.file() + +# Avoid let statements + +we don't like let statements, especially combined with if/else statements. +prefer const + +This is bad: + +Good: + +```ts +const foo = condition ? 1 : 2 +``` + +Bad: + +```ts +let foo + +if (condition) foo = 1 +else foo = 2 +``` + +# Avoid else statements + +Prefer early returns or even using `iife` to avoid else statements + +Good: + +```ts +function foo() { + if (condition) return 1 + return 2 +} +``` + +Bad: + +```ts +function foo() { + if (condition) return 1 + else return 2 +} +``` + +# Prefer single word naming + +Try your best to find a single word name for your variables, functions, etc. +Only use multiple words if you cannot. + +Good: + +```ts +const foo = 1 +const bar = 2 +const baz = 3 +``` + +Bad: + +```ts +const fooBar = 1 +const barBaz = 2 +const bazFoo = 3 +``` diff --git a/packages/opencode/src/cli/cmd/tui/thread.ts b/packages/opencode/src/cli/cmd/tui/thread.ts index 5e50d38de..057142685 100644 --- a/packages/opencode/src/cli/cmd/tui/thread.ts +++ b/packages/opencode/src/cli/cmd/tui/thread.ts @@ -34,15 +34,9 @@ function createWorkerFetch(client: RpcClient): typeof fetch { return fn as typeof fetch } -function createEventSource(client: RpcClient, directory: string): EventSource { +function createEventSource(client: RpcClient): EventSource { return { - on: (handler) => - client.on("event", (event) => { - handler(event) - if (event.type === "server.instance.disposed") { - client.call("subscribe", { directory }).catch(() => {}) - } - }), + on: (handler) => client.on("event", handler), } } @@ -131,9 +125,6 @@ export const TuiThreadCommand = cmd({ networkOpts.port !== 0 || networkOpts.hostname !== "127.0.0.1" - // Subscribe to events from worker - await client.call("subscribe", { directory: cwd }) - let url: string let customFetch: typeof fetch | undefined let events: EventSource | undefined @@ -146,7 +137,7 @@ export const TuiThreadCommand = cmd({ // Use direct RPC communication (no HTTP) url = "http://opencode.internal" customFetch = createWorkerFetch(client) - events = createEventSource(client, cwd) + events = createEventSource(client) } const tuiPromise = tui({ diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index ea88e45f1..343a5a310 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -6,8 +6,8 @@ import { InstanceBootstrap } from "@/project/bootstrap" import { Rpc } from "@/util/rpc" import { upgrade } from "@/cli/upgrade" import { Config } from "@/config/config" -import { Bus } from "@/bus" import { GlobalBus } from "@/bus/global" +import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2" import type { BunWebSocketData } from "hono/bun" await Log.init({ @@ -38,6 +38,61 @@ GlobalBus.on("event", (event) => { let server: Bun.Server | undefined +const eventStream = { + abort: undefined as AbortController | undefined, +} + +const startEventStream = (directory: string) => { + if (eventStream.abort) eventStream.abort.abort() + const abort = new AbortController() + eventStream.abort = abort + const signal = abort.signal + + const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => { + const request = new Request(input, init) + return Server.App().fetch(request) + }) as typeof globalThis.fetch + + const sdk = createOpencodeClient({ + baseUrl: "http://opencode.internal", + directory, + fetch: fetchFn, + signal, + }) + + ;(async () => { + while (!signal.aborted) { + const events = await Promise.resolve( + sdk.event.subscribe( + {}, + { + signal, + }, + ), + ).catch(() => undefined) + + if (!events) { + await Bun.sleep(250) + continue + } + + for await (const event of events.stream) { + Rpc.emit("event", event as Event) + } + + if (!signal.aborted) { + await Bun.sleep(250) + } + } + })().catch((error) => { + Log.Default.error("event stream error", { + error: error instanceof Error ? error.message : error, + }) + }) +} + +startEventStream(process.cwd()) + export const rpc = { async fetch(input: { url: string; method: string; headers: Record; body?: string }) { const request = new Request(input.url, { @@ -58,20 +113,6 @@ export const rpc = { server = Server.listen(input) return { url: server.url.toString() } }, - async subscribe(input: { directory: string }) { - return Instance.provide({ - directory: input.directory, - init: InstanceBootstrap, - fn: async () => { - Bus.subscribeAll((event) => { - Rpc.emit("event", event) - }) - // Emit connected event - Rpc.emit("event", { type: "server.connected", properties: {} }) - return { subscribed: true } - }, - }) - }, async checkUpgrade(input: { directory: string }) { await Instance.provide({ directory: input.directory, @@ -87,6 +128,7 @@ export const rpc = { }, async shutdown() { Log.Default.info("worker shutting down") + if (eventStream.abort) eventStream.abort.abort() await Instance.disposeAll() if (server) server.stop(true) },