diff --git a/apps/desktop/src/main/terminal-host/session.ts b/apps/desktop/src/main/terminal-host/session.ts index 24287b59393..08419125316 100644 --- a/apps/desktop/src/main/terminal-host/session.ts +++ b/apps/desktop/src/main/terminal-host/session.ts @@ -81,6 +81,37 @@ const EMULATOR_WRITE_QUEUE_LOW_WATERMARK_BYTES = 250_000; */ const SHELL_READY_TIMEOUT_MS = 15_000; +/** + * Coalesce data broadcasts to reduce per-event IPC overhead + * (JSON.stringify + socket.write) and downstream renderer xterm.write cost + * when the PTY emits high-frequency full-screen redraws (e.g. Codex TUI in + * Ratatui which rewrites the whole screen 30-60 times per second). + * + * Flushed when either the interval elapses or the buffered byte count + * exceeds the threshold, whichever comes first. Non-data events + * (exit/error) and boundary events (attach) force an immediate flush so + * event ordering and snapshot consistency are preserved. + * + * Disable by setting SUPERSET_TERMINAL_BROADCAST_COALESCE=0. + */ +const BROADCAST_COALESCE_INTERVAL_MS = 16; +const BROADCAST_COALESCE_MAX_BYTES = 131_072; +const BROADCAST_COALESCE_ENABLED = + process.env.SUPERSET_TERMINAL_BROADCAST_COALESCE !== "0"; + +/** + * Coalesce consecutive queued emulator chunks into a single emulator.write() + * call per drain iteration. For high-frequency producers like Codex/Ratatui + * the emulator queue accumulates many small chunks in a single tick; + * concatenating them reduces ANSI parser setup overhead and function-call + * churn. The MAX_CHUNK_CHARS cap (below) is still honored so we never grow + * a single write beyond what the emulator already tolerates. + * + * Disable by setting SUPERSET_TERMINAL_EMULATOR_COALESCE=0. + */ +const EMULATOR_WRITE_COALESCE_ENABLED = + process.env.SUPERSET_TERMINAL_EMULATOR_COALESCE !== "0"; + /** * Shell readiness lifecycle: * - `pending` — shell is initializing; user writes are buffered, escape sequences dropped @@ -172,6 +203,11 @@ export class Session { private emulatorWriteScheduled = false; private emulatorFlushWaiters: Array<() => void> = []; + // Broadcast data coalescing — see BROADCAST_COALESCE_* constants. + private pendingBroadcastChunks: string[] = []; + private pendingBroadcastBytes = 0; + private broadcastCoalesceTimer: ReturnType | null = null; + // Snapshot boundary tracking for concurrent attaches. private emulatorWriteProcessedItems = 0; private nextSnapshotBoundaryWaiterId = 1; @@ -373,11 +409,7 @@ export class Session { if (data.length === 0) break; this.enqueueEmulatorWrite(data); - - this.broadcastEvent("data", { - type: "data", - data, - } satisfies TerminalDataEvent); + this.queueBroadcastData(data); break; } @@ -386,6 +418,7 @@ export class Session { const signal = payload.length >= 8 ? payload.readInt32LE(4) : 0; this.exitCode = exitCode; + this.flushPendingBroadcastData(); this.broadcastEvent("exit", { type: "exit", exitCode, @@ -411,6 +444,7 @@ export class Session { errorMessage, ); + this.flushPendingBroadcastData(); this.broadcastEvent("error", { type: "error", error: errorMessage, @@ -430,6 +464,7 @@ export class Session { if (this.exitCode === null) { this.exitCode = exitCode; + this.flushPendingBroadcastData(); this.broadcastEvent("exit", { type: "exit", exitCode, @@ -495,6 +530,7 @@ export class Session { console.warn( `[Session ${this.sessionId}] stdin queue full (${this.subprocessStdinQueuedBytes} bytes), dropping frame`, ); + this.flushPendingBroadcastData(); this.broadcastEvent("error", { type: "error", error: "Write queue full - input dropped", @@ -619,11 +655,16 @@ export class Session { while (this.emulatorWriteQueue.length > 0) { if (performance.now() - start > budgetMs) break; - let chunk = this.emulatorWriteQueue[0]; - if (chunk.length > MAX_CHUNK_CHARS) { + const head = this.emulatorWriteQueue[0]; + + // Oversized head — split at MAX_CHUNK_CHARS (respecting surrogate + // pairs) and write a single slice. No coalescing possible here + // since the remainder must stay at the queue head for the next + // iteration to preserve FIFO order. + if (head.length > MAX_CHUNK_CHARS) { let splitAt = MAX_CHUNK_CHARS; - const prev = chunk.charCodeAt(splitAt - 1); - const next = chunk.charCodeAt(splitAt); + const prev = head.charCodeAt(splitAt - 1); + const next = head.charCodeAt(splitAt); if ( prev >= 0xd800 && prev <= 0xdbff && @@ -632,16 +673,42 @@ export class Session { ) { splitAt--; } - this.emulatorWriteQueue[0] = chunk.slice(splitAt); - chunk = chunk.slice(0, splitAt); + this.emulatorWriteQueue[0] = head.slice(splitAt); + const chunk = head.slice(0, splitAt); + this.emulatorWriteQueuedBytes -= Buffer.byteLength(chunk, "utf8"); + this.emulator.write(chunk); + continue; + } + + // Coalesce consecutive in-bounds items into a single write while + // staying under MAX_CHUNK_CHARS. Each consumed item still counts + // as one processed item so snapshot-boundary targets match the + // pre-coalescing item accounting used by flushToSnapshotBoundary. + if (EMULATOR_WRITE_COALESCE_ENABLED) { + let merged = ""; + let itemsConsumed = 0; + while ( + this.emulatorWriteQueue.length > 0 && + merged.length + this.emulatorWriteQueue[0].length <= MAX_CHUNK_CHARS + ) { + const nextChunk = this.emulatorWriteQueue.shift() as string; + merged += nextChunk; + itemsConsumed++; + } + + if (itemsConsumed === 0) break; // defensive — should not happen + + this.emulatorWriteProcessedItems += itemsConsumed; + this.resolveReachedSnapshotBoundaryWaiters(); + this.emulatorWriteQueuedBytes -= Buffer.byteLength(merged, "utf8"); + this.emulator.write(merged); } else { this.emulatorWriteQueue.shift(); this.emulatorWriteProcessedItems++; this.resolveReachedSnapshotBoundaryWaiters(); + this.emulatorWriteQueuedBytes -= Buffer.byteLength(head, "utf8"); + this.emulator.write(head); } - - this.emulatorWriteQueuedBytes -= Buffer.byteLength(chunk, "utf8"); - this.emulator.write(chunk); } this.maybeResumeSubprocessStdoutForEmulatorBackpressure(); @@ -786,6 +853,12 @@ export class Session { } throwIfAborted(signal); + // Drain any pending coalesced data to existing clients before the new + // client joins. Otherwise the new client would receive pre-attach + // bytes that are already captured in the snapshot below, causing + // duplicated output (double-advance of cursor, etc.) on the renderer. + this.flushPendingBroadcastData(); + const attachedClient: AttachedClient = { socket, attachedAt: Date.now(), @@ -1009,6 +1082,12 @@ export class Session { const waiters = this.emulatorFlushWaiters; this.emulatorFlushWaiters = []; for (const resolve of waiters) resolve(); + + // Flush before dropping the coalesce buffer — resolveShellReady can + // enqueue held scanner bytes during teardown (short-lived shells that + // exit before the ready marker completes), and clearing without + // flushing would silently drop them. + this.flushPendingBroadcastData(); } /** @@ -1039,10 +1118,7 @@ export class Session { // Flush held marker bytes — they weren't part of a full marker if (this.scanState.heldBytes.length > 0) { this.enqueueEmulatorWrite(this.scanState.heldBytes); - this.broadcastEvent("data", { - type: "data", - data: this.scanState.heldBytes, - } satisfies TerminalDataEvent); + this.queueBroadcastData(this.scanState.heldBytes); this.scanState.heldBytes = ""; } this.scanState.matchPos = 0; @@ -1054,6 +1130,64 @@ export class Session { } } + /** + * Buffer a data event for coalesced broadcast. See BROADCAST_COALESCE_* + * constants for rationale. Falls back to immediate broadcast when + * coalescing is disabled via env var or the session is already disposed. + */ + private queueBroadcastData(data: string): void { + if (data.length === 0) return; + + if (!BROADCAST_COALESCE_ENABLED || this.disposed) { + this.broadcastEvent("data", { + type: "data", + data, + } satisfies TerminalDataEvent); + return; + } + + this.pendingBroadcastChunks.push(data); + this.pendingBroadcastBytes += Buffer.byteLength(data, "utf8"); + + if (this.pendingBroadcastBytes >= BROADCAST_COALESCE_MAX_BYTES) { + this.flushPendingBroadcastData(); + return; + } + + if (!this.broadcastCoalesceTimer) { + this.broadcastCoalesceTimer = setTimeout(() => { + this.broadcastCoalesceTimer = null; + this.flushPendingBroadcastData(); + }, BROADCAST_COALESCE_INTERVAL_MS); + } + } + + /** + * Emit any buffered data chunks as a single merged broadcast. Safe to + * call when the buffer is empty. Must be called before any non-data + * event (exit/error) and before attaching a new client so ordering and + * snapshot consistency are preserved. + */ + private flushPendingBroadcastData(): void { + if (this.broadcastCoalesceTimer) { + clearTimeout(this.broadcastCoalesceTimer); + this.broadcastCoalesceTimer = null; + } + if (this.pendingBroadcastChunks.length === 0) return; + + const merged = + this.pendingBroadcastChunks.length === 1 + ? this.pendingBroadcastChunks[0] + : this.pendingBroadcastChunks.join(""); + this.pendingBroadcastChunks = []; + this.pendingBroadcastBytes = 0; + + this.broadcastEvent("data", { + type: "data", + data: merged, + } satisfies TerminalDataEvent); + } + /** * Broadcast an event to all attached clients with backpressure awareness. */