Skip to content
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
da9b27f
initial
Jarred-Sumner Oct 1, 2025
d9a7dab
a
Jarred-Sumner Oct 1, 2025
a9c930d
finish
Jarred-Sumner Oct 2, 2025
a4c4bcd
a
Jarred-Sumner Oct 3, 2025
53001dd
Merge branch 'main' into jarred/readable-stream-strong
Jarred-Sumner Oct 4, 2025
699d847
Fix body-stream test hang and ReadableStream.Ref ownership issues
Oct 4, 2025
5f1c745
Fix ReadableStream.Ref to use strong references when owner has no JSV…
Oct 4, 2025
24c7dbf
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 4, 2025
460c31a
Fix critical GC and stream ownership issues from code review
Oct 4, 2025
ff0fb1e
Fix async generator Response body hang by upgrading Ref to strong
Oct 4, 2025
60322d7
Fix Request body stream timeout by upgrading to strong ref
Oct 4, 2025
c470ef9
Implement fallback strong ref architecture for Request bodies
Oct 5, 2025
ad733f2
Add this_jsvalue field to Request for proper GC cache support
Oct 5, 2025
2c847c1
Remove debug logging from Body and ReadableStream
Oct 5, 2025
af6fa5b
Add this_jsvalue.finalize() cleanup in Request.finalize()
Oct 5, 2025
26cbce6
Store JSValue wrapper in Request.toJS() and fix owner syntax consistency
Oct 5, 2025
53c301d
Fix Content-Type header regression in ensureFetchHeaders
Oct 5, 2025
100b7fc
Add setValue calls in doClone to properly initialize Ref with tee'd s…
Oct 5, 2025
07b3a65
Revert "Add setValue calls in doClone to properly initialize Ref with…
Oct 5, 2025
817ebbe
Revert unintended test refactoring in body-stream.test.ts
Oct 5, 2025
a6745df
Add request body abort logic in RequestContext.finalize
Oct 5, 2025
a68d917
Fix critical GC issues in ReadableStream lifecycle
Oct 5, 2025
d56fd9a
Fix missing readable ref assignment in drain conversion path
Oct 5, 2025
35338e6
Merge branch 'main' into jarred/readable-stream-strong
cirospaciari Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/bun.js/api/bun/spawn/stdio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ pub const Stdio = union(enum) {
};
}

fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void {
body.toBlobIfPossible();
fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, owner: jsc.WebCore.ReadableStream.Ref.Owner, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void {
body.toBlobIfPossible(owner);

if (body.tryUseAsAnyBlob()) |blob| {
if (body.tryUseAsAnyBlob(owner)) |blob| {
return out_stdio.extractBlob(globalThis, blob, i);
}

Expand Down Expand Up @@ -313,7 +313,7 @@ pub const Stdio = union(enum) {
else => unreachable,
}

const stream_value = try body.toReadableStream(globalThis);
const stream_value = try body.toReadableStream(owner, globalThis);

const stream = (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) orelse return globalThis.throwInvalidArguments("Failed to create ReadableStream", .{});

Expand Down Expand Up @@ -391,9 +391,9 @@ pub const Stdio = union(enum) {
} else if (value.as(jsc.WebCore.Blob)) |blob| {
return out_stdio.extractBlob(globalThis, .{ .Blob = blob.dupe() }, i);
} else if (value.as(jsc.WebCore.Request)) |req| {
return extractBodyValue(out_stdio, globalThis, i, req.getBodyValue(), is_sync);
return extractBodyValue(out_stdio, globalThis, i, .{ .Request = value }, req.getBodyValue(), is_sync);
} else if (value.as(jsc.WebCore.Response)) |res| {
return extractBodyValue(out_stdio, globalThis, i, res.getBodyValue(), is_sync);
return extractBodyValue(out_stdio, globalThis, i, .{ .Response = value }, res.getBodyValue(), is_sync);
}

if (try jsc.WebCore.ReadableStream.fromJS(value, globalThis)) |stream_| {
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/api/html_rewriter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub const HTMLRewriter = struct {

if (kind != .other) {
{
const body_value = try jsc.WebCore.Body.extract(global, response_value);
const body_value = try jsc.WebCore.Body.extract(global, response_value, null);
const resp = bun.new(Response, Response{
.init = .{
.status_code = 200,
Expand Down
2 changes: 2 additions & 0 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,8 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d
bun.default_allocator,
ctx,
false,
first_arg,
null,
);
} else {
const fetch_error = jsc.WebCore.Fetch.fetch_type_error_strings.get(bun.jsc.C.JSValueGetType(ctx, first_arg.asRef()));
Expand Down
3 changes: 2 additions & 1 deletion src/bun.js/api/server/FileRoute.zig
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ pub fn memoryCost(this: *const FileRoute) usize {

pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSError!?*FileRoute {
if (argument.as(jsc.WebCore.Response)) |response| {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
if (response.body.value == .Blob and response.body.value.Blob.needsToReadFile()) {
if (response.body.value.Blob.store.?.data.file.pathlike == .fd) {
return globalThis.throwTODO("Support serving files from a file descriptor. Please pass a path instead.");
Expand Down
74 changes: 48 additions & 26 deletions src/bun.js/api/server/RequestContext.zig
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
abort.cb(abort.data);
}

const response_jsvalue = this.response_jsvalue;

this.detachResponse();
var any_js_calls = false;
var vm = this.server.?.vm;
Expand Down Expand Up @@ -676,13 +678,22 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
any_js_calls = true;
}

// Abort request body stream if still locked (e.g., pending read)
if (this.request_weakref.get()) |request| {
if (request.body.value == .Locked) {
const owner: jsc.WebCore.ReadableStream.Ref.Owner = if (request.this_jsvalue.tryGet()) |js_value|
.{ .Request = js_value }
else
.empty;
if (request.body.value.Locked.readable.abort(owner, globalThis)) {
any_js_calls = true;
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

if (this.response_ptr) |response| {
if (response.body.value == .Locked) {
var strong_readable = response.body.value.Locked.readable;
response.body.value.Locked.readable = .{};
defer strong_readable.deinit();
if (strong_readable.get(globalThis)) |readable| {
readable.abort(globalThis);
if (response.body.value.Locked.readable.abort(.{ .Response = response_jsvalue }, globalThis)) {
any_js_calls = true;
}
}
Expand Down Expand Up @@ -1216,7 +1227,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
// TODO: should this timeout?
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis),
.readable = .{ .strong = .init(stream, globalThis) },
.global = globalThis,
},
};
Expand Down Expand Up @@ -1454,7 +1465,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
}
// not content-length or transfer-encoding so we need to respect the body
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.InternalBlob, .WTFStringImpl => {
var blob = response.body.value.useAsAnyBlobAllowNonUTF8String();
Expand Down Expand Up @@ -1559,7 +1571,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
return;
} else {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);

switch (response.body.value) {
.Blob => |*blob| {
Expand Down Expand Up @@ -1618,7 +1631,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
return;
}
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.Blob => |*blob| {
if (blob.needsToReadFile()) {
Expand Down Expand Up @@ -1662,10 +1676,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

if (resp.body.value == .Locked) {
const global = resp.body.value.Locked.global;
if (resp.body.value.Locked.readable.get(global)) |stream| {
stream.done(global);
if (resp.this_jsvalue.tryGet()) |value| {
resp.body.value.Locked.readable.done(.{ .Response = value }, global);
} else {
resp.body.value.Locked.readable.deinit();
}
resp.body.value.Locked.readable.deinit();
resp.body.value = .{ .Used = {} };
}
}
Expand Down Expand Up @@ -1715,10 +1730,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

if (req.response_ptr) |resp| {
if (resp.body.value == .Locked) {
if (resp.body.value.Locked.readable.get(globalThis)) |stream| {
stream.done(globalThis);
if (resp.this_jsvalue.tryGet()) |value| {
resp.body.value.Locked.readable.done(.{ .Response = value }, globalThis);
} else {
resp.body.value.Locked.readable.deinit();
}
resp.body.value.Locked.readable.deinit();
resp.body.value = .{ .Used = {} };
}
}
Expand Down Expand Up @@ -1801,7 +1817,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

// If a ReadableStream can trivially be converted to a Blob, do so.
// If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob.
value.toBlobIfPossible();
value.toBlobIfPossible(.{ .empty = {} });
const globalThis = this.server.?.globalThis;
switch (value.*) {
.Error => |*err_ref| {
Expand All @@ -1827,19 +1843,21 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
return;
}

if (lock.readable.get(globalThis)) |stream_| {
const stream: jsc.WebCore.ReadableStream = stream_;
// we hold the stream alive until we're done with it
this.readable_stream_ref = lock.readable;
value.* = .{ .Used = {} };
if (lock.readable.get(.{ .Response = this.response_jsvalue }, globalThis)) |stream| {
{
var old = this.readable_stream_ref;
this.readable_stream_ref = .init(stream, globalThis);
old.deinit();
value.* = .{ .Used = {} };
}

if (stream.isLocked(globalThis)) {
streamLog("was locked but it shouldn't be", .{});
var err = jsc.SystemError{
.code = bun.String.static(@tagName(jsc.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE)),
.message = bun.String.static("Stream already used, please create a new one"),
};
stream.value.unprotect();
this.readable_stream_ref.deinit();
this.runErrorHandler(err.toErrorInstance(globalThis));
return;
}
Expand Down Expand Up @@ -1883,7 +1901,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}
this.ref();
byte_stream.pipe = jsc.WebCore.Pipe.Wrap(@This(), onPipe).init(this);
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis);
if (this.readable_stream_ref.has()) {
this.readable_stream_ref.deinit();
}
this.readable_stream_ref = .init(stream, globalThis);

this.byte_stream = byte_stream;
var response_buf = byte_stream.drain();
Expand All @@ -1907,7 +1928,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

if (lock.onReceiveValue != null or lock.task != null) {
// someone else is waiting for the stream or waiting for `onStartStreaming`
const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards
const readable = value.toReadableStream(.{ .Response = this.response_jsvalue }, globalThis) catch return;
readable.ensureStillAlive();
this.doRenderWithBody(value);
return;
Expand Down Expand Up @@ -2162,7 +2183,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
ctx.flags.response_protected = false;
ctx.response_ptr = response;

response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);
switch (response.body.value) {
.Blob => |*blob| {
if (blob.needsToReadFile()) {
Expand Down Expand Up @@ -2492,7 +2514,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
pub fn onRequestBodyReadableStreamAvailable(ptr: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void {
var this = bun.cast(*RequestContext, ptr);
bun.debugAssert(this.request_body_readable_stream_ref.held.impl == null);
this.request_body_readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
this.request_body_readable_stream_ref = .init(readable, globalThis);
}

pub fn onStartBufferingCallback(this: *anyopaque) void {
Expand Down
3 changes: 2 additions & 1 deletion src/bun.js/api/server/StaticRoute.zig
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro

// The user may want to pass in the same Response object multiple endpoints
// Let's let them do that.
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);

const blob: AnyBlob = brk: {
switch (response.body.value) {
Expand Down
14 changes: 12 additions & 2 deletions src/bun.js/bindings/JSBunRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ JSObject* JSBunRequest::cookies() const
return m_cookies.get();
}

extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject);
extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject, JSC::EncodedJSValue thisValue, JSC::EncodedJSValue* readableStreamTee);

JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject)
{
auto throwScope = DECLARE_THROW_SCOPE(vm);

JSC::EncodedJSValue readableStreamTee[2] { encodedJSValue(), encodedJSValue() };
auto* structure = defaultGlobalObject(globalObject)->m_JSBunRequestStructure.getInitializedOnMainThread(globalObject);
auto* raw = Request__clone(this->wrapped(), globalObject);
auto* raw = Request__clone(this->wrapped(), globalObject, JSValue::encode(this), readableStreamTee);
EXCEPTION_ASSERT(!!raw == !throwScope.exception());
RETURN_IF_EXCEPTION(throwScope, nullptr);
auto* clone = this->create(vm, structure, raw, nullptr);
Expand Down Expand Up @@ -134,9 +135,18 @@ JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject)
auto cookieMapClone = cookieMap->clone();
auto cookies = WebCore::toJSNewlyCreated(globalObject, jsCast<JSDOMGlobalObject*>(globalObject), WTFMove(cookieMapClone));
clone->setCookies(cookies.getObject());
RETURN_IF_EXCEPTION(throwScope, nullptr);
}
}

if (readableStreamTee[0] != encodedJSValue()) {
this->m_body.set(vm, this, JSValue::decode(readableStreamTee[0]));
}

if (readableStreamTee[1] != encodedJSValue()) {
clone->m_body.set(vm, clone, JSValue::decode(readableStreamTee[1]));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

RELEASE_AND_RETURN(throwScope, clone);
}

Expand Down
5 changes: 3 additions & 2 deletions src/bun.js/bindings/JSGlobalObject.zig
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,11 @@ pub const JSGlobalObject = opaque {
}

// We're done validating. From now on, deal with extracting the body.
body.toBlobIfPossible();
const owner = jsc.WebCore.ReadableStream.Ref.Owner{ .Response = response_value };
body.toBlobIfPossible(owner);

var any_blob = switch (body.*) {
.Locked => body.tryUseAsAnyBlob() orelse return body.toReadableStream(this),
.Locked => body.tryUseAsAnyBlob(owner) orelse return body.toReadableStream(owner, this),
else => body.useAsAnyBlob(),
};

Expand Down
9 changes: 5 additions & 4 deletions src/bun.js/node/types.zig
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub const BlobOrStringOrBuffer = union(enum) {
}
if (allow_request_response) {
if (value.as(jsc.WebCore.Request)) |request| {
request.body.value.toBlobIfPossible();
request.body.value.toBlobIfPossible(.{ .empty = {} });

if (request.body.value.tryUseAsAnyBlob()) |any_blob_| {
if (request.body.value.tryUseAsAnyBlob(.{ .empty = {} })) |any_blob_| {
var any_blob = any_blob_;
defer any_blob.detach();
return .{ .blob = any_blob.toBlob(global) };
Expand All @@ -98,9 +98,10 @@ pub const BlobOrStringOrBuffer = union(enum) {
}

if (value.as(jsc.WebCore.Response)) |response| {
response.body.value.toBlobIfPossible();
const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} };
response.body.value.toBlobIfPossible(owner);

if (response.body.value.tryUseAsAnyBlob()) |any_blob_| {
if (response.body.value.tryUseAsAnyBlob(owner)) |any_blob_| {
var any_blob = any_blob_;
defer any_blob.detach();
return .{ .blob = any_blob.toBlob(global) };
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/webcore/BakeResponse.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, b
}
}

return Response.constructor(globalThis, callframe);
return Response.constructor(globalThis, callframe, .zero);
}

pub export fn BakeResponseClass__constructRedirect(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue {
Expand Down
Loading