feat(core): basic implementation of remote workspace support (#15120)

This commit is contained in:
James Long
2026-02-27 15:36:39 -05:00
committed by GitHub
parent a94f564ff0
commit c12ce2ffff
26 changed files with 2153 additions and 65 deletions

View File

@@ -2,6 +2,9 @@ import { Server } from "../../server/server"
import { cmd } from "./cmd"
import { withNetworkOptions, resolveNetworkOptions } from "../network"
import { Flag } from "../../flag/flag"
import { Workspace } from "../../control-plane/workspace"
import { Project } from "../../project/project"
import { Installation } from "../../installation"
export const ServeCommand = cmd({
command: "serve",
@@ -14,7 +17,15 @@ export const ServeCommand = cmd({
const opts = await resolveNetworkOptions(args)
const server = Server.listen(opts)
console.log(`opencode server listening on http://${server.hostname}:${server.port}`)
let workspaceSync: Array<ReturnType<typeof Workspace.startSyncing>> = []
// Only available in development right now
if (Installation.isLocal()) {
workspaceSync = Project.list().map((project) => Workspace.startSyncing(project))
}
await new Promise(() => {})
await server.stop()
await Promise.all(workspaceSync.map((item) => item.stop()))
},
})

View File

@@ -1,59 +1,16 @@
import { cmd } from "./cmd"
import { withNetworkOptions, resolveNetworkOptions } from "../network"
import { Installation } from "../../installation"
import { WorkspaceServer } from "../../control-plane/workspace-server/server"
export const WorkspaceServeCommand = cmd({
command: "workspace-serve",
builder: (yargs) => withNetworkOptions(yargs),
describe: "starts a remote workspace websocket server",
describe: "starts a remote workspace event server",
handler: async (args) => {
const opts = await resolveNetworkOptions(args)
const server = Bun.serve<{ id: string }>({
hostname: opts.hostname,
port: opts.port,
fetch(req, server) {
const url = new URL(req.url)
if (url.pathname === "/ws") {
const id = Bun.randomUUIDv7()
if (server.upgrade(req, { data: { id } })) return
return new Response("Upgrade failed", { status: 400 })
}
if (url.pathname === "/health") {
return new Response("ok", {
status: 200,
headers: {
"content-type": "text/plain; charset=utf-8",
},
})
}
return new Response(
JSON.stringify({
service: "workspace-server",
ws: `ws://${server.hostname}:${server.port}/ws`,
}),
{
status: 200,
headers: {
"content-type": "application/json; charset=utf-8",
},
},
)
},
websocket: {
open(ws) {
ws.send(JSON.stringify({ type: "ready", id: ws.data.id }))
},
message(ws, msg) {
const text = typeof msg === "string" ? msg : msg.toString()
ws.send(JSON.stringify({ type: "message", id: ws.data.id, text }))
},
close() {},
},
})
console.log(`workspace websocket server listening on ws://${server.hostname}:${server.port}/ws`)
const server = WorkspaceServer.Listen(opts)
console.log(`workspace event server listening on http://${server.hostname}:${server.port}/event`)
await new Promise(() => {})
await server.stop()
},
})

View File

@@ -0,0 +1,10 @@
import { WorktreeAdaptor } from "./worktree"
import type { Config } from "../config"
import type { Adaptor } from "./types"
export function getAdaptor(config: Config): Adaptor {
switch (config.type) {
case "worktree":
return WorktreeAdaptor
}
}

View File

@@ -0,0 +1,7 @@
import type { Config } from "../config"
export type Adaptor<T extends Config = Config> = {
create(from: T, branch?: string | null): Promise<{ config: T; init: () => Promise<void> }>
remove(from: T): Promise<void>
request(from: T, method: string, url: string, data?: BodyInit, signal?: AbortSignal): Promise<Response | undefined>
}

View File

@@ -0,0 +1,26 @@
import { Worktree } from "@/worktree"
import type { Config } from "../config"
import type { Adaptor } from "./types"
type WorktreeConfig = Extract<Config, { type: "worktree" }>
export const WorktreeAdaptor: Adaptor<WorktreeConfig> = {
async create(_from: WorktreeConfig, _branch: string) {
const next = await Worktree.create(undefined)
return {
config: {
type: "worktree",
directory: next.directory,
},
// Hack for now: `Worktree.create` puts all its async code in a
// `setTimeout` so it doesn't use this, but we should change that
init: async () => {},
}
},
async remove(config: WorktreeConfig) {
await Worktree.remove({ directory: config.directory })
},
async request(_from: WorktreeConfig, _method: string, _url: string, _data?: BodyInit, _signal?: AbortSignal) {
throw new Error("worktree does not support request")
},
}

View File

@@ -0,0 +1,10 @@
import z from "zod"
export const Config = z.discriminatedUnion("type", [
z.object({
directory: z.string(),
type: z.literal("worktree"),
}),
])
export type Config = z.infer<typeof Config>

View File

@@ -0,0 +1,46 @@
import { Instance } from "@/project/instance"
import type { MiddlewareHandler } from "hono"
import { Installation } from "../installation"
import { getAdaptor } from "./adaptors"
import { Workspace } from "./workspace"
// This middleware forwards all non-GET requests if the workspace is a
// remote. The remote workspace needs to handle session mutations
async function proxySessionRequest(req: Request) {
if (req.method === "GET") return
if (!Instance.directory.startsWith("wrk_")) return
const workspace = await Workspace.get(Instance.directory)
if (!workspace) {
return new Response(`Workspace not found: ${Instance.directory}`, {
status: 500,
headers: {
"content-type": "text/plain; charset=utf-8",
},
})
}
if (workspace.config.type === "worktree") return
const url = new URL(req.url)
const body = req.method === "HEAD" ? undefined : await req.arrayBuffer()
return getAdaptor(workspace.config).request(
workspace.config,
req.method,
`${url.pathname}${url.search}`,
body,
req.signal,
)
}
export const SessionProxyMiddleware: MiddlewareHandler = async (c, next) => {
// Only available in development for now
if (!Installation.isLocal()) {
return next()
}
const response = await proxySessionRequest(c.req.raw)
if (response) {
return response
}
return next()
}

View File

@@ -0,0 +1,66 @@
export async function parseSSE(
body: ReadableStream<Uint8Array>,
signal: AbortSignal,
onEvent: (event: unknown) => void,
) {
const reader = body.getReader()
const decoder = new TextDecoder()
let buf = ""
let last = ""
let retry = 1000
const abort = () => {
void reader.cancel().catch(() => undefined)
}
signal.addEventListener("abort", abort)
try {
while (!signal.aborted) {
const chunk = await reader.read().catch(() => ({ done: true, value: undefined as Uint8Array | undefined }))
if (chunk.done) break
buf += decoder.decode(chunk.value, { stream: true })
buf = buf.replace(/\r\n/g, "\n").replace(/\r/g, "\n")
const chunks = buf.split("\n\n")
buf = chunks.pop() ?? ""
chunks.forEach((chunk) => {
const data: string[] = []
chunk.split("\n").forEach((line) => {
if (line.startsWith("data:")) {
data.push(line.replace(/^data:\s*/, ""))
return
}
if (line.startsWith("id:")) {
last = line.replace(/^id:\s*/, "")
return
}
if (line.startsWith("retry:")) {
const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
if (!Number.isNaN(parsed)) retry = parsed
}
})
if (!data.length) return
const raw = data.join("\n")
try {
onEvent(JSON.parse(raw))
} catch {
onEvent({
type: "sse.message",
properties: {
data: raw,
id: last || undefined,
retry,
},
})
}
})
}
} finally {
signal.removeEventListener("abort", abort)
reader.releaseLock()
}
}

View File

@@ -0,0 +1,33 @@
import { GlobalBus } from "../../bus/global"
import { Hono } from "hono"
import { streamSSE } from "hono/streaming"
export function WorkspaceServerRoutes() {
return new Hono().get("/event", async (c) => {
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const send = async (event: unknown) => {
await stream.writeSSE({
data: JSON.stringify(event),
})
}
const handler = async (event: { directory?: string; payload: unknown }) => {
await send(event.payload)
}
GlobalBus.on("event", handler)
await send({ type: "server.connected", properties: {} })
const heartbeat = setInterval(() => {
void send({ type: "server.heartbeat", properties: {} })
}, 10_000)
await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolve()
})
})
})
})
}

View File

@@ -0,0 +1,24 @@
import { Hono } from "hono"
import { SessionRoutes } from "../../server/routes/session"
import { WorkspaceServerRoutes } from "./routes"
export namespace WorkspaceServer {
export function App() {
const session = new Hono()
.use("*", async (c, next) => {
if (c.req.method === "GET") return c.notFound()
await next()
})
.route("/", SessionRoutes())
return new Hono().route("/session", session).route("/", WorkspaceServerRoutes())
}
export function Listen(opts: { hostname: string; port: number }) {
return Bun.serve({
hostname: opts.hostname,
port: opts.port,
fetch: App().fetch,
})
}
}

View File

@@ -0,0 +1,12 @@
import { sqliteTable, text } from "drizzle-orm/sqlite-core"
import { ProjectTable } from "@/project/project.sql"
import type { Config } from "./config"
export const WorkspaceTable = sqliteTable("workspace", {
id: text().primaryKey(),
branch: text(),
project_id: text()
.notNull()
.references(() => ProjectTable.id, { onDelete: "cascade" }),
config: text({ mode: "json" }).notNull().$type<Config>(),
})

View File

@@ -0,0 +1,160 @@
import z from "zod"
import { Identifier } from "@/id/id"
import { fn } from "@/util/fn"
import { Database, eq } from "@/storage/db"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { Log } from "@/util/log"
import { WorkspaceTable } from "./workspace.sql"
import { Config } from "./config"
import { getAdaptor } from "./adaptors"
import { parseSSE } from "./sse"
export namespace Workspace {
export const Event = {
Ready: BusEvent.define(
"workspace.ready",
z.object({
name: z.string(),
}),
),
Failed: BusEvent.define(
"workspace.failed",
z.object({
message: z.string(),
}),
),
}
export const Info = z
.object({
id: Identifier.schema("workspace"),
branch: z.string().nullable(),
projectID: z.string(),
config: Config,
})
.meta({
ref: "Workspace",
})
export type Info = z.infer<typeof Info>
function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
return {
id: row.id,
branch: row.branch,
projectID: row.project_id,
config: row.config,
}
}
export const create = fn(
z.object({
id: Identifier.schema("workspace").optional(),
projectID: Info.shape.projectID,
branch: Info.shape.branch,
config: Info.shape.config,
}),
async (input) => {
const id = Identifier.ascending("workspace", input.id)
const { config, init } = await getAdaptor(input.config).create(input.config, input.branch)
const info: Info = {
id,
projectID: input.projectID,
branch: input.branch,
config,
}
setTimeout(async () => {
await init()
Database.use((db) => {
db.insert(WorkspaceTable)
.values({
id: info.id,
branch: info.branch,
project_id: info.projectID,
config: info.config,
})
.run()
})
GlobalBus.emit("event", {
directory: id,
payload: {
type: Event.Ready.type,
properties: {},
},
})
}, 0)
return info
},
)
export function list(project: Project.Info) {
const rows = Database.use((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
)
return rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
}
export const get = fn(Identifier.schema("workspace"), async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
return fromRow(row)
})
export const remove = fn(Identifier.schema("workspace"), async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (row) {
const info = fromRow(row)
await getAdaptor(info.config).remove(info.config)
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
}
})
const log = Log.create({ service: "workspace-sync" })
async function workspaceEventLoop(space: Info, stop: AbortSignal) {
while (!stop.aborted) {
const res = await getAdaptor(space.config)
.request(space.config, "GET", "/event", undefined, stop)
.catch(() => undefined)
if (!res || !res.ok || !res.body) {
await Bun.sleep(1000)
continue
}
await parseSSE(res.body, stop, (event) => {
GlobalBus.emit("event", {
directory: space.id,
payload: event,
})
})
// Wait 250ms and retry if SSE connection fails
await Bun.sleep(250)
}
}
export function startSyncing(project: Project.Info) {
const stop = new AbortController()
const spaces = list(project).filter((space) => space.config.type !== "worktree")
spaces.forEach((space) => {
void workspaceEventLoop(space, stop.signal).catch((error) => {
log.warn("workspace sync listener failed", {
workspaceID: space.id,
error,
})
})
})
return {
async stop() {
stop.abort()
},
}
}
}

View File

@@ -11,6 +11,7 @@ export namespace Identifier {
part: "prt",
pty: "pty",
tool: "tool",
workspace: "wrk",
} as const
export function schema(prefix: keyof typeof prefixes) {

View File

@@ -10,6 +10,7 @@ import { Session } from "../../session"
import { zodToJsonSchema } from "zod-to-json-schema"
import { errors } from "../error"
import { lazy } from "../../util/lazy"
import { WorkspaceRoutes } from "./workspace"
export const ExperimentalRoutes = lazy(() =>
new Hono()
@@ -112,6 +113,7 @@ export const ExperimentalRoutes = lazy(() =>
return c.json(worktree)
},
)
.route("/workspace", WorkspaceRoutes())
.get(
"/worktree",
describeRoute({

View File

@@ -16,11 +16,13 @@ import { Log } from "../../util/log"
import { PermissionNext } from "@/permission/next"
import { errors } from "../error"
import { lazy } from "../../util/lazy"
import { SessionProxyMiddleware } from "../../control-plane/session-proxy-middleware"
const log = Log.create({ service: "server" })
export const SessionRoutes = lazy(() =>
new Hono()
.use(SessionProxyMiddleware)
.get(
"/",
describeRoute({

View File

@@ -0,0 +1,104 @@
import { Hono } from "hono"
import { describeRoute, resolver, validator } from "hono-openapi"
import z from "zod"
import { Workspace } from "../../control-plane/workspace"
import { Instance } from "../../project/instance"
import { errors } from "../error"
import { lazy } from "../../util/lazy"
export const WorkspaceRoutes = lazy(() =>
new Hono()
.post(
"/:id",
describeRoute({
summary: "Create workspace",
description: "Create a workspace for the current project.",
operationId: "experimental.workspace.create",
responses: {
200: {
description: "Workspace created",
content: {
"application/json": {
schema: resolver(Workspace.Info),
},
},
},
...errors(400),
},
}),
validator(
"param",
z.object({
id: Workspace.Info.shape.id,
}),
),
validator(
"json",
z.object({
branch: Workspace.Info.shape.branch,
config: Workspace.Info.shape.config,
}),
),
async (c) => {
const { id } = c.req.valid("param")
const body = c.req.valid("json")
const workspace = await Workspace.create({
id,
projectID: Instance.project.id,
branch: body.branch,
config: body.config,
})
return c.json(workspace)
},
)
.get(
"/",
describeRoute({
summary: "List workspaces",
description: "List all workspaces.",
operationId: "experimental.workspace.list",
responses: {
200: {
description: "Workspaces",
content: {
"application/json": {
schema: resolver(z.array(Workspace.Info)),
},
},
},
},
}),
async (c) => {
return c.json(Workspace.list(Instance.project))
},
)
.delete(
"/:id",
describeRoute({
summary: "Remove workspace",
description: "Remove an existing workspace.",
operationId: "experimental.workspace.remove",
responses: {
200: {
description: "Workspace removed",
content: {
"application/json": {
schema: resolver(Workspace.Info.optional()),
},
},
},
...errors(400),
},
}),
validator(
"param",
z.object({
id: Workspace.Info.shape.id,
}),
),
async (c) => {
const { id } = c.req.valid("param")
return c.json(await Workspace.remove(id))
},
),
)

View File

@@ -2,3 +2,4 @@ export { ControlAccountTable } from "../control/control.sql"
export { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql"
export { SessionShareTable } from "../share/share.sql"
export { ProjectTable } from "../project/project.sql"
export { WorkspaceTable } from "../control-plane/workspace.sql"