Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 152 additions & 18 deletions apps/desktop/src/main/terminal-host/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,37 @@ const EMULATOR_WRITE_QUEUE_LOW_WATERMARK_BYTES = 250_000;
*/
const SHELL_READY_TIMEOUT_MS = 15_000;

/**
* Coalesce data broadcasts to reduce per-event IPC overhead
* (JSON.stringify + socket.write) and downstream renderer xterm.write cost
* when the PTY emits high-frequency full-screen redraws (e.g. Codex TUI in
* Ratatui which rewrites the whole screen 30-60 times per second).
*
* Flushed when either the interval elapses or the buffered byte count
* exceeds the threshold, whichever comes first. Non-data events
* (exit/error) and boundary events (attach) force an immediate flush so
* event ordering and snapshot consistency are preserved.
*
* Disable by setting SUPERSET_TERMINAL_BROADCAST_COALESCE=0.
*/
const BROADCAST_COALESCE_INTERVAL_MS = 16;
const BROADCAST_COALESCE_MAX_BYTES = 131_072;
const BROADCAST_COALESCE_ENABLED =
process.env.SUPERSET_TERMINAL_BROADCAST_COALESCE !== "0";

/**
* Coalesce consecutive queued emulator chunks into a single emulator.write()
* call per drain iteration. For high-frequency producers like Codex/Ratatui
* the emulator queue accumulates many small chunks in a single tick;
* concatenating them reduces ANSI parser setup overhead and function-call
* churn. The MAX_CHUNK_CHARS cap (below) is still honored so we never grow
* a single write beyond what the emulator already tolerates.
*
* Disable by setting SUPERSET_TERMINAL_EMULATOR_COALESCE=0.
*/
const EMULATOR_WRITE_COALESCE_ENABLED =
process.env.SUPERSET_TERMINAL_EMULATOR_COALESCE !== "0";

/**
* Shell readiness lifecycle:
* - `pending` — shell is initializing; user writes are buffered, escape sequences dropped
Expand Down Expand Up @@ -172,6 +203,11 @@ export class Session {
private emulatorWriteScheduled = false;
private emulatorFlushWaiters: Array<() => void> = [];

// Broadcast data coalescing — see BROADCAST_COALESCE_* constants.
private pendingBroadcastChunks: string[] = [];
private pendingBroadcastBytes = 0;
private broadcastCoalesceTimer: ReturnType<typeof setTimeout> | null = null;

// Snapshot boundary tracking for concurrent attaches.
private emulatorWriteProcessedItems = 0;
private nextSnapshotBoundaryWaiterId = 1;
Expand Down Expand Up @@ -373,11 +409,7 @@ export class Session {
if (data.length === 0) break;

this.enqueueEmulatorWrite(data);

this.broadcastEvent("data", {
type: "data",
data,
} satisfies TerminalDataEvent);
this.queueBroadcastData(data);
break;
}

Expand All @@ -386,6 +418,7 @@ export class Session {
const signal = payload.length >= 8 ? payload.readInt32LE(4) : 0;
this.exitCode = exitCode;

this.flushPendingBroadcastData();
this.broadcastEvent("exit", {
type: "exit",
exitCode,
Expand All @@ -411,6 +444,7 @@ export class Session {
errorMessage,
);

this.flushPendingBroadcastData();
this.broadcastEvent("error", {
type: "error",
error: errorMessage,
Expand All @@ -430,6 +464,7 @@ export class Session {
if (this.exitCode === null) {
this.exitCode = exitCode;

this.flushPendingBroadcastData();
this.broadcastEvent("exit", {
type: "exit",
exitCode,
Expand Down Expand Up @@ -495,6 +530,7 @@ export class Session {
console.warn(
`[Session ${this.sessionId}] stdin queue full (${this.subprocessStdinQueuedBytes} bytes), dropping frame`,
);
this.flushPendingBroadcastData();
this.broadcastEvent("error", {
type: "error",
error: "Write queue full - input dropped",
Expand Down Expand Up @@ -619,11 +655,16 @@ export class Session {
while (this.emulatorWriteQueue.length > 0) {
if (performance.now() - start > budgetMs) break;

let chunk = this.emulatorWriteQueue[0];
if (chunk.length > MAX_CHUNK_CHARS) {
const head = this.emulatorWriteQueue[0];

// Oversized head — split at MAX_CHUNK_CHARS (respecting surrogate
// pairs) and write a single slice. No coalescing possible here
// since the remainder must stay at the queue head for the next
// iteration to preserve FIFO order.
if (head.length > MAX_CHUNK_CHARS) {
let splitAt = MAX_CHUNK_CHARS;
const prev = chunk.charCodeAt(splitAt - 1);
const next = chunk.charCodeAt(splitAt);
const prev = head.charCodeAt(splitAt - 1);
const next = head.charCodeAt(splitAt);
if (
prev >= 0xd800 &&
prev <= 0xdbff &&
Expand All @@ -632,16 +673,42 @@ export class Session {
) {
splitAt--;
}
this.emulatorWriteQueue[0] = chunk.slice(splitAt);
chunk = chunk.slice(0, splitAt);
this.emulatorWriteQueue[0] = head.slice(splitAt);
const chunk = head.slice(0, splitAt);
this.emulatorWriteQueuedBytes -= Buffer.byteLength(chunk, "utf8");
this.emulator.write(chunk);
continue;
}

// Coalesce consecutive in-bounds items into a single write while
// staying under MAX_CHUNK_CHARS. Each consumed item still counts
// as one processed item so snapshot-boundary targets match the
// pre-coalescing item accounting used by flushToSnapshotBoundary.
if (EMULATOR_WRITE_COALESCE_ENABLED) {
let merged = "";
let itemsConsumed = 0;
while (
this.emulatorWriteQueue.length > 0 &&
merged.length + this.emulatorWriteQueue[0].length <= MAX_CHUNK_CHARS
) {
const nextChunk = this.emulatorWriteQueue.shift() as string;
merged += nextChunk;
itemsConsumed++;
}

if (itemsConsumed === 0) break; // defensive — should not happen

this.emulatorWriteProcessedItems += itemsConsumed;
this.resolveReachedSnapshotBoundaryWaiters();
this.emulatorWriteQueuedBytes -= Buffer.byteLength(merged, "utf8");
this.emulator.write(merged);
} else {
this.emulatorWriteQueue.shift();
this.emulatorWriteProcessedItems++;
this.resolveReachedSnapshotBoundaryWaiters();
this.emulatorWriteQueuedBytes -= Buffer.byteLength(head, "utf8");
this.emulator.write(head);
}

this.emulatorWriteQueuedBytes -= Buffer.byteLength(chunk, "utf8");
this.emulator.write(chunk);
}

this.maybeResumeSubprocessStdoutForEmulatorBackpressure();
Expand Down Expand Up @@ -786,6 +853,12 @@ export class Session {
}
throwIfAborted(signal);

// Drain any pending coalesced data to existing clients before the new
// client joins. Otherwise the new client would receive pre-attach
// bytes that are already captured in the snapshot below, causing
// duplicated output (double-advance of cursor, etc.) on the renderer.
this.flushPendingBroadcastData();

const attachedClient: AttachedClient = {
socket,
attachedAt: Date.now(),
Expand Down Expand Up @@ -1009,6 +1082,12 @@ export class Session {
const waiters = this.emulatorFlushWaiters;
this.emulatorFlushWaiters = [];
for (const resolve of waiters) resolve();

// Flush before dropping the coalesce buffer — resolveShellReady can
// enqueue held scanner bytes during teardown (short-lived shells that
// exit before the ready marker completes), and clearing without
// flushing would silently drop them.
this.flushPendingBroadcastData();
}

/**
Expand Down Expand Up @@ -1039,10 +1118,7 @@ export class Session {
// Flush held marker bytes — they weren't part of a full marker
if (this.scanState.heldBytes.length > 0) {
this.enqueueEmulatorWrite(this.scanState.heldBytes);
this.broadcastEvent("data", {
type: "data",
data: this.scanState.heldBytes,
} satisfies TerminalDataEvent);
this.queueBroadcastData(this.scanState.heldBytes);
this.scanState.heldBytes = "";
Comment on lines 1120 to 1122
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Broadcast held shell-ready bytes immediately on exit path

When a session exits before shell readiness completes, handleSubprocessExit() calls resolveShellReady("timed_out") and then resetProcessState(). In this block, held scanner bytes are now sent via queueBroadcastData, which defers emission behind the coalescing timer; resetProcessState() then clears the pending coalesce buffer, so those final bytes are never delivered to attached clients. This is a regression from the previous immediate broadcastEvent behavior and can drop terminal output for short-lived shells/commands that terminate during startup marker scanning.

Useful? React with 👍 / 👎.

}
this.scanState.matchPos = 0;
Expand All @@ -1054,6 +1130,64 @@ export class Session {
}
}

/**
* Buffer a data event for coalesced broadcast. See BROADCAST_COALESCE_*
* constants for rationale. Falls back to immediate broadcast when
* coalescing is disabled via env var or the session is already disposed.
*/
private queueBroadcastData(data: string): void {
if (data.length === 0) return;

if (!BROADCAST_COALESCE_ENABLED || this.disposed) {
this.broadcastEvent("data", {
type: "data",
data,
} satisfies TerminalDataEvent);
return;
}

this.pendingBroadcastChunks.push(data);
this.pendingBroadcastBytes += Buffer.byteLength(data, "utf8");

if (this.pendingBroadcastBytes >= BROADCAST_COALESCE_MAX_BYTES) {
this.flushPendingBroadcastData();
return;
}

if (!this.broadcastCoalesceTimer) {
this.broadcastCoalesceTimer = setTimeout(() => {
this.broadcastCoalesceTimer = null;
this.flushPendingBroadcastData();
}, BROADCAST_COALESCE_INTERVAL_MS);
}
}

/**
* Emit any buffered data chunks as a single merged broadcast. Safe to
* call when the buffer is empty. Must be called before any non-data
* event (exit/error) and before attaching a new client so ordering and
* snapshot consistency are preserved.
*/
private flushPendingBroadcastData(): void {
if (this.broadcastCoalesceTimer) {
clearTimeout(this.broadcastCoalesceTimer);
this.broadcastCoalesceTimer = null;
}
if (this.pendingBroadcastChunks.length === 0) return;

const merged =
this.pendingBroadcastChunks.length === 1
? this.pendingBroadcastChunks[0]
: this.pendingBroadcastChunks.join("");
this.pendingBroadcastChunks = [];
this.pendingBroadcastBytes = 0;

this.broadcastEvent("data", {
type: "data",
data: merged,
} satisfies TerminalDataEvent);
}

/**
* Broadcast an event to all attached clients with backpressure awareness.
*/
Expand Down
Loading