refactor(snapshot): effectify SnapshotService (#17878)

This commit is contained in:
Kit Langton
2026-03-17 21:04:16 -04:00
committed by GitHub
parent fee3c196c5
commit 9e7c136de7
7 changed files with 834 additions and 675 deletions

View File

@@ -11,46 +11,52 @@ const seed = async () => {
const { Instance } = await import("../src/project/instance") const { Instance } = await import("../src/project/instance")
const { InstanceBootstrap } = await import("../src/project/bootstrap") const { InstanceBootstrap } = await import("../src/project/bootstrap")
const { Config } = await import("../src/config/config") const { Config } = await import("../src/config/config")
const { disposeRuntime } = await import("../src/effect/runtime")
const { Session } = await import("../src/session") const { Session } = await import("../src/session")
const { MessageID, PartID } = await import("../src/session/schema") const { MessageID, PartID } = await import("../src/session/schema")
const { Project } = await import("../src/project/project") const { Project } = await import("../src/project/project")
const { ModelID, ProviderID } = await import("../src/provider/schema") const { ModelID, ProviderID } = await import("../src/provider/schema")
const { ToolRegistry } = await import("../src/tool/registry") const { ToolRegistry } = await import("../src/tool/registry")
await Instance.provide({ try {
directory: dir, await Instance.provide({
init: InstanceBootstrap, directory: dir,
fn: async () => { init: InstanceBootstrap,
await Config.waitForDependencies() fn: async () => {
await ToolRegistry.ids() await Config.waitForDependencies()
await ToolRegistry.ids()
const session = await Session.create({ title }) const session = await Session.create({ title })
const messageID = MessageID.ascending() const messageID = MessageID.ascending()
const partID = PartID.ascending() const partID = PartID.ascending()
const message = { const message = {
id: messageID, id: messageID,
sessionID: session.id, sessionID: session.id,
role: "user" as const, role: "user" as const,
time: { created: now }, time: { created: now },
agent: "build", agent: "build",
model: { model: {
providerID: ProviderID.make(providerID), providerID: ProviderID.make(providerID),
modelID: ModelID.make(modelID), modelID: ModelID.make(modelID),
}, },
} }
const part = { const part = {
id: partID, id: partID,
sessionID: session.id, sessionID: session.id,
messageID, messageID,
type: "text" as const, type: "text" as const,
text, text,
time: { start: now }, time: { start: now },
} }
await Session.updateMessage(message) await Session.updateMessage(message)
await Session.updatePart(part) await Session.updatePart(part)
await Project.update({ projectID: Instance.project.id, name: "E2E Project" }) await Project.update({ projectID: Instance.project.id, name: "E2E Project" })
}, },
}) })
} finally {
await Instance.disposeAll().catch(() => {})
await disposeRuntime().catch(() => {})
}
} }
await seed() await seed()

View File

@@ -1,13 +1,15 @@
import { ServiceMap } from "effect" import { ServiceMap } from "effect";
import type { Project } from "@/project/project" import type { Project } from "@/project/project";
export declare namespace InstanceContext { export declare namespace InstanceContext {
export interface Shape { export interface Shape {
readonly directory: string readonly directory: string;
readonly project: Project.Info readonly worktree: string;
} readonly project: Project.Info;
}
} }
export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()( export class InstanceContext extends ServiceMap.Service<
"opencode/InstanceContext", InstanceContext,
) {} InstanceContext.Shape
>()("opencode/InstanceContext") {}

View File

@@ -1,64 +1,83 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect" import { Effect, Layer, LayerMap, ServiceMap } from "effect";
import { registerDisposer } from "./instance-registry" import { FileService } from "@/file";
import { InstanceContext } from "./instance-context" import { FileTimeService } from "@/file/time";
import { ProviderAuthService } from "@/provider/auth-service" import { FileWatcherService } from "@/file/watcher";
import { QuestionService } from "@/question/service" import { FormatService } from "@/format";
import { PermissionService } from "@/permission/service" import { PermissionService } from "@/permission/service";
import { FileWatcherService } from "@/file/watcher" import { Instance } from "@/project/instance";
import { VcsService } from "@/project/vcs" import { VcsService } from "@/project/vcs";
import { FileTimeService } from "@/file/time" import { ProviderAuthService } from "@/provider/auth-service";
import { FormatService } from "@/format" import { QuestionService } from "@/question/service";
import { FileService } from "@/file" import { SkillService } from "@/skill/skill";
import { SkillService } from "@/skill/skill" import { SnapshotService } from "@/snapshot";
import { Instance } from "@/project/instance" import { InstanceContext } from "./instance-context";
import { registerDisposer } from "./instance-registry";
export { InstanceContext } from "./instance-context" export { InstanceContext } from "./instance-context";
export type InstanceServices = export type InstanceServices =
| QuestionService | QuestionService
| PermissionService | PermissionService
| ProviderAuthService | ProviderAuthService
| FileWatcherService | FileWatcherService
| VcsService | VcsService
| FileTimeService | FileTimeService
| FormatService | FormatService
| FileService | FileService
| SkillService | SkillService
| SnapshotService;
function lookup(directory: string) { // NOTE: LayerMap only passes the key (directory string) to lookup, but we need
const project = Instance.project // the full instance context (directory, worktree, project). We read from the
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of({ directory, project })) // legacy Instance ALS here, which is safe because lookup is only triggered via
return Layer.mergeAll( // runPromiseInstance -> Instances.get, which always runs inside Instance.provide.
Layer.fresh(QuestionService.layer), // This should go away once the old Instance type is removed and lookup can load
Layer.fresh(PermissionService.layer), // the full context directly.
Layer.fresh(ProviderAuthService.layer), function lookup(_key: string) {
Layer.fresh(FileWatcherService.layer).pipe(Layer.orDie), const ctx = Layer.sync(InstanceContext, () =>
Layer.fresh(VcsService.layer), InstanceContext.of(Instance.current),
Layer.fresh(FileTimeService.layer).pipe(Layer.orDie), );
Layer.fresh(FormatService.layer), return Layer.mergeAll(
Layer.fresh(FileService.layer), Layer.fresh(QuestionService.layer),
Layer.fresh(SkillService.layer), Layer.fresh(PermissionService.layer),
).pipe(Layer.provide(ctx)) Layer.fresh(ProviderAuthService.layer),
Layer.fresh(FileWatcherService.layer).pipe(Layer.orDie),
Layer.fresh(VcsService.layer),
Layer.fresh(FileTimeService.layer).pipe(Layer.orDie),
Layer.fresh(FormatService.layer),
Layer.fresh(FileService.layer),
Layer.fresh(SkillService.layer),
Layer.fresh(SnapshotService.layer),
).pipe(Layer.provide(ctx));
} }
export class Instances extends ServiceMap.Service<Instances, LayerMap.LayerMap<string, InstanceServices>>()( export class Instances extends ServiceMap.Service<
"opencode/Instances", Instances,
) { LayerMap.LayerMap<string, InstanceServices>
static readonly layer = Layer.effect( >()("opencode/Instances") {
Instances, static readonly layer = Layer.effect(
Effect.gen(function* () { Instances,
const layerMap = yield* LayerMap.make(lookup, { idleTimeToLive: Infinity }) Effect.gen(function* () {
const unregister = registerDisposer((directory) => Effect.runPromise(layerMap.invalidate(directory))) const layerMap = yield* LayerMap.make(lookup, {
yield* Effect.addFinalizer(() => Effect.sync(unregister)) idleTimeToLive: Infinity,
return Instances.of(layerMap) });
}), const unregister = registerDisposer((directory) =>
) Effect.runPromise(layerMap.invalidate(directory)),
);
yield* Effect.addFinalizer(() => Effect.sync(unregister));
return Instances.of(layerMap);
}),
);
static get(directory: string): Layer.Layer<InstanceServices, never, Instances> { static get(
return Layer.unwrap(Instances.use((map) => Effect.succeed(map.get(directory)))) directory: string,
} ): Layer.Layer<InstanceServices, never, Instances> {
return Layer.unwrap(
Instances.use((map) => Effect.succeed(map.get(directory))),
);
}
static invalidate(directory: string): Effect.Effect<void, never, Instances> { static invalidate(directory: string): Effect.Effect<void, never, Instances> {
return Instances.use((map) => map.invalidate(directory)) return Instances.use((map) => map.invalidate(directory));
} }
} }

View File

@@ -12,3 +12,7 @@ export const runtime = ManagedRuntime.make(
export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) { export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
} }
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -1,163 +1,185 @@
import { Log } from "@/util/log" import { GlobalBus } from "@/bus/global";
import { Context } from "../util/context" import { disposeInstance } from "@/effect/instance-registry";
import { Project } from "./project" import { Filesystem } from "@/util/filesystem";
import { State } from "./state" import { iife } from "@/util/iife";
import { iife } from "@/util/iife" import { Log } from "@/util/log";
import { GlobalBus } from "@/bus/global" import { Context } from "../util/context";
import { Filesystem } from "@/util/filesystem" import { Project } from "./project";
import { disposeInstance } from "@/effect/instance-registry" import { State } from "./state";
interface Context { interface Context {
directory: string directory: string;
worktree: string worktree: string;
project: Project.Info project: Project.Info;
} }
const context = Context.create<Context>("instance") const context = Context.create<Context>("instance");
const cache = new Map<string, Promise<Context>>() const cache = new Map<string, Promise<Context>>();
const disposal = { const disposal = {
all: undefined as Promise<void> | undefined, all: undefined as Promise<void> | undefined,
} };
function emit(directory: string) { function emit(directory: string) {
GlobalBus.emit("event", { GlobalBus.emit("event", {
directory, directory,
payload: { payload: {
type: "server.instance.disposed", type: "server.instance.disposed",
properties: { properties: {
directory, directory,
}, },
}, },
}) });
} }
function boot(input: { directory: string; init?: () => Promise<any>; project?: Project.Info; worktree?: string }) { function boot(input: {
return iife(async () => { directory: string;
const ctx = init?: () => Promise<any>;
input.project && input.worktree project?: Project.Info;
? { worktree?: string;
directory: input.directory, }) {
worktree: input.worktree, return iife(async () => {
project: input.project, const ctx =
} input.project && input.worktree
: await Project.fromDirectory(input.directory).then(({ project, sandbox }) => ({ ? {
directory: input.directory, directory: input.directory,
worktree: sandbox, worktree: input.worktree,
project, project: input.project,
})) }
await context.provide(ctx, async () => { : await Project.fromDirectory(input.directory).then(
await input.init?.() ({ project, sandbox }) => ({
}) directory: input.directory,
return ctx worktree: sandbox,
}) project,
}),
);
await context.provide(ctx, async () => {
await input.init?.();
});
return ctx;
});
} }
function track(directory: string, next: Promise<Context>) { function track(directory: string, next: Promise<Context>) {
const task = next.catch((error) => { const task = next.catch((error) => {
if (cache.get(directory) === task) cache.delete(directory) if (cache.get(directory) === task) cache.delete(directory);
throw error throw error;
}) });
cache.set(directory, task) cache.set(directory, task);
return task return task;
} }
export const Instance = { export const Instance = {
async provide<R>(input: { directory: string; init?: () => Promise<any>; fn: () => R }): Promise<R> { async provide<R>(input: {
const directory = Filesystem.resolve(input.directory) directory: string;
let existing = cache.get(directory) init?: () => Promise<any>;
if (!existing) { fn: () => R;
Log.Default.info("creating instance", { directory }) }): Promise<R> {
existing = track( const directory = Filesystem.resolve(input.directory);
directory, let existing = cache.get(directory);
boot({ if (!existing) {
directory, Log.Default.info("creating instance", { directory });
init: input.init, existing = track(
}), directory,
) boot({
} directory,
const ctx = await existing init: input.init,
return context.provide(ctx, async () => { }),
return input.fn() );
}) }
}, const ctx = await existing;
get directory() { return context.provide(ctx, async () => {
return context.use().directory return input.fn();
}, });
get worktree() { },
return context.use().worktree get current() {
}, return context.use();
get project() { },
return context.use().project get directory() {
}, return context.use().directory;
/** },
* Check if a path is within the project boundary. get worktree() {
* Returns true if path is inside Instance.directory OR Instance.worktree. return context.use().worktree;
* Paths within the worktree but outside the working directory should not trigger external_directory permission. },
*/ get project() {
containsPath(filepath: string) { return context.use().project;
if (Filesystem.contains(Instance.directory, filepath)) return true },
// Non-git projects set worktree to "/" which would match ANY absolute path. /**
// Skip worktree check in this case to preserve external_directory permissions. * Check if a path is within the project boundary.
if (Instance.worktree === "/") return false * Returns true if path is inside Instance.directory OR Instance.worktree.
return Filesystem.contains(Instance.worktree, filepath) * Paths within the worktree but outside the working directory should not trigger external_directory permission.
}, */
/** containsPath(filepath: string) {
* Captures the current instance ALS context and returns a wrapper that if (Filesystem.contains(Instance.directory, filepath)) return true;
* restores it when called. Use this for callbacks that fire outside the // Non-git projects set worktree to "/" which would match ANY absolute path.
* instance async context (native addons, event emitters, timers, etc.). // Skip worktree check in this case to preserve external_directory permissions.
*/ if (Instance.worktree === "/") return false;
bind<F extends (...args: any[]) => any>(fn: F): F { return Filesystem.contains(Instance.worktree, filepath);
const ctx = context.use() },
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F /**
}, * Captures the current instance ALS context and returns a wrapper that
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S { * restores it when called. Use this for callbacks that fire outside the
return State.create(() => Instance.directory, init, dispose) * instance async context (native addons, event emitters, timers, etc.).
}, */
async reload(input: { directory: string; init?: () => Promise<any>; project?: Project.Info; worktree?: string }) { bind<F extends (...args: any[]) => any>(fn: F): F {
const directory = Filesystem.resolve(input.directory) const ctx = context.use();
Log.Default.info("reloading instance", { directory }) return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F;
await Promise.all([State.dispose(directory), disposeInstance(directory)]) },
cache.delete(directory) state<S>(
const next = track(directory, boot({ ...input, directory })) init: () => S,
emit(directory) dispose?: (state: Awaited<S>) => Promise<void>,
return await next ): () => S {
}, return State.create(() => Instance.directory, init, dispose);
async dispose() { },
const directory = Instance.directory async reload(input: {
Log.Default.info("disposing instance", { directory }) directory: string;
await Promise.all([State.dispose(directory), disposeInstance(directory)]) init?: () => Promise<any>;
cache.delete(directory) project?: Project.Info;
emit(directory) worktree?: string;
}, }) {
async disposeAll() { const directory = Filesystem.resolve(input.directory);
if (disposal.all) return disposal.all Log.Default.info("reloading instance", { directory });
await Promise.all([State.dispose(directory), disposeInstance(directory)]);
cache.delete(directory);
const next = track(directory, boot({ ...input, directory }));
emit(directory);
return await next;
},
async dispose() {
const directory = Instance.directory;
Log.Default.info("disposing instance", { directory });
await Promise.all([State.dispose(directory), disposeInstance(directory)]);
cache.delete(directory);
emit(directory);
},
async disposeAll() {
if (disposal.all) return disposal.all;
disposal.all = iife(async () => { disposal.all = iife(async () => {
Log.Default.info("disposing all instances") Log.Default.info("disposing all instances");
const entries = [...cache.entries()] const entries = [...cache.entries()];
for (const [key, value] of entries) { for (const [key, value] of entries) {
if (cache.get(key) !== value) continue if (cache.get(key) !== value) continue;
const ctx = await value.catch((error) => { const ctx = await value.catch((error) => {
Log.Default.warn("instance dispose failed", { key, error }) Log.Default.warn("instance dispose failed", { key, error });
return undefined return undefined;
}) });
if (!ctx) { if (!ctx) {
if (cache.get(key) === value) cache.delete(key) if (cache.get(key) === value) cache.delete(key);
continue continue;
} }
if (cache.get(key) !== value) continue if (cache.get(key) !== value) continue;
await context.provide(ctx, async () => { await context.provide(ctx, async () => {
await Instance.dispose() await Instance.dispose();
}) });
} }
}).finally(() => { }).finally(() => {
disposal.all = undefined disposal.all = undefined;
}) });
return disposal.all return disposal.all;
}, },
} };

View File

@@ -1,416 +1,516 @@
import path from "path" import {
import fs from "fs/promises" NodeChildProcessSpawner,
import { Filesystem } from "../util/filesystem" NodeFileSystem,
import { Log } from "../util/log" NodePath,
import { Flag } from "../flag/flag" } from "@effect/platform-node";
import { Global } from "../global" import {
import z from "zod" Cause,
import { Config } from "../config/config" Duration,
import { Instance } from "../project/instance" Effect,
import { Scheduler } from "../scheduler" FileSystem,
import { Process } from "@/util/process" Layer,
Schedule,
ServiceMap,
Stream,
} from "effect";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
import path from "path";
import z from "zod";
import { InstanceContext } from "@/effect/instance-context";
import { runPromiseInstance } from "@/effect/runtime";
import { Config } from "../config/config";
import { Global } from "../global";
import { Log } from "../util/log";
const log = Log.create({ service: "snapshot" });
const PRUNE = "7.days";
// Common git config flags shared across snapshot operations
const GIT_CORE = ["-c", "core.longpaths=true", "-c", "core.symlinks=true"];
const GIT_CFG = ["-c", "core.autocrlf=false", ...GIT_CORE];
const GIT_CFG_QUOTE = [...GIT_CFG, "-c", "core.quotepath=false"];
interface GitResult {
readonly code: ChildProcessSpawner.ExitCode;
readonly text: string;
readonly stderr: string;
}
export namespace Snapshot { export namespace Snapshot {
const log = Log.create({ service: "snapshot" }) export const Patch = z.object({
const hour = 60 * 60 * 1000 hash: z.string(),
const prune = "7.days" files: z.string().array(),
});
export type Patch = z.infer<typeof Patch>;
function args(git: string, cmd: string[]) { export const FileDiff = z
return ["--git-dir", git, "--work-tree", Instance.worktree, ...cmd] .object({
} file: z.string(),
before: z.string(),
after: z.string(),
additions: z.number(),
deletions: z.number(),
status: z.enum(["added", "deleted", "modified"]).optional(),
})
.meta({
ref: "FileDiff",
});
export type FileDiff = z.infer<typeof FileDiff>;
export function init() { // Promise facade — existing callers use these
Scheduler.register({ export function init() {
id: "snapshot.cleanup", void runPromiseInstance(SnapshotService.use((s) => s.init()));
interval: hour, }
run: cleanup,
scope: "instance",
})
}
export async function cleanup() { export async function cleanup() {
if (Instance.project.vcs !== "git") return return runPromiseInstance(SnapshotService.use((s) => s.cleanup()));
const cfg = await Config.get() }
if (cfg.snapshot === false) return
const git = gitdir()
const exists = await fs
.stat(git)
.then(() => true)
.catch(() => false)
if (!exists) return
const result = await Process.run(["git", ...args(git, ["gc", `--prune=${prune}`])], {
cwd: Instance.directory,
nothrow: true,
})
if (result.code !== 0) {
log.warn("cleanup failed", {
exitCode: result.code,
stderr: result.stderr.toString(),
stdout: result.stdout.toString(),
})
return
}
log.info("cleanup", { prune })
}
export async function track() { export async function track() {
if (Instance.project.vcs !== "git") return return runPromiseInstance(SnapshotService.use((s) => s.track()));
const cfg = await Config.get() }
if (cfg.snapshot === false) return
const git = gitdir()
if (await fs.mkdir(git, { recursive: true })) {
await Process.run(["git", "init"], {
env: {
...process.env,
GIT_DIR: git,
GIT_WORK_TREE: Instance.worktree,
},
nothrow: true,
})
// Configure git to not convert line endings on Windows export async function patch(hash: string) {
await Process.run(["git", "--git-dir", git, "config", "core.autocrlf", "false"], { nothrow: true }) return runPromiseInstance(SnapshotService.use((s) => s.patch(hash)));
await Process.run(["git", "--git-dir", git, "config", "core.longpaths", "true"], { nothrow: true }) }
await Process.run(["git", "--git-dir", git, "config", "core.symlinks", "true"], { nothrow: true })
await Process.run(["git", "--git-dir", git, "config", "core.fsmonitor", "false"], { nothrow: true })
log.info("initialized")
}
await add(git)
const hash = await Process.text(["git", ...args(git, ["write-tree"])], {
cwd: Instance.directory,
nothrow: true,
}).then((x) => x.text)
log.info("tracking", { hash, cwd: Instance.directory, git })
return hash.trim()
}
export const Patch = z.object({ export async function restore(snapshot: string) {
hash: z.string(), return runPromiseInstance(SnapshotService.use((s) => s.restore(snapshot)));
files: z.string().array(), }
})
export type Patch = z.infer<typeof Patch>
export async function patch(hash: string): Promise<Patch> { export async function revert(patches: Patch[]) {
const git = gitdir() return runPromiseInstance(SnapshotService.use((s) => s.revert(patches)));
await add(git) }
const result = await Process.text(
[
"git",
"-c",
"core.autocrlf=false",
"-c",
"core.longpaths=true",
"-c",
"core.symlinks=true",
"-c",
"core.quotepath=false",
...args(git, ["diff", "--no-ext-diff", "--name-only", hash, "--", "."]),
],
{
cwd: Instance.directory,
nothrow: true,
},
)
// If git diff fails, return empty patch export async function diff(hash: string) {
if (result.code !== 0) { return runPromiseInstance(SnapshotService.use((s) => s.diff(hash)));
log.warn("failed to get diff", { hash, exitCode: result.code }) }
return { hash, files: [] }
}
const files = result.text export async function diffFull(from: string, to: string) {
return { return runPromiseInstance(SnapshotService.use((s) => s.diffFull(from, to)));
hash, }
files: files }
.trim()
.split("\n") export namespace SnapshotService {
.map((x) => x.trim()) export interface Service {
.filter(Boolean) readonly init: () => Effect.Effect<void>;
.map((x) => path.join(Instance.worktree, x).replaceAll("\\", "/")), readonly cleanup: () => Effect.Effect<void>;
} readonly track: () => Effect.Effect<string | undefined>;
} readonly patch: (hash: string) => Effect.Effect<Snapshot.Patch>;
readonly restore: (snapshot: string) => Effect.Effect<void>;
export async function restore(snapshot: string) { readonly revert: (patches: Snapshot.Patch[]) => Effect.Effect<void>;
log.info("restore", { commit: snapshot }) readonly diff: (hash: string) => Effect.Effect<string>;
const git = gitdir() readonly diffFull: (
const result = await Process.run( from: string,
["git", "-c", "core.longpaths=true", "-c", "core.symlinks=true", ...args(git, ["read-tree", snapshot])], to: string,
{ ) => Effect.Effect<Snapshot.FileDiff[]>;
cwd: Instance.worktree, }
nothrow: true, }
},
) export class SnapshotService extends ServiceMap.Service<
if (result.code === 0) { SnapshotService,
const checkout = await Process.run( SnapshotService.Service
["git", "-c", "core.longpaths=true", "-c", "core.symlinks=true", ...args(git, ["checkout-index", "-a", "-f"])], >()("@opencode/Snapshot") {
{ static readonly layer = Layer.effect(
cwd: Instance.worktree, SnapshotService,
nothrow: true, Effect.gen(function* () {
}, const ctx = yield* InstanceContext;
) const fileSystem = yield* FileSystem.FileSystem;
if (checkout.code === 0) return const spawner = yield* ChildProcessSpawner.ChildProcessSpawner;
log.error("failed to restore snapshot", { const { directory, worktree, project } = ctx;
snapshot, const isGit = project.vcs === "git";
exitCode: checkout.code, const snapshotGit = path.join(Global.Path.data, "snapshot", project.id);
stderr: checkout.stderr.toString(),
stdout: checkout.stdout.toString(), const gitArgs = (cmd: string[]) => [
}) "--git-dir",
return snapshotGit,
} "--work-tree",
worktree,
log.error("failed to restore snapshot", { ...cmd,
snapshot, ];
exitCode: result.code,
stderr: result.stderr.toString(), // Run git with nothrow semantics — always returns a result, never fails
stdout: result.stdout.toString(), const git = (
}) args: string[],
} opts?: { cwd?: string; env?: Record<string, string> },
): Effect.Effect<GitResult> =>
export async function revert(patches: Patch[]) { Effect.gen(function* () {
const files = new Set<string>() const command = ChildProcess.make("git", args, {
const git = gitdir() cwd: opts?.cwd,
for (const item of patches) { env: opts?.env,
for (const file of item.files) { extendEnv: true,
if (files.has(file)) continue });
log.info("reverting", { file, hash: item.hash }) const handle = yield* spawner.spawn(command);
const result = await Process.run( const [text, stderr] = yield* Effect.all(
[ [
"git", Stream.mkString(Stream.decodeText(handle.stdout)),
"-c", Stream.mkString(Stream.decodeText(handle.stderr)),
"core.longpaths=true", ],
"-c", { concurrency: 2 },
"core.symlinks=true", );
...args(git, ["checkout", item.hash, "--", file]), const code = yield* handle.exitCode;
], return { code, text, stderr };
{ }).pipe(
cwd: Instance.worktree, Effect.scoped,
nothrow: true, Effect.catch((err) =>
}, Effect.succeed({
) code: ChildProcessSpawner.ExitCode(1),
if (result.code !== 0) { text: "",
const relativePath = path.relative(Instance.worktree, file) stderr: String(err),
const checkTree = await Process.text( }),
[ ),
"git", );
"-c",
"core.longpaths=true", // FileSystem helpers — orDie converts PlatformError to defects
"-c", const exists = (p: string) => fileSystem.exists(p).pipe(Effect.orDie);
"core.symlinks=true", const mkdir = (p: string) =>
...args(git, ["ls-tree", item.hash, "--", relativePath]), fileSystem.makeDirectory(p, { recursive: true }).pipe(Effect.orDie);
], const writeFile = (p: string, content: string) =>
{ fileSystem.writeFileString(p, content).pipe(Effect.orDie);
cwd: Instance.worktree, const readFile = (p: string) =>
nothrow: true, fileSystem
}, .readFileString(p)
) .pipe(Effect.catch(() => Effect.succeed("")));
if (checkTree.code === 0 && checkTree.text.trim()) { const removeFile = (p: string) =>
log.info("file existed in snapshot but checkout failed, keeping", { fileSystem.remove(p).pipe(Effect.catch(() => Effect.void));
file,
}) // --- internal Effect helpers ---
} else {
log.info("file did not exist in snapshot, deleting", { file }) const isEnabled = Effect.gen(function* () {
await fs.unlink(file).catch(() => {}) if (!isGit) return false;
} const cfg = yield* Effect.promise(() => Config.get());
} return cfg.snapshot !== false;
files.add(file) });
}
} const excludesPath = Effect.gen(function* () {
} const result = yield* git(
["rev-parse", "--path-format=absolute", "--git-path", "info/exclude"],
export async function diff(hash: string) { {
const git = gitdir() cwd: worktree,
await add(git) },
const result = await Process.text( );
[ const file = result.text.trim();
"git", if (!file) return undefined;
"-c", if (!(yield* exists(file))) return undefined;
"core.autocrlf=false", return file;
"-c", });
"core.longpaths=true",
"-c", const syncExclude = Effect.gen(function* () {
"core.symlinks=true", const file = yield* excludesPath;
"-c", const target = path.join(snapshotGit, "info", "exclude");
"core.quotepath=false", yield* mkdir(path.join(snapshotGit, "info"));
...args(git, ["diff", "--no-ext-diff", hash, "--", "."]), if (!file) {
], yield* writeFile(target, "");
{ return;
cwd: Instance.worktree, }
nothrow: true, const text = yield* readFile(file);
}, yield* writeFile(target, text);
) });
if (result.code !== 0) { const add = Effect.gen(function* () {
log.warn("failed to get diff", { yield* syncExclude;
hash, yield* git([...GIT_CFG, ...gitArgs(["add", "."])], { cwd: directory });
exitCode: result.code, });
stderr: result.stderr.toString(),
stdout: result.stdout.toString(), // --- service methods ---
})
return "" const cleanup = Effect.fn("SnapshotService.cleanup")(function* () {
} if (!(yield* isEnabled)) return;
if (!(yield* exists(snapshotGit))) return;
return result.text.trim() const result = yield* git(gitArgs(["gc", `--prune=${PRUNE}`]), {
} cwd: directory,
});
export const FileDiff = z if (result.code !== 0) {
.object({ log.warn("cleanup failed", {
file: z.string(), exitCode: result.code,
before: z.string(), stderr: result.stderr,
after: z.string(), });
additions: z.number(), return;
deletions: z.number(), }
status: z.enum(["added", "deleted", "modified"]).optional(), log.info("cleanup", { prune: PRUNE });
}) });
.meta({
ref: "FileDiff", const track = Effect.fn("SnapshotService.track")(function* () {
}) if (!(yield* isEnabled)) return undefined;
export type FileDiff = z.infer<typeof FileDiff> const existed = yield* exists(snapshotGit);
export async function diffFull(from: string, to: string): Promise<FileDiff[]> { yield* mkdir(snapshotGit);
const git = gitdir() if (!existed) {
const result: FileDiff[] = [] yield* git(["init"], {
const status = new Map<string, "added" | "deleted" | "modified">() env: { GIT_DIR: snapshotGit, GIT_WORK_TREE: worktree },
});
const statuses = await Process.text( yield* git([
[ "--git-dir",
"git", snapshotGit,
"-c", "config",
"core.autocrlf=false", "core.autocrlf",
"-c", "false",
"core.longpaths=true", ]);
"-c", yield* git([
"core.symlinks=true", "--git-dir",
"-c", snapshotGit,
"core.quotepath=false", "config",
...args(git, ["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."]), "core.longpaths",
], "true",
{ ]);
cwd: Instance.directory, yield* git([
nothrow: true, "--git-dir",
}, snapshotGit,
).then((x) => x.text) "config",
"core.symlinks",
for (const line of statuses.trim().split("\n")) { "true",
if (!line) continue ]);
const [code, file] = line.split("\t") yield* git([
if (!code || !file) continue "--git-dir",
const kind = code.startsWith("A") ? "added" : code.startsWith("D") ? "deleted" : "modified" snapshotGit,
status.set(file, kind) "config",
} "core.fsmonitor",
"false",
for (const line of await Process.lines( ]);
[ log.info("initialized");
"git", }
"-c", yield* add;
"core.autocrlf=false", const result = yield* git(gitArgs(["write-tree"]), { cwd: directory });
"-c", const hash = result.text.trim();
"core.longpaths=true", log.info("tracking", { hash, cwd: directory, git: snapshotGit });
"-c", return hash;
"core.symlinks=true", });
"-c",
"core.quotepath=false", const patch = Effect.fn("SnapshotService.patch")(function* (
...args(git, ["diff", "--no-ext-diff", "--no-renames", "--numstat", from, to, "--", "."]), hash: string,
], ) {
{ yield* add;
cwd: Instance.directory, const result = yield* git(
nothrow: true, [
}, ...GIT_CFG_QUOTE,
)) { ...gitArgs([
if (!line) continue "diff",
const [additions, deletions, file] = line.split("\t") "--no-ext-diff",
const isBinaryFile = additions === "-" && deletions === "-" "--name-only",
const before = isBinaryFile hash,
? "" "--",
: await Process.text( ".",
[ ]),
"git", ],
"-c", { cwd: directory },
"core.autocrlf=false", );
"-c",
"core.longpaths=true", if (result.code !== 0) {
"-c", log.warn("failed to get diff", { hash, exitCode: result.code });
"core.symlinks=true", return { hash, files: [] } as Snapshot.Patch;
...args(git, ["show", `${from}:${file}`]), }
],
{ nothrow: true }, return {
).then((x) => x.text) hash,
const after = isBinaryFile files: result.text
? "" .trim()
: await Process.text( .split("\n")
[ .map((x: string) => x.trim())
"git", .filter(Boolean)
"-c", .map((x: string) => path.join(worktree, x).replaceAll("\\", "/")),
"core.autocrlf=false", } as Snapshot.Patch;
"-c", });
"core.longpaths=true",
"-c", const restore = Effect.fn("SnapshotService.restore")(function* (
"core.symlinks=true", snapshot: string,
...args(git, ["show", `${to}:${file}`]), ) {
], log.info("restore", { commit: snapshot });
{ nothrow: true }, const result = yield* git(
).then((x) => x.text) [...GIT_CORE, ...gitArgs(["read-tree", snapshot])],
const added = isBinaryFile ? 0 : parseInt(additions) { cwd: worktree },
const deleted = isBinaryFile ? 0 : parseInt(deletions) );
result.push({ if (result.code === 0) {
file, const checkout = yield* git(
before, [...GIT_CORE, ...gitArgs(["checkout-index", "-a", "-f"])],
after, { cwd: worktree },
additions: Number.isFinite(added) ? added : 0, );
deletions: Number.isFinite(deleted) ? deleted : 0, if (checkout.code === 0) return;
status: status.get(file) ?? "modified", log.error("failed to restore snapshot", {
}) snapshot,
} exitCode: checkout.code,
return result stderr: checkout.stderr,
} });
return;
function gitdir() { }
const project = Instance.project log.error("failed to restore snapshot", {
return path.join(Global.Path.data, "snapshot", project.id) snapshot,
} exitCode: result.code,
stderr: result.stderr,
async function add(git: string) { });
await syncExclude(git) });
await Process.run(
[ const revert = Effect.fn("SnapshotService.revert")(function* (
"git", patches: Snapshot.Patch[],
"-c", ) {
"core.autocrlf=false", const seen = new Set<string>();
"-c", for (const item of patches) {
"core.longpaths=true", for (const file of item.files) {
"-c", if (seen.has(file)) continue;
"core.symlinks=true", log.info("reverting", { file, hash: item.hash });
...args(git, ["add", "."]), const result = yield* git(
], [...GIT_CORE, ...gitArgs(["checkout", item.hash, "--", file])],
{ {
cwd: Instance.directory, cwd: worktree,
nothrow: true, },
}, );
) if (result.code !== 0) {
} const relativePath = path.relative(worktree, file);
const checkTree = yield* git(
async function syncExclude(git: string) { [
const file = await excludes() ...GIT_CORE,
const target = path.join(git, "info", "exclude") ...gitArgs(["ls-tree", item.hash, "--", relativePath]),
await fs.mkdir(path.join(git, "info"), { recursive: true }) ],
if (!file) { {
await Filesystem.write(target, "") cwd: worktree,
return },
} );
const text = await Filesystem.readText(file).catch(() => "") if (checkTree.code === 0 && checkTree.text.trim()) {
log.info(
await Filesystem.write(target, text) "file existed in snapshot but checkout failed, keeping",
} { file },
);
async function excludes() { } else {
const file = await Process.text(["git", "rev-parse", "--path-format=absolute", "--git-path", "info/exclude"], { log.info("file did not exist in snapshot, deleting", { file });
cwd: Instance.worktree, yield* removeFile(file);
nothrow: true, }
}).then((x) => x.text) }
if (!file.trim()) return seen.add(file);
const exists = await fs }
.stat(file.trim()) }
.then(() => true) });
.catch(() => false)
if (!exists) return const diff = Effect.fn("SnapshotService.diff")(function* (hash: string) {
return file.trim() yield* add;
} const result = yield* git(
[
...GIT_CFG_QUOTE,
...gitArgs(["diff", "--no-ext-diff", hash, "--", "."]),
],
{
cwd: worktree,
},
);
if (result.code !== 0) {
log.warn("failed to get diff", {
hash,
exitCode: result.code,
stderr: result.stderr,
});
return "";
}
return result.text.trim();
});
const diffFull = Effect.fn("SnapshotService.diffFull")(function* (
from: string,
to: string,
) {
const result: Snapshot.FileDiff[] = [];
const status = new Map<string, "added" | "deleted" | "modified">();
const statuses = yield* git(
[
...GIT_CFG_QUOTE,
...gitArgs([
"diff",
"--no-ext-diff",
"--name-status",
"--no-renames",
from,
to,
"--",
".",
]),
],
{ cwd: directory },
);
for (const line of statuses.text.trim().split("\n")) {
if (!line) continue;
const [code, file] = line.split("\t");
if (!code || !file) continue;
const kind = code.startsWith("A")
? "added"
: code.startsWith("D")
? "deleted"
: "modified";
status.set(file, kind);
}
const numstat = yield* git(
[
...GIT_CFG_QUOTE,
...gitArgs([
"diff",
"--no-ext-diff",
"--no-renames",
"--numstat",
from,
to,
"--",
".",
]),
],
{ cwd: directory },
);
for (const line of numstat.text.trim().split("\n")) {
if (!line) continue;
const [additions, deletions, file] = line.split("\t");
const isBinaryFile = additions === "-" && deletions === "-";
const [before, after] = isBinaryFile
? ["", ""]
: yield* Effect.all(
[
git([
...GIT_CFG,
...gitArgs(["show", `${from}:${file}`]),
]).pipe(Effect.map((r) => r.text)),
git([...GIT_CFG, ...gitArgs(["show", `${to}:${file}`])]).pipe(
Effect.map((r) => r.text),
),
],
{ concurrency: 2 },
);
const added = isBinaryFile ? 0 : parseInt(additions!);
const deleted = isBinaryFile ? 0 : parseInt(deletions!);
result.push({
file: file!,
before,
after,
additions: Number.isFinite(added) ? added : 0,
deletions: Number.isFinite(deleted) ? deleted : 0,
status: status.get(file!) ?? "modified",
});
}
return result;
});
// Start hourly cleanup fiber — scoped to instance lifetime
yield* cleanup().pipe(
Effect.catchCause((cause) => {
log.error("cleanup loop failed", { cause: Cause.pretty(cause) });
return Effect.void;
}),
Effect.repeat(Schedule.spaced(Duration.hours(1))),
Effect.forkScoped,
);
return SnapshotService.of({
init: Effect.fn("SnapshotService.init")(function* () {}),
cleanup,
track,
patch,
restore,
revert,
diff,
diffFull,
});
}),
).pipe(
Layer.provide(NodeChildProcessSpawner.layer),
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
);
} }

View File

@@ -1,14 +1,14 @@
import { ConfigProvider, Layer, ManagedRuntime } from "effect" import { ConfigProvider, Layer, ManagedRuntime } from "effect";
import { InstanceContext } from "../../src/effect/instance-context" import { InstanceContext } from "../../src/effect/instance-context";
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance";
/** ConfigProvider that enables the experimental file watcher. */ /** ConfigProvider that enables the experimental file watcher. */
export const watcherConfigLayer = ConfigProvider.layer( export const watcherConfigLayer = ConfigProvider.layer(
ConfigProvider.fromUnknown({ ConfigProvider.fromUnknown({
OPENCODE_EXPERIMENTAL_FILEWATCHER: "true", OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false", OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
}), }),
) );
/** /**
* Boot an Instance with the given service layers and run `body` with * Boot an Instance with the given service layers and run `body` with
@@ -19,29 +19,35 @@ export const watcherConfigLayer = ConfigProvider.layer(
* Pass extra layers via `options.provide` (e.g. ConfigProvider.layer). * Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
*/ */
export function withServices<S>( export function withServices<S>(
directory: string, directory: string,
layer: Layer.Layer<S, any, InstanceContext>, layer: Layer.Layer<S, any, InstanceContext>,
body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>, body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
options?: { provide?: Layer.Layer<never>[] }, options?: { provide?: Layer.Layer<never>[] },
) { ) {
return Instance.provide({ return Instance.provide({
directory, directory,
fn: async () => { fn: async () => {
const ctx = Layer.sync(InstanceContext, () => const ctx = Layer.sync(InstanceContext, () =>
InstanceContext.of({ directory: Instance.directory, project: Instance.project }), InstanceContext.of({
) directory: Instance.directory,
let resolved: Layer.Layer<S> = Layer.fresh(layer).pipe(Layer.provide(ctx)) as any worktree: Instance.worktree,
if (options?.provide) { project: Instance.project,
for (const l of options.provide) { }),
resolved = resolved.pipe(Layer.provide(l)) as any );
} let resolved: Layer.Layer<S> = Layer.fresh(layer).pipe(
} Layer.provide(ctx),
const rt = ManagedRuntime.make(resolved) ) as any;
try { if (options?.provide) {
await body(rt) for (const l of options.provide) {
} finally { resolved = resolved.pipe(Layer.provide(l)) as any;
await rt.dispose() }
} }
}, const rt = ManagedRuntime.make(resolved);
}) try {
await body(rt);
} finally {
await rt.dispose();
}
},
});
} }