Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/bun.js/webcore/FileReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ fn isPulling(this: *const FileReader) bool {
pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Result {
array.ensureStillAlive();
defer array.ensureStillAlive();
const drained = this.drain();
var drained = this.drain();

if (drained.len > 0) {
log("onPull({d}) = {d}", .{ buffer.len, drained.len });
Expand All @@ -470,13 +470,17 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
this.pending_view = &.{};

if (buffer.len >= @as(usize, drained.len)) {
@memcpy(buffer[0..drained.len], drained.slice());
this.buffered.clearAndFree(bun.default_allocator);
const drained_len = drained.len;
@memcpy(buffer[0..drained_len], drained.slice());
// drain() moved ownership of the allocation into `drained` and
// left `this.buffered` / the reader buffer empty, so free
// `drained` here — freeing `this.buffered` would be a no-op.
drained.deinit(bun.default_allocator);

if (this.reader.isDone()) {
return .{ .into_array_and_done = .{ .value = array, .len = drained.len } };
return .{ .into_array_and_done = .{ .value = array, .len = drained_len } };
} else {
return .{ .into_array = .{ .value = array, .len = drained.len } };
return .{ .into_array = .{ .value = array, .len = drained_len } };
}
}

Expand Down
87 changes: 87 additions & 0 deletions test/js/bun/spawn/spawn-stdout-iterate-leak.fixture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Driven by spawn-stdout-iterate-leak.test.ts.
//
// Exercises the FileReader.onPull drain+memcpy branch: child_process stdout
// is a NativeReadable whose `_read()` calls `ptr.pull()` directly (no JS-side
// drain guard after the first read). With the Readable's highWaterMark set
// to CHUNK, a CHUNK-byte write that fulfils a pending pull via `.into_array`
// exactly fills the Node buffer, so `_read()` is not auto-rescheduled. The
// next write therefore lands in `onReadChunk` with no pending pull and is
// appended to `FileReader.buffered`. Draining the Node buffer via `.read()`
// then re-triggers `_read()` → `ptr.pull(CHUNK)` → `onPull`, which now sees
// `drain().len == CHUNK` and takes the memcpy branch.
//
// CHUNK = 32 KiB stays comfortably under the default 64 KiB pipe buffer so
// the round-trip through `cat` never blocks.

import { spawn } from "child_process";
import { once } from "events";

const MB = 1024 * 1024;
const CHUNK = 32768;

async function run(iters: number) {
const proc = spawn("cat", [], { stdio: ["pipe", "pipe", "ignore"] });
// Match the Node buffer threshold to CHUNK so one push makes it "full".
(proc.stdout as any)._readableState.highWaterMark = CHUNK;

const block = Buffer.alloc(CHUNK, 88);
const tick = () => new Promise<void>(resolve => setImmediate(resolve));

let total = 0;
// Prime: the first chunk is consumed by the one-time JS-side drain in
// NativeReadable's first _read().
proc.stdin!.write(block);
await once(proc.stdout!, "readable");
let c: Buffer | null;
while ((c = proc.stdout!.read()) !== null) total += c.length;
await tick();
await tick();

for (let i = 0; i < iters; i++) {
// First write fulfils the pending pull (into_array); second write lands
// in FileReader.buffered.
proc.stdin!.write(block);
await tick();
await tick();
proc.stdin!.write(block);
await tick();
await tick();
// Draining the Node buffer re-triggers _read → onPull with
// FileReader.buffered populated: the memcpy branch.
while ((c = proc.stdout!.read()) !== null) total += c.length;
await tick();
await tick();
}

proc.stdin!.end();
proc.stdout!.on("data", (d: Buffer) => void (total += d.length));
proc.stdout!.resume();
await once(proc, "close");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if (total !== (iters * 2 + 1) * CHUNK) {
throw new Error(`wrong total: got ${total}, expected ${(iters * 2 + 1) * CHUNK}`);
}
}

// A real leak grows RSS on every run; transient allocator growth plateaus.
// After a short warmup we take several equally-sized samples and report the
// delta from the first sample to the last — a leak adds ~ITERS×CHUNK per
// step, non-leaking allocator caching contributes (at most) once.
const ITERS = 1000;
const STEPS = 5;

await run(200);
Bun.gc(true);

const samples: number[] = [];
for (let i = 0; i < STEPS; i++) {
await run(ITERS);
Bun.gc(true);
samples.push(process.memoryUsage.rss());
}

console.log(
JSON.stringify({
samples: samples.map(s => s / MB),
delta: (samples[samples.length - 1] - samples[0]) / MB,
}),
);
52 changes: 52 additions & 0 deletions test/js/bun/spawn/spawn-stdout-iterate-leak.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, isASAN, isDebug, isWindows } from "harness";
import { join } from "path";

// Regression test for FileReader.onPull: after `drain()` moves the
// internally buffered data into a local ByteList and the data is memcpy'd
// into the JS-provided pull buffer, that ByteList must be freed. The old
// code freed `this.buffered` instead — but `drain()` had already emptied
// it, so the moved allocation was orphaned on every such pull.
//
// The measurement runs in a subprocess so RSS is isolated from the test
// harness, and several equally-sized runs are sampled so steady-state
// allocator growth (which plateaus) is distinguished from a real leak
// (which keeps climbing). See spawn-stdout-iterate-leak.fixture.ts for how
// the exact code path is reached.

// The leak is in the posix poll-reader path; Windows pipes go through
// libuv with different buffering.
//
// On release builds without ASAN, mimalloc recycles the orphaned 32 KiB
// blocks into later allocations of the same size class, so RSS growth is
// sub-linear and too close to allocator noise to threshold reliably.
// Under ASAN each leaked block is quarantined behind poisoned redzones and
// cannot be reused, so the leak shows up as clean linear RSS growth
// (~106-119 MB unfixed vs. <10 MB fixed over the sampled window). Debug
// builds always enable ASAN.
test.skipIf(isWindows || !(isDebug || isASAN))(
"FileReader.onPull frees the drained buffer after memcpy",
async () => {
await using proc = Bun.spawn({
cmd: [bunExe(), join(import.meta.dir, "spawn-stdout-iterate-leak.fixture.ts")],
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
stdin: "ignore",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
expect(stderr).toBe("");

const { samples, delta } = JSON.parse(stdout.trim()) as { samples: number[]; delta: number };
console.log(`RSS samples=[${samples.map(s => s.toFixed(1)).join(", ")}]MB delta=${delta.toFixed(1)}MB`);
expect(exitCode).toBe(0);

// Without the fix, each of the 4×1000 iterations between the first and
// last sample orphans a ~32 KiB allocation, so RSS climbs another
// ~65-120 MB (higher under ASAN). With the fix the samples plateau and
// the first-to-last delta is noise (<10 MB).
expect(delta).toBeLessThan(32);
},
// Debug+ASAN event-loop ticks are slow; release finishes in ~1s.
300_000,
);
Loading