Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions src/runtime/webcore/Blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,21 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}

// If the body is backed by a ReadableStream, pipe it to the file.
// Without this, we'd only install an `onReceiveValue` callback and
// never actually read from the stream, so the promise would never
// settle (e.g. `Bun.write(path, new Response(req.body))`).
// https://github.com/oven-sh/bun/issues/13237
if (response.getBodyReadableStream(globalThis) orelse bodyValue.Locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
defer destination_blob.detach();
return destination_blob.pipeReadableStreamToBlob(globalThis, readable, options.extra_options);
}

var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
Expand Down Expand Up @@ -1640,6 +1655,21 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}

// If the body is backed by a ReadableStream, pipe it to the file.
// Without this, we'd only install an `onReceiveValue` callback and
// never actually read from the stream, so the promise would never
// settle (e.g. `Bun.write(path, req)` after accessing `req.body`).
// https://github.com/oven-sh/bun/issues/13237
if (request.getBodyReadableStream(globalThis) orelse locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
defer destination_blob.detach();
return destination_blob.pipeReadableStreamToBlob(globalThis, readable, options.extra_options);
}

var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
Expand Down Expand Up @@ -2551,20 +2581,21 @@ pub fn onFileStreamResolveRequestStream(globalThis: *jsc.JSGlobalObject, callfra
var args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(FileStreamWrapper);
defer this.deinit();
const written = this.sink.written;
var strong = this.readable_stream_ref;
defer strong.deinit();
this.readable_stream_ref = .{};
if (strong.get(globalThis)) |stream| {
stream.done(globalThis);
}
try this.promise.resolve(globalThis, jsc.JSValue.jsNumber(0));
try this.promise.resolve(globalThis, jsc.JSValue.jsNumber(written));
return .js_undefined;
}

pub fn onFileStreamRejectRequestStream(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(FileStreamWrapper);
defer this.sink.deref();
defer this.deinit();
const err = args.ptr[0];

var strong = this.readable_stream_ref;
Expand Down Expand Up @@ -2631,11 +2662,14 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re
const path = pathlike.path.sliceZ(&file_path);
switch (bun.sys.open(
path,
bun.O.WRONLY | bun.O.CREAT | bun.O.NONBLOCK,
bun.O.WRONLY | bun.O.CREAT | bun.O.TRUNC | bun.O.NONBLOCK,
write_permissions,
)) {
.result => |result| {
break :brk result;
.result => |result| switch (result.makeLibUVOwnedForSyscall(.open, .close_on_fail)) {
.result => |uv_fd| break :brk uv_fd,
.err => |err| {
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, try err.withPath(path).toJS(globalThis));
},
},
.err => |err| {
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, try err.withPath(path).toJS(globalThis));
Expand Down Expand Up @@ -2737,8 +2771,10 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re

assignment_result.ensureStillAlive();

// assert that it was updated
bun.assert(!signal.isDead());
// Note: signal may still be dead if readStreamIntoSink completed
// synchronously (readMany() returned done:true without awaiting).
// In that case $startDirectStream is never called, which is fine
// because the stream has already been fully consumed via sink.end().

if (assignment_result.toError()) |err| {
file_sink.deref();
Expand Down Expand Up @@ -2769,9 +2805,10 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re
return promise_value;
},
.fulfilled => {
const written = file_sink.written;
file_sink.deref();
readable_stream.done(globalThis);
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(0));
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(written));
},
.rejected => {
file_sink.deref();
Expand All @@ -2789,9 +2826,10 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, assignment_result);
}
}
const written = file_sink.written;
file_sink.deref();

return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(0));
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(written));
}

pub fn getWriter(
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/webcore/FileSink.zig
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
close: bool = false,
mode: bun.Mode = 0o664,

pub fn flags(this: *const Options) i32 {
_ = this;

return bun.O.NONBLOCK | bun.O.CLOEXEC | bun.O.CREAT | bun.O.WRONLY;
var f: i32 = bun.O.NONBLOCK | bun.O.CLOEXEC | bun.O.CREAT | bun.O.WRONLY;
if (this.truncate) f |= bun.O.TRUNC;
return f;

Check failure on line 71 in src/runtime/webcore/FileSink.zig

View check run for this annotation

Claude / Claude Code Review

FileSink.Options.flags() O_TRUNC change affects Bun.file().writer(), creates POSIX/Windows divergence

Honoring `truncate` here is correct for `pipeReadableStreamToBlob`, but `flags()` is also reached from `Bun.file(path).writer()` on POSIX (Blob.zig:3008-3019 → `sink.start()` → `setup()` → `openForWriting(..., options.flags(), ...)`), so that API now opens with `O_TRUNC` where it previously did not. The Windows `getWriter` branch (Blob.zig:~2935) still opens with `WRONLY | CREAT | NONBLOCK` and bypasses `flags()`, so this introduces a POSIX-vs-Windows divergence for `Bun.file().writer()` — worth
Comment on lines 68 to +71

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Honoring truncate here is correct for pipeReadableStreamToBlob, but flags() is also reached from Bun.file(path).writer() on POSIX (Blob.zig:3008-3019 → sink.start()setup()openForWriting(..., options.flags(), ...)), so that API now opens with O_TRUNC where it previously did not. The Windows getWriter branch (Blob.zig:~2935) still opens with WRONLY | CREAT | NONBLOCK and bypasses flags(), so this introduces a POSIX-vs-Windows divergence for Bun.file().writer() — worth either adding O_TRUNC there too or calling this out as an intentional behavior change.

Extended reasoning...

What changed and why it has wider reach

This PR changes FileSink.Options.flags() from dead code (_ = this; return NONBLOCK|CLOEXEC|CREAT|WRONLY) to actually honoring the truncate: bool = true field by adding O_TRUNC when set. The intent is to fix pipeReadableStreamToBlob so that overwriting a file with a shorter ReadableStream-backed Response doesn't leave stale trailing bytes.

However, flags() is not only used by pipeReadableStreamToBlob. It is also reached from FileSink.setup() (line 312: bun.io.openForWriting(..., options.flags(), ...)), and setup() is called from FileSink.start() for any streams.Start{.FileSink = ...}. The public API Bun.file(path).writer() on POSIX takes exactly this path.

The affected call chain

On non-Windows platforms, Blob.getWriter (Blob.zig:3008-3019) constructs:

var stream_start: streams.Start = .{ .FileSink = .{ .input_path = input_path } };

This leaves truncate at its struct default of true. It then calls sink.start(stream_start)FileSink.start()FileSink.setup()bun.io.openForWriting(bun.FD.cwd(), options.input_path, options.flags(), ...). openForWritingImpl (src/io/openForWriting.zig:60) passes input_flags directly to bun.sys.openatA(dir, path, input_flags, mode) for the .path case, so the new O_TRUNC reaches the actual open() syscall.

The same applies to streams.Start.fromJSWithTag for the .FileSink case (streams.zig:117-173): it reads highWaterMark/path/fd from JS but never reads a truncate option, so it stays at the default true. There is no JS-side way to opt out.

The cross-platform divergence this introduces

On Windows, Blob.getWriter takes a completely separate branch (Blob.zig:~2928-2989). For path-backed blobs it opens directly via:

bun.sys.open(path, bun.O.WRONLY | bun.O.CREAT | bun.O.NONBLOCK, write_permissions)

This bypasses FileSink.Options.flags() entirely and does not include O_TRUNC. So before this PR, neither platform truncated; after this PR, POSIX truncates but Windows does not. The PR added O_TRUNC to the Windows open in pipeReadableStreamToBlob (Blob.zig:2665) but not to the Windows open in getWriter.

Step-by-step proof

  1. On Linux: write 10 bytes to /tmp/x.txt (e.g. Bun.write('/tmp/x.txt', '0123456789')).
  2. Call const w = Bun.file('/tmp/x.txt').writer(); w.write('abc'); await w.end();
  3. getWriter (POSIX branch) builds streams.Start{.FileSink = .{.input_path = .{.path = '/tmp/x.txt'}}} with truncate defaulting to true.
  4. sink.start()setup()openForWriting(..., NONBLOCK|CLOEXEC|CREAT|WRONLY|TRUNC, ...) → file is truncated to 0 bytes on open.
  5. After end(), the file contains abc (3 bytes). Before this PR, the open used NONBLOCK|CLOEXEC|CREAT|WRONLY (no TRUNC), so the file would contain abc3456789 (10 bytes).
  6. On Windows, repeat the same steps: getWriter opens with WRONLY|CREAT|NONBLOCK (no TRUNC, line ~2935), so the file still contains abc3456789.

Result: POSIX and Windows now disagree on what Bun.file(path).writer() does to existing file contents.

Why existing code doesn't prevent it

The truncate: bool = true field has always existed with that default — _ = this; was clearly incomplete dead code, so honoring it is arguably the long-intended behavior. CI is green because all existing FileSink/writer tests write to fresh temp files and never overwrite. But this is still an undocumented behavior change to a public API surface (Bun.file().writer()) that is unrelated to the Bun.write-hangs-on-locked-body bug this PR fixes, and the new cross-platform inconsistency is a concrete defect introduced here.

How to fix

The cleanest minimal fix is to add bun.O.TRUNC to the Windows getWriter open at Blob.zig:~2935 so both platforms agree (and match the field's stated default). Alternatively, if you want to avoid touching Bun.file().writer() semantics in this PR, you could revert flags() and instead pass an explicit truncating flag set only from pipeReadableStreamToBlob's POSIX path — but since truncate-by-default is almost certainly what users expect from a file writer, aligning Windows is probably the right call.

}
};

Expand Down
168 changes: 168 additions & 0 deletions test/regression/issue/13237.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// https://github.com/oven-sh/bun/issues/13237
//
// Bun.write with a Response/Request whose body is a ReadableStream used to
// hang forever: the `.Locked` body path only installed an `onReceiveValue`
// callback and never actually read from the stream, so the returned promise
// never settled.
//
// These tests spawn subprocesses so a hang turns into a clean failure instead
// of blocking the test runner.

import { describe, expect, it } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";
import { join } from "path";

async function run(dir: string, script: string) {
await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
cwd: dir,
stdout: "pipe",
stderr: "pipe",
timeout: 10_000,
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
return { stdout, stderr, exitCode };
}

describe("Bun.write with a ReadableStream-backed body", () => {
it("Bun.write(path, new Response(ReadableStream))", async () => {
using dir = tempDir("issue-13237-response-stream", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("hello "));
controller.enqueue(new TextEncoder().encode("world"));
controller.close();
},
});
const written = await Bun.write(${JSON.stringify(out)}, new Response(stream));
console.log(JSON.stringify({ written, text: await Bun.file(${JSON.stringify(out)}).text() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ written: 11, text: "hello world" });
expect(exitCode).toBe(0);
});

it("Bun.write(path, new Request(url, { body: ReadableStream }))", async () => {
using dir = tempDir("issue-13237-request-stream", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("from request"));
controller.close();
},
});
const req = new Request("http://example.com", { method: "POST", body: stream });
const written = await Bun.write(${JSON.stringify(out)}, req);
console.log(JSON.stringify({ written, text: await Bun.file(${JSON.stringify(out)}).text() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ written: 12, text: "from request" });
expect(exitCode).toBe(0);
});

it("Bun.file(path).write(new Response(ReadableStream))", async () => {
using dir = tempDir("issue-13237-file-write", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("instance method"));
controller.close();
},
});
const written = await Bun.file(${JSON.stringify(out)}).write(new Response(stream));
console.log(JSON.stringify({ written, text: await Bun.file(${JSON.stringify(out)}).text() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ written: 15, text: "instance method" });
expect(exitCode).toBe(0);
});

it("Bun.write(path, new Response(req.body)) inside a server handler", async () => {
using dir = tempDir("issue-13237-server-response-body", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const server = Bun.serve({
port: 0,
async fetch(req) {
const written = await Bun.write(${JSON.stringify(out)}, new Response(req.body));
return new Response(String(written));
},
});
const res = await fetch(server.url, { method: "POST", body: "hello from server" });
const written = await res.text();
server.stop();
console.log(JSON.stringify({ written, text: await Bun.file(${JSON.stringify(out)}).text() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ written: "17", text: "hello from server" });
expect(exitCode).toBe(0);
});

it("Bun.write(path, new Response(ReadableStream)) truncates the destination file", async () => {
using dir = tempDir("issue-13237-truncate", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const body = (n, c) =>
new Response(
new ReadableStream({
start(controller) {
controller.enqueue(Buffer.alloc(n, c));
controller.close();
},
}),
);
await Bun.write(${JSON.stringify(out)}, body(1000, "A"));
await Bun.write(${JSON.stringify(out)}, body(100, "B"));
const text = await Bun.file(${JSON.stringify(out)}).text();
console.log(JSON.stringify({ length: text.length, ok: text === Buffer.alloc(100, "B").toString() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ length: 100, ok: true });
expect(exitCode).toBe(0);
});

it("Bun.write(path, req) after accessing req.body inside a server handler", async () => {
using dir = tempDir("issue-13237-server-body-access", {});
const out = join(String(dir), "out.txt");
const { stdout, stderr, exitCode } = await run(
String(dir),
`
const server = Bun.serve({
port: 0,
async fetch(req) {
if (!req.body) return new Response("no body", { status: 400 });
const written = await Bun.write(${JSON.stringify(out)}, req);
return new Response(String(written));
},
});
const res = await fetch(server.url, { method: "POST", body: "body was touched" });
const written = await res.text();
server.stop();
console.log(JSON.stringify({ written, text: await Bun.file(${JSON.stringify(out)}).text() }));
`,
);
expect(stderr).toBe("");
expect(JSON.parse(stdout)).toEqual({ written: "16", text: "body was touched" });
expect(exitCode).toBe(0);
});
});
Loading