effectify SessionStatus service (#18565)

This commit is contained in:
Kit Langton
2026-03-21 19:55:07 -04:00
committed by GitHub
parent 832b8e252e
commit 10a3d6c54e
5 changed files with 61 additions and 38 deletions

View File

@@ -123,6 +123,7 @@ Fully migrated (single namespace, InstanceState where needed, flattened facade):
- [x] `Truncate``tool/truncate.ts` - [x] `Truncate``tool/truncate.ts`
- [x] `Vcs``project/vcs.ts` - [x] `Vcs``project/vcs.ts`
- [x] `Discovery``skill/discovery.ts` - [x] `Discovery``skill/discovery.ts`
- [x] `SessionStatus`
Still open and likely worth migrating: Still open and likely worth migrating:

View File

@@ -88,8 +88,8 @@ export const SessionRoutes = lazy(() =>
}, },
}), }),
async (c) => { async (c) => {
const result = SessionStatus.list() const result = await SessionStatus.list()
return c.json(result) return c.json(Object.fromEntries(result))
}, },
) )
.get( .get(

View File

@@ -57,7 +57,7 @@ export namespace SessionProcessor {
input.abort.throwIfAborted() input.abort.throwIfAborted()
switch (value.type) { switch (value.type) {
case "start": case "start":
SessionStatus.set(input.sessionID, { type: "busy" }) await SessionStatus.set(input.sessionID, { type: "busy" })
break break
case "reasoning-start": case "reasoning-start":
@@ -368,7 +368,7 @@ export namespace SessionProcessor {
if (retry !== undefined) { if (retry !== undefined) {
attempt++ attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
SessionStatus.set(input.sessionID, { await SessionStatus.set(input.sessionID, {
type: "retry", type: "retry",
attempt, attempt,
message: retry, message: retry,
@@ -382,7 +382,7 @@ export namespace SessionProcessor {
sessionID: input.assistantMessage.sessionID, sessionID: input.assistantMessage.sessionID,
error: input.assistantMessage.error, error: input.assistantMessage.error,
}) })
SessionStatus.set(input.sessionID, { type: "idle" }) await SessionStatus.set(input.sessionID, { type: "idle" })
} }
} }
if (snapshot) { if (snapshot) {

View File

@@ -257,17 +257,17 @@ export namespace SessionPrompt {
return s[sessionID].abort.signal return s[sessionID].abort.signal
} }
export function cancel(sessionID: SessionID) { export async function cancel(sessionID: SessionID) {
log.info("cancel", { sessionID }) log.info("cancel", { sessionID })
const s = state() const s = state()
const match = s[sessionID] const match = s[sessionID]
if (!match) { if (!match) {
SessionStatus.set(sessionID, { type: "idle" }) await SessionStatus.set(sessionID, { type: "idle" })
return return
} }
match.abort.abort() match.abort.abort()
delete s[sessionID] delete s[sessionID]
SessionStatus.set(sessionID, { type: "idle" }) await SessionStatus.set(sessionID, { type: "idle" })
return return
} }
@@ -286,7 +286,7 @@ export namespace SessionPrompt {
}) })
} }
using _ = defer(() => cancel(sessionID)) await using _ = defer(() => cancel(sessionID))
// Structured output state // Structured output state
// Note: On session resumption, state is reset but outputFormat is preserved // Note: On session resumption, state is reset but outputFormat is preserved
@@ -296,7 +296,7 @@ export namespace SessionPrompt {
let step = 0 let step = 0
const session = await Session.get(sessionID) const session = await Session.get(sessionID)
while (true) { while (true) {
SessionStatus.set(sessionID, { type: "busy" }) await SessionStatus.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID }) log.info("loop", { step, sessionID })
if (abort.aborted) break if (abort.aborted) break
let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID)) let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID))

View File

@@ -1,7 +1,9 @@
import { BusEvent } from "@/bus/bus-event" import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus" import { Bus } from "@/bus"
import { Instance } from "@/project/instance" import { InstanceState } from "@/effect/instance-state"
import { makeRunPromise } from "@/effect/run-service"
import { SessionID } from "./schema" import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod" import z from "zod"
export namespace SessionStatus { export namespace SessionStatus {
@@ -42,36 +44,56 @@ export namespace SessionStatus {
), ),
} }
const state = Instance.state(() => { export interface Interface {
const data: Record<string, Info> = {} readonly get: (sessionID: SessionID) => Effect.Effect<Info>
return data readonly list: () => Effect.Effect<Map<SessionID, Info>>
}) readonly set: (sessionID: SessionID, status: Info) => Effect.Effect<void>
export function get(sessionID: SessionID) {
return (
state()[sessionID] ?? {
type: "idle",
}
)
} }
export function list() { export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionStatus") {}
return state()
}
export function set(sessionID: SessionID, status: Info) { export const layer = Layer.effect(
Bus.publish(Event.Status, { Service,
sessionID, Effect.gen(function* () {
status, const state = yield* InstanceState.make(
}) Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
if (status.type === "idle") { )
// deprecated
Bus.publish(Event.Idle, { const get = Effect.fn("SessionStatus.get")(function* (sessionID: SessionID) {
sessionID, const data = yield* InstanceState.get(state)
return data.get(sessionID) ?? { type: "idle" as const }
}) })
delete state()[sessionID]
return const list = Effect.fn("SessionStatus.list")(function* () {
} return new Map(yield* InstanceState.get(state))
state()[sessionID] = status })
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
const data = yield* InstanceState.get(state)
yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status }))
if (status.type === "idle") {
yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID }))
data.delete(sessionID)
return
}
data.set(sessionID, status)
})
return Service.of({ get, list, set })
}),
)
const runPromise = makeRunPromise(Service, layer)
export async function get(sessionID: SessionID) {
return runPromise((svc) => svc.get(sessionID))
}
export async function list() {
return runPromise((svc) => svc.list())
}
export async function set(sessionID: SessionID, status: Info) {
return runPromise((svc) => svc.set(sessionID, status))
} }
} }