share: speed up share loads (#16165)

This commit is contained in:
Shoubhit Dash
2026-03-06 18:19:15 +05:30
committed by GitHub
parent eeeb21ff86
commit 1d9dcd2a27
7 changed files with 167 additions and 145 deletions

View File

@@ -1,10 +1,8 @@
import { FileDiff, Message, Model, Part, Session } from "@opencode-ai/sdk/v2"
import { fn } from "@opencode-ai/util/fn"
import { iife } from "@opencode-ai/util/iife"
import { Identifier } from "@opencode-ai/util/identifier"
import z from "zod"
import { Storage } from "./storage"
import { Binary } from "@opencode-ai/util/binary"
export namespace Share {
export const Info = z.object({
@@ -38,6 +36,81 @@ export namespace Share {
])
export type Data = z.infer<typeof Data>
type Snapshot = {
data: Data[]
}
type Compaction = {
event?: string
data: Data[]
}
function key(item: Data) {
switch (item.type) {
case "session":
return "session"
case "message":
return `message/${item.data.id}`
case "part":
return `part/${item.data.messageID}/${item.data.id}`
case "session_diff":
return "session_diff"
case "model":
return "model"
}
}
function merge(...items: Data[][]) {
const map = new Map<string, Data>()
for (const list of items) {
for (const item of list) {
map.set(key(item), item)
}
}
return Array.from(map.entries())
.sort(([a], [b]) => a.localeCompare(b))
.map(([, item]) => item)
}
async function readSnapshot(shareID: string) {
return (await Storage.read<Snapshot>(["share_snapshot", shareID]))?.data
}
async function writeSnapshot(shareID: string, data: Data[]) {
await Storage.write<Snapshot>(["share_snapshot", shareID], { data })
}
async function legacy(shareID: string) {
const compaction: Compaction = (await Storage.read<Compaction>(["share_compaction", shareID])) ?? {
data: [],
event: undefined,
}
const list = await Storage.list({
prefix: ["share_event", shareID],
before: compaction.event,
}).then((x) => x.toReversed())
if (list.length === 0) {
if (compaction.data.length > 0) await writeSnapshot(shareID, compaction.data)
return compaction.data
}
const next = merge(
compaction.data,
await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) =>
x.flatMap((item) => item ?? []),
),
)
await Promise.all([
Storage.write(["share_compaction", shareID], {
event: list.at(-1)?.at(-1),
data: next,
}),
writeSnapshot(shareID, next),
])
return next
}
export const create = fn(z.object({ sessionID: z.string() }), async (body) => {
const isTest = process.env.NODE_ENV === "test" || body.sessionID.startsWith("test_")
const info: Info = {
@@ -47,7 +120,7 @@ export namespace Share {
}
const exists = await get(info.id)
if (exists) throw new Errors.AlreadyExists(info.id)
await Storage.write(["share", info.id], info)
await Promise.all([Storage.write(["share", info.id], info), writeSnapshot(info.id, [])])
return info
})
@@ -60,8 +133,13 @@ export namespace Share {
if (!share) throw new Errors.NotFound(body.id)
if (share.secret !== body.secret) throw new Errors.InvalidSecret(body.id)
await Storage.remove(["share", body.id])
const list = await Storage.list({ prefix: ["share_data", body.id] })
for (const item of list) {
const groups = await Promise.all([
Storage.list({ prefix: ["share_snapshot", body.id] }),
Storage.list({ prefix: ["share_compaction", body.id] }),
Storage.list({ prefix: ["share_event", body.id] }),
Storage.list({ prefix: ["share_data", body.id] }),
])
for (const item of groups.flat()) {
await Storage.remove(item)
}
})
@@ -75,59 +153,13 @@ export namespace Share {
const share = await get(input.share.id)
if (!share) throw new Errors.NotFound(input.share.id)
if (share.secret !== input.share.secret) throw new Errors.InvalidSecret(input.share.id)
await Storage.write(["share_event", input.share.id, Identifier.descending()], input.data)
const data = (await readSnapshot(input.share.id)) ?? (await legacy(input.share.id))
await writeSnapshot(input.share.id, merge(data, input.data))
},
)
type Compaction = {
event?: string
data: Data[]
}
export async function data(shareID: string) {
console.log("reading compaction")
const compaction: Compaction = (await Storage.read<Compaction>(["share_compaction", shareID])) ?? {
data: [],
event: undefined,
}
console.log("reading pending events")
const list = await Storage.list({
prefix: ["share_event", shareID],
before: compaction.event,
}).then((x) => x.toReversed())
console.log("compacting", list.length)
if (list.length > 0) {
const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
for (const item of data) {
if (!item) continue
const key = (item: Data) => {
switch (item.type) {
case "session":
return "session"
case "message":
return `message/${item.data.id}`
case "part":
return `${item.data.messageID}/${item.data.id}`
case "session_diff":
return "session_diff"
case "model":
return "model"
}
}
const id = key(item)
const result = Binary.search(compaction.data, id, key)
if (result.found) {
compaction.data[result.index] = item
} else {
compaction.data.splice(result.index, 0, item)
}
}
compaction.event = list.at(-1)?.at(-1)
await Storage.write(["share_compaction", shareID], compaction)
}
return compaction.data
return (await readSnapshot(shareID)) ?? legacy(shareID)
}
export const syncOld = fn(