From 13bac9c91a908f560f74f19a49c7c865e4bfd5ec Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sat, 21 Mar 2026 21:17:13 -0400 Subject: [PATCH] effectify Pty service (#18572) --- packages/opencode/src/pty/index.ts | 502 ++++++++++++--------- packages/opencode/src/server/routes/pty.ts | 24 +- 2 files changed, 306 insertions(+), 220 deletions(-) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 7436abec9..f866b18ad 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -1,13 +1,16 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" +import { InstanceState } from "@/effect/instance-state" +import { makeRunPromise } from "@/effect/run-service" +import { Instance } from "@/project/instance" import { type IPty } from "bun-pty" import z from "zod" import { Log } from "../util/log" -import { Instance } from "../project/instance" import { lazy } from "@opencode-ai/util/lazy" import { Shell } from "@/shell/shell" import { Plugin } from "@/plugin" import { PtyID } from "./schema" +import { Effect, Layer, ServiceMap } from "effect" export namespace Pty { const log = Log.create({ service: "pty" }) @@ -23,6 +26,20 @@ export namespace Pty { close: (code?: number, reason?: string) => void } + type Active = { + info: Info + process: IPty + buffer: string + bufferCursor: number + cursor: number + subscribers: Map + } + + type State = { + dir: string + sessions: Map + } + // WebSocket control frame: 0x00 + UTF-8 JSON. const meta = (cursor: number) => { const json = JSON.stringify({ cursor }) @@ -81,241 +98,300 @@ export namespace Pty { Deleted: BusEvent.define("pty.deleted", z.object({ id: PtyID.zod })), } - interface ActiveSession { - info: Info - process: IPty - buffer: string - bufferCursor: number - cursor: number - subscribers: Map + export interface Interface { + readonly list: () => Effect.Effect + readonly get: (id: PtyID) => Effect.Effect + readonly create: (input: CreateInput) => Effect.Effect + readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect + readonly remove: (id: PtyID) => Effect.Effect + readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect + readonly write: (id: PtyID, data: string) => Effect.Effect + readonly connect: ( + id: PtyID, + ws: Socket, + cursor?: number, + ) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined> } - const state = Instance.state( - () => new Map(), - async (sessions) => { - for (const session of sessions.values()) { + export class Service extends ServiceMap.Service()("@opencode/Pty") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + function teardown(session: Active) { try { session.process.kill() } catch {} for (const [key, ws] of session.subscribers.entries()) { try { if (ws.data === key) ws.close() + } catch {} + } + session.subscribers.clear() + } + + const cache = yield* InstanceState.make( + Effect.fn("Pty.state")(function* (ctx) { + const state = { + dir: ctx.directory, + sessions: new Map(), + } + + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const session of state.sessions.values()) { + teardown(session) + } + state.sessions.clear() + }), + ) + + return state + }), + ) + + const remove = Effect.fn("Pty.remove")(function* (id: PtyID) { + const state = yield* InstanceState.get(cache) + const session = state.sessions.get(id) + if (!session) return + state.sessions.delete(id) + log.info("removing session", { id }) + teardown(session) + void Bus.publish(Event.Deleted, { id: session.info.id }) + }) + + const list = Effect.fn("Pty.list")(function* () { + const state = yield* InstanceState.get(cache) + return Array.from(state.sessions.values()).map((session) => session.info) + }) + + const get = Effect.fn("Pty.get")(function* (id: PtyID) { + const state = yield* InstanceState.get(cache) + return state.sessions.get(id)?.info + }) + + const create = Effect.fn("Pty.create")(function* (input: CreateInput) { + const state = yield* InstanceState.get(cache) + return yield* Effect.promise(async () => { + const id = PtyID.ascending() + const command = input.command || Shell.preferred() + const args = input.args || [] + if (command.endsWith("sh")) { + args.push("-l") + } + + const cwd = input.cwd || state.dir + const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} }) + const env = { + ...process.env, + ...input.env, + ...shellEnv.env, + TERM: "xterm-256color", + OPENCODE_TERMINAL: "1", + } as Record + + if (process.platform === "win32") { + env.LC_ALL = "C.UTF-8" + env.LC_CTYPE = "C.UTF-8" + env.LANG = "C.UTF-8" + } + log.info("creating session", { id, cmd: command, args, cwd }) + + const spawn = await pty() + const proc = spawn(command, args, { + name: "xterm-256color", + cwd, + env, + }) + + const info = { + id, + title: input.title || `Terminal ${id.slice(-4)}`, + command, + args, + cwd, + status: "running", + pid: proc.pid, + } as const + const session: Active = { + info, + process: proc, + buffer: "", + bufferCursor: 0, + cursor: 0, + subscribers: new Map(), + } + state.sessions.set(id, session) + proc.onData( + Instance.bind((chunk) => { + session.cursor += chunk.length + + for (const [key, ws] of session.subscribers.entries()) { + if (ws.readyState !== 1) { + session.subscribers.delete(key) + continue + } + if (ws.data !== key) { + session.subscribers.delete(key) + continue + } + try { + ws.send(chunk) + } catch { + session.subscribers.delete(key) + } + } + + session.buffer += chunk + if (session.buffer.length <= BUFFER_LIMIT) return + const excess = session.buffer.length - BUFFER_LIMIT + session.buffer = session.buffer.slice(excess) + session.bufferCursor += excess + }), + ) + proc.onExit( + Instance.bind(({ exitCode }) => { + if (session.info.status === "exited") return + log.info("session exited", { id, exitCode }) + session.info.status = "exited" + void Bus.publish(Event.Exited, { id, exitCode }) + Effect.runFork(remove(id)) + }), + ) + await Bus.publish(Event.Created, { info }) + return info + }) + }) + + const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) { + const state = yield* InstanceState.get(cache) + const session = state.sessions.get(id) + if (!session) return + if (input.title) { + session.info.title = input.title + } + if (input.size) { + session.process.resize(input.size.cols, input.size.rows) + } + yield* Effect.promise(() => Bus.publish(Event.Updated, { info: session.info })) + return session.info + }) + + const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) { + const state = yield* InstanceState.get(cache) + const session = state.sessions.get(id) + if (session && session.info.status === "running") { + session.process.resize(cols, rows) + } + }) + + const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) { + const state = yield* InstanceState.get(cache) + const session = state.sessions.get(id) + if (session && session.info.status === "running") { + session.process.write(data) + } + }) + + const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) { + const state = yield* InstanceState.get(cache) + const session = state.sessions.get(id) + if (!session) { + ws.close() + return + } + log.info("client connected to session", { id }) + + // Use ws.data as the unique key for this connection lifecycle. + // If ws.data is undefined, fallback to ws object. + const key = ws.data && typeof ws.data === "object" ? ws.data : ws + // Optionally cleanup if the key somehow exists + session.subscribers.delete(key) + session.subscribers.set(key, ws) + + const cleanup = () => { + session.subscribers.delete(key) + } + + const start = session.bufferCursor + const end = session.cursor + const from = + cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 + + const data = (() => { + if (!session.buffer) return "" + if (from >= end) return "" + const offset = Math.max(0, from - start) + if (offset >= session.buffer.length) return "" + return session.buffer.slice(offset) + })() + + if (data) { + try { + for (let i = 0; i < data.length; i += BUFFER_CHUNK) { + ws.send(data.slice(i, i + BUFFER_CHUNK)) + } } catch { - // ignore + cleanup() + ws.close() + return } } - } - sessions.clear() - }, + + try { + ws.send(meta(end)) + } catch { + cleanup() + ws.close() + return + } + + return { + onMessage: (message: string | ArrayBuffer) => { + session.process.write(String(message)) + }, + onClose: () => { + log.info("client disconnected from session", { id }) + cleanup() + }, + } + }) + + return Service.of({ list, get, create, update, remove, resize, write, connect }) + }), ) - export function list() { - return Array.from(state().values()).map((s) => s.info) + const runPromise = makeRunPromise(Service, layer) + + export async function list() { + return runPromise((svc) => svc.list()) } - export function get(id: PtyID) { - return state().get(id)?.info + export async function get(id: PtyID) { + return runPromise((svc) => svc.get(id)) + } + + export async function resize(id: PtyID, cols: number, rows: number) { + return runPromise((svc) => svc.resize(id, cols, rows)) + } + + export async function write(id: PtyID, data: string) { + return runPromise((svc) => svc.write(id, data)) + } + + export async function connect(id: PtyID, ws: Socket, cursor?: number) { + return runPromise((svc) => svc.connect(id, ws, cursor)) } export async function create(input: CreateInput) { - const id = PtyID.ascending() - const command = input.command || Shell.preferred() - const args = input.args || [] - if (command.endsWith("sh")) { - args.push("-l") - } - - const cwd = input.cwd || Instance.directory - const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} }) - const env = { - ...process.env, - ...input.env, - ...shellEnv.env, - TERM: "xterm-256color", - OPENCODE_TERMINAL: "1", - } as Record - - if (process.platform === "win32") { - env.LC_ALL = "C.UTF-8" - env.LC_CTYPE = "C.UTF-8" - env.LANG = "C.UTF-8" - } - log.info("creating session", { id, cmd: command, args, cwd }) - - const spawn = await pty() - const ptyProcess = spawn(command, args, { - name: "xterm-256color", - cwd, - env, - }) - - const info = { - id, - title: input.title || `Terminal ${id.slice(-4)}`, - command, - args, - cwd, - status: "running", - pid: ptyProcess.pid, - } as const - const session: ActiveSession = { - info, - process: ptyProcess, - buffer: "", - bufferCursor: 0, - cursor: 0, - subscribers: new Map(), - } - state().set(id, session) - ptyProcess.onData( - Instance.bind((chunk) => { - session.cursor += chunk.length - - for (const [key, ws] of session.subscribers.entries()) { - if (ws.readyState !== 1) { - session.subscribers.delete(key) - continue - } - - if (ws.data !== key) { - session.subscribers.delete(key) - continue - } - - try { - ws.send(chunk) - } catch { - session.subscribers.delete(key) - } - } - - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess - }), - ) - ptyProcess.onExit( - Instance.bind(({ exitCode }) => { - if (session.info.status === "exited") return - log.info("session exited", { id, exitCode }) - session.info.status = "exited" - Bus.publish(Event.Exited, { id, exitCode }) - remove(id) - }), - ) - Bus.publish(Event.Created, { info }) - return info + return runPromise((svc) => svc.create(input)) } export async function update(id: PtyID, input: UpdateInput) { - const session = state().get(id) - if (!session) return - if (input.title) { - session.info.title = input.title - } - if (input.size) { - session.process.resize(input.size.cols, input.size.rows) - } - Bus.publish(Event.Updated, { info: session.info }) - return session.info + return runPromise((svc) => svc.update(id, input)) } export async function remove(id: PtyID) { - const session = state().get(id) - if (!session) return - state().delete(id) - log.info("removing session", { id }) - try { - session.process.kill() - } catch {} - for (const [key, ws] of session.subscribers.entries()) { - try { - if (ws.data === key) ws.close() - } catch { - // ignore - } - } - session.subscribers.clear() - Bus.publish(Event.Deleted, { id: session.info.id }) - } - - export function resize(id: PtyID, cols: number, rows: number) { - const session = state().get(id) - if (session && session.info.status === "running") { - session.process.resize(cols, rows) - } - } - - export function write(id: PtyID, data: string) { - const session = state().get(id) - if (session && session.info.status === "running") { - session.process.write(data) - } - } - - export function connect(id: PtyID, ws: Socket, cursor?: number) { - const session = state().get(id) - if (!session) { - ws.close() - return - } - log.info("client connected to session", { id }) - - // Use ws.data as the unique key for this connection lifecycle. - // If ws.data is undefined, fallback to ws object. - const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws - - // Optionally cleanup if the key somehow exists - session.subscribers.delete(connectionKey) - session.subscribers.set(connectionKey, ws) - - const cleanup = () => { - session.subscribers.delete(connectionKey) - } - - const start = session.bufferCursor - const end = session.cursor - - const from = - cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 - - const data = (() => { - if (!session.buffer) return "" - if (from >= end) return "" - const offset = Math.max(0, from - start) - if (offset >= session.buffer.length) return "" - return session.buffer.slice(offset) - })() - - if (data) { - try { - for (let i = 0; i < data.length; i += BUFFER_CHUNK) { - ws.send(data.slice(i, i + BUFFER_CHUNK)) - } - } catch { - cleanup() - ws.close() - return - } - } - - try { - ws.send(meta(end)) - } catch { - cleanup() - ws.close() - return - } - return { - onMessage: (message: string | ArrayBuffer) => { - session.process.write(String(message)) - }, - onClose: () => { - log.info("client disconnected from session", { id }) - cleanup() - }, - } + return runPromise((svc) => svc.remove(id)) } } diff --git a/packages/opencode/src/server/routes/pty.ts b/packages/opencode/src/server/routes/pty.ts index 0f9151ba4..de79801e2 100644 --- a/packages/opencode/src/server/routes/pty.ts +++ b/packages/opencode/src/server/routes/pty.ts @@ -28,7 +28,7 @@ export const PtyRoutes = lazy(() => }, }), async (c) => { - return c.json(Pty.list()) + return c.json(await Pty.list()) }, ) .post( @@ -75,7 +75,7 @@ export const PtyRoutes = lazy(() => }), validator("param", z.object({ ptyID: PtyID.zod })), async (c) => { - const info = Pty.get(c.req.valid("param").ptyID) + const info = await Pty.get(c.req.valid("param").ptyID) if (!info) { throw new NotFoundError({ message: "Session not found" }) } @@ -150,7 +150,7 @@ export const PtyRoutes = lazy(() => }, }), validator("param", z.object({ ptyID: PtyID.zod })), - upgradeWebSocket((c) => { + upgradeWebSocket(async (c) => { const id = PtyID.zod.parse(c.req.param("ptyID")) const cursor = (() => { const value = c.req.query("cursor") @@ -159,8 +159,8 @@ export const PtyRoutes = lazy(() => if (!Number.isSafeInteger(parsed) || parsed < -1) return return parsed })() - let handler: ReturnType - if (!Pty.get(id)) throw new Error("Session not found") + let handler: Awaited> + if (!(await Pty.get(id))) throw new Error("Session not found") type Socket = { readyState: number @@ -176,17 +176,27 @@ export const PtyRoutes = lazy(() => return typeof (value as { readyState?: unknown }).readyState === "number" } + const pending: string[] = [] + let ready = false + return { - onOpen(_event, ws) { + async onOpen(_event, ws) { const socket = ws.raw if (!isSocket(socket)) { ws.close() return } - handler = Pty.connect(id, socket, cursor) + handler = await Pty.connect(id, socket, cursor) + ready = true + for (const msg of pending) handler?.onMessage(msg) + pending.length = 0 }, onMessage(event) { if (typeof event.data !== "string") return + if (!ready) { + pending.push(event.data) + return + } handler?.onMessage(event.data) }, onClose() {