Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
79f271a
child_process: support piping subprocess stdio streams between spawns
robobun May 15, 2026
a9738b7
[autofix.ci] apply automated fixes
autofix-ci[bot] May 15, 2026
7125b90
rephrase 'TODO: ...' error references as plain text in comments
robobun May 15, 2026
e06994c
bake-codegen: pre-quote OVERLAY_CSS so builds work on old bun
robobun May 15, 2026
a710477
child_process: defer source-stream quiesce until after spawn succeeds
robobun May 15, 2026
34a658c
child_process: destroy handed-off stdio streams so 'close' fires
robobun May 15, 2026
73e4d41
child_process: don't panic on Windows when exposing pipe fd
robobun May 15, 2026
08be37f
[autofix.ci] apply automated fixes
autofix-ci[bot] May 15, 2026
46805c2
test/30831: drop setTimeout race from close-event assertion
robobun May 15, 2026
cdc2cdb
comments: note that bake-codegen now pre-quotes OVERLAY_CSS
robobun May 22, 2026
ea929bf
child_process: hoist fd reads in extractStreamFd for no-duplicate-con…
robobun Jun 21, 2026
a7d4305
test/30831: use test.skipIf(isWindows) instead of a misleadingly-name…
robobun Jun 21, 2026
e8b358d
child_process: fix two stdio stream hand-off edge cases
robobun Jun 21, 2026
d65cc88
[autofix.ci] apply automated fixes
autofix-ci[bot] Jun 21, 2026
fdc3229
comments: correct stale stdio-quiesce ordering and spawnSync references
robobun Jun 21, 2026
5476676
child_process: reject destroyed streams in extractStreamFd; fix test …
robobun Jun 21, 2026
4c9ea81
ci: retrigger
robobun Jun 21, 2026
cfa805a
[autofix.ci] apply automated fixes
autofix-ci[bot] Jun 21, 2026
4ddde40
child_process: distinct error for destroyed stream stdio; relocate tests
robobun Jun 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/codegen/bake-codegen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ async function run() {
side: JSON.stringify(side),
IS_ERROR_RUNTIME: String(file === "error"),
IS_BUN_DEVELOPMENT: String(!!debug),
OVERLAY_CSS: css("../runtime/bake/client/overlay.css", !!debug),
// JSON.stringify: the JSON lexer rejects `*`/`?`/`(`/`)` before
// `parse_env_json`'s auto-quote fallback runs (see #30679). Without
// pre-quoting here, a minified-CSS string starting with `*{...}`
// breaks `bun run build:debug` on older bun versions that don't
// yet have the lexer recovery.
OVERLAY_CSS: JSON.stringify(css("../runtime/bake/client/overlay.css", !!debug)),
Comment thread
claude[bot] marked this conversation as resolved.
},
minify: {
syntax: !debug,
Expand Down
9 changes: 9 additions & 0 deletions src/js/internal/fs/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,15 @@ function writableFromFileSink(fileSink: any) {
$assert(w[kWriteStreamFastPath] === true, "fast path not enabled");
w[kWriteStreamFastPath] = fileSink;
w.path = undefined;
// Expose the underlying pipe fd so this stream can be passed to
// `child_process.spawn` as stdio (e.g. `{ stdio: ['pipe', proc.stdin, ...] }`
// piping one subprocess's stdout into another subprocess's stdin) — node's
// equivalent `subprocess.stdin` is a `net.Socket` whose fd is discoverable,
// so `nodeToBun` expects `.fd` on stream stdio.
const fd = fileSink._getFd?.();
if (typeof fd === "number" && fd >= 0) {
w.fd = fd;
}
Comment thread
robobun marked this conversation as resolved.
Comment thread
claude[bot] marked this conversation as resolved.
return w;
}

Expand Down
18 changes: 18 additions & 0 deletions src/js/internal/streams/native-readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface NativePtr {
pull: (view: any, closer: any) => any;
updateRef: (ref: boolean) => void;
cancel: (error: any) => void;
getFd: () => number;
}

let debugId = 0;
Expand All @@ -68,6 +69,18 @@ function constructNativeReadable(readableStream: ReadableStream, options): Nativ
stream[kHasResized] = !dynamicallyAdjustChunkSize();
stream[kCloseState] = [false];

// Expose the underlying pipe fd (when available) so this stream can be
// passed to `child_process.spawn` as stdio — for example `spawn(..., {
// stdio: [proc.stdout, 'pipe', 'inherit'] })` to pipe one subprocess's
// stdout into another's stdin. `nodeToBun` extracts `.fd` from node-ish
// stream stdio; Node's equivalent is `subprocess.stdout._handle.fd`.
if (typeof bunNativePtr?.getFd === "function") {
const fd = bunNativePtr.getFd();
if (typeof fd === "number" && fd >= 0) {
stream.fd = fd;
}
}
Comment thread
robobun marked this conversation as resolved.

const highWaterMark = options.highWaterMark;
stream[kHighWaterMark] = typeof highWaterMark === "number" ? highWaterMark : 256 * 1024;

Expand Down Expand Up @@ -232,6 +245,11 @@ function destroy(this: NativeReadable, error: any, cb: () => void) {
if (ptr) {
ptr.cancel(error);
}
// `ptr.cancel` closes the underlying pipe fd, so clear the cached `fd` that
// `constructNativeReadable` exposed for stdio hand-off — otherwise a
// destroyed stream still reports a stale (closed or kernel-reused) fd to
// `child_process.spawn`. Mirrors the WriteStream fast-path `close()`.
this.fd = null;
if (cb) {
process.nextTick(cb);
}
Expand Down
168 changes: 148 additions & 20 deletions src/js/node/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ var Uint8ArrayPrototypeIncludes = Uint8Array.prototype.includes;

const MAX_BUFFER = 1024 * 1024;
const kFromNode = Symbol("kFromNode");
// Marker applied to a readable stream whose pipe fd has been handed off to
// another subprocess's stdio. Used by `#handleOnExit`'s drain logic to skip
// auto-resuming such streams — mirrors Node's `kIsUsedAsStdio` (see
// lib/internal/child_process.js). Without this, our own `PipeReader` would
// race the child for the pipe's bytes and the child would usually lose.
const kIsUsedAsStdio = Symbol("kIsUsedAsStdio");

// Pass DEBUG_CHILD_PROCESS=1 to enable debug output
if ($debug) {
Expand Down Expand Up @@ -532,7 +538,10 @@ function spawnSync(file, args, options) {
options.killSignal = sanitizeKillSignal(options.killSignal);

const stdio = options.stdio || "pipe";
const bunStdio = getBunStdioFromOptions(stdio);
// Sync path: only the `bunStdio` fd mapping matters. Unlike async `spawn`,
// the source streams are not quiesced (see below at the `Bun.spawnSync`
// call), so `streamsToQuiesce` is intentionally ignored here.
const { bunStdio } = getBunStdioFromOptions(stdio);

var { input } = options;
if (input) {
Expand Down Expand Up @@ -573,6 +582,12 @@ function spawnSync(file, args, options) {
killSignal: options.killSignal,
maxBuffer: options.maxBuffer,
});
// No quiescing on the sync path. `Bun.spawnSync` blocks the JS thread
// until the child exits, so the parent's reader can't race it, and this
// line only runs once the child is already gone. Calling
// `markStreamsAsStdio` here would have no upside and its irreversible
// `setFlowing(false)` would permanently brick the source stream (e.g.
// `process.stdin`). Node likewise does not mark streams in `spawnSync`.
} catch (err) {
error = err;
stdout = null;
Expand Down Expand Up @@ -1136,13 +1151,28 @@ class ChildProcess extends EventEmitter {
if (stdout === undefined) {
this.#stdout = this.#getBunSpawnIo(1, this.#encoding, true);
} else if (stdout && this.#stdioOptions[1] === "pipe" && !stdout?.destroyed) {
stdout.resume?.();
if (stdout[kIsUsedAsStdio]) {
// The fd was handed to another child; we marked this stream paused +
// `updateRef(false)` in `markStreamsAsStdio`, which unregisters the
// native FilePoll — so the stream will never see EOF on its own and
// never emit `'close'`. Without `'close'` the `#closesNeeded` tally
// stalls one short and we'd never emit `this`'s `'close'` either.
// `destroy()` fires `'close'` synchronously-ish and releases the
// now-useless parent-side read fd.
stdout.destroy?.();
} else {
stdout.resume?.();
}
}

if (stderr === undefined) {
this.#stderr = this.#getBunSpawnIo(2, this.#encoding, true);
} else if (stderr && this.#stdioOptions[2] === "pipe" && !stderr?.destroyed) {
stderr.resume?.();
if (stderr[kIsUsedAsStdio]) {
stderr.destroy?.();
} else {
stderr.resume?.();
}
}
}

Expand Down Expand Up @@ -1350,7 +1380,7 @@ class ChildProcess extends EventEmitter {
const serialization = options.serialization || "json";

const stdio = options.stdio || ["pipe", "pipe", "pipe"];
const bunStdio = getBunStdioFromOptions(stdio);
const { bunStdio, streamsToQuiesce } = getBunStdioFromOptions(stdio);
// Extra "pipe" slots (i >= 3) are wrapped in a net.Socket by
// #getBunSpawnIo, which hands the fd to usockets (usockets closes it on
// socket close). Use Bun.spawn's "socket-fd" so the parent end is stored
Expand Down Expand Up @@ -1437,6 +1467,13 @@ class ChildProcess extends EventEmitter {

$debug("ChildProcess: spawn", this.pid, spawnargs);

// Only quiesce source streams after spawn has taken over the fd —
// see `markStreamsAsStdio`. Doing this before spawn (or running it on
// the catch path below) would leave the source stream permanently
// stuck if spawn throws, since `setFlowing(false)` has no
// user-recoverable counterpart.
if (streamsToQuiesce.length > 0) markStreamsAsStdio(streamsToQuiesce);

process.nextTick(() => {
this.emit("spawn");
});
Expand Down Expand Up @@ -1664,7 +1701,11 @@ const nodeToBunLookup = {
ipc: "ipc",
};

function nodeToBun(item: string, index: number): string | number | null | NodeJS.TypedArray | ArrayBufferView {
function nodeToBun(
item: any,
index: number,
streamsToQuiesce: any[],
): string | number | null | NodeJS.TypedArray | ArrayBufferView {
// If not defined, use the default.
// For stdin/stdout/stderr, it's pipe. For others, it's ignore.
if (item == null) {
Expand All @@ -1676,19 +1717,25 @@ function nodeToBun(item: string, index: number): string | number | null | NodeJS
return item;
}
if (isNodeStreamReadable(item)) {
const itemFd = Object.hasOwn(item, "fd") ? item.fd : undefined;
if (typeof itemFd === "number") return itemFd;
const handle = item._handle;
const handleFd = handle ? handle.fd : undefined;
if (typeof handleFd === "number") return handleFd;
const fd = extractStreamFd(item);
if (fd !== undefined) {
streamsToQuiesce.push(item);
return fd;
}
if (item.destroyed === true) {
throw new Error(`Cannot use a destroyed stream as stdio[${index}]`);
}
throw new Error(`TODO: stream.Readable stdio @ ${index}`);
}
if (isNodeStreamWritable(item)) {
const itemFd = Object.hasOwn(item, "fd") ? item.fd : undefined;
if (typeof itemFd === "number") return itemFd;
const handle = item._handle;
const handleFd = handle ? handle.fd : undefined;
if (typeof handleFd === "number") return handleFd;
const fd = extractStreamFd(item);
if (fd !== undefined) {
streamsToQuiesce.push(item);
return fd;
}
Comment thread
robobun marked this conversation as resolved.
if (item.destroyed === true) {
throw new Error(`Cannot use a destroyed stream as stdio[${index}]`);
}
throw new Error(`TODO: stream.Writable stdio @ ${index}`);
Comment thread
robobun marked this conversation as resolved.
}
const result = nodeToBunLookup[item];
Expand All @@ -1698,6 +1745,83 @@ function nodeToBun(item: string, index: number): string | number | null | NodeJS
return result;
}

/**
* Extract the underlying pipe/file fd from a Node-ish stream suitable for
* forwarding to `Bun.spawn` as `stdio`.
* - `item.fd` is already a number — our own `WriteStream` / `NativeReadable`
* set `.fd` from the FileSink / ReadableStreamSource; Node's own
* `fs.ReadStream` / `fs.WriteStream` set it after open.
* - `item._handle.fd` is a number — Node's `net.Socket`-backed subprocess
* streams expose the pipe fd here; we don't emit this but accept it.
* A destroyed stream is rejected up front: its fd has been (or is about to be)
* closed, and not every destroy path clears the cached `.fd` synchronously
* (the `WriteStream` fast-path only nulls it in `close()`, which the async
* `_destroy` branch skips when the sink still has buffered bytes). Forwarding
* such an fd would `dup2` a stale (possibly kernel-reused) descriptor into the
* child, so guarding here covers both the readable and writable sides.
* Returns `undefined` when no fd is available so the caller can raise the
Comment thread
robobun marked this conversation as resolved.
* stream-stdio-unsupported error.
*/
function extractStreamFd(item: any): number | undefined {
if (item.destroyed === true) return undefined;
if (Object.hasOwn(item, "fd")) {
const fd = item.fd;
if (typeof fd === "number") return fd;
}
const handle = item._handle;
if (handle) {
const fd = handle.fd;
if (typeof fd === "number") return fd;
}
return undefined;
}

/**
* Quiesce source streams whose fds have been inherited by a child. MUST be
* invoked AFTER `Bun.spawn` returns successfully: the
* `$bunNativePtr.setFlowing(false)` side-effect is sticky at the native
* layer with no user-recoverable counterpart, so running it when spawn
* then throws would leave the source stream permanently stuck. The sync
* path deliberately does not call this (see the comment at the
* `Bun.spawnSync` call site); Node likewise does not mark streams in
* `spawnSync`.
*
* Mirrors Node's `getValidStdio` post-spawn hook (`readStop()` + `pause()`
* + `kIsUsedAsStdio` tag in `lib/internal/child_process.js`):
* - `$bunNativePtr?.setFlowing(false)` / `.updateRef(false)` stop Bun's
* internal `FileReader` / `BufferedReader` from issuing further
* `read(2)`s on the pipe fd — the equivalent of libuv's `readStop()`.
* - `.pause()` clears the consumer-side flowing flag so `_read` won't
* call `pull` again.
* - The `kIsUsedAsStdio` tag tells `#handleOnExit` to skip the
* `stdout.resume?.()` that would otherwise resurrect reading right as
* the child starts.
*/
function markStreamsAsStdio(streams: any[]) {
for (const item of streams) {
item[kIsUsedAsStdio] = true;
const ptr = item.$bunNativePtr;
if (ptr) {
try {
ptr.setFlowing?.(false);
} catch {}
try {
ptr.updateRef?.(false);
} catch {}
}
if (typeof item.pause === "function") {
try {
item.pause();
} catch {}
}
const rs = item._readableState;
if (rs) {
rs.reading = false;
rs.flowing = false;
}
}
}

/**
* Safer version of `item instance of node:stream.Readable`.
*
Expand Down Expand Up @@ -1739,7 +1863,10 @@ function fdToStdioName(fd: number) {
}
}

function getBunStdioFromOptions(stdio) {
function getBunStdioFromOptions(stdio): {
bunStdio: (string | number | null | NodeJS.TypedArray | ArrayBufferView)[];
streamsToQuiesce: any[];
} {
const normalizedStdio = normalizeStdio(stdio);
if (normalizedStdio.filter(v => v === "ipc").length > 1) throw $ERR_IPC_ONE_PIPE();
// Node options:
Expand All @@ -1748,7 +1875,7 @@ function getBunStdioFromOptions(stdio) {
// overlapped -- same as pipe on Unix based systems
// inherit -- 'inherit': equivalent to ['inherit', 'inherit', 'inherit'] or [0, 1, 2]
// ignore -- > /dev/null, more or less same as null option for Bun.spawn stdio
// TODO: Stream -- use this stream
// Stream -- handled by `nodeToBun` via fd extraction + `streamsToQuiesce`.
// number -- used as FD
// null, undefined: Use default value. Not same as ignore, which is Bun.spawn null.
// null/undefined: For stdio fds 0, 1, and 2 (in other words, stdin, stdout, and stderr) a pipe is created. For fd 3 and up, the default is 'ignore'
Expand All @@ -1763,9 +1890,10 @@ function getBunStdioFromOptions(stdio) {
// overlapped -> pipe
// ignore -> null
// inherit -> inherit (stdin/stdout/stderr)
// Stream -> throw err for now
const bunStdio = normalizedStdio.map(nodeToBun);
return bunStdio;
// Stream -> extract its fd and record the stream for post-spawn quiesce.
const streamsToQuiesce: any[] = [];
const bunStdio = normalizedStdio.map((item, i) => nodeToBun(item, i, streamsToQuiesce));
return { bunStdio, streamsToQuiesce };
}

function normalizeStdio(stdio): string[] {
Expand Down
3 changes: 2 additions & 1 deletion src/parsers/json_lexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,8 @@ where
// `JSONLikeParser::parse_expr`'s auto-quote fallback rescue an
// unquoted value that starts with one — e.g. a `Bun.build`
// `define:` whose value is a raw minified CSS string starting
// with `*{...}` (`bake-codegen.ts`'s `OVERLAY_CSS`). Erroring
// with `*{...}` (originally motivated by `bake-codegen.ts`'s
// `OVERLAY_CSS`, now pre-`JSON.stringify`d there). Erroring
// here aborts `Lexer::init` before `parse_env_json` gets a
// chance to auto-quote.
//
Expand Down
4 changes: 4 additions & 0 deletions src/runtime/api/streams.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ function source(name) {
isClosed: {
getter: "getIsClosedFromJS",
},
getFd: {
fn: "getFdFromJS",
length: 0,
},
...(name !== "File"
? // Buffered versions
// not implemented in File, yet.
Expand Down
32 changes: 32 additions & 0 deletions src/runtime/webcore/FileReader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,38 @@ impl readable_stream::SourceContext for FileReader {
fn set_flowing(&mut self, flag: bool) {
Self::set_flowing(self, flag)
}
fn get_fd(&self) -> i64 {
// Windows: the pipe handle is system-kind (`Fd::from_system(pipe.fd())`
// in `bun_io::PipeReader::WindowsBufferedReader::Source::Pipe`), and
// `Fd::uv()` panics for non-stdio system HANDLEs. Until we have a
// safe handle→uv-fd round-trip (`makeLibUVOwned`-style) for inherited
// subprocess stdio on Windows, return `-1` so `constructNativeReadable`
// sees no fd and `nodeToBun`'s stream-stdio path falls back to the
// unsupported-stdio error instead of crashing the parent on any piped
// spawn.
#[cfg(windows)]
{
return -1;
}
// POSIX path: the `fd` field is only populated after `on_start`
// resolves the lazy blob/path; before that (and for pipe-backed
// streams constructed via `ReadableStream::from_pipe`) the real fd
// lives in the buffered reader's pipe handle instead. Prefer the
// pipe handle first so the subprocess stdout/stderr path yields the
// pipe fd immediately.
#[cfg(not(windows))]
{
let reader_fd = self.reader().get_fd();
if reader_fd != Fd::INVALID {
return reader_fd.uv() as i64;
}
let field_fd = self.fd.get();
if field_fd != Fd::INVALID {
return field_fd.uv() as i64;
}
-1
}
}
Comment thread
robobun marked this conversation as resolved.
// toBufferedValue: null
}

Expand Down
Loading
Loading