From 46ba9c81703fc6e7db7e623a607eeaab94fcd00f Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Fri, 13 Mar 2026 16:43:41 +0530 Subject: [PATCH] perf(app): use cursor session history loading (#17329) --- .../global-sync/session-prefetch.test.ts | 49 ++++++++++++++++--- .../context/global-sync/session-prefetch.ts | 15 ++++++ packages/app/src/context/sync.tsx | 45 +++++++++++++---- packages/app/src/pages/layout.tsx | 16 +++--- 4 files changed, 102 insertions(+), 23 deletions(-) diff --git a/packages/app/src/context/global-sync/session-prefetch.test.ts b/packages/app/src/context/global-sync/session-prefetch.test.ts index f039b02ca..1ce833469 100644 --- a/packages/app/src/context/global-sync/session-prefetch.test.ts +++ b/packages/app/src/context/global-sync/session-prefetch.test.ts @@ -5,6 +5,7 @@ import { getSessionPrefetch, runSessionPrefetch, setSessionPrefetch, + shouldSkipSessionPrefetch, } from "./session-prefetch" describe("session prefetch", () => { @@ -16,11 +17,12 @@ describe("session prefetch", () => { directory: "/tmp/a", sessionID: "ses_1", limit: 200, + cursor: "abc", complete: false, at: 123, }) - expect(getSessionPrefetch("/tmp/a", "ses_1")).toEqual({ limit: 200, complete: false, at: 123 }) + expect(getSessionPrefetch("/tmp/a", "ses_1")).toEqual({ limit: 200, cursor: "abc", complete: false, at: 123 }) expect(getSessionPrefetch("/tmp/b", "ses_1")).toBeUndefined() clearSessionPrefetch("/tmp/a", ["ses_1"]) @@ -38,26 +40,57 @@ describe("session prefetch", () => { sessionID: "ses_2", task: async () => { calls += 1 - return { limit: 100, complete: true, at: 456 } + return { limit: 100, cursor: "next", complete: true, at: 456 } }, }) const [a, b] = await Promise.all([run(), run()]) expect(calls).toBe(1) - expect(a).toEqual({ limit: 100, complete: true, at: 456 }) - expect(b).toEqual({ limit: 100, complete: true, at: 456 }) + expect(a).toEqual({ limit: 100, cursor: "next", complete: true, at: 456 }) + expect(b).toEqual({ limit: 100, cursor: "next", complete: true, at: 456 }) }) test("clears a whole directory", () => { - setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_1", limit: 10, complete: true, at: 1 }) - setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_2", limit: 20, complete: false, at: 2 }) - setSessionPrefetch({ directory: "/tmp/e", sessionID: "ses_1", limit: 30, complete: true, at: 3 }) + setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_1", limit: 10, cursor: "a", complete: true, at: 1 }) + setSessionPrefetch({ directory: "/tmp/d", sessionID: "ses_2", limit: 20, cursor: "b", complete: false, at: 2 }) + setSessionPrefetch({ directory: "/tmp/e", sessionID: "ses_1", limit: 30, cursor: "c", complete: true, at: 3 }) clearSessionPrefetchDirectory("/tmp/d") expect(getSessionPrefetch("/tmp/d", "ses_1")).toBeUndefined() expect(getSessionPrefetch("/tmp/d", "ses_2")).toBeUndefined() - expect(getSessionPrefetch("/tmp/e", "ses_1")).toEqual({ limit: 30, complete: true, at: 3 }) + expect(getSessionPrefetch("/tmp/e", "ses_1")).toEqual({ limit: 30, cursor: "c", complete: true, at: 3 }) + }) + + test("refreshes stale first-page prefetched history", () => { + expect( + shouldSkipSessionPrefetch({ + message: true, + info: { limit: 200, cursor: "x", complete: false, at: 1 }, + chunk: 200, + now: 1 + 15_001, + }), + ).toBe(false) + }) + + test("keeps deeper or complete history cached", () => { + expect( + shouldSkipSessionPrefetch({ + message: true, + info: { limit: 400, cursor: "x", complete: false, at: 1 }, + chunk: 200, + now: 1 + 15_001, + }), + ).toBe(true) + + expect( + shouldSkipSessionPrefetch({ + message: true, + info: { limit: 120, complete: true, at: 1 }, + chunk: 200, + now: 1 + 15_001, + }), + ).toBe(true) }) }) diff --git a/packages/app/src/context/global-sync/session-prefetch.ts b/packages/app/src/context/global-sync/session-prefetch.ts index 10877b063..608561f85 100644 --- a/packages/app/src/context/global-sync/session-prefetch.ts +++ b/packages/app/src/context/global-sync/session-prefetch.ts @@ -4,10 +4,23 @@ export const SESSION_PREFETCH_TTL = 15_000 type Meta = { limit: number + cursor?: string complete: boolean at: number } +export function shouldSkipSessionPrefetch(input: { message: boolean; info?: Meta; chunk: number; now?: number }) { + if (input.message) { + if (!input.info) return true + if (input.info.complete) return true + if (input.info.limit > input.chunk) return true + } else { + if (!input.info) return false + } + + return (input.now ?? Date.now()) - input.info.at < SESSION_PREFETCH_TTL +} + const cache = new Map() const inflight = new Map>() const rev = new Map() @@ -53,11 +66,13 @@ export function setSessionPrefetch(input: { directory: string sessionID: string limit: number + cursor?: string complete: boolean at?: number }) { cache.set(key(input.directory, input.sessionID), { limit: input.limit, + cursor: input.cursor, complete: input.complete, at: input.at ?? Date.now(), }) diff --git a/packages/app/src/context/sync.tsx b/packages/app/src/context/sync.tsx index 3e3696924..9dc6623a7 100644 --- a/packages/app/src/context/sync.tsx +++ b/packages/app/src/context/sync.tsx @@ -32,6 +32,12 @@ const keyFor = (directory: string, id: string) => `${directory}\n${id}` const cmp = (a: string, b: string) => (a < b ? -1 : a > b ? 1 : 0) +function merge(a: readonly T[], b: readonly T[]) { + const map = new Map(a.map((item) => [item.id, item] as const)) + for (const item of b) map.set(item.id, item) + return [...map.values()].sort((x, y) => cmp(x.id, y.id)) +} + type OptimisticStore = { message: Record part: Record @@ -119,6 +125,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ const seen = new Map>() const [meta, setMeta] = createStore({ limit: {} as Record, + cursor: {} as Record, complete: {} as Record, loading: {} as Record, }) @@ -157,6 +164,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ for (const sessionID of sessionIDs) { const key = keyFor(directory, sessionID) delete draft.limit[key] + delete draft.cursor[key] delete draft.complete[key] delete draft.loading[key] } @@ -187,17 +195,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ evict(directory, setStore, stale) } - const fetchMessages = async (input: { client: typeof sdk.client; sessionID: string; limit: number }) => { + const fetchMessages = async (input: { + client: typeof sdk.client + sessionID: string + limit: number + before?: string + }) => { const messages = await retry(() => - input.client.session.messages({ sessionID: input.sessionID, limit: input.limit }), + input.client.session.messages({ sessionID: input.sessionID, limit: input.limit, before: input.before }), ) const items = (messages.data ?? []).filter((x) => !!x?.info?.id) const session = items.map((x) => x.info).sort((a, b) => cmp(a.id, b.id)) const part = items.map((message) => ({ id: message.info.id, part: sortParts(message.parts) })) + const cursor = messages.response.headers.get("x-next-cursor") ?? undefined return { session, part, - complete: session.length < input.limit, + cursor, + complete: !cursor, } } @@ -209,6 +224,8 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ setStore: Setter sessionID: string limit: number + before?: string + mode?: "replace" | "prepend" }) => { const key = keyFor(input.directory, input.sessionID) if (meta.loading[key]) return @@ -217,17 +234,22 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ await fetchMessages(input) .then((next) => { if (!tracked(input.directory, input.sessionID)) return + const [store] = globalSync.child(input.directory, { bootstrap: false }) + const cached = input.mode === "prepend" ? (store.message[input.sessionID] ?? []) : [] + const message = input.mode === "prepend" ? merge(cached, next.session) : next.session batch(() => { - input.setStore("message", input.sessionID, reconcile(next.session, { key: "id" })) + input.setStore("message", input.sessionID, reconcile(message, { key: "id" })) for (const p of next.part) { input.setStore("part", p.id, p.part) } - setMeta("limit", key, input.limit) + setMeta("limit", key, message.length) + setMeta("cursor", key, next.cursor) setMeta("complete", key, next.complete) setSessionPrefetch({ directory: input.directory, sessionID: input.sessionID, - limit: input.limit, + limit: message.length, + cursor: next.cursor, complete: next.complete, }) }) @@ -312,6 +334,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) { batch(() => { setMeta("limit", key, seeded.limit) + setMeta("cursor", key, seeded.cursor) setMeta("complete", key, seeded.complete) setMeta("loading", key, false) }) @@ -325,6 +348,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ if (seeded && store.message[sessionID] !== undefined && meta.limit[key] === undefined) { batch(() => { setMeta("limit", key, seeded.limit) + setMeta("cursor", key, seeded.cursor) setMeta("complete", key, seeded.complete) setMeta("loading", key, false) }) @@ -420,7 +444,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ if (store.message[sessionID] === undefined) return false if (meta.limit[key] === undefined) return false if (meta.complete[key]) return false - return true + return !!meta.cursor[key] }, loading(sessionID: string) { const key = keyFor(sdk.directory, sessionID) @@ -435,14 +459,17 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ const step = count ?? messagePageSize if (meta.loading[key]) return if (meta.complete[key]) return + const before = meta.cursor[key] + if (!before) return - const currentLimit = meta.limit[key] ?? messagePageSize await loadMessages({ directory, client, setStore, sessionID, - limit: currentLimit + step, + limit: step, + before, + mode: "prepend", }) }, }, diff --git a/packages/app/src/pages/layout.tsx b/packages/app/src/pages/layout.tsx index ed2221846..9b964b4b5 100644 --- a/packages/app/src/pages/layout.tsx +++ b/packages/app/src/pages/layout.tsx @@ -41,8 +41,8 @@ import { getSessionPrefetch, isSessionPrefetchCurrent, runSessionPrefetch, - SESSION_PREFETCH_TTL, setSessionPrefetch, + shouldSkipSessionPrefetch, } from "@/context/global-sync/session-prefetch" import { useNotification } from "@/context/notification" import { usePermission } from "@/context/permission" @@ -770,9 +770,11 @@ export default function Layout(props: ParentProps) { const next = items.map((x) => x.info).filter((m): m is Message => !!m?.id) const sorted = mergeByID([], next) const stale = markPrefetched(directory, sessionID) + const cursor = messages.response.headers.get("x-next-cursor") ?? undefined const meta = { - limit: prefetchChunk, - complete: sorted.length < prefetchChunk, + limit: sorted.length, + cursor, + complete: !cursor, at: Date.now(), } @@ -846,10 +848,12 @@ export default function Layout(props: ParentProps) { const [store] = globalSync.child(directory, { bootstrap: false }) const cached = untrack(() => { - if (store.message[session.id] === undefined) return false const info = getSessionPrefetch(directory, session.id) - if (!info) return false - return Date.now() - info.at < SESSION_PREFETCH_TTL + return shouldSkipSessionPrefetch({ + message: store.message[session.id] !== undefined, + info, + chunk: prefetchChunk, + }) }) if (cached) return