Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion apps/desktop/src/main/terminal-host/session.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { beforeEach, describe, expect, it } from "bun:test";
import { afterEach, beforeEach, describe, expect, it, mock } from "bun:test";
import type { ChildProcess } from "node:child_process";
import { EventEmitter } from "node:events";
import path from "node:path";
Expand Down Expand Up @@ -219,3 +219,138 @@ describe("Terminal Host Session shell args", () => {
expect(writes.some((message) => message.includes('"hello"'))).toBe(true);
});
});

describe("Backpressure warning rate limiting (#$ISSUE_NUMBER)", () => {
let warnSpy: ReturnType<typeof mock>;
const originalWarn = console.warn;

beforeEach(() => {
warnSpy = mock((..._args: unknown[]) => {});
console.warn = warnSpy as typeof console.warn;
});

afterEach(() => {
console.warn = originalWarn;
});

function createSessionWithBackpressuredSocket() {
const session = new Session({
sessionId: "session-backpressure",
workspaceId: "workspace-1",
paneId: "pane-1",
tabId: "tab-1",
cols: 80,
rows: 24,
cwd: "/tmp",
shell: "/bin/bash",
});

// Create a fake socket whose write() always returns false (buffer full)
const socket = {
write(_message: string) {
return false;
},
once(_event: string, _cb: () => void) {
// never drain — keeps backpressure active
},
} as unknown as import("node:net").Socket;

// Directly add the socket as an attached client
const attachedClients = (
session as unknown as {
attachedClients: Map<
import("node:net").Socket,
{ socket: import("node:net").Socket }
>;
}
).attachedClients;
attachedClients.set(socket, { socket });

return { session, socket };
}

it("emits a warning on the first backpressure event", () => {
const { session } = createSessionWithBackpressuredSocket();

(
session as unknown as {
broadcastEvent: (
eventType: string,
payload: { type: "data"; data: string },
) => void;
}
).broadcastEvent("data", { type: "data", data: "x" });

const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter(
(call) =>
typeof call[0] === "string" &&
call[0].includes("Client socket buffer full"),
);
expect(backpressureWarns.length).toBe(1);
});

it("suppresses repeated warnings within the rate-limit window", () => {
const { session } = createSessionWithBackpressuredSocket();
const broadcast = (
session as unknown as {
broadcastEvent: (
eventType: string,
payload: { type: "data"; data: string },
) => void;
}
).broadcastEvent.bind(session);

// Fire many backpressure events in rapid succession
for (let i = 0; i < 1000; i++) {
broadcast("data", { type: "data", data: `chunk-${i}` });
}

const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter(
(call) =>
typeof call[0] === "string" &&
call[0].includes("Client socket buffer full"),
);

// Should emit exactly 1 warning (the initial one), not 1000
expect(backpressureWarns.length).toBe(1);
});

it("includes suppressed count when warning resumes after interval", () => {
const { session } = createSessionWithBackpressuredSocket();
const broadcast = (
session as unknown as {
broadcastEvent: (
eventType: string,
payload: { type: "data"; data: string },
) => void;
}
).broadcastEvent.bind(session);

// First call emits immediately
broadcast("data", { type: "data", data: "first" });

// Suppress a batch
for (let i = 0; i < 50; i++) {
broadcast("data", { type: "data", data: `suppressed-${i}` });
}

// Advance the internal timestamp past the rate-limit window
(
session as unknown as { backpressureWarnLastAt: number }
).backpressureWarnLastAt = Date.now() - 10_000;

// Next call should emit with suppressed count
broadcast("data", { type: "data", data: "after-interval" });

const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter(
(call) =>
typeof call[0] === "string" &&
call[0].includes("Client socket buffer full"),
);

expect(backpressureWarns.length).toBe(2);
const lastWarn = backpressureWarns[1]?.[0] as string;
expect(lastWarn).toContain("suppressed");
expect(lastWarn).toContain("50");
});
});
41 changes: 36 additions & 5 deletions apps/desktop/src/main/terminal-host/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ const ATTACH_FLUSH_TIMEOUT_MS = 500;
*/
const MAX_SUBPROCESS_STDIN_QUEUE_BYTES = 2_000_000;

/**
* Minimum interval (ms) between backpressure warnings per session.
* Prevents log flooding when a high-output pane keeps the socket buffer full.
*/
const BACKPRESSURE_WARN_INTERVAL_MS = 5_000;

/**
* How long to wait for the shell-ready marker before unblocking writes.
* 15s covers heavy setups like Nix-based devenv via direnv. On timeout,
Expand Down Expand Up @@ -127,6 +133,8 @@ export class Session {
private attachedClients: Map<Socket, AttachedClient> = new Map();
private clientSocketsWaitingForDrain: Set<Socket> = new Set();
private subprocessStdoutPaused = false;
private backpressureWarnLastAt = 0;
private backpressureWarnSuppressed = 0;
private lastAttachedAt: Date;
private exitCode: number | null = null;
private disposed = false;
Expand Down Expand Up @@ -1067,11 +1075,7 @@ export class Session {
try {
const canWrite = socket.write(message);
if (!canWrite) {
// Socket buffer full - data will be queued but may cause memory pressure
// In production, could track this and pause PTY output temporarily
console.warn(
`[Session ${this.sessionId}] Client socket buffer full, output may be delayed`,
);
this.warnBackpressure();
this.handleClientBackpressure(socket);
}
} catch {
Expand Down Expand Up @@ -1108,6 +1112,33 @@ export class Session {
this.subprocess.stdout.resume();
}

/**
* Rate-limited backpressure warning to prevent log flooding.
* Emits at most one warning per BACKPRESSURE_WARN_INTERVAL_MS,
* reporting how many warnings were suppressed since the last emission.
*/
private warnBackpressure(): void {
const now = Date.now();
if (now - this.backpressureWarnLastAt < BACKPRESSURE_WARN_INTERVAL_MS) {
this.backpressureWarnSuppressed++;
return;
}

const suppressed = this.backpressureWarnSuppressed;
this.backpressureWarnLastAt = now;
this.backpressureWarnSuppressed = 0;

if (suppressed > 0) {
console.warn(
`[Session ${this.sessionId}] Client socket buffer full, output may be delayed (${suppressed} similar warnings suppressed)`,
);
} else {
console.warn(
`[Session ${this.sessionId}] Client socket buffer full, output may be delayed`,
);
}
}

/**
* Get default shell for the platform
*/
Expand Down