Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bun.js/api/html_rewriter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,12 @@ pub const HTMLRewriter = struct {
result.setUrl(original.getUrl().clone());

const value = original.getBodyValue();
const owned_readable_stream = original.getBodyReadableStream(sink.global);
sink.ref();
sink.bodyValueBufferer = jsc.WebCore.Body.ValueBufferer.init(sink, @ptrCast(&onFinishedBuffering), sink.global, bun.default_allocator);
response_js_value.ensureStillAlive();

sink.bodyValueBufferer.?.run(value) catch |buffering_error| {
sink.bodyValueBufferer.?.run(value, owned_readable_stream) catch |buffering_error| {
defer sink.deref();
return switch (buffering_error) {
error.StreamAlreadyUsed => {
Expand Down
30 changes: 15 additions & 15 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d
}
}

existing_request = Request.init(
existing_request = Request.init2(
bun.String.cloneUTF8(url.href),
headers,
bun.handleOom(this.vm.initRequestBodyValue(body)),
Expand Down Expand Up @@ -2213,13 +2213,13 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d
ctx.signal = signal;
signal.pendingActivityRef();

const request_object = Request.new(.{
.method = ctx.method,
.request_context = AnyRequestContext.init(ctx),
.https = ssl_enabled,
.signal = signal.ref(),
.body = body.ref(),
});
const request_object = Request.new(Request.init(
ctx.method,
AnyRequestContext.init(ctx),
ssl_enabled,
signal.ref(),
body.ref(),
));
ctx.request_weakref = .initRef(request_object);

if (comptime debug_mode) {
Expand Down Expand Up @@ -2314,13 +2314,13 @@ pub fn NewServer(protocol_enum: enum { http, https }, development_kind: enum { d
var signal = jsc.WebCore.AbortSignal.new(this.globalThis);
ctx.signal = signal;

var request_object = Request.new(.{
.method = ctx.method,
.request_context = AnyRequestContext.init(ctx),
.https = ssl_enabled,
.signal = signal.ref(),
.body = body.ref(),
});
var request_object = Request.new(Request.init(
ctx.method,
AnyRequestContext.init(ctx),
ssl_enabled,
signal.ref(),
body.ref(),
));
ctx.upgrade_context = upgrade_ctx;
ctx.request_weakref = .initRef(request_object);
// We keep the Request object alive for the duration of the request so that we can remove the pointer to the UWS request object.
Expand Down
62 changes: 33 additions & 29 deletions src/bun.js/api/server/RequestContext.zig
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,11 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}

if (this.response_ptr) |response| {
const bodyValue = response.getBodyValue();
if (bodyValue.* == .Locked) {
var strong_readable = bodyValue.Locked.readable;
bodyValue.Locked.readable = .{};
defer strong_readable.deinit();
if (strong_readable.get(globalThis)) |readable| {
readable.abort(globalThis);
any_js_calls = true;
}
if (response.getBodyReadableStream(globalThis)) |stream| {
stream.value.ensureStillAlive();
response.detachReadableStream(globalThis);
stream.abort(globalThis);
any_js_calls = true;
}
}
}
Expand Down Expand Up @@ -1091,7 +1087,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
}

pub fn doRenderWithBodyLocked(this: *anyopaque, value: *jsc.WebCore.Body.Value) void {
doRenderWithBody(bun.cast(*RequestContext, this), value);
doRenderWithBody(bun.cast(*RequestContext, this), value, null);
}

fn renderWithBlobFromBodyValue(this: *RequestContext) void {
Expand Down Expand Up @@ -1664,13 +1660,13 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

if (req.response_ptr) |resp| {
assert(req.server != null);
const globalThis = req.server.?.globalThis;
const bodyValue = resp.getBodyValue();
if (bodyValue.* == .Locked) {
const global = bodyValue.Locked.global;
if (bodyValue.Locked.readable.get(global)) |stream| {
stream.done(global);
}
bodyValue.Locked.readable.deinit();
if (resp.getBodyReadableStream(globalThis)) |stream| {
stream.value.ensureStillAlive();
resp.detachReadableStream(globalThis);

stream.done(globalThis);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This subtly changes the semantics. now we only mark it as used if there is a ReadableStream attached. Previously, we unconditionally marked it as used. That is a bug.

bodyValue.* = .{ .Used = {} };
}
}
Expand Down Expand Up @@ -1720,11 +1716,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,

if (req.response_ptr) |resp| {
const bodyValue = resp.getBodyValue();
if (bodyValue.* == .Locked) {
if (bodyValue.Locked.readable.get(globalThis)) |stream| {
stream.done(globalThis);
}
bodyValue.Locked.readable.deinit();
if (resp.getBodyReadableStream(globalThis)) |stream| {
stream.value.ensureStillAlive();
resp.detachReadableStream(globalThis);
stream.done(globalThis);
bodyValue.* = .{ .Used = {} };
}
}
Expand Down Expand Up @@ -1802,7 +1797,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
req.endStream(req.shouldCloseConnection());
}

pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value) void {
pub fn doRenderWithBody(this: *RequestContext, value: *jsc.WebCore.Body.Value, owned_readable: ?jsc.WebCore.ReadableStream) void {
this.drainMicrotasks();

// If a ReadableStream can trivially be converted to a Blob, do so.
Expand Down Expand Up @@ -1832,11 +1827,20 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
if (this.isAbortedOrEnded()) {
return;
}

if (lock.readable.get(globalThis)) |stream_| {
const stream: jsc.WebCore.ReadableStream = stream_;
// we hold the stream alive until we're done with it
this.readable_stream_ref = lock.readable;
const readable_stream = brk: {
if (lock.readable.get(globalThis)) |stream| {
// we hold the stream alive until we're done with it
this.readable_stream_ref = lock.readable;
break :brk stream;
}
if (owned_readable) |stream| {
// response owns the stream, so we hold a strong reference to it
this.readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(stream, globalThis);
break :brk stream;
}
break :brk null;
};
if (readable_stream) |stream| {
value.* = .{ .Used = {} };

if (stream.isLocked(globalThis)) {
Expand Down Expand Up @@ -1915,7 +1919,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
// someone else is waiting for the stream or waiting for `onStartStreaming`
const readable = value.toReadableStream(globalThis) catch return; // TODO: properly propagate exception upwards
readable.ensureStillAlive();
this.doRenderWithBody(value);
this.doRenderWithBody(value, null);
return;
}

Expand Down Expand Up @@ -1996,7 +2000,7 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
return;
}
var response = this.response_ptr.?;
this.doRenderWithBody(response.getBodyValue());
this.doRenderWithBody(response.getBodyValue(), response.getBodyReadableStream(this.server.?.globalThis));
}

pub fn renderProductionError(this: *RequestContext, status: u16) void {
Expand Down
6 changes: 3 additions & 3 deletions src/bun.js/bindings/JSBakeResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugInfo);
static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugStack);
static JSC_DECLARE_CUSTOM_GETTER(jsBakeResponsePrototypeGetDebugTask);

extern JSC_CALLCONV void* JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructForSSR(JSC::JSGlobalObject*, JSC::CallFrame*, int*);
extern JSC_CALLCONV void* JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructForSSR(JSC::JSGlobalObject*, JSC::CallFrame*, int*, JSC::EncodedJSValue);
extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES ResponseClass__constructError(JSC::JSGlobalObject*, JSC::CallFrame*);
extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES ResponseClass__constructJSON(JSC::JSGlobalObject*, JSC::CallFrame*);
extern "C" SYSV_ABI JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES BakeResponseClass__constructRender(JSC::JSGlobalObject*, JSC::CallFrame*);
Expand Down Expand Up @@ -213,7 +213,7 @@ class JSBakeResponseConstructor final : public JSC::InternalFunction {
JSBakeResponse* instance = JSBakeResponse::create(vm, globalObject, structure, nullptr);

int arg_was_jsx = 0;
void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, &arg_was_jsx);
void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, &arg_was_jsx, JSValue::encode(instance));
if (scope.exception()) [[unlikely]] {
ASSERT_WITH_MESSAGE(!ptr, "Memory leak detected: new Response() allocated memory without checking for exceptions.");
return JSValue::encode(JSC::jsUndefined());
Expand Down Expand Up @@ -243,7 +243,7 @@ class JSBakeResponseConstructor final : public JSC::InternalFunction {
Structure* structure = globalObject->bakeAdditions().JSBakeResponseStructure(globalObject);
JSBakeResponse* instance = JSBakeResponse::create(vm, globalObject, structure, nullptr);

void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, nullptr);
void* ptr = BakeResponseClass__constructForSSR(globalObject, callFrame, nullptr, JSValue::encode(instance));
if (scope.exception()) [[unlikely]] {
ASSERT_WITH_MESSAGE(!ptr, "Memory leak detected: new Response() allocated memory without checking for exceptions.");
return JSValue::encode(JSC::jsUndefined());
Expand Down
6 changes: 6 additions & 0 deletions src/bun.js/bindings/JSGlobalObject.zig
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,12 @@ pub const JSGlobalObject = opaque {
// We're done validating. From now on, deal with extracting the body.
body.toBlobIfPossible();

if (body.* == .Locked) {
if (response.getBodyReadableStream(this)) |stream| {
return stream.value;
}
}

var any_blob = switch (body.*) {
.Locked => body.tryUseAsAnyBlob() orelse return body.toReadableStream(this),
else => body.useAsAnyBlob(),
Expand Down
5 changes: 3 additions & 2 deletions src/bun.js/node/types.zig
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ pub const BlobOrStringOrBuffer = union(enum) {
}
if (allow_request_response) {
if (value.as(jsc.WebCore.Request)) |request| {
request.body.value.toBlobIfPossible();
const bodyValue = request.getBodyValue();
bodyValue.toBlobIfPossible();

if (request.body.value.tryUseAsAnyBlob()) |any_blob_| {
if (bodyValue.tryUseAsAnyBlob()) |any_blob_| {
var any_blob = any_blob_;
defer any_blob.detach();
return .{ .blob = any_blob.toBlob(global) };
Expand Down
8 changes: 4 additions & 4 deletions src/bun.js/webcore/BakeResponse.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub fn toJSForSSR(this: *Response, globalObject: *JSGlobalObject, kind: SSRKind)
return BakeResponse__createForSSR(globalObject, this, @intFromEnum(kind));
}

pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame, bake_ssr_has_jsx: *c_int) callconv(jsc.conv) ?*anyopaque {
return @as(*Response, constructor(globalObject, callFrame, bake_ssr_has_jsx) catch |err| switch (err) {
pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame, bake_ssr_has_jsx: *c_int, js_this: jsc.JSValue) callconv(jsc.conv) ?*anyopaque {
return @as(*Response, constructor(globalObject, callFrame, bake_ssr_has_jsx, js_this) catch |err| switch (err) {
error.JSError => return null,
error.OutOfMemory => {
globalObject.throwOutOfMemory() catch {};
Expand All @@ -28,7 +28,7 @@ pub export fn BakeResponseClass__constructForSSR(globalObject: *jsc.JSGlobalObje
});
}

pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, bake_ssr_has_jsx: *c_int) bun.JSError!*Response {
pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, bake_ssr_has_jsx: *c_int, js_this: jsc.JSValue) bun.JSError!*Response {
var arguments = callframe.argumentsAsArray(2);

// Allow `return new Response(<jsx> ... </jsx>, { ... }`
Expand All @@ -44,7 +44,7 @@ pub fn constructor(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame, b
}
}

return Response.constructor(globalThis, callframe);
return Response.constructor(globalThis, callframe, js_this);
}

pub export fn BakeResponseClass__constructRedirect(globalObject: *jsc.JSGlobalObject, callFrame: *jsc.CallFrame) callconv(jsc.conv) jsc.JSValue {
Expand Down
20 changes: 11 additions & 9 deletions src/bun.js/webcore/Blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1356,13 +1356,14 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
_ = bodyValue.use();
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => |*locked| {
.Locked => {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try bodyValue.toReadableStream(globalThis);
if (locked.readable.get(globalThis)) |readable| {

if (response.getBodyReadableStream(globalThis) orelse bodyValue.Locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
Expand Down Expand Up @@ -1401,28 +1402,29 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
}

if (data.as(Request)) |request| {
switch (request.body.value) {
const bodyValue = request.getBodyValue();
switch (bodyValue.*) {
.WTFStringImpl,
.InternalBlob,
.Used,
.Empty,
.Blob,
.Null,
=> {
break :brk request.body.value.use();
break :brk bodyValue.use();
},
.Error => |*err_ref| {
destination_blob.detach();
_ = request.body.value.use();
_ = bodyValue.use();
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => |locked| {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try request.body.value.toReadableStream(globalThis);
if (locked.readable.get(globalThis)) |readable| {
_ = try bodyValue.toReadableStream(globalThis);
if (request.getBodyReadableStream(globalThis) orelse locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
Expand Down Expand Up @@ -1453,8 +1455,8 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
.mkdirp_if_not_exists = options.mkdirp_if_not_exists orelse true,
});

request.body.value.Locked.task = task;
request.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
bodyValue.Locked.task = task;
bodyValue.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;

return task.promise.value();
},
Expand Down
Loading