mirror of
https://gitea.toothfairyai.com/ToothFairyAI/tf_code.git
synced 2026-04-02 23:23:45 +00:00
sqlite again (#10597)
Co-authored-by: Github Action <action@github.com> Co-authored-by: opencode-agent[bot] <opencode-agent[bot]@users.noreply.github.com> Co-authored-by: Brendan Allan <git@brendonovich.dev>
This commit is contained in:
@@ -10,7 +10,9 @@ import { Flag } from "../flag/flag"
|
||||
import { Identifier } from "../id/id"
|
||||
import { Installation } from "../installation"
|
||||
|
||||
import { Storage } from "../storage/storage"
|
||||
import { Database, NotFoundError, eq, and, or, like } from "../storage/db"
|
||||
import { SessionTable, MessageTable, PartTable } from "./session.sql"
|
||||
import { Storage } from "@/storage/storage"
|
||||
import { Log } from "../util/log"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { Instance } from "../project/instance"
|
||||
@@ -41,6 +43,64 @@ export namespace Session {
|
||||
).test(title)
|
||||
}
|
||||
|
||||
type SessionRow = typeof SessionTable.$inferSelect
|
||||
|
||||
export function fromRow(row: SessionRow): Info {
|
||||
const summary =
|
||||
row.summary_additions !== null || row.summary_deletions !== null || row.summary_files !== null
|
||||
? {
|
||||
additions: row.summary_additions ?? 0,
|
||||
deletions: row.summary_deletions ?? 0,
|
||||
files: row.summary_files ?? 0,
|
||||
diffs: row.summary_diffs ?? undefined,
|
||||
}
|
||||
: undefined
|
||||
const share = row.share_url ? { url: row.share_url } : undefined
|
||||
const revert = row.revert ?? undefined
|
||||
return {
|
||||
id: row.id,
|
||||
slug: row.slug,
|
||||
projectID: row.project_id,
|
||||
directory: row.directory,
|
||||
parentID: row.parent_id ?? undefined,
|
||||
title: row.title,
|
||||
version: row.version,
|
||||
summary,
|
||||
share,
|
||||
revert,
|
||||
permission: row.permission ?? undefined,
|
||||
time: {
|
||||
created: row.time_created,
|
||||
updated: row.time_updated,
|
||||
compacting: row.time_compacting ?? undefined,
|
||||
archived: row.time_archived ?? undefined,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function toRow(info: Info) {
|
||||
return {
|
||||
id: info.id,
|
||||
project_id: info.projectID,
|
||||
parent_id: info.parentID,
|
||||
slug: info.slug,
|
||||
directory: info.directory,
|
||||
title: info.title,
|
||||
version: info.version,
|
||||
share_url: info.share?.url,
|
||||
summary_additions: info.summary?.additions,
|
||||
summary_deletions: info.summary?.deletions,
|
||||
summary_files: info.summary?.files,
|
||||
summary_diffs: info.summary?.diffs,
|
||||
revert: info.revert ?? null,
|
||||
permission: info.permission,
|
||||
time_created: info.time.created,
|
||||
time_updated: info.time.updated,
|
||||
time_compacting: info.time.compacting,
|
||||
time_archived: info.time.archived,
|
||||
}
|
||||
}
|
||||
|
||||
function getForkedTitle(title: string): string {
|
||||
const match = title.match(/^(.+) \(fork #(\d+)\)$/)
|
||||
if (match) {
|
||||
@@ -94,16 +154,6 @@ export namespace Session {
|
||||
})
|
||||
export type Info = z.output<typeof Info>
|
||||
|
||||
export const ShareInfo = z
|
||||
.object({
|
||||
secret: z.string(),
|
||||
url: z.string(),
|
||||
})
|
||||
.meta({
|
||||
ref: "SessionShare",
|
||||
})
|
||||
export type ShareInfo = z.output<typeof ShareInfo>
|
||||
|
||||
export const Event = {
|
||||
Created: BusEvent.define(
|
||||
"session.created",
|
||||
@@ -200,8 +250,17 @@ export namespace Session {
|
||||
)
|
||||
|
||||
export const touch = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
await update(sessionID, (draft) => {
|
||||
draft.time.updated = Date.now()
|
||||
const now = Date.now()
|
||||
Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ time_updated: now })
|
||||
.where(eq(SessionTable.id, sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -227,21 +286,19 @@ export namespace Session {
|
||||
},
|
||||
}
|
||||
log.info("created", result)
|
||||
await Storage.write(["session", Instance.project.id, result.id], result)
|
||||
Bus.publish(Event.Created, {
|
||||
info: result,
|
||||
Database.use((db) => {
|
||||
db.insert(SessionTable).values(toRow(result)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(Event.Created, {
|
||||
info: result,
|
||||
}),
|
||||
)
|
||||
})
|
||||
const cfg = await Config.get()
|
||||
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto"))
|
||||
share(result.id)
|
||||
.then((share) => {
|
||||
update(result.id, (draft) => {
|
||||
draft.share = share
|
||||
})
|
||||
})
|
||||
.catch(() => {
|
||||
// Silently ignore sharing errors during session creation
|
||||
})
|
||||
share(result.id).catch(() => {
|
||||
// Silently ignore sharing errors during session creation
|
||||
})
|
||||
Bus.publish(Event.Updated, {
|
||||
info: result,
|
||||
})
|
||||
@@ -256,12 +313,9 @@ export namespace Session {
|
||||
}
|
||||
|
||||
export const get = fn(Identifier.schema("session"), async (id) => {
|
||||
const read = await Storage.read<Info>(["session", Instance.project.id, id])
|
||||
return read as Info
|
||||
})
|
||||
|
||||
export const getShare = fn(Identifier.schema("session"), async (id) => {
|
||||
return Storage.read<ShareInfo>(["share", id])
|
||||
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
return fromRow(row)
|
||||
})
|
||||
|
||||
export const share = fn(Identifier.schema("session"), async (id) => {
|
||||
@@ -271,15 +325,12 @@ export namespace Session {
|
||||
}
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
const share = await ShareNext.create(id)
|
||||
await update(
|
||||
id,
|
||||
(draft) => {
|
||||
draft.share = {
|
||||
url: share.url,
|
||||
}
|
||||
},
|
||||
{ touch: false },
|
||||
)
|
||||
Database.use((db) => {
|
||||
const row = db.update(SessionTable).set({ share_url: share.url }).where(eq(SessionTable.id, id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
return share
|
||||
})
|
||||
|
||||
@@ -287,32 +338,155 @@ export namespace Session {
|
||||
// Use ShareNext to remove the share (same as share function uses ShareNext to create)
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
await ShareNext.remove(id)
|
||||
await update(
|
||||
id,
|
||||
(draft) => {
|
||||
draft.share = undefined
|
||||
},
|
||||
{ touch: false },
|
||||
)
|
||||
Database.use((db) => {
|
||||
const row = db.update(SessionTable).set({ share_url: null }).where(eq(SessionTable.id, id)).returning().get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
})
|
||||
})
|
||||
|
||||
export async function update(id: string, editor: (session: Info) => void, options?: { touch?: boolean }) {
|
||||
const project = Instance.project
|
||||
const result = await Storage.update<Info>(["session", project.id, id], (draft) => {
|
||||
editor(draft)
|
||||
if (options?.touch !== false) {
|
||||
draft.time.updated = Date.now()
|
||||
}
|
||||
export const setTitle = fn(
|
||||
z.object({
|
||||
sessionID: Identifier.schema("session"),
|
||||
title: z.string(),
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ title: input.title })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const setArchived = fn(
|
||||
z.object({
|
||||
sessionID: Identifier.schema("session"),
|
||||
time: z.number().optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ time_archived: input.time })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const setPermission = fn(
|
||||
z.object({
|
||||
sessionID: Identifier.schema("session"),
|
||||
permission: PermissionNext.Ruleset,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({ permission: input.permission, time_updated: Date.now() })
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const setRevert = fn(
|
||||
z.object({
|
||||
sessionID: Identifier.schema("session"),
|
||||
revert: Info.shape.revert,
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
revert: input.revert ?? null,
|
||||
summary_additions: input.summary?.additions,
|
||||
summary_deletions: input.summary?.deletions,
|
||||
summary_files: input.summary?.files,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const clearRevert = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
revert: null,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
Bus.publish(Event.Updated, {
|
||||
info: result,
|
||||
})
|
||||
return result
|
||||
}
|
||||
})
|
||||
|
||||
export const setSummary = fn(
|
||||
z.object({
|
||||
sessionID: Identifier.schema("session"),
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.update(SessionTable)
|
||||
.set({
|
||||
summary_additions: input.summary?.additions,
|
||||
summary_deletions: input.summary?.deletions,
|
||||
summary_files: input.summary?.files,
|
||||
time_updated: Date.now(),
|
||||
})
|
||||
.where(eq(SessionTable.id, input.sessionID))
|
||||
.returning()
|
||||
.get()
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
|
||||
const info = fromRow(row)
|
||||
Database.effect(() => Bus.publish(Event.Updated, { info }))
|
||||
return info
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const diff = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
const diffs = await Storage.read<Snapshot.FileDiff[]>(["session_diff", sessionID])
|
||||
return diffs ?? []
|
||||
try {
|
||||
return await Storage.read<Snapshot.FileDiff[]>(["session_diff", sessionID])
|
||||
} catch {
|
||||
return []
|
||||
}
|
||||
})
|
||||
|
||||
export const messages = fn(
|
||||
@@ -331,25 +505,37 @@ export namespace Session {
|
||||
},
|
||||
)
|
||||
|
||||
export async function* list() {
|
||||
export function* list() {
|
||||
const project = Instance.project
|
||||
for (const item of await Storage.list(["session", project.id])) {
|
||||
const session = await Storage.read<Info>(item).catch(() => undefined)
|
||||
if (!session) continue
|
||||
yield session
|
||||
const rel = path.relative(Instance.worktree, Instance.directory)
|
||||
const suffix = path.sep + rel
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(
|
||||
and(
|
||||
eq(SessionTable.project_id, project.id),
|
||||
or(eq(SessionTable.directory, Instance.directory), like(SessionTable.directory, `%${suffix}`)),
|
||||
),
|
||||
)
|
||||
.all(),
|
||||
)
|
||||
for (const row of rows) {
|
||||
yield fromRow(row)
|
||||
}
|
||||
}
|
||||
|
||||
export const children = fn(Identifier.schema("session"), async (parentID) => {
|
||||
const project = Instance.project
|
||||
const result = [] as Session.Info[]
|
||||
for (const item of await Storage.list(["session", project.id])) {
|
||||
const session = await Storage.read<Info>(item).catch(() => undefined)
|
||||
if (!session) continue
|
||||
if (session.parentID !== parentID) continue
|
||||
result.push(session)
|
||||
}
|
||||
return result
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.all(),
|
||||
)
|
||||
return rows.map(fromRow)
|
||||
})
|
||||
|
||||
export const remove = fn(Identifier.schema("session"), async (sessionID) => {
|
||||
@@ -360,15 +546,14 @@ export namespace Session {
|
||||
await remove(child.id)
|
||||
}
|
||||
await unshare(sessionID).catch(() => {})
|
||||
for (const msg of await Storage.list(["message", sessionID])) {
|
||||
for (const part of await Storage.list(["part", msg.at(-1)!])) {
|
||||
await Storage.remove(part)
|
||||
}
|
||||
await Storage.remove(msg)
|
||||
}
|
||||
await Storage.remove(["session", project.id, sessionID])
|
||||
Bus.publish(Event.Deleted, {
|
||||
info: session,
|
||||
// CASCADE delete handles messages and parts automatically
|
||||
Database.use((db) => {
|
||||
db.delete(SessionTable).where(eq(SessionTable.id, sessionID)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(Event.Deleted, {
|
||||
info: session,
|
||||
}),
|
||||
)
|
||||
})
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
@@ -376,9 +561,23 @@ export namespace Session {
|
||||
})
|
||||
|
||||
export const updateMessage = fn(MessageV2.Info, async (msg) => {
|
||||
await Storage.write(["message", msg.sessionID, msg.id], msg)
|
||||
Bus.publish(MessageV2.Event.Updated, {
|
||||
info: msg,
|
||||
const time_created = msg.role === "user" ? msg.time.created : msg.time.created
|
||||
const { id, sessionID, ...data } = msg
|
||||
Database.use((db) => {
|
||||
db.insert(MessageTable)
|
||||
.values({
|
||||
id,
|
||||
session_id: sessionID,
|
||||
time_created,
|
||||
data,
|
||||
})
|
||||
.onConflictDoUpdate({ target: MessageTable.id, set: { data } })
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.Updated, {
|
||||
info: msg,
|
||||
}),
|
||||
)
|
||||
})
|
||||
return msg
|
||||
})
|
||||
@@ -389,10 +588,15 @@ export namespace Session {
|
||||
messageID: Identifier.schema("message"),
|
||||
}),
|
||||
async (input) => {
|
||||
await Storage.remove(["message", input.sessionID, input.messageID])
|
||||
Bus.publish(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
// CASCADE delete handles parts automatically
|
||||
Database.use((db) => {
|
||||
db.delete(MessageTable).where(eq(MessageTable.id, input.messageID)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
}),
|
||||
)
|
||||
})
|
||||
return input.messageID
|
||||
},
|
||||
@@ -405,39 +609,58 @@ export namespace Session {
|
||||
partID: Identifier.schema("part"),
|
||||
}),
|
||||
async (input) => {
|
||||
await Storage.remove(["part", input.messageID, input.partID])
|
||||
Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
Database.use((db) => {
|
||||
db.delete(PartTable).where(eq(PartTable.id, input.partID)).run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
}),
|
||||
)
|
||||
})
|
||||
return input.partID
|
||||
},
|
||||
)
|
||||
|
||||
const UpdatePartInput = z.union([
|
||||
MessageV2.Part,
|
||||
z.object({
|
||||
part: MessageV2.TextPart,
|
||||
delta: z.string(),
|
||||
}),
|
||||
z.object({
|
||||
part: MessageV2.ReasoningPart,
|
||||
delta: z.string(),
|
||||
}),
|
||||
])
|
||||
const UpdatePartInput = MessageV2.Part
|
||||
|
||||
export const updatePart = fn(UpdatePartInput, async (input) => {
|
||||
const part = "delta" in input ? input.part : input
|
||||
const delta = "delta" in input ? input.delta : undefined
|
||||
await Storage.write(["part", part.messageID, part.id], part)
|
||||
Bus.publish(MessageV2.Event.PartUpdated, {
|
||||
part,
|
||||
delta,
|
||||
export const updatePart = fn(UpdatePartInput, async (part) => {
|
||||
const { id, messageID, sessionID, ...data } = part
|
||||
const time = Date.now()
|
||||
Database.use((db) => {
|
||||
db.insert(PartTable)
|
||||
.values({
|
||||
id,
|
||||
message_id: messageID,
|
||||
session_id: sessionID,
|
||||
time_created: time,
|
||||
data,
|
||||
})
|
||||
.onConflictDoUpdate({ target: PartTable.id, set: { data } })
|
||||
.run()
|
||||
Database.effect(() =>
|
||||
Bus.publish(MessageV2.Event.PartUpdated, {
|
||||
part,
|
||||
}),
|
||||
)
|
||||
})
|
||||
return part
|
||||
})
|
||||
|
||||
export const updatePartDelta = fn(
|
||||
z.object({
|
||||
sessionID: z.string(),
|
||||
messageID: z.string(),
|
||||
partID: z.string(),
|
||||
field: z.string(),
|
||||
delta: z.string(),
|
||||
}),
|
||||
async (input) => {
|
||||
Bus.publish(MessageV2.Event.PartDelta, input)
|
||||
},
|
||||
)
|
||||
|
||||
export const getUsage = fn(
|
||||
z.object({
|
||||
model: z.custom<Provider.Model>(),
|
||||
|
||||
@@ -6,6 +6,10 @@ import { Identifier } from "../id/id"
|
||||
import { LSP } from "../lsp"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Database, eq, desc, inArray } from "@/storage/db"
|
||||
import { MessageTable, PartTable } from "./session.sql"
|
||||
import { ProviderTransform } from "@/provider/transform"
|
||||
import { STATUS_CODES } from "http"
|
||||
import { Storage } from "@/storage/storage"
|
||||
import { ProviderError } from "@/provider/error"
|
||||
import { iife } from "@/util/iife"
|
||||
@@ -456,7 +460,16 @@ export namespace MessageV2 {
|
||||
"message.part.updated",
|
||||
z.object({
|
||||
part: Part,
|
||||
delta: z.string().optional(),
|
||||
}),
|
||||
),
|
||||
PartDelta: BusEvent.define(
|
||||
"message.part.delta",
|
||||
z.object({
|
||||
sessionID: z.string(),
|
||||
messageID: z.string(),
|
||||
partID: z.string(),
|
||||
field: z.string(),
|
||||
delta: z.string(),
|
||||
}),
|
||||
),
|
||||
PartRemoved: BusEvent.define(
|
||||
@@ -701,23 +714,65 @@ export namespace MessageV2 {
|
||||
}
|
||||
|
||||
export const stream = fn(Identifier.schema("session"), async function* (sessionID) {
|
||||
const list = await Array.fromAsync(await Storage.list(["message", sessionID]))
|
||||
for (let i = list.length - 1; i >= 0; i--) {
|
||||
yield await get({
|
||||
sessionID,
|
||||
messageID: list[i][2],
|
||||
})
|
||||
const size = 50
|
||||
let offset = 0
|
||||
while (true) {
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(MessageTable)
|
||||
.where(eq(MessageTable.session_id, sessionID))
|
||||
.orderBy(desc(MessageTable.time_created))
|
||||
.limit(size)
|
||||
.offset(offset)
|
||||
.all(),
|
||||
)
|
||||
if (rows.length === 0) break
|
||||
|
||||
const ids = rows.map((row) => row.id)
|
||||
const partsByMessage = new Map<string, MessageV2.Part[]>()
|
||||
if (ids.length > 0) {
|
||||
const partRows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(PartTable)
|
||||
.where(inArray(PartTable.message_id, ids))
|
||||
.orderBy(PartTable.message_id, PartTable.id)
|
||||
.all(),
|
||||
)
|
||||
for (const row of partRows) {
|
||||
const part = {
|
||||
...row.data,
|
||||
id: row.id,
|
||||
sessionID: row.session_id,
|
||||
messageID: row.message_id,
|
||||
} as MessageV2.Part
|
||||
const list = partsByMessage.get(row.message_id)
|
||||
if (list) list.push(part)
|
||||
else partsByMessage.set(row.message_id, [part])
|
||||
}
|
||||
}
|
||||
|
||||
for (const row of rows) {
|
||||
const info = { ...row.data, id: row.id, sessionID: row.session_id } as MessageV2.Info
|
||||
yield {
|
||||
info,
|
||||
parts: partsByMessage.get(row.id) ?? [],
|
||||
}
|
||||
}
|
||||
|
||||
offset += rows.length
|
||||
if (rows.length < size) break
|
||||
}
|
||||
})
|
||||
|
||||
export const parts = fn(Identifier.schema("message"), async (messageID) => {
|
||||
const result = [] as MessageV2.Part[]
|
||||
for (const item of await Storage.list(["part", messageID])) {
|
||||
const read = await Storage.read<MessageV2.Part>(item)
|
||||
result.push(read)
|
||||
}
|
||||
result.sort((a, b) => (a.id > b.id ? 1 : -1))
|
||||
return result
|
||||
export const parts = fn(Identifier.schema("message"), async (message_id) => {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
|
||||
)
|
||||
return rows.map(
|
||||
(row) => ({ ...row.data, id: row.id, sessionID: row.session_id, messageID: row.message_id }) as MessageV2.Part,
|
||||
)
|
||||
})
|
||||
|
||||
export const get = fn(
|
||||
@@ -726,8 +781,11 @@ export namespace MessageV2 {
|
||||
messageID: Identifier.schema("message"),
|
||||
}),
|
||||
async (input): Promise<WithParts> => {
|
||||
const row = Database.use((db) => db.select().from(MessageTable).where(eq(MessageTable.id, input.messageID)).get())
|
||||
if (!row) throw new Error(`Message not found: ${input.messageID}`)
|
||||
const info = { ...row.data, id: row.id, sessionID: row.session_id } as MessageV2.Info
|
||||
return {
|
||||
info: await Storage.read<MessageV2.Info>(["message", input.sessionID, input.messageID]),
|
||||
info,
|
||||
parts: await parts(input.messageID),
|
||||
}
|
||||
},
|
||||
|
||||
@@ -63,17 +63,19 @@ export namespace SessionProcessor {
|
||||
if (value.id in reasoningMap) {
|
||||
continue
|
||||
}
|
||||
reasoningMap[value.id] = {
|
||||
const reasoningPart = {
|
||||
id: Identifier.ascending("part"),
|
||||
messageID: input.assistantMessage.id,
|
||||
sessionID: input.assistantMessage.sessionID,
|
||||
type: "reasoning",
|
||||
type: "reasoning" as const,
|
||||
text: "",
|
||||
time: {
|
||||
start: Date.now(),
|
||||
},
|
||||
metadata: value.providerMetadata,
|
||||
}
|
||||
reasoningMap[value.id] = reasoningPart
|
||||
await Session.updatePart(reasoningPart)
|
||||
break
|
||||
|
||||
case "reasoning-delta":
|
||||
@@ -81,7 +83,13 @@ export namespace SessionProcessor {
|
||||
const part = reasoningMap[value.id]
|
||||
part.text += value.text
|
||||
if (value.providerMetadata) part.metadata = value.providerMetadata
|
||||
if (part.text) await Session.updatePart({ part, delta: value.text })
|
||||
await Session.updatePartDelta({
|
||||
sessionID: part.sessionID,
|
||||
messageID: part.messageID,
|
||||
partID: part.id,
|
||||
field: "text",
|
||||
delta: value.text,
|
||||
})
|
||||
}
|
||||
break
|
||||
|
||||
@@ -288,17 +296,20 @@ export namespace SessionProcessor {
|
||||
},
|
||||
metadata: value.providerMetadata,
|
||||
}
|
||||
await Session.updatePart(currentText)
|
||||
break
|
||||
|
||||
case "text-delta":
|
||||
if (currentText) {
|
||||
currentText.text += value.text
|
||||
if (value.providerMetadata) currentText.metadata = value.providerMetadata
|
||||
if (currentText.text)
|
||||
await Session.updatePart({
|
||||
part: currentText,
|
||||
delta: value.text,
|
||||
})
|
||||
await Session.updatePartDelta({
|
||||
sessionID: currentText.sessionID,
|
||||
messageID: currentText.messageID,
|
||||
partID: currentText.id,
|
||||
field: "text",
|
||||
delta: value.text,
|
||||
})
|
||||
}
|
||||
break
|
||||
|
||||
|
||||
@@ -174,9 +174,7 @@ export namespace SessionPrompt {
|
||||
}
|
||||
if (permissions.length > 0) {
|
||||
session.permission = permissions
|
||||
await Session.update(session.id, (draft) => {
|
||||
draft.permission = permissions
|
||||
})
|
||||
await Session.setPermission({ sessionID: session.id, permission: permissions })
|
||||
}
|
||||
|
||||
if (input.noReply === true) {
|
||||
@@ -1946,21 +1944,16 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
],
|
||||
})
|
||||
const text = await result.text.catch((err) => log.error("failed to generate title", { error: err }))
|
||||
if (text)
|
||||
return Session.update(
|
||||
input.session.id,
|
||||
(draft) => {
|
||||
const cleaned = text
|
||||
.replace(/<think>[\s\S]*?<\/think>\s*/g, "")
|
||||
.split("\n")
|
||||
.map((line) => line.trim())
|
||||
.find((line) => line.length > 0)
|
||||
if (!cleaned) return
|
||||
if (text) {
|
||||
const cleaned = text
|
||||
.replace(/<think>[\s\S]*?<\/think>\s*/g, "")
|
||||
.split("\n")
|
||||
.map((line) => line.trim())
|
||||
.find((line) => line.length > 0)
|
||||
if (!cleaned) return
|
||||
|
||||
const title = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
|
||||
draft.title = title
|
||||
},
|
||||
{ touch: false },
|
||||
)
|
||||
const title = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
|
||||
return Session.setTitle({ sessionID: input.session.id, title })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,9 @@ import { Snapshot } from "../snapshot"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { Session } from "."
|
||||
import { Log } from "../util/log"
|
||||
import { splitWhen } from "remeda"
|
||||
import { Storage } from "../storage/storage"
|
||||
import { Database, eq } from "../storage/db"
|
||||
import { MessageTable, PartTable } from "./session.sql"
|
||||
import { Storage } from "@/storage/storage"
|
||||
import { Bus } from "../bus"
|
||||
import { SessionPrompt } from "./prompt"
|
||||
import { SessionSummary } from "./summary"
|
||||
@@ -65,13 +66,14 @@ export namespace SessionRevert {
|
||||
sessionID: input.sessionID,
|
||||
diff: diffs,
|
||||
})
|
||||
return Session.update(input.sessionID, (draft) => {
|
||||
draft.revert = revert
|
||||
draft.summary = {
|
||||
return Session.setRevert({
|
||||
sessionID: input.sessionID,
|
||||
revert,
|
||||
summary: {
|
||||
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
|
||||
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
|
||||
files: diffs.length,
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
return session
|
||||
@@ -83,39 +85,54 @@ export namespace SessionRevert {
|
||||
const session = await Session.get(input.sessionID)
|
||||
if (!session.revert) return session
|
||||
if (session.revert.snapshot) await Snapshot.restore(session.revert.snapshot)
|
||||
const next = await Session.update(input.sessionID, (draft) => {
|
||||
draft.revert = undefined
|
||||
})
|
||||
return next
|
||||
return Session.clearRevert(input.sessionID)
|
||||
}
|
||||
|
||||
export async function cleanup(session: Session.Info) {
|
||||
if (!session.revert) return
|
||||
const sessionID = session.id
|
||||
let msgs = await Session.messages({ sessionID })
|
||||
const msgs = await Session.messages({ sessionID })
|
||||
const messageID = session.revert.messageID
|
||||
const [preserve, remove] = splitWhen(msgs, (x) => x.info.id === messageID)
|
||||
msgs = preserve
|
||||
const preserve = [] as MessageV2.WithParts[]
|
||||
const remove = [] as MessageV2.WithParts[]
|
||||
let target: MessageV2.WithParts | undefined
|
||||
for (const msg of msgs) {
|
||||
if (msg.info.id < messageID) {
|
||||
preserve.push(msg)
|
||||
continue
|
||||
}
|
||||
if (msg.info.id > messageID) {
|
||||
remove.push(msg)
|
||||
continue
|
||||
}
|
||||
if (session.revert.partID) {
|
||||
preserve.push(msg)
|
||||
target = msg
|
||||
continue
|
||||
}
|
||||
remove.push(msg)
|
||||
}
|
||||
for (const msg of remove) {
|
||||
await Storage.remove(["message", sessionID, msg.info.id])
|
||||
Database.use((db) => db.delete(MessageTable).where(eq(MessageTable.id, msg.info.id)).run())
|
||||
await Bus.publish(MessageV2.Event.Removed, { sessionID: sessionID, messageID: msg.info.id })
|
||||
}
|
||||
const last = preserve.at(-1)
|
||||
if (session.revert.partID && last) {
|
||||
if (session.revert.partID && target) {
|
||||
const partID = session.revert.partID
|
||||
const [preserveParts, removeParts] = splitWhen(last.parts, (x) => x.id === partID)
|
||||
last.parts = preserveParts
|
||||
for (const part of removeParts) {
|
||||
await Storage.remove(["part", last.info.id, part.id])
|
||||
await Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
sessionID: sessionID,
|
||||
messageID: last.info.id,
|
||||
partID: part.id,
|
||||
})
|
||||
const removeStart = target.parts.findIndex((part) => part.id === partID)
|
||||
if (removeStart >= 0) {
|
||||
const preserveParts = target.parts.slice(0, removeStart)
|
||||
const removeParts = target.parts.slice(removeStart)
|
||||
target.parts = preserveParts
|
||||
for (const part of removeParts) {
|
||||
Database.use((db) => db.delete(PartTable).where(eq(PartTable.id, part.id)).run())
|
||||
await Bus.publish(MessageV2.Event.PartRemoved, {
|
||||
sessionID: sessionID,
|
||||
messageID: target.info.id,
|
||||
partID: part.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
await Session.update(sessionID, (draft) => {
|
||||
draft.revert = undefined
|
||||
})
|
||||
await Session.clearRevert(sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
88
packages/opencode/src/session/session.sql.ts
Normal file
88
packages/opencode/src/session/session.sql.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import { sqliteTable, text, integer, index, primaryKey } from "drizzle-orm/sqlite-core"
|
||||
import { ProjectTable } from "../project/project.sql"
|
||||
import type { MessageV2 } from "./message-v2"
|
||||
import type { Snapshot } from "@/snapshot"
|
||||
import type { PermissionNext } from "@/permission/next"
|
||||
import { Timestamps } from "@/storage/schema.sql"
|
||||
|
||||
type PartData = Omit<MessageV2.Part, "id" | "sessionID" | "messageID">
|
||||
type InfoData = Omit<MessageV2.Info, "id" | "sessionID">
|
||||
|
||||
export const SessionTable = sqliteTable(
|
||||
"session",
|
||||
{
|
||||
id: text().primaryKey(),
|
||||
project_id: text()
|
||||
.notNull()
|
||||
.references(() => ProjectTable.id, { onDelete: "cascade" }),
|
||||
parent_id: text(),
|
||||
slug: text().notNull(),
|
||||
directory: text().notNull(),
|
||||
title: text().notNull(),
|
||||
version: text().notNull(),
|
||||
share_url: text(),
|
||||
summary_additions: integer(),
|
||||
summary_deletions: integer(),
|
||||
summary_files: integer(),
|
||||
summary_diffs: text({ mode: "json" }).$type<Snapshot.FileDiff[]>(),
|
||||
revert: text({ mode: "json" }).$type<{ messageID: string; partID?: string; snapshot?: string; diff?: string }>(),
|
||||
permission: text({ mode: "json" }).$type<PermissionNext.Ruleset>(),
|
||||
...Timestamps,
|
||||
time_compacting: integer(),
|
||||
time_archived: integer(),
|
||||
},
|
||||
(table) => [index("session_project_idx").on(table.project_id), index("session_parent_idx").on(table.parent_id)],
|
||||
)
|
||||
|
||||
export const MessageTable = sqliteTable(
|
||||
"message",
|
||||
{
|
||||
id: text().primaryKey(),
|
||||
session_id: text()
|
||||
.notNull()
|
||||
.references(() => SessionTable.id, { onDelete: "cascade" }),
|
||||
...Timestamps,
|
||||
data: text({ mode: "json" }).notNull().$type<InfoData>(),
|
||||
},
|
||||
(table) => [index("message_session_idx").on(table.session_id)],
|
||||
)
|
||||
|
||||
export const PartTable = sqliteTable(
|
||||
"part",
|
||||
{
|
||||
id: text().primaryKey(),
|
||||
message_id: text()
|
||||
.notNull()
|
||||
.references(() => MessageTable.id, { onDelete: "cascade" }),
|
||||
session_id: text().notNull(),
|
||||
...Timestamps,
|
||||
data: text({ mode: "json" }).notNull().$type<PartData>(),
|
||||
},
|
||||
(table) => [index("part_message_idx").on(table.message_id), index("part_session_idx").on(table.session_id)],
|
||||
)
|
||||
|
||||
export const TodoTable = sqliteTable(
|
||||
"todo",
|
||||
{
|
||||
session_id: text()
|
||||
.notNull()
|
||||
.references(() => SessionTable.id, { onDelete: "cascade" }),
|
||||
content: text().notNull(),
|
||||
status: text().notNull(),
|
||||
priority: text().notNull(),
|
||||
position: integer().notNull(),
|
||||
...Timestamps,
|
||||
},
|
||||
(table) => [
|
||||
primaryKey({ columns: [table.session_id, table.position] }),
|
||||
index("todo_session_idx").on(table.session_id),
|
||||
],
|
||||
)
|
||||
|
||||
export const PermissionTable = sqliteTable("permission", {
|
||||
project_id: text()
|
||||
.primaryKey()
|
||||
.references(() => ProjectTable.id, { onDelete: "cascade" }),
|
||||
...Timestamps,
|
||||
data: text({ mode: "json" }).notNull().$type<PermissionNext.Ruleset>(),
|
||||
})
|
||||
@@ -90,12 +90,13 @@ export namespace SessionSummary {
|
||||
|
||||
async function summarizeSession(input: { sessionID: string; messages: MessageV2.WithParts[] }) {
|
||||
const diffs = await computeDiff({ messages: input.messages })
|
||||
await Session.update(input.sessionID, (draft) => {
|
||||
draft.summary = {
|
||||
await Session.setSummary({
|
||||
sessionID: input.sessionID,
|
||||
summary: {
|
||||
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
|
||||
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
|
||||
files: diffs.length,
|
||||
}
|
||||
},
|
||||
})
|
||||
await Storage.write(["session_diff", input.sessionID], diffs)
|
||||
Bus.publish(Session.Event.Diff, {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import z from "zod"
|
||||
import { Storage } from "../storage/storage"
|
||||
import { Database, eq, asc } from "../storage/db"
|
||||
import { TodoTable } from "./session.sql"
|
||||
|
||||
export namespace Todo {
|
||||
export const Info = z
|
||||
@@ -9,7 +10,6 @@ export namespace Todo {
|
||||
content: z.string().describe("Brief description of the task"),
|
||||
status: z.string().describe("Current status of the task: pending, in_progress, completed, cancelled"),
|
||||
priority: z.string().describe("Priority level of the task: high, medium, low"),
|
||||
id: z.string().describe("Unique identifier for the todo item"),
|
||||
})
|
||||
.meta({ ref: "Todo" })
|
||||
export type Info = z.infer<typeof Info>
|
||||
@@ -24,14 +24,33 @@ export namespace Todo {
|
||||
),
|
||||
}
|
||||
|
||||
export async function update(input: { sessionID: string; todos: Info[] }) {
|
||||
await Storage.write(["todo", input.sessionID], input.todos)
|
||||
export function update(input: { sessionID: string; todos: Info[] }) {
|
||||
Database.transaction((db) => {
|
||||
db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run()
|
||||
if (input.todos.length === 0) return
|
||||
db.insert(TodoTable)
|
||||
.values(
|
||||
input.todos.map((todo, position) => ({
|
||||
session_id: input.sessionID,
|
||||
content: todo.content,
|
||||
status: todo.status,
|
||||
priority: todo.priority,
|
||||
position,
|
||||
})),
|
||||
)
|
||||
.run()
|
||||
})
|
||||
Bus.publish(Event.Updated, input)
|
||||
}
|
||||
|
||||
export async function get(sessionID: string) {
|
||||
return Storage.read<Info[]>(["todo", sessionID])
|
||||
.then((x) => x || [])
|
||||
.catch(() => [])
|
||||
export function get(sessionID: string) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(TodoTable).where(eq(TodoTable.session_id, sessionID)).orderBy(asc(TodoTable.position)).all(),
|
||||
)
|
||||
return rows.map((row) => ({
|
||||
content: row.content,
|
||||
status: row.status,
|
||||
priority: row.priority,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user