mirror of
https://gitea.toothfairyai.com/ToothFairyAI/tf_code.git
synced 2026-04-26 02:24:43 +00:00
stack: effectify-file-watcher-service (#17827)
This commit is contained in:
@@ -34,6 +34,7 @@ Instructions to follow when writing Effect.
|
|||||||
- Use `Effect.gen(function* () { ... })` for composition.
|
- Use `Effect.gen(function* () { ... })` for composition.
|
||||||
- Use `Effect.fn("ServiceName.method")` for named/traced effects and `Effect.fnUntraced` for internal helpers.
|
- Use `Effect.fn("ServiceName.method")` for named/traced effects and `Effect.fnUntraced` for internal helpers.
|
||||||
- `Effect.fn` / `Effect.fnUntraced` accept pipeable operators as extra arguments, so avoid unnecessary `flow` or outer `.pipe()` wrappers.
|
- `Effect.fn` / `Effect.fnUntraced` accept pipeable operators as extra arguments, so avoid unnecessary `flow` or outer `.pipe()` wrappers.
|
||||||
|
- **`Effect.callback`** (not `Effect.async`) for callback-based APIs. The classic `Effect.async` was renamed to `Effect.callback` in effect-smol/v4.
|
||||||
|
|
||||||
## Time
|
## Time
|
||||||
|
|
||||||
@@ -42,3 +43,37 @@ Instructions to follow when writing Effect.
|
|||||||
## Errors
|
## Errors
|
||||||
|
|
||||||
- In `Effect.gen/fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
|
- In `Effect.gen/fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
|
||||||
|
|
||||||
|
## Instance-scoped Effect services
|
||||||
|
|
||||||
|
Services that need per-directory lifecycle (created/destroyed per instance) go through the `Instances` LayerMap:
|
||||||
|
|
||||||
|
1. Define a `ServiceMap.Service` with a `static readonly layer` (see `FileWatcherService`, `QuestionService`, `PermissionService`, `ProviderAuthService`).
|
||||||
|
2. Add it to `InstanceServices` union and `Layer.mergeAll(...)` in `src/effect/instances.ts`.
|
||||||
|
3. Use `InstanceContext` inside the layer to read `directory` and `project` instead of `Instance.*` globals.
|
||||||
|
4. Call from legacy code via `runPromiseInstance(MyService.use((s) => s.method()))`.
|
||||||
|
|
||||||
|
### Instance.bind — ALS context for native callbacks
|
||||||
|
|
||||||
|
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and returns a wrapper that restores it synchronously when called.
|
||||||
|
|
||||||
|
**Use it** when passing callbacks to native C/C++ addons (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
|
||||||
|
|
||||||
|
**Don't need it** for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers — Node.js ALS propagates through those automatically.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Native addon callback — needs Instance.bind
|
||||||
|
const cb = Instance.bind((err, evts) => {
|
||||||
|
Bus.publish(MyEvent, { ... })
|
||||||
|
})
|
||||||
|
nativeAddon.subscribe(dir, cb)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Flag → Effect.Config migration
|
||||||
|
|
||||||
|
Flags in `src/flag/flag.ts` are being migrated from static `truthy(...)` reads to `Config.boolean(...).pipe(Config.withDefault(false))` as their consumers get effectified.
|
||||||
|
|
||||||
|
- Effectful flags return `Config<boolean>` and are read with `yield*` inside `Effect.gen`.
|
||||||
|
- The default `ConfigProvider` reads from `process.env`, so env vars keep working.
|
||||||
|
- Tests can override via `ConfigProvider.layer(ConfigProvider.fromUnknown({ ... }))`.
|
||||||
|
- Keep all flags in `flag.ts` as the single registry — just change the implementation from `truthy()` to `Config.boolean()` when the consumer moves to Effect.
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { registerDisposer } from "./instance-registry"
|
|||||||
import { ProviderAuthService } from "@/provider/auth-service"
|
import { ProviderAuthService } from "@/provider/auth-service"
|
||||||
import { QuestionService } from "@/question/service"
|
import { QuestionService } from "@/question/service"
|
||||||
import { PermissionService } from "@/permission/service"
|
import { PermissionService } from "@/permission/service"
|
||||||
|
import { FileWatcherService } from "@/file/watcher"
|
||||||
import { Instance } from "@/project/instance"
|
import { Instance } from "@/project/instance"
|
||||||
import type { Project } from "@/project/project"
|
import type { Project } from "@/project/project"
|
||||||
|
|
||||||
@@ -17,7 +18,7 @@ export class InstanceContext extends ServiceMap.Service<InstanceContext, Instanc
|
|||||||
"opencode/InstanceContext",
|
"opencode/InstanceContext",
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
export type InstanceServices = QuestionService | PermissionService | ProviderAuthService
|
export type InstanceServices = QuestionService | PermissionService | ProviderAuthService | FileWatcherService
|
||||||
|
|
||||||
function lookup(directory: string) {
|
function lookup(directory: string) {
|
||||||
const project = Instance.project
|
const project = Instance.project
|
||||||
@@ -26,6 +27,7 @@ function lookup(directory: string) {
|
|||||||
Layer.fresh(QuestionService.layer),
|
Layer.fresh(QuestionService.layer),
|
||||||
Layer.fresh(PermissionService.layer),
|
Layer.fresh(PermissionService.layer),
|
||||||
Layer.fresh(ProviderAuthService.layer),
|
Layer.fresh(ProviderAuthService.layer),
|
||||||
|
Layer.fresh(FileWatcherService.layer),
|
||||||
).pipe(Layer.provide(ctx))
|
).pipe(Layer.provide(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import { BusEvent } from "@/bus/bus-event"
|
import { BusEvent } from "@/bus/bus-event"
|
||||||
import { Bus } from "@/bus"
|
import { Bus } from "@/bus"
|
||||||
|
import { InstanceContext } from "@/effect/instances"
|
||||||
|
import { Instance } from "@/project/instance"
|
||||||
import z from "zod"
|
import z from "zod"
|
||||||
import { Instance } from "../project/instance"
|
|
||||||
import { Log } from "../util/log"
|
import { Log } from "../util/log"
|
||||||
import { FileIgnore } from "./ignore"
|
import { FileIgnore } from "./ignore"
|
||||||
import { Config } from "../config/config"
|
import { Config } from "../config/config"
|
||||||
@@ -9,118 +10,139 @@ import path from "path"
|
|||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
import { createWrapper } from "@parcel/watcher/wrapper"
|
import { createWrapper } from "@parcel/watcher/wrapper"
|
||||||
import { lazy } from "@/util/lazy"
|
import { lazy } from "@/util/lazy"
|
||||||
import { withTimeout } from "@/util/timeout"
|
|
||||||
import type ParcelWatcher from "@parcel/watcher"
|
import type ParcelWatcher from "@parcel/watcher"
|
||||||
import { Flag } from "@/flag/flag"
|
|
||||||
import { readdir } from "fs/promises"
|
import { readdir } from "fs/promises"
|
||||||
import { git } from "@/util/git"
|
import { git } from "@/util/git"
|
||||||
import { Protected } from "./protected"
|
import { Protected } from "./protected"
|
||||||
|
import { Flag } from "@/flag/flag"
|
||||||
|
import { Cause, Effect, Layer, ServiceMap } from "effect"
|
||||||
|
|
||||||
const SUBSCRIBE_TIMEOUT_MS = 10_000
|
const SUBSCRIBE_TIMEOUT_MS = 10_000
|
||||||
|
|
||||||
declare const OPENCODE_LIBC: string | undefined
|
declare const OPENCODE_LIBC: string | undefined
|
||||||
|
|
||||||
export namespace FileWatcher {
|
const log = Log.create({ service: "file.watcher" })
|
||||||
const log = Log.create({ service: "file.watcher" })
|
|
||||||
|
|
||||||
export const Event = {
|
const event = {
|
||||||
Updated: BusEvent.define(
|
Updated: BusEvent.define(
|
||||||
"file.watcher.updated",
|
"file.watcher.updated",
|
||||||
z.object({
|
z.object({
|
||||||
file: z.string(),
|
file: z.string(),
|
||||||
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
|
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
|
||||||
|
try {
|
||||||
|
const binding = require(
|
||||||
|
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
|
||||||
|
)
|
||||||
|
return createWrapper(binding) as typeof import("@parcel/watcher")
|
||||||
|
} catch (error) {
|
||||||
|
log.error("failed to load watcher binding", { error })
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
|
function getBackend() {
|
||||||
try {
|
if (process.platform === "win32") return "windows"
|
||||||
const binding = require(
|
if (process.platform === "darwin") return "fs-events"
|
||||||
`@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
|
if (process.platform === "linux") return "inotify"
|
||||||
)
|
}
|
||||||
return createWrapper(binding) as typeof import("@parcel/watcher")
|
|
||||||
} catch (error) {
|
|
||||||
log.error("failed to load watcher binding", { error })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
const state = Instance.state(
|
export namespace FileWatcher {
|
||||||
async () => {
|
export const Event = event
|
||||||
log.info("init")
|
/** Whether the native @parcel/watcher binding is available on this platform. */
|
||||||
const cfg = await Config.get()
|
export const hasNativeBinding = () => !!watcher()
|
||||||
const backend = (() => {
|
}
|
||||||
if (process.platform === "win32") return "windows"
|
|
||||||
if (process.platform === "darwin") return "fs-events"
|
|
||||||
if (process.platform === "linux") return "inotify"
|
|
||||||
})()
|
|
||||||
if (!backend) {
|
|
||||||
log.error("watcher backend not supported", { platform: process.platform })
|
|
||||||
return {}
|
|
||||||
}
|
|
||||||
log.info("watcher backend", { platform: process.platform, backend })
|
|
||||||
|
|
||||||
const w = watcher()
|
const init = Effect.fn("FileWatcherService.init")(function* () {})
|
||||||
if (!w) return {}
|
|
||||||
|
|
||||||
const subscribe: ParcelWatcher.SubscribeCallback = (err, evts) => {
|
export namespace FileWatcherService {
|
||||||
if (err) return
|
export interface Service {
|
||||||
for (const evt of evts) {
|
readonly init: () => Effect.Effect<void>
|
||||||
if (evt.type === "create") Bus.publish(Event.Updated, { file: evt.path, event: "add" })
|
|
||||||
if (evt.type === "update") Bus.publish(Event.Updated, { file: evt.path, event: "change" })
|
|
||||||
if (evt.type === "delete") Bus.publish(Event.Updated, { file: evt.path, event: "unlink" })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const subs: ParcelWatcher.AsyncSubscription[] = []
|
|
||||||
const cfgIgnores = cfg.watcher?.ignore ?? []
|
|
||||||
|
|
||||||
if (Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
|
|
||||||
const pending = w.subscribe(Instance.directory, subscribe, {
|
|
||||||
ignore: [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()],
|
|
||||||
backend,
|
|
||||||
})
|
|
||||||
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
|
|
||||||
log.error("failed to subscribe to Instance.directory", { error: err })
|
|
||||||
pending.then((s) => s.unsubscribe()).catch(() => {})
|
|
||||||
return undefined
|
|
||||||
})
|
|
||||||
if (sub) subs.push(sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Instance.project.vcs === "git") {
|
|
||||||
const result = await git(["rev-parse", "--git-dir"], {
|
|
||||||
cwd: Instance.worktree,
|
|
||||||
})
|
|
||||||
const vcsDir = result.exitCode === 0 ? path.resolve(Instance.worktree, result.text().trim()) : undefined
|
|
||||||
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
|
|
||||||
const gitDirContents = await readdir(vcsDir).catch(() => [])
|
|
||||||
const ignoreList = gitDirContents.filter((entry) => entry !== "HEAD")
|
|
||||||
const pending = w.subscribe(vcsDir, subscribe, {
|
|
||||||
ignore: ignoreList,
|
|
||||||
backend,
|
|
||||||
})
|
|
||||||
const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
|
|
||||||
log.error("failed to subscribe to vcsDir", { error: err })
|
|
||||||
pending.then((s) => s.unsubscribe()).catch(() => {})
|
|
||||||
return undefined
|
|
||||||
})
|
|
||||||
if (sub) subs.push(sub)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return { subs }
|
|
||||||
},
|
|
||||||
async (state) => {
|
|
||||||
if (!state.subs) return
|
|
||||||
await Promise.all(state.subs.map((sub) => sub?.unsubscribe()))
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
export function init() {
|
|
||||||
if (Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
state()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class FileWatcherService extends ServiceMap.Service<FileWatcherService, FileWatcherService.Service>()(
|
||||||
|
"@opencode/FileWatcher",
|
||||||
|
) {
|
||||||
|
static readonly layer = Layer.effect(
|
||||||
|
FileWatcherService,
|
||||||
|
Effect.gen(function* () {
|
||||||
|
const instance = yield* InstanceContext
|
||||||
|
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
|
||||||
|
|
||||||
|
log.info("init", { directory: instance.directory })
|
||||||
|
|
||||||
|
const backend = getBackend()
|
||||||
|
if (!backend) {
|
||||||
|
log.error("watcher backend not supported", { directory: instance.directory, platform: process.platform })
|
||||||
|
return FileWatcherService.of({ init })
|
||||||
|
}
|
||||||
|
|
||||||
|
const w = watcher()
|
||||||
|
if (!w) return FileWatcherService.of({ init })
|
||||||
|
|
||||||
|
log.info("watcher backend", { directory: instance.directory, platform: process.platform, backend })
|
||||||
|
|
||||||
|
const subs: ParcelWatcher.AsyncSubscription[] = []
|
||||||
|
yield* Effect.addFinalizer(() => Effect.promise(() => Promise.allSettled(subs.map((sub) => sub.unsubscribe()))))
|
||||||
|
|
||||||
|
const cb: ParcelWatcher.SubscribeCallback = Instance.bind((err, evts) => {
|
||||||
|
if (err) return
|
||||||
|
for (const evt of evts) {
|
||||||
|
if (evt.type === "create") Bus.publish(event.Updated, { file: evt.path, event: "add" })
|
||||||
|
if (evt.type === "update") Bus.publish(event.Updated, { file: evt.path, event: "change" })
|
||||||
|
if (evt.type === "delete") Bus.publish(event.Updated, { file: evt.path, event: "unlink" })
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const subscribe = (dir: string, ignore: string[]) => {
|
||||||
|
const pending = w.subscribe(dir, cb, { ignore, backend })
|
||||||
|
return Effect.gen(function* () {
|
||||||
|
const sub = yield* Effect.promise(() => pending)
|
||||||
|
subs.push(sub)
|
||||||
|
}).pipe(
|
||||||
|
Effect.timeout(SUBSCRIBE_TIMEOUT_MS),
|
||||||
|
Effect.catchCause((cause) => {
|
||||||
|
log.error("failed to subscribe", { dir, cause: Cause.pretty(cause) })
|
||||||
|
// Clean up a subscription that resolves after timeout
|
||||||
|
pending.then((s) => s.unsubscribe()).catch(() => {})
|
||||||
|
return Effect.void
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
const cfg = yield* Effect.promise(() => Config.get())
|
||||||
|
const cfgIgnores = cfg.watcher?.ignore ?? []
|
||||||
|
|
||||||
|
if (yield* Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
|
||||||
|
yield* subscribe(instance.directory, [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()])
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instance.project.vcs === "git") {
|
||||||
|
const result = yield* Effect.promise(() =>
|
||||||
|
git(["rev-parse", "--git-dir"], {
|
||||||
|
cwd: instance.project.worktree,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
const vcsDir = result.exitCode === 0 ? path.resolve(instance.project.worktree, result.text().trim()) : undefined
|
||||||
|
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
|
||||||
|
const ignore = (yield* Effect.promise(() => readdir(vcsDir).catch(() => []))).filter(
|
||||||
|
(entry) => entry !== "HEAD",
|
||||||
|
)
|
||||||
|
yield* subscribe(vcsDir, ignore)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return FileWatcherService.of({ init })
|
||||||
|
}).pipe(
|
||||||
|
Effect.catchCause((cause) => {
|
||||||
|
log.error("failed to init watcher service", { cause: Cause.pretty(cause) })
|
||||||
|
return Effect.succeed(FileWatcherService.of({ init }))
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import { Config } from "effect"
|
||||||
|
|
||||||
function truthy(key: string) {
|
function truthy(key: string) {
|
||||||
const value = process.env[key]?.toLowerCase()
|
const value = process.env[key]?.toLowerCase()
|
||||||
return value === "true" || value === "1"
|
return value === "true" || value === "1"
|
||||||
@@ -40,8 +42,12 @@ export namespace Flag {
|
|||||||
|
|
||||||
// Experimental
|
// Experimental
|
||||||
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
|
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
|
||||||
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_FILEWATCHER")
|
export const OPENCODE_EXPERIMENTAL_FILEWATCHER = Config.boolean("OPENCODE_EXPERIMENTAL_FILEWATCHER").pipe(
|
||||||
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER")
|
Config.withDefault(false),
|
||||||
|
)
|
||||||
|
export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = Config.boolean(
|
||||||
|
"OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER",
|
||||||
|
).pipe(Config.withDefault(false))
|
||||||
export const OPENCODE_EXPERIMENTAL_ICON_DISCOVERY =
|
export const OPENCODE_EXPERIMENTAL_ICON_DISCOVERY =
|
||||||
OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_ICON_DISCOVERY")
|
OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_ICON_DISCOVERY")
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { Plugin } from "../plugin"
|
import { Plugin } from "../plugin"
|
||||||
import { Format } from "../format"
|
import { Format } from "../format"
|
||||||
import { LSP } from "../lsp"
|
import { LSP } from "../lsp"
|
||||||
import { FileWatcher } from "../file/watcher"
|
import { FileWatcherService } from "../file/watcher"
|
||||||
import { File } from "../file"
|
import { File } from "../file"
|
||||||
import { Project } from "./project"
|
import { Project } from "./project"
|
||||||
import { Bus } from "../bus"
|
import { Bus } from "../bus"
|
||||||
@@ -12,6 +12,7 @@ import { Log } from "@/util/log"
|
|||||||
import { ShareNext } from "@/share/share-next"
|
import { ShareNext } from "@/share/share-next"
|
||||||
import { Snapshot } from "../snapshot"
|
import { Snapshot } from "../snapshot"
|
||||||
import { Truncate } from "../tool/truncation"
|
import { Truncate } from "../tool/truncation"
|
||||||
|
import { runPromiseInstance } from "@/effect/runtime"
|
||||||
|
|
||||||
export async function InstanceBootstrap() {
|
export async function InstanceBootstrap() {
|
||||||
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
||||||
@@ -19,7 +20,7 @@ export async function InstanceBootstrap() {
|
|||||||
ShareNext.init()
|
ShareNext.init()
|
||||||
Format.init()
|
Format.init()
|
||||||
await LSP.init()
|
await LSP.init()
|
||||||
FileWatcher.init()
|
await runPromiseInstance(FileWatcherService.use((service) => service.init()))
|
||||||
File.init()
|
File.init()
|
||||||
Vcs.init()
|
Vcs.init()
|
||||||
Snapshot.init()
|
Snapshot.init()
|
||||||
|
|||||||
@@ -101,6 +101,15 @@ export const Instance = {
|
|||||||
if (Instance.worktree === "/") return false
|
if (Instance.worktree === "/") return false
|
||||||
return Filesystem.contains(Instance.worktree, filepath)
|
return Filesystem.contains(Instance.worktree, filepath)
|
||||||
},
|
},
|
||||||
|
/**
|
||||||
|
* Captures the current instance ALS context and returns a wrapper that
|
||||||
|
* restores it when called. Use this for callbacks that fire outside the
|
||||||
|
* instance async context (native addons, event emitters, timers, etc.).
|
||||||
|
*/
|
||||||
|
bind<F extends (...args: any[]) => any>(fn: F): F {
|
||||||
|
const ctx = context.use()
|
||||||
|
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
|
||||||
|
},
|
||||||
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
|
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
|
||||||
return State.create(() => Instance.directory, init, dispose)
|
return State.create(() => Instance.directory, init, dispose)
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -167,40 +167,44 @@ export namespace Pty {
|
|||||||
subscribers: new Map(),
|
subscribers: new Map(),
|
||||||
}
|
}
|
||||||
state().set(id, session)
|
state().set(id, session)
|
||||||
ptyProcess.onData((chunk) => {
|
ptyProcess.onData(
|
||||||
session.cursor += chunk.length
|
Instance.bind((chunk) => {
|
||||||
|
session.cursor += chunk.length
|
||||||
|
|
||||||
for (const [key, ws] of session.subscribers.entries()) {
|
for (const [key, ws] of session.subscribers.entries()) {
|
||||||
if (ws.readyState !== 1) {
|
if (ws.readyState !== 1) {
|
||||||
session.subscribers.delete(key)
|
session.subscribers.delete(key)
|
||||||
continue
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ws.data !== key) {
|
||||||
|
session.subscribers.delete(key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
ws.send(chunk)
|
||||||
|
} catch {
|
||||||
|
session.subscribers.delete(key)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ws.data !== key) {
|
session.buffer += chunk
|
||||||
session.subscribers.delete(key)
|
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||||
continue
|
const excess = session.buffer.length - BUFFER_LIMIT
|
||||||
}
|
session.buffer = session.buffer.slice(excess)
|
||||||
|
session.bufferCursor += excess
|
||||||
try {
|
}),
|
||||||
ws.send(chunk)
|
)
|
||||||
} catch {
|
ptyProcess.onExit(
|
||||||
session.subscribers.delete(key)
|
Instance.bind(({ exitCode }) => {
|
||||||
}
|
if (session.info.status === "exited") return
|
||||||
}
|
log.info("session exited", { id, exitCode })
|
||||||
|
session.info.status = "exited"
|
||||||
session.buffer += chunk
|
Bus.publish(Event.Exited, { id, exitCode })
|
||||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
remove(id)
|
||||||
const excess = session.buffer.length - BUFFER_LIMIT
|
}),
|
||||||
session.buffer = session.buffer.slice(excess)
|
)
|
||||||
session.bufferCursor += excess
|
|
||||||
})
|
|
||||||
ptyProcess.onExit(({ exitCode }) => {
|
|
||||||
if (session.info.status === "exited") return
|
|
||||||
log.info("session exited", { id, exitCode })
|
|
||||||
session.info.status = "exited"
|
|
||||||
Bus.publish(Event.Exited, { id, exitCode })
|
|
||||||
remove(id)
|
|
||||||
})
|
|
||||||
Bus.publish(Event.Created, { info })
|
Bus.publish(Event.Created, { info })
|
||||||
return info
|
return info
|
||||||
}
|
}
|
||||||
|
|||||||
250
packages/opencode/test/file/watcher.test.ts
Normal file
250
packages/opencode/test/file/watcher.test.ts
Normal file
@@ -0,0 +1,250 @@
|
|||||||
|
import { $ } from "bun"
|
||||||
|
import { afterEach, describe, expect, test } from "bun:test"
|
||||||
|
import fs from "fs/promises"
|
||||||
|
import path from "path"
|
||||||
|
import { ConfigProvider, Deferred, Effect, Fiber, Layer, ManagedRuntime, Option } from "effect"
|
||||||
|
import { tmpdir } from "../fixture/fixture"
|
||||||
|
import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
|
||||||
|
import { InstanceContext } from "../../src/effect/instances"
|
||||||
|
import { Instance } from "../../src/project/instance"
|
||||||
|
import { GlobalBus } from "../../src/bus/global"
|
||||||
|
|
||||||
|
// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
|
||||||
|
const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
const configLayer = ConfigProvider.layer(
|
||||||
|
ConfigProvider.fromUnknown({
|
||||||
|
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
|
||||||
|
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
|
||||||
|
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
|
||||||
|
|
||||||
|
/** Run `body` with a live FileWatcherService. Runtime is acquired/released via Effect.scoped. */
|
||||||
|
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
|
||||||
|
return Instance.provide({
|
||||||
|
directory,
|
||||||
|
fn: () =>
|
||||||
|
Effect.gen(function* () {
|
||||||
|
const ctx = Layer.sync(InstanceContext, () =>
|
||||||
|
InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
|
||||||
|
)
|
||||||
|
const layer = Layer.fresh(FileWatcherService.layer).pipe(Layer.provide(ctx), Layer.provide(configLayer))
|
||||||
|
const rt = yield* Effect.acquireRelease(
|
||||||
|
Effect.sync(() => ManagedRuntime.make(layer)),
|
||||||
|
(rt) => Effect.promise(() => rt.dispose()),
|
||||||
|
)
|
||||||
|
yield* Effect.promise(() => rt.runPromise(FileWatcherService.use((s) => s.init())))
|
||||||
|
yield* ready(directory)
|
||||||
|
yield* body
|
||||||
|
}).pipe(Effect.scoped, Effect.runPromise),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
|
||||||
|
let done = false
|
||||||
|
|
||||||
|
function on(evt: BusUpdate) {
|
||||||
|
if (done) return
|
||||||
|
if (evt.directory !== directory) return
|
||||||
|
if (evt.payload.type !== FileWatcher.Event.Updated.type) return
|
||||||
|
if (!check(evt.payload.properties)) return
|
||||||
|
hit(evt.payload.properties)
|
||||||
|
}
|
||||||
|
|
||||||
|
function cleanup() {
|
||||||
|
if (done) return
|
||||||
|
done = true
|
||||||
|
GlobalBus.off("event", on)
|
||||||
|
}
|
||||||
|
|
||||||
|
GlobalBus.on("event", on)
|
||||||
|
return cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
|
||||||
|
return Effect.callback<WatcherEvent>((resume) => {
|
||||||
|
const cleanup = listen(directory, check, (evt) => {
|
||||||
|
cleanup()
|
||||||
|
resume(Effect.succeed(evt))
|
||||||
|
})
|
||||||
|
return Effect.sync(cleanup)
|
||||||
|
}).pipe(Effect.timeout("5 seconds"))
|
||||||
|
}
|
||||||
|
|
||||||
|
function nextUpdate<E>(directory: string, check: (evt: WatcherEvent) => boolean, trigger: Effect.Effect<void, E>) {
|
||||||
|
return Effect.acquireUseRelease(
|
||||||
|
wait(directory, check).pipe(Effect.forkChild({ startImmediately: true })),
|
||||||
|
(fiber) =>
|
||||||
|
Effect.gen(function* () {
|
||||||
|
yield* trigger
|
||||||
|
return yield* Fiber.join(fiber)
|
||||||
|
}),
|
||||||
|
Fiber.interrupt,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Effect that asserts no matching event arrives within `ms`. */
|
||||||
|
function noUpdate<E>(
|
||||||
|
directory: string,
|
||||||
|
check: (evt: WatcherEvent) => boolean,
|
||||||
|
trigger: Effect.Effect<void, E>,
|
||||||
|
ms = 500,
|
||||||
|
) {
|
||||||
|
return Effect.gen(function* () {
|
||||||
|
const deferred = yield* Deferred.make<WatcherEvent>()
|
||||||
|
|
||||||
|
yield* Effect.acquireUseRelease(
|
||||||
|
Effect.sync(() =>
|
||||||
|
listen(directory, check, (evt) => {
|
||||||
|
Effect.runSync(Deferred.succeed(deferred, evt))
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
() =>
|
||||||
|
Effect.gen(function* () {
|
||||||
|
yield* trigger
|
||||||
|
expect(yield* Deferred.await(deferred).pipe(Effect.timeoutOption(`${ms} millis`))).toEqual(Option.none())
|
||||||
|
}),
|
||||||
|
(cleanup) => Effect.sync(cleanup),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
function ready(directory: string) {
|
||||||
|
const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`)
|
||||||
|
const head = path.join(directory, ".git", "HEAD")
|
||||||
|
|
||||||
|
return Effect.gen(function* () {
|
||||||
|
yield* nextUpdate(
|
||||||
|
directory,
|
||||||
|
(evt) => evt.file === file && evt.event === "add",
|
||||||
|
Effect.promise(() => fs.writeFile(file, "ready")),
|
||||||
|
).pipe(Effect.ensuring(Effect.promise(() => fs.rm(file, { force: true }).catch(() => undefined))), Effect.asVoid)
|
||||||
|
|
||||||
|
const git = yield* Effect.promise(() =>
|
||||||
|
fs
|
||||||
|
.stat(head)
|
||||||
|
.then(() => true)
|
||||||
|
.catch(() => false),
|
||||||
|
)
|
||||||
|
if (!git) return
|
||||||
|
|
||||||
|
const branch = `watch-${Math.random().toString(36).slice(2)}`
|
||||||
|
const hash = yield* Effect.promise(() => $`git rev-parse HEAD`.cwd(directory).quiet().text())
|
||||||
|
yield* nextUpdate(
|
||||||
|
directory,
|
||||||
|
(evt) => evt.file === head && evt.event !== "unlink",
|
||||||
|
Effect.promise(async () => {
|
||||||
|
await fs.writeFile(path.join(directory, ".git", "refs", "heads", branch), hash.trim() + "\n")
|
||||||
|
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
|
||||||
|
}),
|
||||||
|
).pipe(Effect.asVoid)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
describeWatcher("FileWatcherService", () => {
|
||||||
|
afterEach(() => Instance.disposeAll())
|
||||||
|
|
||||||
|
test("publishes root create, update, and delete events", async () => {
|
||||||
|
await using tmp = await tmpdir({ git: true })
|
||||||
|
const file = path.join(tmp.path, "watch.txt")
|
||||||
|
const dir = tmp.path
|
||||||
|
const cases = [
|
||||||
|
{ event: "add" as const, trigger: Effect.promise(() => fs.writeFile(file, "a")) },
|
||||||
|
{ event: "change" as const, trigger: Effect.promise(() => fs.writeFile(file, "b")) },
|
||||||
|
{ event: "unlink" as const, trigger: Effect.promise(() => fs.unlink(file)) },
|
||||||
|
]
|
||||||
|
|
||||||
|
await withWatcher(
|
||||||
|
dir,
|
||||||
|
Effect.forEach(cases, ({ event, trigger }) =>
|
||||||
|
nextUpdate(dir, (evt) => evt.file === file && evt.event === event, trigger).pipe(
|
||||||
|
Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event }))),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("watches non-git roots", async () => {
|
||||||
|
await using tmp = await tmpdir()
|
||||||
|
const file = path.join(tmp.path, "plain.txt")
|
||||||
|
const dir = tmp.path
|
||||||
|
|
||||||
|
await withWatcher(
|
||||||
|
dir,
|
||||||
|
nextUpdate(
|
||||||
|
dir,
|
||||||
|
(e) => e.file === file && e.event === "add",
|
||||||
|
Effect.promise(() => fs.writeFile(file, "plain")),
|
||||||
|
).pipe(Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event: "add" })))),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("cleanup stops publishing events", async () => {
|
||||||
|
await using tmp = await tmpdir({ git: true })
|
||||||
|
const file = path.join(tmp.path, "after-dispose.txt")
|
||||||
|
|
||||||
|
// Start and immediately stop the watcher (withWatcher disposes on exit)
|
||||||
|
await withWatcher(tmp.path, Effect.void)
|
||||||
|
|
||||||
|
// Now write a file — no watcher should be listening
|
||||||
|
await Effect.runPromise(
|
||||||
|
noUpdate(
|
||||||
|
tmp.path,
|
||||||
|
(e) => e.file === file,
|
||||||
|
Effect.promise(() => fs.writeFile(file, "gone")),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("ignores .git/index changes", async () => {
|
||||||
|
await using tmp = await tmpdir({ git: true })
|
||||||
|
const gitIndex = path.join(tmp.path, ".git", "index")
|
||||||
|
const edit = path.join(tmp.path, "tracked.txt")
|
||||||
|
|
||||||
|
await withWatcher(
|
||||||
|
tmp.path,
|
||||||
|
noUpdate(
|
||||||
|
tmp.path,
|
||||||
|
(e) => e.file === gitIndex,
|
||||||
|
Effect.promise(async () => {
|
||||||
|
await fs.writeFile(edit, "a")
|
||||||
|
await $`git add .`.cwd(tmp.path).quiet().nothrow()
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("publishes .git/HEAD events", async () => {
|
||||||
|
await using tmp = await tmpdir({ git: true })
|
||||||
|
const head = path.join(tmp.path, ".git", "HEAD")
|
||||||
|
const branch = `watch-${Math.random().toString(36).slice(2)}`
|
||||||
|
await $`git branch ${branch}`.cwd(tmp.path).quiet()
|
||||||
|
|
||||||
|
await withWatcher(
|
||||||
|
tmp.path,
|
||||||
|
nextUpdate(
|
||||||
|
tmp.path,
|
||||||
|
(evt) => evt.file === head && evt.event !== "unlink",
|
||||||
|
Effect.promise(() => fs.writeFile(head, `ref: refs/heads/${branch}\n`)),
|
||||||
|
).pipe(
|
||||||
|
Effect.tap((evt) =>
|
||||||
|
Effect.sync(() => {
|
||||||
|
expect(evt.file).toBe(head)
|
||||||
|
expect(["add", "change"]).toContain(evt.event)
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -977,7 +977,7 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
|
|||||||
await Instance.provide({
|
await Instance.provide({
|
||||||
directory: tmp.path,
|
directory: tmp.path,
|
||||||
fn: async () => {
|
fn: async () => {
|
||||||
const ask = PermissionNext.ask({
|
const err = await PermissionNext.ask({
|
||||||
sessionID: SessionID.make("session_test"),
|
sessionID: SessionID.make("session_test"),
|
||||||
permission: "bash",
|
permission: "bash",
|
||||||
patterns: ["echo hello", "rm -rf /"],
|
patterns: ["echo hello", "rm -rf /"],
|
||||||
@@ -987,24 +987,12 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
|
|||||||
{ permission: "bash", pattern: "echo *", action: "ask" },
|
{ permission: "bash", pattern: "echo *", action: "ask" },
|
||||||
{ permission: "bash", pattern: "rm *", action: "deny" },
|
{ permission: "bash", pattern: "rm *", action: "deny" },
|
||||||
],
|
],
|
||||||
})
|
}).then(
|
||||||
|
() => undefined,
|
||||||
|
(err) => err,
|
||||||
|
)
|
||||||
|
|
||||||
const out = await Promise.race([
|
expect(err).toBeInstanceOf(PermissionNext.DeniedError)
|
||||||
ask.then(
|
|
||||||
() => ({ ok: true as const, err: undefined }),
|
|
||||||
(err) => ({ ok: false as const, err }),
|
|
||||||
),
|
|
||||||
Bun.sleep(100).then(() => "timeout" as const),
|
|
||||||
])
|
|
||||||
|
|
||||||
if (out === "timeout") {
|
|
||||||
await rejectAll()
|
|
||||||
await ask.catch(() => {})
|
|
||||||
throw new Error("ask timed out instead of denying immediately")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(out.ok).toBe(false)
|
|
||||||
expect(out.err).toBeInstanceOf(PermissionNext.DeniedError)
|
|
||||||
expect(await PermissionNext.list()).toHaveLength(0)
|
expect(await PermissionNext.list()).toHaveLength(0)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import type { PtyID } from "../../src/pty/schema"
|
|||||||
import { tmpdir } from "../fixture/fixture"
|
import { tmpdir } from "../fixture/fixture"
|
||||||
import { setTimeout as sleep } from "node:timers/promises"
|
import { setTimeout as sleep } from "node:timers/promises"
|
||||||
|
|
||||||
const wait = async (fn: () => boolean, ms = 2000) => {
|
const wait = async (fn: () => boolean, ms = 5000) => {
|
||||||
const end = Date.now() + ms
|
const end = Date.now() + ms
|
||||||
while (Date.now() < end) {
|
while (Date.now() < end) {
|
||||||
if (fn()) return
|
if (fn()) return
|
||||||
@@ -20,7 +20,7 @@ const pick = (log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }>,
|
|||||||
}
|
}
|
||||||
|
|
||||||
describe("pty", () => {
|
describe("pty", () => {
|
||||||
test("publishes created, exited, deleted in order for /bin/ls + remove", async () => {
|
test("publishes created, exited, deleted in order for a short-lived process", async () => {
|
||||||
if (process.platform === "win32") return
|
if (process.platform === "win32") return
|
||||||
|
|
||||||
await using dir = await tmpdir({ git: true })
|
await using dir = await tmpdir({ git: true })
|
||||||
@@ -37,7 +37,11 @@ describe("pty", () => {
|
|||||||
|
|
||||||
let id: PtyID | undefined
|
let id: PtyID | undefined
|
||||||
try {
|
try {
|
||||||
const info = await Pty.create({ command: "/bin/ls", title: "ls" })
|
const info = await Pty.create({
|
||||||
|
command: "/usr/bin/env",
|
||||||
|
args: ["sh", "-c", "sleep 0.1"],
|
||||||
|
title: "sleep",
|
||||||
|
})
|
||||||
id = info.id
|
id = info.id
|
||||||
|
|
||||||
await wait(() => pick(log, id!).includes("exited"))
|
await wait(() => pick(log, id!).includes("exited"))
|
||||||
|
|||||||
Reference in New Issue
Block a user