diff --git a/src/bun.js/api/bun/spawn/stdio.zig b/src/bun.js/api/bun/spawn/stdio.zig index 8ff2915176d..c45b1ff9494 100644 --- a/src/bun.js/api/bun/spawn/stdio.zig +++ b/src/bun.js/api/bun/spawn/stdio.zig @@ -277,10 +277,10 @@ pub const Stdio = union(enum) { }; } - fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void { - body.toBlobIfPossible(); + fn extractBodyValue(out_stdio: *Stdio, globalThis: *jsc.JSGlobalObject, i: i32, owner: jsc.WebCore.ReadableStream.Ref.Owner, body: *jsc.WebCore.Body.Value, is_sync: bool) bun.JSError!void { + body.toBlobIfPossible(owner); - if (body.tryUseAsAnyBlob()) |blob| { + if (body.tryUseAsAnyBlob(owner)) |blob| { return out_stdio.extractBlob(globalThis, blob, i); } @@ -313,7 +313,7 @@ pub const Stdio = union(enum) { else => unreachable, } - const stream_value = try body.toReadableStream(globalThis); + const stream_value = try body.toReadableStream(owner, globalThis); const stream = (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) orelse return globalThis.throwInvalidArguments("Failed to create ReadableStream", .{}); @@ -391,9 +391,9 @@ pub const Stdio = union(enum) { } else if (value.as(jsc.WebCore.Blob)) |blob| { return out_stdio.extractBlob(globalThis, .{ .Blob = blob.dupe() }, i); } else if (value.as(jsc.WebCore.Request)) |req| { - return extractBodyValue(out_stdio, globalThis, i, req.getBodyValue(), is_sync); + return extractBodyValue(out_stdio, globalThis, i, .{ .Request = value }, req.getBodyValue(), is_sync); } else if (value.as(jsc.WebCore.Response)) |res| { - return extractBodyValue(out_stdio, globalThis, i, res.getBodyValue(), is_sync); + return extractBodyValue(out_stdio, globalThis, i, .{ .Response = value }, res.getBodyValue(), is_sync); } if (try jsc.WebCore.ReadableStream.fromJS(value, globalThis)) |stream_| { diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index b928bb7e8da..f29c3508aa9 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -197,7 +197,7 @@ pub const HTMLRewriter = struct { if (kind != .other) { { - const body_value = try jsc.WebCore.Body.extract(global, response_value); + const body_value = try jsc.WebCore.Body.extract(global, response_value, null); const resp = bun.new(Response, Response{ .init = .{ .status_code = 200, diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 1448dc2e523..ec5dca4c56a 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1198,6 +1198,8 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d bun.default_allocator, ctx, false, + first_arg, + null, ); } else { const fetch_error = jsc.WebCore.Fetch.fetch_type_error_strings.get(bun.jsc.C.JSValueGetType(ctx, first_arg.asRef())); diff --git a/src/bun.js/api/server/FileRoute.zig b/src/bun.js/api/server/FileRoute.zig index ab4b57bfa8b..a9b66465bf7 100644 --- a/src/bun.js/api/server/FileRoute.zig +++ b/src/bun.js/api/server/FileRoute.zig @@ -59,7 +59,8 @@ pub fn memoryCost(this: *const FileRoute) usize { pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSError!?*FileRoute { if (argument.as(jsc.WebCore.Response)) |response| { - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); if (response.body.value == .Blob and response.body.value.Blob.needsToReadFile()) { if (response.body.value.Blob.store.?.data.file.pathlike == .fd) { return globalThis.throwTODO("Support serving files from a file descriptor. Please pass a path instead."); diff --git a/src/bun.js/api/server/RequestContext.zig b/src/bun.js/api/server/RequestContext.zig index 84b83f0a1d0..0fb5829855f 100644 --- a/src/bun.js/api/server/RequestContext.zig +++ b/src/bun.js/api/server/RequestContext.zig @@ -627,6 +627,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, abort.cb(abort.data); } + const response_jsvalue = this.response_jsvalue; + this.detachResponse(); var any_js_calls = false; var vm = this.server.?.vm; @@ -647,6 +649,19 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } // we can already clean this strong refs request.internal_event_callback.deinit(); + + // Abort request body stream if still locked (e.g., pending read) + // Must do this before deref() so the request is still reachable + if (request.body.value == .Locked) { + const owner: jsc.WebCore.ReadableStream.Ref.Owner = if (request.this_jsvalue.tryGet()) |js_value| + .{ .Request = js_value } + else + .empty; + if (request.body.value.Locked.readable.abort(owner, globalThis)) { + any_js_calls = true; + } + } + this.request_weakref.deref(); } // if signal is not aborted, abort the signal @@ -678,11 +693,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (this.response_ptr) |response| { if (response.body.value == .Locked) { - var strong_readable = response.body.value.Locked.readable; - response.body.value.Locked.readable = .{}; - defer strong_readable.deinit(); - if (strong_readable.get(globalThis)) |readable| { - readable.abort(globalThis); + if (response.body.value.Locked.readable.abort(.{ .Response = response_jsvalue }, globalThis)) { any_js_calls = true; } } @@ -1216,7 +1227,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, // TODO: should this timeout? this.response_ptr.?.body.value = .{ .Locked = .{ - .readable = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis), + .readable = .{ .strong = .init(stream, globalThis) }, .global = globalThis, }, }; @@ -1454,7 +1465,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } } // not content-length or transfer-encoding so we need to respect the body - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); switch (response.body.value) { .InternalBlob, .WTFStringImpl => { var blob = response.body.value.useAsAnyBlobAllowNonUTF8String(); @@ -1559,7 +1571,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } return; } else { - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); switch (response.body.value) { .Blob => |*blob| { @@ -1618,7 +1631,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } return; } - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); switch (response.body.value) { .Blob => |*blob| { if (blob.needsToReadFile()) { @@ -1662,10 +1676,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (resp.body.value == .Locked) { const global = resp.body.value.Locked.global; - if (resp.body.value.Locked.readable.get(global)) |stream| { - stream.done(global); + if (resp.this_jsvalue.tryGet()) |value| { + resp.body.value.Locked.readable.done(.{ .Response = value }, global); + } else { + resp.body.value.Locked.readable.deinit(); } - resp.body.value.Locked.readable.deinit(); resp.body.value = .{ .Used = {} }; } } @@ -1715,10 +1730,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (req.response_ptr) |resp| { if (resp.body.value == .Locked) { - if (resp.body.value.Locked.readable.get(globalThis)) |stream| { - stream.done(globalThis); + if (resp.this_jsvalue.tryGet()) |value| { + resp.body.value.Locked.readable.done(.{ .Response = value }, globalThis); + } else { + resp.body.value.Locked.readable.deinit(); } - resp.body.value.Locked.readable.deinit(); resp.body.value = .{ .Used = {} }; } } @@ -1801,7 +1817,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, // If a ReadableStream can trivially be converted to a Blob, do so. // If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob. - value.toBlobIfPossible(); + value.toBlobIfPossible(.{ .empty = {} }); const globalThis = this.server.?.globalThis; switch (value.*) { .Error => |*err_ref| { @@ -1827,11 +1843,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, return; } - if (lock.readable.get(globalThis)) |stream_| { - const stream: jsc.WebCore.ReadableStream = stream_; - // we hold the stream alive until we're done with it - this.readable_stream_ref = lock.readable; - value.* = .{ .Used = {} }; + if (lock.readable.get(.{ .Response = this.response_jsvalue }, globalThis)) |stream| { + { + var old = this.readable_stream_ref; + this.readable_stream_ref = .init(stream, globalThis); + old.deinit(); + value.* = .{ .Used = {} }; + } if (stream.isLocked(globalThis)) { streamLog("was locked but it shouldn't be", .{}); @@ -1839,7 +1857,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, .code = bun.String.static(@tagName(jsc.Node.ErrorCode.ERR_STREAM_CANNOT_PIPE)), .message = bun.String.static("Stream already used, please create a new one"), }; - stream.value.unprotect(); + this.readable_stream_ref.deinit(); this.runErrorHandler(err.toErrorInstance(globalThis)); return; } @@ -1883,7 +1901,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, } this.ref(); byte_stream.pipe = jsc.WebCore.Pipe.Wrap(@This(), onPipe).init(this); - this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis); + if (this.readable_stream_ref.has()) { + this.readable_stream_ref.deinit(); + } + this.readable_stream_ref = .init(stream, globalThis); this.byte_stream = byte_stream; var response_buf = byte_stream.drain(); @@ -1907,7 +1928,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, if (lock.onReceiveValue != null or lock.task != null) { // someone else is waiting for the stream or waiting for `onStartStreaming` - const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards + const readable = value.toReadableStream(.{ .Response = this.response_jsvalue }, globalThis) catch return; readable.ensureStillAlive(); this.doRenderWithBody(value); return; @@ -2162,7 +2183,8 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, ctx.flags.response_protected = false; ctx.response_ptr = response; - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); switch (response.body.value) { .Blob => |*blob| { if (blob.needsToReadFile()) { @@ -2492,7 +2514,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, pub fn onRequestBodyReadableStreamAvailable(ptr: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void { var this = bun.cast(*RequestContext, ptr); bun.debugAssert(this.request_body_readable_stream_ref.held.impl == null); - this.request_body_readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis); + this.request_body_readable_stream_ref = .init(readable, globalThis); } pub fn onStartBufferingCallback(this: *anyopaque) void { diff --git a/src/bun.js/api/server/StaticRoute.zig b/src/bun.js/api/server/StaticRoute.zig index 9e931826a85..8c65a015057 100644 --- a/src/bun.js/api/server/StaticRoute.zig +++ b/src/bun.js/api/server/StaticRoute.zig @@ -91,7 +91,8 @@ pub fn fromJS(globalThis: *jsc.JSGlobalObject, argument: jsc.JSValue) bun.JSErro // The user may want to pass in the same Response object multiple endpoints // Let's let them do that. - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); const blob: AnyBlob = brk: { switch (response.body.value) { diff --git a/src/bun.js/bindings/JSBunRequest.cpp b/src/bun.js/bindings/JSBunRequest.cpp index 1dcdf1e47ae..6f169b41b0b 100644 --- a/src/bun.js/bindings/JSBunRequest.cpp +++ b/src/bun.js/bindings/JSBunRequest.cpp @@ -96,14 +96,15 @@ JSObject* JSBunRequest::cookies() const return m_cookies.get(); } -extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject); +extern "C" void* Request__clone(void* internalZigRequestPointer, JSGlobalObject* globalObject, JSC::EncodedJSValue thisValue, JSC::EncodedJSValue* readableStreamTee); JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject) { auto throwScope = DECLARE_THROW_SCOPE(vm); + JSC::EncodedJSValue readableStreamTee[2] { encodedJSValue(), encodedJSValue() }; auto* structure = defaultGlobalObject(globalObject)->m_JSBunRequestStructure.getInitializedOnMainThread(globalObject); - auto* raw = Request__clone(this->wrapped(), globalObject); + auto* raw = Request__clone(this->wrapped(), globalObject, JSValue::encode(this), readableStreamTee); EXCEPTION_ASSERT(!!raw == !throwScope.exception()); RETURN_IF_EXCEPTION(throwScope, nullptr); auto* clone = this->create(vm, structure, raw, nullptr); @@ -134,9 +135,18 @@ JSBunRequest* JSBunRequest::clone(JSC::VM& vm, JSGlobalObject* globalObject) auto cookieMapClone = cookieMap->clone(); auto cookies = WebCore::toJSNewlyCreated(globalObject, jsCast(globalObject), WTFMove(cookieMapClone)); clone->setCookies(cookies.getObject()); + RETURN_IF_EXCEPTION(throwScope, nullptr); } } + if (readableStreamTee[0] != encodedJSValue()) { + this->m_body.set(vm, this, JSValue::decode(readableStreamTee[0])); + } + + if (readableStreamTee[1] != encodedJSValue()) { + clone->m_body.set(vm, clone, JSValue::decode(readableStreamTee[1])); + } + RELEASE_AND_RETURN(throwScope, clone); } diff --git a/src/bun.js/bindings/JSGlobalObject.zig b/src/bun.js/bindings/JSGlobalObject.zig index 67dda877766..88c889db620 100644 --- a/src/bun.js/bindings/JSGlobalObject.zig +++ b/src/bun.js/bindings/JSGlobalObject.zig @@ -899,10 +899,11 @@ pub const JSGlobalObject = opaque { } // We're done validating. From now on, deal with extracting the body. - body.toBlobIfPossible(); + const owner = jsc.WebCore.ReadableStream.Ref.Owner{ .Response = response_value }; + body.toBlobIfPossible(owner); var any_blob = switch (body.*) { - .Locked => body.tryUseAsAnyBlob() orelse return body.toReadableStream(this), + .Locked => body.tryUseAsAnyBlob(owner) orelse return body.toReadableStream(owner, this), else => body.useAsAnyBlob(), }; diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index d2162a9372b..8c48c5901e7 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -86,9 +86,9 @@ pub const BlobOrStringOrBuffer = union(enum) { } if (allow_request_response) { if (value.as(jsc.WebCore.Request)) |request| { - request.body.value.toBlobIfPossible(); + request.body.value.toBlobIfPossible(.{ .empty = {} }); - if (request.body.value.tryUseAsAnyBlob()) |any_blob_| { + if (request.body.value.tryUseAsAnyBlob(.{ .empty = {} })) |any_blob_| { var any_blob = any_blob_; defer any_blob.detach(); return .{ .blob = any_blob.toBlob(global) }; @@ -98,9 +98,10 @@ pub const BlobOrStringOrBuffer = union(enum) { } if (value.as(jsc.WebCore.Response)) |response| { - response.body.value.toBlobIfPossible(); + const owner = if (response.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + response.body.value.toBlobIfPossible(owner); - if (response.body.value.tryUseAsAnyBlob()) |any_blob_| { + if (response.body.value.tryUseAsAnyBlob(owner)) |any_blob_| { var any_blob = any_blob_; defer any_blob.detach(); return .{ .blob = any_blob.toBlob(global) }; diff --git a/src/bun.js/webcore/BakeResponse.zig b/src/bun.js/webcore/BakeResponse.zig index 4c03c755c01..bc88feee79a 100644 --- a/src/bun.js/webcore/BakeResponse.zig +++ b/src/bun.js/webcore/BakeResponse.zig @@ -44,7 +44,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, b } } - return Response.constructor(globalThis, callframe); + return Response.constructor(globalThis, callframe, .zero); } pub export fn BakeResponseClass__constructRedirect(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue { diff --git a/src/bun.js/webcore/Blob.zig b/src/bun.js/webcore/Blob.zig index 37e6ff6f254..c1ba5d9d03f 100644 --- a/src/bun.js/webcore/Blob.zig +++ b/src/bun.js/webcore/Blob.zig @@ -1355,13 +1355,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr _ = response.body.value.use(); return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis)); }, - .Locked => |*locked| { + .Locked => { if (destination_blob.isS3()) { const s3 = &destination_blob.store.?.data.s3; var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis); defer aws_options.deinit(); - _ = try response.body.value.toReadableStream(globalThis); - if (locked.readable.get(globalThis)) |readable| { + const stream_value = try response.body.value.toReadableStream(.{ .Response = data }, globalThis); + if (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) |readable| { if (readable.isDisturbed(globalThis)) { destination_blob.detach(); return globalThis.throwInvalidArguments("ReadableStream has already been used", .{}); @@ -1416,13 +1416,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr _ = request.body.value.use(); return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis)); }, - .Locked => |locked| { + .Locked => { if (destination_blob.isS3()) { const s3 = &destination_blob.store.?.data.s3; var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis); defer aws_options.deinit(); - _ = try request.body.value.toReadableStream(globalThis); - if (locked.readable.get(globalThis)) |readable| { + const stream_value = try request.body.value.toReadableStream(.{ .Request = data }, globalThis); + if (try jsc.WebCore.ReadableStream.fromJS(stream_value, globalThis)) |readable| { if (readable.isDisturbed(globalThis)) { destination_blob.detach(); return globalThis.throwInvalidArguments("ReadableStream has already been used", .{}); diff --git a/src/bun.js/webcore/Body.zig b/src/bun.js/webcore/Body.zig index 04dbf20f0f7..c726069bf78 100644 --- a/src/bun.js/webcore/Body.zig +++ b/src/bun.js/webcore/Body.zig @@ -5,7 +5,7 @@ const Body = @This(); value: Value, // = Value.empty, pub fn len(this: *Body) Blob.SizeType { - return this.value.size(); + return this.value.size(.{ .empty = {} }); } pub fn slice(this: *const Body) []const u8 { @@ -16,9 +16,9 @@ pub fn use(this: *Body) Blob { return this.value.use(); } -pub fn clone(this: *Body, globalThis: *JSGlobalObject) bun.JSError!Body { +pub fn clone(this: *Body, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Body { return Body{ - .value = try this.value.clone(globalThis), + .value = try this.value.clone(owner, globalThis, readable_stream_tee), }; } @@ -38,9 +38,9 @@ pub fn writeFormat(this: *Body, comptime Formatter: type, formatter: *Formatter, try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); - try Blob.writeFormatForSize(false, this.value.size(), writer, enable_ansi_colors); + try Blob.writeFormatForSize(false, this.value.size(.{ .empty = {} }), writer, enable_ansi_colors); } else if (this.value == .Locked) { - if (this.value.Locked.readable.get(this.value.Locked.global)) |stream| { + if (this.value.Locked.readable.get(.{ .empty = {} }, this.value.Locked.global)) |stream| { try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); @@ -55,7 +55,7 @@ pub fn deinit(this: *Body, _: std.mem.Allocator) void { pub const PendingValue = struct { promise: ?JSValue = null, - readable: jsc.WebCore.ReadableStream.Strong = .{}, + readable: jsc.WebCore.ReadableStream.Ref = .empty, // writable: jsc.WebCore.Sink global: *JSGlobalObject, @@ -78,8 +78,8 @@ pub const PendingValue = struct { /// when Content-Length is provided this represents the whole size of the request /// If chunked encoded this will represent the total received size (ignoring the chunk headers) /// If the size is unknown will be 0 - fn sizeHint(this: *const PendingValue) Blob.SizeType { - if (this.readable.get(this.global)) |readable| { + fn sizeHint(this: *const PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType { + if (this.readable.get(owner, this.global)) |readable| { if (readable.ptr == .Bytes) { return readable.ptr.Bytes.size_hint; } @@ -87,46 +87,27 @@ pub const PendingValue = struct { return this.size_hint; } - pub fn toAnyBlob(this: *PendingValue) ?AnyBlob { + pub fn toAnyBlob(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob { if (this.promise != null) return null; - return this.toAnyBlobAllowPromise(); + return this.toAnyBlobAllowPromise(owner); } - pub fn isDisturbed(this: *const PendingValue, comptime T: type, globalObject: *jsc.JSGlobalObject, this_value: jsc.JSValue) bool { + pub fn isDisturbed(this: *const PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalObject: *jsc.JSGlobalObject) bool { if (this.promise != null) { return true; } - if (T.js.bodyGetCached(this_value)) |body_value| { - if (jsc.WebCore.ReadableStream.isDisturbedValue(body_value, globalObject)) { - return true; - } - - return false; - } - - if (this.readable.get(globalObject)) |readable| { - return readable.isDisturbed(globalObject); - } - - return false; + return this.readable.isDisturbed(owner, globalObject); } - pub fn isDisturbed2(this: *const PendingValue, globalObject: *jsc.JSGlobalObject) bool { - if (this.promise != null) { - return true; - } - - if (this.readable.get(globalObject)) |readable| { - return readable.isDisturbed(globalObject); - } - - return false; + pub fn abort(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalObject: *jsc.JSGlobalObject) bool { + return this.readable.abort(owner, globalObject); } + pub fn isStreamingOrBuffering(this: *PendingValue) bool { - return this.readable.held.has() or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull()); + return this.readable != .empty or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull()); } pub fn hasPendingPromise(this: *PendingValue) bool { @@ -146,20 +127,13 @@ pub const PendingValue = struct { return false; } - pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob { - var stream = if (this.readable.get(this.global)) |readable| readable else return null; - - if (stream.toAnyBlob(this.global)) |blob| { - this.readable.deinit(); - return blob; - } - - return null; + pub fn toAnyBlobAllowPromise(this: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob { + return this.readable.toAnyBlob(owner, this.global); } - pub fn setPromise(value: *PendingValue, globalThis: *jsc.JSGlobalObject, action: Action) JSValue { + pub fn setPromise(value: *PendingValue, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, action: Action) JSValue { value.action = action; - if (value.readable.get(globalThis)) |readable| { + if (value.readable.get(owner, globalThis)) |readable| { switch (action) { .getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer, .getBytes => { const promise = switch (action) { @@ -340,7 +314,7 @@ pub const Value = union(Tag) { this.* = .{ .JSValue = .empty }; } }; - pub fn toBlobIfPossible(this: *Value) void { + pub fn toBlobIfPossible(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) void { if (this.* == .WTFStringImpl) { if (this.WTFStringImpl.toUTF8IfNeeded(bun.default_allocator)) |bytes| { var str = this.WTFStringImpl; @@ -357,7 +331,7 @@ pub const Value = union(Tag) { if (this.* != .Locked) return; - if (this.Locked.toAnyBlob()) |blob| { + if (this.Locked.toAnyBlob(owner)) |blob| { this.* = switch (blob) { .Blob => .{ .Blob = blob.Blob }, .InternalBlob => .{ .InternalBlob = blob.InternalBlob }, @@ -367,22 +341,22 @@ pub const Value = union(Tag) { } } - pub fn size(this: *Value) Blob.SizeType { + pub fn size(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType { return switch (this.*) { .Blob => @truncate(this.Blob.getSizeForBindings()), .InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)), .WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())), - .Locked => this.Locked.sizeHint(), + .Locked => this.Locked.sizeHint(owner), // .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len), else => 0, }; } - pub fn fastSize(this: *const Value) Blob.SizeType { + pub fn fastSize(this: *const Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) Blob.SizeType { return switch (this.*) { .InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)), .WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.byteSlice().len)), - .Locked => this.Locked.sizeHint(), + .Locked => this.Locked.sizeHint(owner), // .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len), else => 0, }; @@ -392,7 +366,7 @@ pub const Value = union(Tag) { return switch (this.*) { .InternalBlob => this.InternalBlob.bytes.items.len, .WTFStringImpl => this.WTFStringImpl.memoryCost(), - .Locked => this.Locked.sizeHint(), + .Locked => this.Locked.sizeHint(.{ .empty = {} }), // .InlineBlob => this.InlineBlob.sliceConst().len, else => 0, }; @@ -402,7 +376,7 @@ pub const Value = union(Tag) { return switch (this.*) { .InternalBlob => this.InternalBlob.sliceConst().len, .WTFStringImpl => this.WTFStringImpl.byteSlice().len, - .Locked => this.Locked.sizeHint(), + .Locked => this.Locked.sizeHint(.{ .empty = {} }), // .InlineBlob => this.InlineBlob.sliceConst().len, else => 0, }; @@ -444,7 +418,7 @@ pub const Value = union(Tag) { // pub const empty = Value{ .Empty = {} }; - pub fn toReadableStream(this: *Value, globalThis: *JSGlobalObject) bun.JSError!JSValue { + pub fn toReadableStream(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *JSGlobalObject) bun.JSError!JSValue { jsc.markBinding(@src()); switch (this.*) { @@ -463,17 +437,28 @@ pub const Value = union(Tag) { blob.resolveSize(); const value = try jsc.WebCore.ReadableStream.fromBlobCopyRef(globalThis, &blob, blob.size); + const stream = (try jsc.WebCore.ReadableStream.fromJS(value, globalThis)).?; this.* = .{ .Locked = .{ - .readable = jsc.WebCore.ReadableStream.Strong.init((try jsc.WebCore.ReadableStream.fromJS(value, globalThis)).?, globalThis), + .readable = switch (owner) { + .Request => |jsval| if (jsval != .zero) .Request else .{ .strong = .init(stream, globalThis) }, + .Response => |jsval| if (jsval != .zero) .Response else .{ .strong = .init(stream, globalThis) }, + .strong, .empty => .{ .strong = .init(stream, globalThis) }, + }, .global = globalThis, }, }; + // Only set in GC cache if we have a valid JSValue owner + switch (owner) { + .Request => |jsval| if (jsval != .zero) this.Locked.readable.set(owner, stream, globalThis), + .Response => |jsval| if (jsval != .zero) this.Locked.readable.set(owner, stream, globalThis), + .strong, .empty => {}, + } return value; }, .Locked => { var locked = &this.Locked; - if (locked.readable.get(globalThis)) |readable| { + if (locked.readable.get(owner, globalThis)) |readable| { return readable.value; } if (locked.promise != null or locked.action != .none) { @@ -508,16 +493,30 @@ pub const Value = union(Tag) { reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint)); } - locked.readable = jsc.WebCore.ReadableStream.Strong.init(.{ + const stream_value = try reader.toReadableStream(globalThis); + const stream = jsc.WebCore.ReadableStream{ .ptr = .{ .Bytes = &reader.context }, - .value = try reader.toReadableStream(globalThis), - }, globalThis); + .value = stream_value, + }; + + // Use strong ref if owner doesn't have a valid JSValue + locked.readable = switch (owner) { + .Request => |jsval| if (jsval != .zero) .Request else .{ .strong = .init(stream, globalThis) }, + .Response => |jsval| if (jsval != .zero) .Response else .{ .strong = .init(stream, globalThis) }, + .strong, .empty => .{ .strong = .init(stream, globalThis) }, + }; + // Only populate GC cache when owner has a valid JSValue + switch (owner) { + .Request => |jsval| if (jsval != .zero) locked.readable.set(owner, stream, globalThis), + .Response => |jsval| if (jsval != .zero) locked.readable.set(owner, stream, globalThis), + .strong, .empty => {}, + } if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| { - onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(globalThis).?); + onReadableStreamAvailable(locked.task.?, globalThis, stream); } - return locked.readable.get(globalThis).?.value; + return stream.value; }, .Error => { // TODO: handle error properly @@ -527,6 +526,10 @@ pub const Value = union(Tag) { } pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!Value { + return fromJSWithReadableStreamValue(globalThis, value, null); + } + + pub fn fromJSWithReadableStreamValue(globalThis: *JSGlobalObject, value: JSValue, readable_stream_value: ?*JSValue) bun.JSError!Value { value.ensureStillAlive(); if (value.isEmptyOrUndefinedOrNull()) { @@ -621,6 +624,14 @@ pub const Value = union(Tag) { else => {}, } + if (readable_stream_value) |readable_stream_ptr| { + readable_stream_ptr.* = readable.value; + return .{ .Locked = .{ + .global = globalThis, + .readable = .empty, + } }; + } + return Body.Value.fromReadableStreamWithoutLockCheck(readable, globalThis); } @@ -642,7 +653,7 @@ pub const Value = union(Tag) { pub fn fromReadableStreamWithoutLockCheck(readable: jsc.WebCore.ReadableStream, globalThis: *JSGlobalObject) Value { return .{ .Locked = .{ - .readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis), + .readable = .{ .strong = .init(readable, globalThis) }, .global = globalThis, }, }; @@ -658,7 +669,7 @@ pub const Value = union(Tag) { if (to_resolve.* == .Locked) { var locked = &to_resolve.Locked; - if (locked.readable.get(global)) |readable| { + if (locked.readable.get(.{ .empty = {} }, global)) |readable| { readable.done(global); locked.readable.deinit(); } @@ -755,7 +766,7 @@ pub const Value = union(Tag) { } pub fn use(this: *Value) Blob { - this.toBlobIfPossible(); + this.toBlobIfPossible(.{ .empty = {} }); switch (this.*) { .Blob => { @@ -817,7 +828,7 @@ pub const Value = union(Tag) { } } - pub fn tryUseAsAnyBlob(this: *Value) ?AnyBlob { + pub fn tryUseAsAnyBlob(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner) ?AnyBlob { if (this.* == .WTFStringImpl) { if (this.WTFStringImpl.canUseAsUTF8()) { return AnyBlob{ .WTFStringImpl = this.WTFStringImpl }; @@ -828,7 +839,7 @@ pub const Value = union(Tag) { .Blob => AnyBlob{ .Blob = this.Blob }, .InternalBlob => AnyBlob{ .InternalBlob = this.InternalBlob }, // .InlineBlob => AnyBlob{ .InlineBlob = this.InlineBlob }, - .Locked => this.Locked.toAnyBlobAllowPromise() orelse return null, + .Locked => this.Locked.toAnyBlobAllowPromise(owner) orelse return null, else => return null, }; @@ -856,7 +867,7 @@ pub const Value = union(Tag) { } }, // .InlineBlob => .{ .InlineBlob = this.InlineBlob }, - .Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .Blob = .{} }, + .Locked => this.Locked.toAnyBlobAllowPromise(.strong) orelse AnyBlob{ .Blob = .{} }, else => .{ .Blob = Blob.initEmpty(undefined) }, }; @@ -873,7 +884,7 @@ pub const Value = union(Tag) { .InternalBlob => .{ .InternalBlob = this.InternalBlob }, .WTFStringImpl => .{ .WTFStringImpl = this.WTFStringImpl }, // .InlineBlob => .{ .InlineBlob = this.InlineBlob }, - .Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .Blob = .{} }, + .Locked => this.Locked.toAnyBlobAllowPromise(.strong) orelse AnyBlob{ .Blob = .{} }, else => .{ .Blob = Blob.initEmpty(undefined) }, }; @@ -890,7 +901,7 @@ pub const Value = union(Tag) { this.* = .{ .Error = err }; var strong_readable = locked.readable; - locked.readable = .{}; + locked.readable = .{ .empty = {} }; defer strong_readable.deinit(); if (locked.hasPendingPromise()) { @@ -905,7 +916,7 @@ pub const Value = union(Tag) { // The Promise version goes before the ReadableStream version incase the Promise version is used too. // Avoid creating unnecessary duplicate JSValue. - if (strong_readable.get(global)) |readable| { + if (strong_readable.get(.strong, global)) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.onData( .{ .err = this.Error.toStreamError(global) }, @@ -938,7 +949,7 @@ pub const Value = union(Tag) { if (!this.Locked.deinit) { this.Locked.deinit = true; this.Locked.readable.deinit(); - this.Locked.readable = .{}; + this.Locked.readable = .empty; } return; @@ -964,23 +975,39 @@ pub const Value = union(Tag) { } } - pub fn tee(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value { + pub fn tee(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Value { var locked = &this.Locked; - if (locked.readable.isDisturbed(globalThis)) { - return Value{ .Used = {} }; + if (locked.readable.isDisturbed(owner, globalThis)) { + return .Used; } - if (try locked.readable.tee(globalThis)) |readable| { - return Value{ + if (try locked.readable.tee(owner, globalThis, readable_stream_tee)) |result| { + if (readable_stream_tee != null) { + return .{ + .Locked = .{ + .readable = switch (owner) { + .Response => .Response, + .Request => .Request, + // For owner-less clones, keep a strong ref to the tee'd stream + // until the Request/Response gains a real owner + else => .{ .strong = .init(result.@"1", globalThis) }, + }, + .global = globalThis, + }, + }; + } + + return .{ .Locked = .{ - .readable = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis), + .readable = .{ .strong = .init(result.@"1", globalThis) }, .global = globalThis, }, }; } - if (locked.promise != null or locked.action != .none or locked.readable.has()) { - return Value{ .Used = {} }; + + if (locked.promise != null or locked.action != .none or locked.readable.has(owner, globalThis)) { + return .Used; } var drain_result: jsc.WebCore.DrainResult = .{ @@ -993,8 +1020,8 @@ pub const Value = union(Tag) { } if (drain_result == .empty or drain_result == .aborted) { - this.* = .{ .Null = {} }; - return Value{ .Null = {} }; + this.* = .Null; + return .Null; } var reader = jsc.WebCore.ByteStream.Source.new(.{ @@ -1012,30 +1039,45 @@ pub const Value = union(Tag) { reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint)); } - locked.readable = jsc.WebCore.ReadableStream.Strong.init(.{ + const stream_value = try reader.toReadableStream(globalThis); + const stream = jsc.WebCore.ReadableStream{ .ptr = .{ .Bytes = &reader.context }, - .value = try reader.toReadableStream(globalThis), - }, globalThis); + .value = stream_value, + }; + locked.readable.set(owner, stream, globalThis); if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| { - onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(globalThis).?); + onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get(owner, globalThis).?); } - const teed = (try locked.readable.tee(globalThis)) orelse return Value{ .Used = {} }; + const tee_result = (try locked.readable.tee(owner, globalThis, readable_stream_tee)) orelse return Value{ .Used = {} }; + + if (readable_stream_tee != null) { + return .{ + .Locked = .{ + .readable = switch (owner) { + .Response => .Response, + .Request => .Request, + else => .empty, + }, + .global = globalThis, + }, + }; + } return Value{ .Locked = .{ - .readable = jsc.WebCore.ReadableStream.Strong.init(teed, globalThis), + .readable = .{ .strong = .init(tee_result.@"1", globalThis) }, .global = globalThis, }, }; } - pub fn clone(this: *Value, globalThis: *jsc.JSGlobalObject) bun.JSError!Value { - this.toBlobIfPossible(); + pub fn clone(this: *Value, owner: jsc.WebCore.ReadableStream.Ref.Owner, globalThis: *jsc.JSGlobalObject, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Value { + this.toBlobIfPossible(owner); if (this.* == .Locked) { - return this.tee(globalThis); + return try this.tee(owner, globalThis, readable_stream_tee); } if (this.* == .InternalBlob) { @@ -1074,10 +1116,11 @@ pub const Value = union(Tag) { pub fn extract( globalThis: *JSGlobalObject, value: JSValue, + readable_stream_value: ?*JSValue, ) bun.JSError!Body { var body = Body{ .value = Value{ .Null = {} } }; - body.value = try Value.fromJS(globalThis, value); + body.value = try Value.fromJSWithReadableStreamValue(globalThis, value, readable_stream_value); if (body.value == .Blob) { assert(!body.value.Blob.isHeapAllocated()); // owned by Body } @@ -1086,18 +1129,29 @@ pub fn extract( pub fn Mixin(comptime Type: type) type { return struct { + inline fn getOwner(this_value: JSValue) jsc.WebCore.ReadableStream.Ref.Owner { + if (Type == jsc.WebCore.Request) { + return .{ .Request = this_value }; + } else if (Type == jsc.WebCore.Response) { + return .{ .Response = this_value }; + } else { + @compileError("Mixin only supports Request or Response types"); + } + } + pub fn getText(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const owner = getOwner(callframe.this()); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { return handleBodyAlreadyUsed(globalObject); } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - return value.Locked.setPromise(globalObject, .{ .getText = {} }); + return value.Locked.setPromise(owner, globalObject, .{ .getText = {} }); } var blob = value.useAsAnyBlobAllowNonUTF8String(); @@ -1105,16 +1159,20 @@ pub fn Mixin(comptime Type: type) type { } pub fn getBody(this: *Type, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue { + const this_value = this.this_jsvalue.tryGet() orelse JSValue.zero; + const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; var body: *Body.Value = this.getBodyValue(); if (body.* == .Used) { return jsc.WebCore.ReadableStream.used(globalThis); } - return body.toReadableStream(globalThis); + return body.toReadableStream(owner, globalThis); } pub fn getBodyUsed(this: *Type, globalObject: *jsc.JSGlobalObject) JSValue { + const this_value = this.this_jsvalue.tryGet() orelse JSValue.zero; + const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; return JSValue.jsBoolean( switch (this.getBodyValue().*) { .Used => true, @@ -1123,7 +1181,7 @@ pub fn Mixin(comptime Type: type) type { break :brk true; } - if (pending.readable.get(globalObject)) |*stream| { + if (pending.readable.get(owner, globalObject)) |*stream| { break :brk stream.isDisturbed(globalObject); } @@ -1143,19 +1201,20 @@ pub fn Mixin(comptime Type: type) type { } pub fn getJSON(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const owner = getOwner(callframe.this()); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { return handleBodyAlreadyUsed(globalObject); } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - value.toBlobIfPossible(); + value.toBlobIfPossible(owner); if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getJSON = {} }); + return value.Locked.setPromise(owner, globalObject, .{ .getJSON = {} }); } } @@ -1169,6 +1228,7 @@ pub fn Mixin(comptime Type: type) type { } pub fn getArrayBuffer(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const owner = getOwner(callframe.this()); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1176,13 +1236,13 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - value.toBlobIfPossible(); + value.toBlobIfPossible(owner); if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getArrayBuffer = {} }); + return value.Locked.setPromise(owner, globalObject, .{ .getArrayBuffer = {} }); } } @@ -1193,6 +1253,7 @@ pub fn Mixin(comptime Type: type) type { } pub fn getBytes(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const owner = getOwner(callframe.this()); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1200,12 +1261,12 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - value.toBlobIfPossible(); + value.toBlobIfPossible(owner); if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getBytes = {} }); + return value.Locked.setPromise(owner, globalObject, .{ .getBytes = {} }); } } @@ -1215,6 +1276,7 @@ pub fn Mixin(comptime Type: type) type { } pub fn getFormData(this: *Type, globalObject: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const owner = getOwner(callframe.this()); var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1222,10 +1284,10 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or value.Locked.isDisturbed(Type, globalObject, callframe.this())) { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - value.toBlobIfPossible(); + value.toBlobIfPossible(owner); } var encoder = (try this.getFormDataEncoding()) orelse { @@ -1234,7 +1296,7 @@ pub fn Mixin(comptime Type: type) type { }; if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getFormData = encoder }); + return value.Locked.setPromise(owner, globalObject, .{ .getFormData = encoder }); } var blob: AnyBlob = value.useAsAnyBlob(); @@ -1266,6 +1328,7 @@ pub fn Mixin(comptime Type: type) type { } pub fn getBlobWithThisValue(this: *Type, globalObject: *jsc.JSGlobalObject, this_value: JSValue) bun.JSError!jsc.JSValue { + const owner = if (this_value != .zero) getOwner(this_value) else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; var value: *Body.Value = this.getBodyValue(); if (value.* == .Used) { @@ -1273,17 +1336,14 @@ pub fn Mixin(comptime Type: type) type { } if (value.* == .Locked) { - if (value.Locked.action != .none or - ((this_value != .zero and value.Locked.isDisturbed(Type, globalObject, this_value)) or - (this_value == .zero and value.Locked.readable.isDisturbed(globalObject)))) - { + if (value.Locked.action != .none or value.Locked.isDisturbed(owner, globalObject)) { return handleBodyAlreadyUsed(globalObject); } - value.toBlobIfPossible(); + value.toBlobIfPossible(owner); if (value.* == .Locked) { - return value.Locked.setPromise(globalObject, .{ .getBlob = {} }); + return value.Locked.setPromise(owner, globalObject, .{ .getBlob = {} }); } } @@ -1373,7 +1433,7 @@ pub const ValueBufferer = struct { } pub fn run(sink: *@This(), value: *jsc.WebCore.Body.Value) !void { - value.toBlobIfPossible(); + value.toBlobIfPossible(.{ .empty = {} }); switch (value.*) { .Used => { @@ -1566,15 +1626,16 @@ pub const ValueBufferer = struct { fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value) !void { assert(value.* == .Locked); const locked = &value.Locked; - if (locked.readable.get(sink.global)) |stream| { - // keep the stream alive until we're done with it - sink.readable_stream_ref = locked.readable; - value.* = .{ .Used = {} }; - + if (locked.readable.get(.{ .empty = {} }, sink.global)) |stream| { if (stream.isLocked(sink.global)) { return error.StreamAlreadyUsed; } + // keep the stream alive until we're done with it + sink.readable_stream_ref = .init(stream, sink.global); + value.deinit(); + value.* = .{ .Used = {} }; + switch (stream.ptr) { .Invalid => { return error.InvalidStream; @@ -1613,7 +1674,7 @@ pub const ValueBufferer = struct { if (locked.onReceiveValue != null or locked.task != null) { // someone else is waiting for the stream or waiting for `onStartStreaming` - const readable = try value.toReadableStream(sink.global); + const readable = try value.toReadableStream(.empty, sink.global); readable.ensureStillAlive(); readable.protect(); return try sink.bufferLockedBodyValue(value); @@ -1632,7 +1693,7 @@ pub const ValueBufferer = struct { return; }, else => { - value.toBlobIfPossible(); + value.toBlobIfPossible(.{ .empty = {} }); var input = value.useAsAnyBlobAllowNonUTF8String(); const bytes = input.slice(); log("onReceiveValue {}", .{bytes.len}); diff --git a/src/bun.js/webcore/ReadableStream.zig b/src/bun.js/webcore/ReadableStream.zig index a8e3f9b9aeb..bb8ec3b4e12 100644 --- a/src/bun.js/webcore/ReadableStream.zig +++ b/src/bun.js/webcore/ReadableStream.zig @@ -3,6 +3,174 @@ const ReadableStream = @This(); value: JSValue, ptr: Source, +pub const Ref = union(Type) { + empty: void, + strong: Strong, + Response: void, + Request: void, + + pub const Owner = union(Type) { + empty: void, + strong: void, + Response: JSValue, + Request: JSValue, + }; + + pub const Type = enum { + empty, + strong, + Response, + Request, + }; + + pub fn get(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) ?ReadableStream { + switch (this.*) { + .strong => |*strong| return strong.get(global), + .Response => { + if (owner == .Response) { + if (owner.Response.as(jsc.WebCore.Response)) |_| { + if (Response.js.gc.body.get(owner.Response)) |body_value| { + return ReadableStream.fromJS(body_value, global) catch null; + } + } + } + }, + .Request => { + if (owner == .Request) { + if (owner.Request.as(jsc.WebCore.Request)) |_| { + if (Request.js.gc.body.get(owner.Request)) |body_value| { + return ReadableStream.fromJS(body_value, global) catch null; + } + } + } + }, + .empty => {}, + } + + return null; + } + + pub fn isDisturbed(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) bool { + const stream = get(this, owner, global) orelse return false; + return stream.isDisturbed(global); + } + + pub fn setValue(this: *Ref, owner: Owner, stream_jsvalue: jsc.JSValue, global: *jsc.JSGlobalObject) void { + switch (owner) { + .Response => |jsvalue| { + if (jsvalue != .zero) { + Response.js.gc.body.set(jsvalue, global, stream_jsvalue); + this.deinit(); + this.* = .Response; + } else { + this.deinit(); + this.* = .empty; + } + }, + .Request => |jsvalue| { + if (jsvalue != .zero) { + Request.js.gc.body.set(jsvalue, global, stream_jsvalue); + this.deinit(); + this.* = .Request; + } else { + this.deinit(); + this.* = .empty; + } + }, + .strong => { + this.deinit(); + this.* = .{ .strong = .{ .held = .create(stream_jsvalue, global) } }; + }, + .empty => {}, + } + } + + pub fn set(this: *Ref, owner: Owner, stream: ReadableStream, global: *jsc.JSGlobalObject) void { + this.setValue(owner, stream.value, global); + } + + pub fn tee(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject, readable_stream_value: ?*[2]jsc.JSValue) bun.JSError!?struct { ReadableStream, ReadableStream } { + const stream = get(this, owner, global) orelse return null; + + const result = try stream.tee(global) orelse return null; + if (readable_stream_value) |value| { + value.* = .{ result.@"0".value, result.@"1".value }; + } + // Always update the original Ref to point to the first tee'd stream + this.set(owner, result.@"0", global); + return result; + } + + pub fn has(this: *const Ref, owner: Owner, global: *jsc.JSGlobalObject) bool { + _ = get(this, owner, global) orelse return false; + return true; + } + + pub fn upgrade(this: *Ref, current: *const ReadableStream, global: *jsc.JSGlobalObject) void { + if (this.* == .strong) { + this.strong.held.set(global, current.value); + return; + } + + this.* = .{ .strong = .init(current.*, global) }; + } + + pub fn init(owner: Owner, global: *jsc.JSGlobalObject) Ref { + _ = global; + switch (owner) { + .Response => { + return .Response; + }, + .Request => { + return .Request; + }, + .strong, .empty => { + // Strong and empty don't have an owner, so we return empty + // The caller should use .set() to populate with a stream + return .empty; + }, + } + } + + pub fn deinit(this: *Ref) void { + switch (this.*) { + .strong => |*strong| { + strong.deinit(); + }, + .Response => {}, + .Request => {}, + .empty => {}, + } + this.* = .empty; + } + + pub fn abort(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject) bool { + if (this.get(owner, global)) |value| { + value.abort(global); + this.deinit(); + return true; + } + return false; + } + + pub fn toAnyBlob(this: *Ref, owner: Owner, global: *jsc.JSGlobalObject) ?Blob.Any { + var value = get(this, owner, global) orelse return null; + + if (value.toAnyBlob(global)) |blob| { + this.deinit(); + return blob; + } + + return null; + } + + pub fn done(this: *Ref, owner: Owner, globalObject: *jsc.JSGlobalObject) void { + const stream = this.get(owner, globalObject) orelse return; + stream.done(globalObject); + this.deinit(); + } +}; + pub const Strong = struct { held: jsc.Strong.Optional = .empty, @@ -842,6 +1010,9 @@ const jsc = bun.jsc; const JSGlobalObject = jsc.JSGlobalObject; const JSValue = jsc.JSValue; +const Request = jsc.WebCore.Request; +const Response = jsc.WebCore.Response; + const webcore = bun.webcore; const Blob = webcore.Blob; const streams = webcore.streams; diff --git a/src/bun.js/webcore/Request.zig b/src/bun.js/webcore/Request.zig index 559f9cb3243..5b628581804 100644 --- a/src/bun.js/webcore/Request.zig +++ b/src/bun.js/webcore/Request.zig @@ -15,6 +15,7 @@ weak_ptr_data: WeakRef.Data = .empty, // We must report a consistent value for this reported_estimated_size: usize = 0, internal_event_callback: InternalJSEventCallback = .{}, +this_jsvalue: jsc.JSRef = .empty(), pub const js = jsc.Codegen.JSRequest; // NOTE: toJS is overridden @@ -68,8 +69,8 @@ pub export fn Request__setTimeout(this: *Request, seconds: jsc.JSValue, globalTh this.setTimeout(seconds.to(c_uint)); } -pub export fn Request__clone(this: *Request, globalThis: *jsc.JSGlobalObject) ?*Request { - return this.clone(bun.default_allocator, globalThis) catch null; +pub export fn Request__clone(this: *Request, globalThis: *jsc.JSGlobalObject, this_value: jsc.JSValue, tee: ?*[2]jsc.JSValue) ?*Request { + return this.clone(bun.default_allocator, globalThis, this_value, tee) catch null; } comptime { @@ -175,7 +176,12 @@ pub export fn Bun__JSRequest__calculateEstimatedByteSize(this: *Request) void { pub fn toJS(this: *Request, globalObject: *JSGlobalObject) JSValue { this.calculateEstimatedByteSize(); - return js.toJSUnchecked(globalObject, this); + const value = js.toJSUnchecked(globalObject, this); + if (value != .zero) { + this.this_jsvalue.finalize(); + this.this_jsvalue = .initWeak(value); + } + return value; } extern "C" fn Bun__JSRequest__createForBake(globalObject: *jsc.JSGlobalObject, requestPtr: *Request) callconv(jsc.conv) jsc.JSValue; @@ -199,7 +205,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type .zero => "Request", else => "BunRequest", }; - try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.body.value.size(), .{}) }); + try writer.print("{s} ({}) {{\n", .{ class_label, bun.fmt.size(this.body.value.size(.empty), .{}) }); { formatter.indent += 1; defer formatter.indent -|= 1; @@ -238,7 +244,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type } else if (this.body.value == .InternalBlob or this.body.value == .WTFStringImpl) { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); - const size = this.body.value.size(); + const size = this.body.value.size(.empty); if (size == 0) { var empty = Blob.initEmpty(undefined); try empty.writeFormat(Formatter, formatter, writer, enable_ansi_colors); @@ -246,7 +252,7 @@ pub fn writeFormat(this: *Request, this_value: JSValue, comptime Formatter: type try Blob.writeFormatForSize(false, size, writer, enable_ansi_colors); } } else if (this.body.value == .Locked) { - if (this.body.value.Locked.readable.get(this.body.value.Locked.global)) |stream| { + if (this.body.value.Locked.readable.get(.strong, this.body.value.Locked.global)) |stream| { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); try formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors); @@ -352,6 +358,7 @@ pub fn finalizeWithoutDeinit(this: *Request) void { } pub fn finalize(this: *Request) void { + this.this_jsvalue.finalize(); this.finalizeWithoutDeinit(); _ = this.body.unref(); if (this.weak_ptr_data.onFinalize()) { @@ -522,7 +529,7 @@ const Fields = enum { url, }; -pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue) bun.JSError!Request { +pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!Request { var success = false; const vm = globalThis.bunVM(); const body = try vm.initRequestBodyValue(.{ .Null = {} }); @@ -582,7 +589,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (value_type == .DOMWrapper) { if (value.asDirect(Request)) |request| { if (values_to_try.len == 1) { - try request.cloneInto(&req, bun.default_allocator, globalThis, fields.contains(.url)); + try request.cloneInto(&req, bun.default_allocator, globalThis, fields.contains(.url), value, readable_stream_tee); success = true; return req; } @@ -610,7 +617,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV switch (request.body.value) { .Null, .Empty, .Used => {}, else => { - req.body.value = try request.body.value.clone(globalThis); + req.body.value = try request.body.value.clone(.{ .Request = value }, globalThis, readable_stream_tee); fields.insert(.body); }, } @@ -641,7 +648,11 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV switch (response.body.value) { .Null, .Empty, .Used => {}, else => { - req.body.value = try response.body.value.clone(globalThis); + req.body.value = try response.body.value.clone(.empty, globalThis, readable_stream_tee); + if (readable_stream_tee) |tee_value| { + Response.js.gc.body.set(value, globalThis, tee_value[0]); + } + fields.insert(.body); }, } @@ -654,7 +665,7 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV if (!fields.contains(.body)) { if (try value.fastGet(globalThis, .body)) |body_| { fields.insert(.body); - req.body.value = try Body.Value.fromJS(globalThis, body_); + req.body.value = try Body.Value.fromJSWithReadableStreamValue(globalThis, body_, if (readable_stream_tee) |tee| &tee[1] else null); } if (globalThis.hasException()) return error.JSError; @@ -775,12 +786,28 @@ pub fn constructInto(globalThis: *jsc.JSGlobalObject, arguments: []const jsc.JSV return req; } -pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Request { +pub fn constructor( + globalThis: *jsc.JSGlobalObject, + callframe: *jsc.CallFrame, + thisValue: JSValue, +) bun.JSError!*Request { const arguments_ = callframe.arguments_old(2); const arguments = arguments_.ptr[0..arguments_.len]; + var readable_stream_tee: [2]JSValue = .{ .zero, .zero }; + + const request = try constructInto(globalThis, arguments, &readable_stream_tee); + const result = Request.new(request); + + // Initialize this_jsvalue for GC cache support + if (thisValue != .zero) { + result.this_jsvalue = .initWeak(thisValue); + // Store tee'd stream in GC cache and update Ref if body is a stream + if (readable_stream_tee[1] != .zero and result.body.value == .Locked) { + result.body.value.Locked.readable.setValue(.{ .Request = thisValue }, readable_stream_tee[1], globalThis); + } + } - const request = try constructInto(globalThis, arguments); - return Request.new(request); + return result; } pub fn getBodyValue( @@ -795,21 +822,16 @@ pub fn doClone( callframe: *jsc.CallFrame, ) bun.JSError!jsc.JSValue { const this_value = callframe.this(); - const cloned = try this.clone(bun.default_allocator, globalThis); - + var readable_stream_tee: [2]JSValue = .{ .zero, .zero }; + const cloned = try this.clone(bun.default_allocator, globalThis, this_value, &readable_stream_tee); const js_wrapper = cloned.toJS(globalThis); if (js_wrapper != .zero) { - if (cloned.body.value == .Locked) { - if (cloned.body.value.Locked.readable.get(globalThis)) |readable| { - // If we are teed, then we need to update the cached .body - // value to point to the new readable stream - // We must do this on both the original and cloned request - // but especially the original request since it will have a stale .body value now. - js.bodySetCached(js_wrapper, globalThis, readable.value); - if (this.body.value.Locked.readable.get(globalThis)) |other_readable| { - js.bodySetCached(this_value, globalThis, other_readable.value); - } - } + if (this.body.value == .Locked and readable_stream_tee[0] != .zero) { + js.gc.body.set(this_value, globalThis, readable_stream_tee[0]); + } + + if (cloned.body.value == .Locked and readable_stream_tee[1] != .zero) { + js.gc.body.set(js_wrapper, globalThis, readable_stream_tee[1]); } } @@ -852,9 +874,13 @@ pub fn ensureFetchHeaders( } else { // we don't have a request context, so we need to create an empty headers object this._headers = FetchHeaders.createEmpty(); + const owner: jsc.WebCore.ReadableStream.Ref.Owner = if (this.this_jsvalue.tryGet()) |js_value| + .{ .Request = js_value } + else + .empty; const content_type = switch (this.body.value) { .Blob => |blob| blob.content_type, - .Locked => |locked| if (locked.readable.get(globalThis)) |*readable| switch (readable.ptr) { + .Locked => |locked| if (locked.readable.get(owner, globalThis)) |*readable| switch (readable.ptr) { .Blob => |blob| blob.content_type, else => null, } else null, @@ -927,11 +953,13 @@ pub fn cloneInto( allocator: std.mem.Allocator, globalThis: *JSGlobalObject, preserve_url: bool, + this_value: jsc.JSValue, + readable_stream_tee: ?*[2]jsc.JSValue, ) bun.JSError!void { _ = allocator; this.ensureURL() catch {}; const vm = globalThis.bunVM(); - var body_ = try this.body.value.clone(globalThis); + var body_ = try this.body.value.clone(.{ .Request = this_value }, globalThis, readable_stream_tee); errdefer body_.deinit(); const body = try vm.initRequestBodyValue(body_); const url = if (preserve_url) req.url else this.url.dupeRef(); @@ -952,10 +980,10 @@ pub fn cloneInto( } } -pub fn clone(this: *Request, allocator: std.mem.Allocator, globalThis: *JSGlobalObject) bun.JSError!*Request { +pub fn clone(this: *Request, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, this_value: jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!*Request { const req = Request.new(undefined); errdefer bun.destroy(req); - try this.cloneInto(req, allocator, globalThis, false); + try this.cloneInto(req, allocator, globalThis, false, this_value, readable_stream_tee); return req; } diff --git a/src/bun.js/webcore/Response.zig b/src/bun.js/webcore/Response.zig index fa0a6b3c067..3cfb6800f61 100644 --- a/src/bun.js/webcore/Response.zig +++ b/src/bun.js/webcore/Response.zig @@ -21,6 +21,8 @@ ref_count: u32 = 1, // We must report a consistent value for this reported_estimated_size: usize = 0, +this_jsvalue: jsc.JSRef = .empty(), + pub const getText = ResponseMixin.getText; pub const getBody = ResponseMixin.getBody; pub const getBytes = ResponseMixin.getBytes; @@ -51,7 +53,9 @@ pub fn calculateEstimatedByteSize(this: *Response) void { pub fn toJS(this: *Response, globalObject: *JSGlobalObject) JSValue { this.calculateEstimatedByteSize(); - return js.toJSUnchecked(globalObject, this); + const value = js.toJSUnchecked(globalObject, this); + this.this_jsvalue = .initWeak(value); + return value; } pub fn getBodyValue( @@ -264,22 +268,23 @@ pub fn doClone( callframe: *jsc.CallFrame, ) bun.JSError!JSValue { const this_value = callframe.this(); - const cloned = try this.clone(globalThis); + var readable_stream_tee: [2]jsc.JSValue = .{ .zero, .zero }; + const cloned = try this.clone(globalThis, this_value, &readable_stream_tee); const js_wrapper = Response.makeMaybePooled(globalThis, cloned); if (js_wrapper != .zero) { - if (cloned.body.value == .Locked) { - if (cloned.body.value.Locked.readable.get(globalThis)) |readable| { - // If we are teed, then we need to update the cached .body - // value to point to the new readable stream - // We must do this on both the original and cloned response - // but especially the original response since it will have a stale .body value now. - js.bodySetCached(js_wrapper, globalThis, readable.value); - if (this.body.value.Locked.readable.get(globalThis)) |other_readable| { - js.bodySetCached(this_value, globalThis, other_readable.value); - } - } + + // If we are teed, then we need to update the cached .body + // value to point to the new readable stream + // We must do this on both the original and cloned response + // but especially the original response since it will have a stale .body value now. + if (readable_stream_tee[0] != .zero) { + js.gc.body.set(this_value, globalThis, readable_stream_tee[0]); + } + + if (readable_stream_tee[1] != .zero) { + js.gc.body.set(js_wrapper, globalThis, readable_stream_tee[1]); } } @@ -293,8 +298,10 @@ pub fn makeMaybePooled(globalObject: *jsc.JSGlobalObject, ptr: *Response) JSValu pub fn cloneValue( this: *Response, globalThis: *JSGlobalObject, + this_value: jsc.JSValue, + readable_stream_tee: ?*[2]jsc.JSValue, ) bun.JSError!Response { - var body = try this.body.clone(globalThis); + var body = try this.body.clone(.{ .Response = this_value }, globalThis, readable_stream_tee); errdefer body.deinit(bun.default_allocator); var init = try this.init.clone(globalThis); errdefer init.deinit(bun.default_allocator); @@ -306,8 +313,8 @@ pub fn cloneValue( }; } -pub fn clone(this: *Response, globalThis: *JSGlobalObject) bun.JSError!*Response { - return bun.new(Response, try this.cloneValue(globalThis)); +pub fn clone(this: *Response, globalThis: *JSGlobalObject, this_value: jsc.JSValue, readable_stream_tee: ?*[2]jsc.JSValue) bun.JSError!*Response { + return bun.new(Response, try this.cloneValue(globalThis, this_value, readable_stream_tee)); } pub fn getStatus( @@ -322,7 +329,7 @@ fn destroy(this: *Response) void { this.init.deinit(bun.default_allocator); this.body.deinit(bun.default_allocator); this.url.deref(); - + this.this_jsvalue.deinit(); bun.destroy(this); } @@ -342,6 +349,7 @@ pub fn unref(this: *Response) void { pub fn finalize( this: *Response, ) callconv(.C) void { + this.this_jsvalue.finalize(); this.unref(); } @@ -532,7 +540,7 @@ pub fn constructError( return response.toJS(globalThis); } -pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!*Response { +pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, thisValue: JSValue) bun.JSError!*Response { var arguments = callframe.argumentsAsArray(2); if (!arguments[0].isUndefinedOrNull() and arguments[0].isObject()) { @@ -560,11 +568,16 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b return s3.throwSignError(sign_err, globalThis); }; defer result.deinit(); - response.init.headers = try response.getOrCreateHeaders(globalThis); + const headers = try response.getOrCreateHeaders(globalThis); + errdefer headers.deref(); + try headers.put(.Location, result.url, globalThis); response.redirected = true; - var headers_ref = response.init.headers.?; - try headers_ref.put(.Location, result.url, globalThis); - return bun.new(Response, response); + response.init.headers = headers; + var allocated_response = bun.new(Response, response); + if (thisValue != .zero) { + allocated_response.this_jsvalue = .initWeak(thisValue); + } + return allocated_response; } } } @@ -589,13 +602,15 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b return error.JSError; } + var readable_stream_value: JSValue = .zero; + var body: Body = brk: { if (arguments[0].isUndefinedOrNull()) { break :brk Body{ .value = Body.Value{ .Null = {} }, }; } - break :brk try Body.extract(globalThis, arguments[0]); + break :brk try Body.extract(globalThis, arguments[0], &readable_stream_value); }; errdefer body.deinit(bun.default_allocator); @@ -603,21 +618,26 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) b return error.JSError; } + if (body.value == .Blob and + init.headers != null and + body.value.Blob.content_type.len > 0 and + !init.headers.?.fastHas(.ContentType)) + { + try init.headers.?.put(.ContentType, body.value.Blob.content_type, globalThis); + } + var response = bun.new(Response, Response{ .body = body, .init = init, }); - if (response.body.value == .Blob and - response.init.headers != null and - response.body.value.Blob.content_type.len > 0 and - !response.init.headers.?.fastHas(.ContentType)) - { - try response.init.headers.?.put(.ContentType, response.body.value.Blob.content_type, globalThis); - } - response.calculateEstimatedByteSize(); - + if (thisValue != .zero) { + response.this_jsvalue = .initWeak(thisValue); + if (readable_stream_value != .zero) { + response.body.value.Locked.readable.setValue(.{ .Response = thisValue }, readable_stream_value, response.body.value.Locked.global); + } + } return response; } diff --git a/src/bun.js/webcore/fetch.zig b/src/bun.js/webcore/fetch.zig index 0b041a974cf..6cecabc0e13 100644 --- a/src/bun.js/webcore/fetch.zig +++ b/src/bun.js/webcore/fetch.zig @@ -184,17 +184,23 @@ pub const FetchTasklet = struct { pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody { var body_value = try Body.Value.fromJS(globalThis, value); - if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) { + defer body_value.deinit(); + if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed(.empty, globalThis)))) { return globalThis.ERR(.BODY_ALREADY_USED, "body already used", .{}).throw(); } if (body_value == .Locked) { - if (body_value.Locked.readable.has()) { + if (body_value.Locked.readable == .strong) { // just grab the ref - return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; + + defer body_value.Locked.readable = .empty; + return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable.strong }; } - const readable = try body_value.toReadableStream(globalThis); - if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) { - return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable }; + + const readable = try body_value.toReadableStream(.empty, globalThis); + if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has(.empty, globalThis)) { + if (body_value.Locked.readable.get(.empty, globalThis)) |stream| { + return FetchTasklet.HTTPRequestBody{ .ReadableStream = .init(stream, globalThis) }; + } } } return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() }; @@ -447,7 +453,8 @@ pub const FetchTasklet = struct { if (this.getCurrentResponse()) |response| { var body = &response.body; if (body.value == .Locked) { - if (body.value.Locked.readable.get(globalThis)) |readable| { + const response_js = response.this_jsvalue.tryGet() orelse .zero; + if (body.value.Locked.readable.get(if (response_js != .zero) .{ .Response = response_js } else .{ .empty = {} }, globalThis)) |readable| { if (readable.ptr == .Bytes) { readable.ptr.Bytes.size_hint = this.getSizeHint(); @@ -464,7 +471,7 @@ pub const FetchTasklet = struct { ); } else { var prev = body.value.Locked.readable; - body.value.Locked.readable = .{}; + body.value.Locked.readable = .{ .empty = {} }; readable.value.ensureStillAlive(); prev.deinit(); readable.value.ensureStillAlive(); @@ -2103,17 +2110,17 @@ pub fn Bun__fetch_( } if (request) |req| { - if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(Request, globalThis, first_arg)))) { + if (req.body.value == .Used or (req.body.value == .Locked and (req.body.value.Locked.action != .none or req.body.value.Locked.isDisturbed(.{ .Request = first_arg }, globalThis)))) { return globalThis.ERR(.BODY_ALREADY_USED, "Request body already used", .{}).throw(); } if (req.body.value == .Locked) { - if (req.body.value.Locked.readable.has()) { - break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) }; + if (req.body.value.Locked.readable.has(.{ .Request = first_arg }, globalThis)) { + break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(.{ .Request = first_arg }, globalThis).?, globalThis) }; } - const readable = try req.body.value.toReadableStream(globalThis); - if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has()) { - break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(globalThis).?, globalThis) }; + const readable = try req.body.value.toReadableStream(.{ .Request = first_arg }, globalThis); + if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has(.{ .Request = first_arg }, globalThis)) { + break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = jsc.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get(.{ .Request = first_arg }, globalThis).?, globalThis) }; } } @@ -2123,7 +2130,7 @@ pub fn Bun__fetch_( if (request_init_object) |req| { if (try req.fastGet(globalThis, .body)) |body__| { if (!body__.isUndefined()) { - break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__); + break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(globalThis, body__); } } } diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index e1900cd059c..31fb3c4d186 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -4,6 +4,7 @@ export default [ define({ name: "Request", construct: true, + constructNeedsThis: true, finalize: true, final: false, klass: {}, @@ -68,6 +69,7 @@ export default [ define({ name: "Response", construct: true, + constructNeedsThis: true, finalize: true, final: false, JSType: "0b11101110", diff --git a/src/codegen/generate-classes.ts b/src/codegen/generate-classes.ts index 1215f77d541..45a889ba7e7 100644 --- a/src/codegen/generate-classes.ts +++ b/src/codegen/generate-classes.ts @@ -1710,7 +1710,7 @@ size_t ${name}::memoryCost(void* ptr) { size_t ${name}::estimatedSize(JSC::JSCell* cell, JSC::VM& vm) { auto* thisObject = jsCast<${name}*>(cell); auto* wrapped = thisObject->wrapped(); - return Base::estimatedSize(cell, vm) + ${name}::memoryCost(wrapped); + return Base::estimatedSize(cell, vm) + (wrapped ? ${name}::memoryCost(wrapped) : 0); } void ${name}::destroy(JSCell* cell) diff --git a/src/codegen/generate-jssink.ts b/src/codegen/generate-jssink.ts index 13d71b6d18d..f235591c7e6 100644 --- a/src/codegen/generate-jssink.ts +++ b/src/codegen/generate-jssink.ts @@ -424,7 +424,8 @@ size_t ${controller}::memoryCost(void* sinkPtr) { } size_t ${controller}::estimatedSize(JSCell* cell, JSC::VM& vm) { - return Base::estimatedSize(cell, vm) + ${controller}::memoryCost(jsCast<${controller}*>(cell)->wrapped()); + auto* wrapped = jsCast<${controller}*>(cell)->wrapped(); + return Base::estimatedSize(cell, vm) + (wrapped ? ${controller}::memoryCost(wrapped) : 0); } JSC_DECLARE_HOST_FUNCTION(${controller}__close); diff --git a/src/shell/states/Cmd.zig b/src/shell/states/Cmd.zig index 8a732f27d67..5e3de34bc0a 100644 --- a/src/shell/states/Cmd.zig +++ b/src/shell/states/Cmd.zig @@ -576,7 +576,8 @@ fn initRedirections(this: *Cmd, spawn_args: *Subprocess.SpawnArgs) bun.JSError!? _ = rstream; @panic("TODO SHELL READABLE STREAM"); } else if (this.base.interpreter.jsobjs[val.idx].as(jsc.WebCore.Response)) |req| { - req.getBodyValue().toBlobIfPossible(); + const owner = if (req.this_jsvalue.tryGet()) |jsval| jsc.WebCore.ReadableStream.Ref.Owner{ .Response = jsval } else jsc.WebCore.ReadableStream.Ref.Owner{ .empty = {} }; + req.getBodyValue().toBlobIfPossible(owner); if (this.node.redirect.stdin) { try spawn_args.stdio[stdin_no].extractBlob(global, req.getBodyValue().useAsAnyBlob(), stdin_no); }