Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 72 additions & 30 deletions packages/opencode/src/cli/cmd/tui/context/sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { createSimpleContext } from "./helper"
import type { Snapshot } from "@/snapshot"
import { useExit } from "./exit"
import { useArgs } from "./args"
import { batch, onMount } from "solid-js"
import { batch, onCleanup, onMount } from "solid-js"
import { Log } from "@/util/log"
import type { Path } from "@opencode-ai/sdk"

Expand Down Expand Up @@ -104,6 +104,48 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({

const sdk = useSDK()

// Delta coalescing: accumulate deltas in a plain record (zero reactive overhead),
// flush to store via queueMicrotask after sdk.tsx's batch() completes.
// Collapses N deltas per part per flush into 1 setStore() call.
const pending: Record<string, Record<string, Record<string, string>>> = {}
let scheduled = false
let disposed = false

function flushDeltas() {
if (disposed) return
for (const messageID in pending) {
const parts = store.part[messageID]
if (!parts) {
delete pending[messageID]
continue
}
for (const partID in pending[messageID]) {
const result = Binary.search(parts, partID, (p) => p.id)
if (!result.found) {
delete pending[messageID][partID]
continue
}
for (const field in pending[messageID][partID]) {
const delta = pending[messageID][partID][field]
setStore(
"part",
messageID,
result.index,
field as keyof Part,
(prev: unknown) => (typeof prev === "string" ? prev : "") + delta,
)
}
delete pending[messageID][partID]
}
delete pending[messageID]
}
}

onCleanup(() => {
disposed = true
for (const key in pending) delete pending[key]
})

sdk.event.listen((e) => {
const event = e.details
switch (event.type) {
Expand Down Expand Up @@ -134,7 +176,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
const match = Binary.search(requests, request.id, (r) => r.id)
if (match.found) {
setStore("permission", request.sessionID, match.index, reconcile(request))
setStore("permission", request.sessionID, match.index, request)
break
}
setStore(
Expand Down Expand Up @@ -172,7 +214,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
const match = Binary.search(requests, request.id, (r) => r.id)
if (match.found) {
setStore("question", request.sessionID, match.index, reconcile(request))
setStore("question", request.sessionID, match.index, request)
break
}
setStore(
Expand Down Expand Up @@ -208,7 +250,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
case "session.updated": {
const result = Binary.search(store.session, event.properties.info.id, (s) => s.id)
if (result.found) {
setStore("session", result.index, reconcile(event.properties.info))
setStore("session", result.index, event.properties.info)
break
}
setStore(
Expand All @@ -233,7 +275,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
const result = Binary.search(messages, event.properties.info.id, (m) => m.id)
if (result.found) {
setStore("message", event.properties.info.sessionID, result.index, reconcile(event.properties.info))
setStore("message", event.properties.info.sessionID, result.index, event.properties.info)
break
}
setStore(
Expand Down Expand Up @@ -279,14 +321,16 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
break
}
case "message.part.updated": {
// Clear pending deltas — full update supersedes them
delete pending[event.properties.part.messageID]?.[event.properties.part.id]
const parts = store.part[event.properties.part.messageID]
if (!parts) {
setStore("part", event.properties.part.messageID, [event.properties.part])
break
}
const result = Binary.search(parts, event.properties.part.id, (p) => p.id)
if (result.found) {
setStore("part", event.properties.part.messageID, result.index, reconcile(event.properties.part))
setStore("part", event.properties.part.messageID, result.index, event.properties.part)
break
}
setStore(
Expand All @@ -300,24 +344,22 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}

case "message.part.delta": {
const parts = store.part[event.properties.messageID]
if (!parts) break
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (!result.found) break
setStore(
"part",
event.properties.messageID,
produce((draft) => {
const part = draft[result.index]
const field = event.properties.field as keyof typeof part
const existing = part[field] as string | undefined
;(part[field] as string) = (existing ?? "") + event.properties.delta
}),
)
const p = event.properties
const msg = (pending[p.messageID] ??= {})
const part = (msg[p.partID] ??= {})
part[p.field] = (part[p.field] ?? "") + p.delta
if (!scheduled) {
scheduled = true
queueMicrotask(() => {
scheduled = false
batch(() => flushDeltas())
})
}
break
}

case "message.part.removed": {
delete pending[event.properties.messageID]?.[event.properties.partID]
const parts = store.part[event.properties.messageID]
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (result.found)
Expand Down Expand Up @@ -388,31 +430,31 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
const sessions = responses[4]

batch(() => {
setStore("provider", reconcile(providers.providers))
setStore("provider", providers.providers)
setStore("provider_default", reconcile(providers.default))
setStore("provider_next", reconcile(providerList))
setStore("agent", reconcile(agents))
setStore("provider_next", providerList)
setStore("agent", agents)
setStore("config", reconcile(config))
if (sessions !== undefined) setStore("session", reconcile(sessions))
if (sessions !== undefined) setStore("session", sessions)
})
})
})
.then(() => {
if (store.status !== "complete") setStore("status", "partial")
// non-blocking
Promise.all([
...(args.continue ? [] : [sessionListPromise.then((sessions) => setStore("session", reconcile(sessions)))]),
sdk.client.command.list().then((x) => setStore("command", reconcile(x.data ?? []))),
sdk.client.lsp.status().then((x) => setStore("lsp", reconcile(x.data!))),
...(args.continue ? [] : [sessionListPromise.then((sessions) => setStore("session", sessions))]),
sdk.client.command.list().then((x) => setStore("command", x.data ?? [])),
sdk.client.lsp.status().then((x) => setStore("lsp", x.data!)),
sdk.client.mcp.status().then((x) => setStore("mcp", reconcile(x.data!))),
sdk.client.experimental.resource.list().then((x) => setStore("mcp_resource", reconcile(x.data ?? {}))),
sdk.client.formatter.status().then((x) => setStore("formatter", reconcile(x.data!))),
sdk.client.formatter.status().then((x) => setStore("formatter", x.data!)),
sdk.client.session.status().then((x) => {
setStore("session_status", reconcile(x.data!))
}),
sdk.client.provider.auth().then((x) => setStore("provider_auth", reconcile(x.data ?? {}))),
sdk.client.vcs.get().then((x) => setStore("vcs", reconcile(x.data))),
sdk.client.path.get().then((x) => setStore("path", reconcile(x.data!))),
sdk.client.vcs.get().then((x) => setStore("vcs", x.data)),
sdk.client.path.get().then((x) => setStore("path", x.data!)),
]).then(() => {
setStore("status", "complete")
})
Expand Down
Loading