mirror of
https://gitea.toothfairyai.com/ToothFairyAI/tf_code.git
synced 2026-04-22 00:24:46 +00:00
effectify Plugin service (#18570)
This commit is contained in:
@@ -75,6 +75,31 @@ export const ZodInfo = zod(Info) // derives z.ZodType from Schema.Union
|
|||||||
|
|
||||||
See `Auth.ZodInfo` for the canonical example.
|
See `Auth.ZodInfo` for the canonical example.
|
||||||
|
|
||||||
|
## InstanceState init patterns
|
||||||
|
|
||||||
|
The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for:
|
||||||
|
|
||||||
|
- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
const cache = yield* InstanceState.make<State>(
|
||||||
|
Effect.fn("Foo.state")(function* (ctx) {
|
||||||
|
// ... load state ...
|
||||||
|
|
||||||
|
yield* Effect.acquireRelease(
|
||||||
|
Effect.sync(() => Bus.subscribeAll((event) => { /* handle */ })),
|
||||||
|
(unsub) => Effect.sync(unsub),
|
||||||
|
)
|
||||||
|
|
||||||
|
return { /* state */ }
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
- **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal.
|
||||||
|
- **Side effects at init**: Config notification, event wiring, etc. all belong in the init closure. Callers just do `InstanceState.get(cache)` to trigger everything, and `ScopedCache` deduplicates automatically.
|
||||||
|
|
||||||
|
The key insight: don't split init into a separate method with a `started` flag. Put everything in the `InstanceState.make` closure and let `ScopedCache` handle the run-once semantics.
|
||||||
## Scheduled Tasks
|
## Scheduled Tasks
|
||||||
|
|
||||||
For loops or periodic work, use `Effect.repeat` or `Effect.schedule` with `Effect.forkScoped` in the layer definition.
|
For loops or periodic work, use `Effect.repeat` or `Effect.schedule` with `Effect.forkScoped` in the layer definition.
|
||||||
@@ -127,7 +152,7 @@ Fully migrated (single namespace, InstanceState where needed, flattened facade):
|
|||||||
|
|
||||||
Still open and likely worth migrating:
|
Still open and likely worth migrating:
|
||||||
|
|
||||||
- [ ] `Plugin`
|
- [x] `Plugin`
|
||||||
- [ ] `ToolRegistry`
|
- [ ] `ToolRegistry`
|
||||||
- [ ] `Pty`
|
- [ ] `Pty`
|
||||||
- [ ] `Worktree`
|
- [ ] `Worktree`
|
||||||
|
|||||||
@@ -5,140 +5,204 @@ import { Log } from "../util/log"
|
|||||||
import { createOpencodeClient } from "@opencode-ai/sdk"
|
import { createOpencodeClient } from "@opencode-ai/sdk"
|
||||||
import { Server } from "../server/server"
|
import { Server } from "../server/server"
|
||||||
import { BunProc } from "../bun"
|
import { BunProc } from "../bun"
|
||||||
import { Instance } from "../project/instance"
|
|
||||||
import { Flag } from "../flag/flag"
|
import { Flag } from "../flag/flag"
|
||||||
import { CodexAuthPlugin } from "./codex"
|
import { CodexAuthPlugin } from "./codex"
|
||||||
import { Session } from "../session"
|
import { Session } from "../session"
|
||||||
import { NamedError } from "@opencode-ai/util/error"
|
import { NamedError } from "@opencode-ai/util/error"
|
||||||
import { CopilotAuthPlugin } from "./copilot"
|
import { CopilotAuthPlugin } from "./copilot"
|
||||||
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
|
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
|
||||||
|
import { Effect, Layer, ServiceMap } from "effect"
|
||||||
|
import { InstanceState } from "@/effect/instance-state"
|
||||||
|
import { makeRunPromise } from "@/effect/run-service"
|
||||||
|
|
||||||
export namespace Plugin {
|
export namespace Plugin {
|
||||||
const log = Log.create({ service: "plugin" })
|
const log = Log.create({ service: "plugin" })
|
||||||
|
|
||||||
|
type State = {
|
||||||
|
hooks: Hooks[]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hook names that follow the (input, output) => Promise<void> trigger pattern
|
||||||
|
type TriggerName = {
|
||||||
|
[K in keyof Hooks]-?: NonNullable<Hooks[K]> extends (input: any, output: any) => Promise<void>
|
||||||
|
? K
|
||||||
|
: never
|
||||||
|
}[keyof Hooks]
|
||||||
|
|
||||||
|
export interface Interface {
|
||||||
|
readonly trigger: <
|
||||||
|
Name extends TriggerName,
|
||||||
|
Input = Parameters<Required<Hooks>[Name]>[0],
|
||||||
|
Output = Parameters<Required<Hooks>[Name]>[1],
|
||||||
|
>(
|
||||||
|
name: Name,
|
||||||
|
input: Input,
|
||||||
|
output: Output,
|
||||||
|
) => Effect.Effect<Output>
|
||||||
|
readonly list: () => Effect.Effect<Hooks[]>
|
||||||
|
readonly init: () => Effect.Effect<void>
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Plugin") {}
|
||||||
|
|
||||||
// Built-in plugins that are directly imported (not installed from npm)
|
// Built-in plugins that are directly imported (not installed from npm)
|
||||||
const INTERNAL_PLUGINS: PluginInstance[] = [CodexAuthPlugin, CopilotAuthPlugin, GitlabAuthPlugin]
|
const INTERNAL_PLUGINS: PluginInstance[] = [CodexAuthPlugin, CopilotAuthPlugin, GitlabAuthPlugin]
|
||||||
|
|
||||||
const state = Instance.state(async () => {
|
// Old npm package names for plugins that are now built-in — skip if users still have them in config
|
||||||
const client = createOpencodeClient({
|
const DEPRECATED_PLUGIN_PACKAGES = ["opencode-openai-codex-auth", "opencode-copilot-auth"]
|
||||||
baseUrl: "http://localhost:4096",
|
|
||||||
directory: Instance.directory,
|
|
||||||
headers: Flag.OPENCODE_SERVER_PASSWORD
|
|
||||||
? {
|
|
||||||
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
|
|
||||||
}
|
|
||||||
: undefined,
|
|
||||||
fetch: async (...args) => Server.Default().fetch(...args),
|
|
||||||
})
|
|
||||||
const config = await Config.get()
|
|
||||||
const hooks: Hooks[] = []
|
|
||||||
const input: PluginInput = {
|
|
||||||
client,
|
|
||||||
project: Instance.project,
|
|
||||||
worktree: Instance.worktree,
|
|
||||||
directory: Instance.directory,
|
|
||||||
get serverUrl(): URL {
|
|
||||||
return Server.url ?? new URL("http://localhost:4096")
|
|
||||||
},
|
|
||||||
$: Bun.$,
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const plugin of INTERNAL_PLUGINS) {
|
export const layer = Layer.effect(
|
||||||
log.info("loading internal plugin", { name: plugin.name })
|
Service,
|
||||||
const init = await plugin(input).catch((err) => {
|
Effect.gen(function* () {
|
||||||
log.error("failed to load internal plugin", { name: plugin.name, error: err })
|
const cache = yield* InstanceState.make<State>(
|
||||||
|
Effect.fn("Plugin.state")(function* (ctx) {
|
||||||
|
const hooks: Hooks[] = []
|
||||||
|
|
||||||
|
yield* Effect.promise(async () => {
|
||||||
|
const client = createOpencodeClient({
|
||||||
|
baseUrl: "http://localhost:4096",
|
||||||
|
directory: ctx.directory,
|
||||||
|
headers: Flag.OPENCODE_SERVER_PASSWORD
|
||||||
|
? {
|
||||||
|
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${Flag.OPENCODE_SERVER_PASSWORD}`).toString("base64")}`,
|
||||||
|
}
|
||||||
|
: undefined,
|
||||||
|
fetch: async (...args) => Server.Default().fetch(...args),
|
||||||
|
})
|
||||||
|
const cfg = await Config.get()
|
||||||
|
const input: PluginInput = {
|
||||||
|
client,
|
||||||
|
project: ctx.project,
|
||||||
|
worktree: ctx.worktree,
|
||||||
|
directory: ctx.directory,
|
||||||
|
get serverUrl(): URL {
|
||||||
|
return Server.url ?? new URL("http://localhost:4096")
|
||||||
|
},
|
||||||
|
$: Bun.$,
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const plugin of INTERNAL_PLUGINS) {
|
||||||
|
log.info("loading internal plugin", { name: plugin.name })
|
||||||
|
const init = await plugin(input).catch((err) => {
|
||||||
|
log.error("failed to load internal plugin", { name: plugin.name, error: err })
|
||||||
|
})
|
||||||
|
if (init) hooks.push(init)
|
||||||
|
}
|
||||||
|
|
||||||
|
let plugins = cfg.plugin ?? []
|
||||||
|
if (plugins.length) await Config.waitForDependencies()
|
||||||
|
|
||||||
|
for (let plugin of plugins) {
|
||||||
|
if (DEPRECATED_PLUGIN_PACKAGES.some((pkg) => plugin.includes(pkg))) continue
|
||||||
|
log.info("loading plugin", { path: plugin })
|
||||||
|
if (!plugin.startsWith("file://")) {
|
||||||
|
const idx = plugin.lastIndexOf("@")
|
||||||
|
const pkg = idx > 0 ? plugin.substring(0, idx) : plugin
|
||||||
|
const version = idx > 0 ? plugin.substring(idx + 1) : "latest"
|
||||||
|
plugin = await BunProc.install(pkg, version).catch((err) => {
|
||||||
|
const cause = err instanceof Error ? err.cause : err
|
||||||
|
const detail = cause instanceof Error ? cause.message : String(cause ?? err)
|
||||||
|
log.error("failed to install plugin", { pkg, version, error: detail })
|
||||||
|
Bus.publish(Session.Event.Error, {
|
||||||
|
error: new NamedError.Unknown({
|
||||||
|
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
|
||||||
|
}).toObject(),
|
||||||
|
})
|
||||||
|
return ""
|
||||||
|
})
|
||||||
|
if (!plugin) continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent duplicate initialization when plugins export the same function
|
||||||
|
// as both a named export and default export (e.g., `export const X` and `export default X`).
|
||||||
|
// Object.entries(mod) would return both entries pointing to the same function reference.
|
||||||
|
await import(plugin)
|
||||||
|
.then(async (mod) => {
|
||||||
|
const seen = new Set<PluginInstance>()
|
||||||
|
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
|
||||||
|
if (seen.has(fn)) continue
|
||||||
|
seen.add(fn)
|
||||||
|
hooks.push(await fn(input))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
const message = err instanceof Error ? err.message : String(err)
|
||||||
|
log.error("failed to load plugin", { path: plugin, error: message })
|
||||||
|
Bus.publish(Session.Event.Error, {
|
||||||
|
error: new NamedError.Unknown({
|
||||||
|
message: `Failed to load plugin ${plugin}: ${message}`,
|
||||||
|
}).toObject(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify plugins of current config
|
||||||
|
for (const hook of hooks) {
|
||||||
|
await (hook as any).config?.(cfg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Subscribe to bus events, clean up when scope is closed
|
||||||
|
yield* Effect.acquireRelease(
|
||||||
|
Effect.sync(() =>
|
||||||
|
Bus.subscribeAll(async (input) => {
|
||||||
|
for (const hook of hooks) {
|
||||||
|
hook["event"]?.({ event: input })
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
(unsub) => Effect.sync(unsub),
|
||||||
|
)
|
||||||
|
|
||||||
|
return { hooks }
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
const trigger = Effect.fn("Plugin.trigger")(function* <
|
||||||
|
Name extends TriggerName,
|
||||||
|
Input = Parameters<Required<Hooks>[Name]>[0],
|
||||||
|
Output = Parameters<Required<Hooks>[Name]>[1],
|
||||||
|
>(name: Name, input: Input, output: Output) {
|
||||||
|
if (!name) return output
|
||||||
|
const state = yield* InstanceState.get(cache)
|
||||||
|
yield* Effect.promise(async () => {
|
||||||
|
for (const hook of state.hooks) {
|
||||||
|
const fn = hook[name] as any
|
||||||
|
if (!fn) continue
|
||||||
|
await fn(input, output)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return output
|
||||||
})
|
})
|
||||||
if (init) hooks.push(init)
|
|
||||||
}
|
|
||||||
|
|
||||||
let plugins = config.plugin ?? []
|
const list = Effect.fn("Plugin.list")(function* () {
|
||||||
if (plugins.length) await Config.waitForDependencies()
|
const state = yield* InstanceState.get(cache)
|
||||||
|
return state.hooks
|
||||||
|
})
|
||||||
|
|
||||||
for (let plugin of plugins) {
|
const init = Effect.fn("Plugin.init")(function* () {
|
||||||
// ignore old codex plugin since it is supported first party now
|
yield* InstanceState.get(cache)
|
||||||
if (plugin.includes("opencode-openai-codex-auth") || plugin.includes("opencode-copilot-auth")) continue
|
})
|
||||||
log.info("loading plugin", { path: plugin })
|
|
||||||
if (!plugin.startsWith("file://")) {
|
|
||||||
const lastAtIndex = plugin.lastIndexOf("@")
|
|
||||||
const pkg = lastAtIndex > 0 ? plugin.substring(0, lastAtIndex) : plugin
|
|
||||||
const version = lastAtIndex > 0 ? plugin.substring(lastAtIndex + 1) : "latest"
|
|
||||||
plugin = await BunProc.install(pkg, version).catch((err) => {
|
|
||||||
const cause = err instanceof Error ? err.cause : err
|
|
||||||
const detail = cause instanceof Error ? cause.message : String(cause ?? err)
|
|
||||||
log.error("failed to install plugin", { pkg, version, error: detail })
|
|
||||||
Bus.publish(Session.Event.Error, {
|
|
||||||
error: new NamedError.Unknown({
|
|
||||||
message: `Failed to install plugin ${pkg}@${version}: ${detail}`,
|
|
||||||
}).toObject(),
|
|
||||||
})
|
|
||||||
return ""
|
|
||||||
})
|
|
||||||
if (!plugin) continue
|
|
||||||
}
|
|
||||||
// Prevent duplicate initialization when plugins export the same function
|
|
||||||
// as both a named export and default export (e.g., `export const X` and `export default X`).
|
|
||||||
// Object.entries(mod) would return both entries pointing to the same function reference.
|
|
||||||
await import(plugin)
|
|
||||||
.then(async (mod) => {
|
|
||||||
const seen = new Set<PluginInstance>()
|
|
||||||
for (const [_name, fn] of Object.entries<PluginInstance>(mod)) {
|
|
||||||
if (seen.has(fn)) continue
|
|
||||||
seen.add(fn)
|
|
||||||
hooks.push(await fn(input))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((err) => {
|
|
||||||
const message = err instanceof Error ? err.message : String(err)
|
|
||||||
log.error("failed to load plugin", { path: plugin, error: message })
|
|
||||||
Bus.publish(Session.Event.Error, {
|
|
||||||
error: new NamedError.Unknown({
|
|
||||||
message: `Failed to load plugin ${plugin}: ${message}`,
|
|
||||||
}).toObject(),
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
return Service.of({ trigger, list, init })
|
||||||
hooks,
|
}),
|
||||||
input,
|
)
|
||||||
}
|
|
||||||
})
|
const runPromise = makeRunPromise(Service, layer)
|
||||||
|
|
||||||
export async function trigger<
|
export async function trigger<
|
||||||
Name extends Exclude<keyof Required<Hooks>, "auth" | "event" | "tool">,
|
Name extends TriggerName,
|
||||||
Input = Parameters<Required<Hooks>[Name]>[0],
|
Input = Parameters<Required<Hooks>[Name]>[0],
|
||||||
Output = Parameters<Required<Hooks>[Name]>[1],
|
Output = Parameters<Required<Hooks>[Name]>[1],
|
||||||
>(name: Name, input: Input, output: Output): Promise<Output> {
|
>(name: Name, input: Input, output: Output): Promise<Output> {
|
||||||
if (!name) return output
|
return runPromise((svc) => svc.trigger(name, input, output))
|
||||||
for (const hook of await state().then((x) => x.hooks)) {
|
|
||||||
const fn = hook[name]
|
|
||||||
if (!fn) continue
|
|
||||||
// @ts-expect-error if you feel adventurous, please fix the typing, make sure to bump the try-counter if you
|
|
||||||
// give up.
|
|
||||||
// try-counter: 2
|
|
||||||
await fn(input, output)
|
|
||||||
}
|
|
||||||
return output
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function list() {
|
export async function list(): Promise<Hooks[]> {
|
||||||
return state().then((x) => x.hooks)
|
return runPromise((svc) => svc.list())
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function init() {
|
export async function init() {
|
||||||
const hooks = await state().then((x) => x.hooks)
|
return runPromise((svc) => svc.init())
|
||||||
const config = await Config.get()
|
|
||||||
for (const hook of hooks) {
|
|
||||||
// @ts-expect-error this is because we haven't moved plugin to sdk v2
|
|
||||||
await hook.config?.(config)
|
|
||||||
}
|
|
||||||
Bus.subscribeAll(async (input) => {
|
|
||||||
const hooks = await state().then((x) => x.hooks)
|
|
||||||
for (const hook of hooks) {
|
|
||||||
hook["event"]?.({
|
|
||||||
event: input,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user