From 2cbdf04ec9970c297acdfdebb4511329fc429639 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 16 Mar 2026 14:23:13 -0400 Subject: [PATCH] refactor(file-time): effectify FileTimeService with Semaphore locks (#17835) --- packages/opencode/src/effect/instances.ts | 5 +- packages/opencode/src/file/time.ts | 164 ++++++++++++++-------- packages/opencode/src/file/watcher.ts | 3 +- packages/opencode/src/flag/flag.ts | 4 +- packages/opencode/src/session/prompt.ts | 2 +- packages/opencode/src/tool/edit.ts | 4 +- packages/opencode/src/tool/read.ts | 2 +- packages/opencode/src/tool/write.ts | 2 +- packages/opencode/test/file/time.test.ts | 113 ++++----------- 9 files changed, 148 insertions(+), 151 deletions(-) diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index 2e6fbe167..78b340e77 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -6,6 +6,7 @@ import { QuestionService } from "@/question/service" import { PermissionService } from "@/permission/service" import { FileWatcherService } from "@/file/watcher" import { VcsService } from "@/project/vcs" +import { FileTimeService } from "@/file/time" import { Instance } from "@/project/instance" export { InstanceContext } from "./instance-context" @@ -16,6 +17,7 @@ export type InstanceServices = | ProviderAuthService | FileWatcherService | VcsService + | FileTimeService function lookup(directory: string) { const project = Instance.project @@ -24,8 +26,9 @@ function lookup(directory: string) { Layer.fresh(QuestionService.layer), Layer.fresh(PermissionService.layer), Layer.fresh(ProviderAuthService.layer), - Layer.fresh(FileWatcherService.layer), + Layer.fresh(FileWatcherService.layer).pipe(Layer.orDie), Layer.fresh(VcsService.layer), + Layer.fresh(FileTimeService.layer).pipe(Layer.orDie), ).pipe(Layer.provide(ctx)) } diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts index efb1c4376..c956cdfdb 100644 --- a/packages/opencode/src/file/time.ts +++ b/packages/opencode/src/file/time.ts @@ -1,71 +1,115 @@ -import { Instance } from "../project/instance" import { Log } from "../util/log" -import { Flag } from "../flag/flag" +import { Flag } from "@/flag/flag" import { Filesystem } from "../util/filesystem" +import { Effect, Layer, ServiceMap, Semaphore } from "effect" +import { runPromiseInstance } from "@/effect/runtime" +import type { SessionID } from "@/session/schema" + +const log = Log.create({ service: "file.time" }) + +export namespace FileTimeService { + export interface Service { + readonly read: (sessionID: SessionID, file: string) => Effect.Effect + readonly get: (sessionID: SessionID, file: string) => Effect.Effect + readonly assert: (sessionID: SessionID, filepath: string) => Effect.Effect + readonly withLock: (filepath: string, fn: () => Promise) => Effect.Effect + } +} + +type Stamp = { + readonly read: Date + readonly mtime: number | undefined + readonly ctime: number | undefined + readonly size: number | undefined +} + +function stamp(file: string): Stamp { + const stat = Filesystem.stat(file) + const size = typeof stat?.size === "bigint" ? Number(stat.size) : stat?.size + return { + read: new Date(), + mtime: stat?.mtime?.getTime(), + ctime: stat?.ctime?.getTime(), + size, + } +} + +function session(reads: Map>, sessionID: SessionID) { + let value = reads.get(sessionID) + if (!value) { + value = new Map() + reads.set(sessionID, value) + } + return value +} + +export class FileTimeService extends ServiceMap.Service()( + "@opencode/FileTime", +) { + static readonly layer = Layer.effect( + FileTimeService, + Effect.gen(function* () { + const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK + const reads = new Map>() + const locks = new Map() + + function getLock(filepath: string) { + let lock = locks.get(filepath) + if (!lock) { + lock = Semaphore.makeUnsafe(1) + locks.set(filepath, lock) + } + return lock + } + + return FileTimeService.of({ + read: Effect.fn("FileTimeService.read")(function* (sessionID: SessionID, file: string) { + log.info("read", { sessionID, file }) + session(reads, sessionID).set(file, stamp(file)) + }), + + get: Effect.fn("FileTimeService.get")(function* (sessionID: SessionID, file: string) { + return reads.get(sessionID)?.get(file)?.read + }), + + assert: Effect.fn("FileTimeService.assert")(function* (sessionID: SessionID, filepath: string) { + if (disableCheck) return + + const time = reads.get(sessionID)?.get(filepath) + if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`) + const next = stamp(filepath) + const changed = next.mtime !== time.mtime || next.ctime !== time.ctime || next.size !== time.size + + if (changed) { + throw new Error( + `File ${filepath} has been modified since it was last read.\nLast modification: ${new Date(next.mtime ?? next.read.getTime()).toISOString()}\nLast read: ${time.read.toISOString()}\n\nPlease read the file again before modifying it.`, + ) + } + }), + + withLock: Effect.fn("FileTimeService.withLock")(function* (filepath: string, fn: () => Promise) { + const lock = getLock(filepath) + return yield* Effect.promise(fn).pipe(lock.withPermits(1)) + }), + }) + }), + ) +} export namespace FileTime { - const log = Log.create({ service: "file.time" }) - // Per-session read times plus per-file write locks. - // All tools that overwrite existing files should run their - // assert/read/write/update sequence inside withLock(filepath, ...) - // so concurrent writes to the same file are serialized. - export const state = Instance.state(() => { - const read: { - [sessionID: string]: { - [path: string]: Date | undefined - } - } = {} - const locks = new Map>() - return { - read, - locks, - } - }) - - export function read(sessionID: string, file: string) { - log.info("read", { sessionID, file }) - const { read } = state() - read[sessionID] = read[sessionID] || {} - read[sessionID][file] = new Date() + export function read(sessionID: SessionID, file: string) { + return runPromiseInstance(FileTimeService.use((s) => s.read(sessionID, file))) } - export function get(sessionID: string, file: string) { - return state().read[sessionID]?.[file] + export function get(sessionID: SessionID, file: string) { + return runPromiseInstance(FileTimeService.use((s) => s.get(sessionID, file))) + } + + export async function assert(sessionID: SessionID, filepath: string) { + return runPromiseInstance(FileTimeService.use((s) => s.assert(sessionID, filepath))) } export async function withLock(filepath: string, fn: () => Promise): Promise { - const current = state() - const currentLock = current.locks.get(filepath) ?? Promise.resolve() - let release: () => void = () => {} - const nextLock = new Promise((resolve) => { - release = resolve - }) - const chained = currentLock.then(() => nextLock) - current.locks.set(filepath, chained) - await currentLock - try { - return await fn() - } finally { - release() - if (current.locks.get(filepath) === chained) { - current.locks.delete(filepath) - } - } - } - - export async function assert(sessionID: string, filepath: string) { - if (Flag.OPENCODE_DISABLE_FILETIME_CHECK === true) { - return - } - - const time = get(sessionID, filepath) - if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`) - const mtime = Filesystem.stat(filepath)?.mtime - // Allow a 50ms tolerance for Windows NTFS timestamp fuzziness / async flushing - if (mtime && mtime.getTime() > time.getTime() + 50) { - throw new Error( - `File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`, - ) - } + return runPromiseInstance(FileTimeService.use((s) => s.withLock(filepath, fn))) } } diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts index 16ee8f27c..1a3a4f742 100644 --- a/packages/opencode/src/file/watcher.ts +++ b/packages/opencode/src/file/watcher.ts @@ -72,7 +72,8 @@ export class FileWatcherService extends ServiceMap.Service 0) { output += `\n\n\n${instructions.map((i) => i.content).join("\n\n")}\n` diff --git a/packages/opencode/src/tool/write.ts b/packages/opencode/src/tool/write.ts index 8c1e53cca..83474a543 100644 --- a/packages/opencode/src/tool/write.ts +++ b/packages/opencode/src/tool/write.ts @@ -49,7 +49,7 @@ export const WriteTool = Tool.define("write", { file: filepath, event: exists ? "change" : "add", }) - FileTime.read(ctx.sessionID, filepath) + await FileTime.read(ctx.sessionID, filepath) let output = "Wrote file successfully." await LSP.touchFile(filepath, true) diff --git a/packages/opencode/test/file/time.test.ts b/packages/opencode/test/file/time.test.ts index e46d5229b..9eedffd76 100644 --- a/packages/opencode/test/file/time.test.ts +++ b/packages/opencode/test/file/time.test.ts @@ -1,13 +1,16 @@ -import { describe, test, expect, beforeEach } from "bun:test" +import { describe, test, expect, afterEach } from "bun:test" import path from "path" import fs from "fs/promises" import { FileTime } from "../../src/file/time" import { Instance } from "../../src/project/instance" +import { SessionID } from "../../src/session/schema" import { Filesystem } from "../../src/util/filesystem" import { tmpdir } from "../fixture/fixture" +afterEach(() => Instance.disposeAll()) + describe("file/time", () => { - const sessionID = "test-session-123" + const sessionID = SessionID.make("ses_00000000000000000000000001") describe("read() and get()", () => { test("stores read timestamp", async () => { @@ -18,12 +21,13 @@ describe("file/time", () => { await Instance.provide({ directory: tmp.path, fn: async () => { - const before = FileTime.get(sessionID, filepath) + const before = await FileTime.get(sessionID, filepath) expect(before).toBeUndefined() - FileTime.read(sessionID, filepath) + await FileTime.read(sessionID, filepath) + await Bun.sleep(10) - const after = FileTime.get(sessionID, filepath) + const after = await FileTime.get(sessionID, filepath) expect(after).toBeInstanceOf(Date) expect(after!.getTime()).toBeGreaterThan(0) }, @@ -38,11 +42,12 @@ describe("file/time", () => { await Instance.provide({ directory: tmp.path, fn: async () => { - FileTime.read("session1", filepath) - FileTime.read("session2", filepath) + await FileTime.read(SessionID.make("ses_00000000000000000000000002"), filepath) + await FileTime.read(SessionID.make("ses_00000000000000000000000003"), filepath) + await Bun.sleep(10) - const time1 = FileTime.get("session1", filepath) - const time2 = FileTime.get("session2", filepath) + const time1 = await FileTime.get(SessionID.make("ses_00000000000000000000000002"), filepath) + const time2 = await FileTime.get(SessionID.make("ses_00000000000000000000000003"), filepath) expect(time1).toBeDefined() expect(time2).toBeDefined() @@ -59,14 +64,16 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) - const first = FileTime.get(sessionID, filepath)! + await Bun.sleep(10) + const first = await FileTime.get(sessionID, filepath) - await new Promise((resolve) => setTimeout(resolve, 10)) + await Bun.sleep(10) FileTime.read(sessionID, filepath) - const second = FileTime.get(sessionID, filepath)! + await Bun.sleep(10) + const second = await FileTime.get(sessionID, filepath) - expect(second.getTime()).toBeGreaterThanOrEqual(first.getTime()) + expect(second!.getTime()).toBeGreaterThanOrEqual(first!.getTime()) }, }) }) @@ -82,8 +89,7 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) - - // Should not throw + await Bun.sleep(10) await FileTime.assert(sessionID, filepath) }, }) @@ -111,13 +117,8 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) - - // Wait to ensure different timestamps - await new Promise((resolve) => setTimeout(resolve, 100)) - - // Modify file after reading + await Bun.sleep(100) await fs.writeFile(filepath, "modified content", "utf-8") - await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow("modified since it was last read") }, }) @@ -132,7 +133,7 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) - await new Promise((resolve) => setTimeout(resolve, 100)) + await Bun.sleep(100) await fs.writeFile(filepath, "modified", "utf-8") let error: Error | undefined @@ -147,28 +148,6 @@ describe("file/time", () => { }, }) }) - - test("skips check when OPENCODE_DISABLE_FILETIME_CHECK is true", async () => { - await using tmp = await tmpdir() - const filepath = path.join(tmp.path, "file.txt") - await fs.writeFile(filepath, "content", "utf-8") - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const { Flag } = await import("../../src/flag/flag") - const original = Flag.OPENCODE_DISABLE_FILETIME_CHECK - ;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = true - - try { - // Should not throw even though file wasn't read - await FileTime.assert(sessionID, filepath) - } finally { - ;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = original - } - }, - }) - }) }) describe("withLock()", () => { @@ -215,7 +194,7 @@ describe("file/time", () => { const op1 = FileTime.withLock(filepath, async () => { order.push(1) - await new Promise((resolve) => setTimeout(resolve, 10)) + await Bun.sleep(50) order.push(2) }) @@ -225,12 +204,7 @@ describe("file/time", () => { }) await Promise.all([op1, op2]) - - // Operations should be serialized - expect(order).toContain(1) - expect(order).toContain(2) - expect(order).toContain(3) - expect(order).toContain(4) + expect(order).toEqual([1, 2, 3, 4]) }, }) }) @@ -248,8 +222,8 @@ describe("file/time", () => { const op1 = FileTime.withLock(filepath1, async () => { started1 = true - await new Promise((resolve) => setTimeout(resolve, 50)) - expect(started2).toBe(true) // op2 should have started while op1 is running + await Bun.sleep(50) + expect(started2).toBe(true) }) const op2 = FileTime.withLock(filepath2, async () => { @@ -257,7 +231,6 @@ describe("file/time", () => { }) await Promise.all([op1, op2]) - expect(started1).toBe(true) expect(started2).toBe(true) }, @@ -277,7 +250,6 @@ describe("file/time", () => { }), ).rejects.toThrow("Test error") - // Lock should be released, subsequent operations should work let executed = false await FileTime.withLock(filepath, async () => { executed = true @@ -286,31 +258,6 @@ describe("file/time", () => { }, }) }) - - test("deadlocks on nested locks (expected behavior)", async () => { - await using tmp = await tmpdir() - const filepath = path.join(tmp.path, "file.txt") - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - // Nested locks on same file cause deadlock - this is expected - // The outer lock waits for inner to complete, but inner waits for outer to release - const timeout = new Promise((_, reject) => - setTimeout(() => reject(new Error("Deadlock detected")), 100), - ) - - const nestedLock = FileTime.withLock(filepath, async () => { - return FileTime.withLock(filepath, async () => { - return "inner" - }) - }) - - // Should timeout due to deadlock - await expect(Promise.race([nestedLock, timeout])).rejects.toThrow("Deadlock detected") - }, - }) - }) }) describe("stat() Filesystem.stat pattern", () => { @@ -323,12 +270,12 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) + await Bun.sleep(10) const stats = Filesystem.stat(filepath) expect(stats?.mtime).toBeInstanceOf(Date) expect(stats!.mtime.getTime()).toBeGreaterThan(0) - // FileTime.assert uses this stat internally await FileTime.assert(sessionID, filepath) }, }) @@ -343,11 +290,11 @@ describe("file/time", () => { directory: tmp.path, fn: async () => { FileTime.read(sessionID, filepath) + await Bun.sleep(10) const originalStat = Filesystem.stat(filepath) - // Wait and modify - await new Promise((resolve) => setTimeout(resolve, 100)) + await Bun.sleep(100) await fs.writeFile(filepath, "modified", "utf-8") const newStat = Filesystem.stat(filepath)