diff --git a/src/bun.js.zig b/src/bun.js.zig index 033fd06cfa2..8298c549f50 100644 --- a/src/bun.js.zig +++ b/src/bun.js.zig @@ -106,6 +106,7 @@ pub const Run = struct { jsc.VirtualMachine.is_main_thread_vm = true; bun.http.experimental_http2_client_from_cli = ctx.runtime_options.experimental_http2_fetch; + bun.http.experimental_http3_client_from_cli = ctx.runtime_options.experimental_http3_fetch; doPreconnect(ctx.runtime_options.preconnect); const callback = OpaqueWrap(Run, Run.start); @@ -292,6 +293,7 @@ pub const Run = struct { vm.transpiler.env.loadTracy(); bun.http.experimental_http2_client_from_cli = ctx.runtime_options.experimental_http2_fetch; + bun.http.experimental_http3_client_from_cli = ctx.runtime_options.experimental_http3_fetch; doPreconnect(ctx.runtime_options.preconnect); vm.main_is_html_entrypoint = (loader orelse vm.transpiler.options.loader(std.fs.path.extension(entry_path))) == .html; diff --git a/src/cli.zig b/src/cli.zig index 00c2fbb57f0..75cda52f5a5 100644 --- a/src/cli.zig +++ b/src/cli.zig @@ -409,6 +409,7 @@ pub const Command = struct { } = .{}, preconnect: []const []const u8 = &[_][]const u8{}, experimental_http2_fetch: bool = false, + experimental_http3_fetch: bool = false, dns_result_order: []const u8 = "verbatim", /// `--expose-gc` makes `globalThis.gc()` available. Added for Node /// compatibility. diff --git a/src/cli/Arguments.zig b/src/cli/Arguments.zig index d3fef673e30..caa05eec2b8 100644 --- a/src/cli/Arguments.zig +++ b/src/cli/Arguments.zig @@ -109,6 +109,7 @@ pub const runtime_params_ = [_]ParamType{ clap.parseParam("--conditions ... Pass custom conditions to resolve") catch unreachable, clap.parseParam("--fetch-preconnect ... Preconnect to a URL while code is loading") catch unreachable, clap.parseParam("--experimental-http2-fetch Offer h2 in fetch() TLS ALPN. Same as BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP2_CLIENT=1") catch unreachable, + clap.parseParam("--experimental-http3-fetch Honor Alt-Svc: h3 in fetch() and upgrade to HTTP/3. Same as BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT=1") catch unreachable, clap.parseParam("--max-http-header-size Set the maximum size of HTTP headers in bytes. Default is 16KiB") catch unreachable, clap.parseParam("--dns-result-order Set the default order of DNS lookup results. Valid orders: verbatim (default), ipv4first, ipv6first") catch unreachable, clap.parseParam("--expose-gc Expose gc() on the global object. Has no effect on Bun.gc().") catch unreachable, @@ -880,6 +881,7 @@ pub fn parse(allocator: std.mem.Allocator, ctx: Command.Context, comptime cmd: C ctx.runtime_options.smol = args.flag("--smol"); ctx.runtime_options.preconnect = args.options("--fetch-preconnect"); ctx.runtime_options.experimental_http2_fetch = args.flag("--experimental-http2-fetch"); + ctx.runtime_options.experimental_http3_fetch = args.flag("--experimental-http3-fetch"); ctx.runtime_options.expose_gc = args.flag("--expose-gc"); if (args.option("--console-depth")) |depth_str| { diff --git a/src/cli/test/parallel/runner.zig b/src/cli/test/parallel/runner.zig index 7cf1f7976c0..b2f53039ca5 100644 --- a/src/cli/test/parallel/runner.zig +++ b/src/cli/test/parallel/runner.zig @@ -229,6 +229,8 @@ fn buildWorkerArgv(arena: std.mem.Allocator, ctx: Command.Context) ![:null]?[*:0 try argv.append(arena, "--smol"); if (ctx.runtime_options.experimental_http2_fetch) try argv.append(arena, "--experimental-http2-fetch"); + if (ctx.runtime_options.experimental_http3_fetch) + try argv.append(arena, "--experimental-http3-fetch"); if (ctx.args.allow_addons == false) try argv.append(arena, "--no-addons"); if (ctx.debug.macros == .disable) diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 6dbb2ef9fe2..38ab7c07c96 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -1496,6 +1496,7 @@ pub const TestCommand = struct { vm.preload = ctx.preloads; vm.transpiler.options.rewrite_jest_for_tests = true; bun.http.experimental_http2_client_from_cli = ctx.runtime_options.experimental_http2_fetch; + bun.http.experimental_http3_client_from_cli = ctx.runtime_options.experimental_http3_fetch; vm.transpiler.options.env.behavior = .load_all_without_inlining; const node_env_entry = try env_loader.map.getOrPutWithoutValue("NODE_ENV"); diff --git a/src/env_var.zig b/src/env_var.zig index 9c16c030a2a..03302f4dc37 100644 --- a/src/env_var.zig +++ b/src/env_var.zig @@ -193,6 +193,11 @@ pub const feature_flag = struct { /// server selects it. Off by default while the client implementation /// matures. `--experimental-http2-fetch` is the CLI equivalent. pub const BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP2_CLIENT = newFeatureFlag("BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP2_CLIENT", .{}); + /// Honor `Alt-Svc: h3` from fetch() responses: subsequent requests to the + /// same origin go over QUIC/HTTP-3 instead of TCP. Off by default while + /// the client implementation matures. `--experimental-http3-fetch` is the + /// CLI equivalent. + pub const BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT = newFeatureFlag("BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT", .{}); pub const BUN_FEATURE_FLAG_FORCE_IO_POOL = newFeatureFlag("BUN_FEATURE_FLAG_FORCE_IO_POOL", .{}); pub const BUN_FEATURE_FLAG_FORCE_WINDOWS_JUNCTIONS = newFeatureFlag("BUN_FEATURE_FLAG_FORCE_WINDOWS_JUNCTIONS", .{}); pub const BUN_INSTRUMENTS = newFeatureFlag("BUN_INSTRUMENTS", .{}); diff --git a/src/http.zig b/src/http.zig index 2a6dc1e654c..4ef7e7c6271 100644 --- a/src/http.zig +++ b/src/http.zig @@ -12,6 +12,9 @@ pub var async_http_id_monotonic: std.atomic.Value(u32) = std.atomic.Value(u32).i /// Set once at startup from `--experimental-http2-fetch` (before the HTTP /// thread spawns) and then only read on that thread, so no atomics needed. pub var experimental_http2_client_from_cli: bool = false; +/// Set once at startup from `--experimental-http3-fetch`. Same threading +/// rules as the http2 flag. +pub var experimental_http3_client_from_cli: bool = false; const MAX_REDIRECT_URL_LENGTH = 128 * 1024; @@ -228,6 +231,28 @@ pub fn alpnOffer(client: *const HTTPClient) BoringSSL.SSL.AlpnOffer { return if (client.flags.force_http2) .h2_only else .h1_or_h2; } +/// Whether the experimental Alt-Svc-driven HTTP/3 upgrade is enabled at all +/// (CLI flag or env var). Used on its own to gate `H3.AltSvc.record` — a +/// response that arrived over a request shape h3 can't serve (proxy, sendfile, +/// `force_http1`) still carries an authoritative Alt-Svc for the origin. +pub fn h3AltSvcEnabled() bool { + return experimental_http3_client_from_cli or + bun.feature_flag.BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT.get(); +} + +/// Whether this request shape is eligible to *use* a cached Alt-Svc h3 +/// alternative (HTTPS, no proxy/unix-socket, no sendfile, not pinned to a +/// specific protocol). When true, `start_()` consults `H3.AltSvc.lookup` +/// before opening TCP. +pub fn canTryH3AltSvc(client: *const HTTPClient) bool { + if (client.flags.force_http1 or client.flags.force_http2) return false; + if (client.http_proxy != null) return false; + if (client.flags.is_preconnect_only) return false; + if (client.unix_socket_path.length() > 0) return false; + if (client.state.original_request_body == .sendfile) return false; + return h3AltSvcEnabled(); +} + pub fn firstCall( client: *HTTPClient, comptime is_ssl: bool, @@ -1180,6 +1205,28 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void { } } + if (comptime is_ssl) { + // Opportunistic Alt-Svc upgrade: a previous response from this origin + // advertised `h3`, and the experimental flag is on. Don't touch + // `flags.force_http3` — that's the user's explicit `protocol:"http3"` + // choice and persists across redirects, whereas an Alt-Svc upgrade is + // per-origin and a cross-origin redirect must re-evaluate from h1. + // `doRedirectMultiplexed` resets `flags.protocol`, so the redirected + // request lands back here with `force_http3` still false and consults + // the cache for the new origin. + if (!this.flags.force_http3 and this.canTryH3AltSvc()) { + if (H3.AltSvc.lookup(this.url.hostname, this.url.getPortAuto())) |alt_port| { + if (H3.ClientContext.getOrCreate(http_thread.loop.loop)) |ctx| { + if (!ctx.connect(this, this.url.hostname, alt_port)) { + this.fail(error.ConnectionRefused); + } + return; + } + // engine init failed: fall through to TCP + } + } + } + if (this.flags.force_http3) { if (comptime !is_ssl) { this.fail(error.HTTP3Unsupported); @@ -2803,6 +2850,14 @@ pub fn handleResponseMetadata( hashHeaderConst("Last-Modified") => { pretend_304 = this.flags.force_last_modified and response.status_code > 199 and response.status_code < 300 and this.if_modified_since.len > 0 and strings.eql(this.if_modified_since, header.value); }, + hashHeaderConst("Alt-Svc") => { + // Record regardless of *this* request's shape — a future + // request to the same origin may be h3-eligible even if this + // one was pinned/proxied/sendfile. + if (this.isHTTPS() and this.unix_socket_path.length() == 0 and h3AltSvcEnabled()) { + H3.AltSvc.record(this.url.hostname, this.url.getPortAuto(), header.value); + } + }, else => {}, } diff --git a/src/http/H3Client.zig b/src/http/H3Client.zig index 92847cdec18..7fc8ebf5001 100644 --- a/src/http/H3Client.zig +++ b/src/http/H3Client.zig @@ -6,642 +6,32 @@ //! is the same one H2 uses (`handleResponseMetadata` / `handleResponseBody` / //! `progressUpdateH3`), so redirect, decompression, and FetchTasklet plumbing //! are shared with HTTP/1.1. - -/// One in-flight request. Created when the request is enqueued; the lsquic -/// stream is bound later from `onStreamOpen` (lsquic creates streams -/// asynchronously once MAX_STREAMS credit is available). -pub const Stream = struct { - pub const new = bun.TrivialNew(@This()); - - session: *ClientSession, - client: ?*HTTPClient, - qstream: ?*quic.Stream = null, - - /// Slices into the lsquic-owned hset buffer; valid only for the duration - /// of the `onStreamHeaders` callback that populated it. `cloneMetadata` - /// deep-copies synchronously inside that callback, so nothing reads these - /// after they go stale. - decoded_headers: std.ArrayListUnmanaged(picohttp.Header) = .{}, - body_buffer: std.ArrayListUnmanaged(u8) = .{}, - status_code: u16 = 0, - - pending_body: []const u8 = "", - request_body_done: bool = false, - is_streaming_body: bool = false, - headers_delivered: bool = false, - - pub fn deinit(this: *Stream) void { - this.decoded_headers.deinit(bun.default_allocator); - this.body_buffer.deinit(bun.default_allocator); - _ = live_streams.fetchSub(1, .monotonic); - bun.destroy(this); - } - - pub fn abort(this: *Stream) void { - if (this.qstream) |qs| qs.close(); - } -}; - -/// One QUIC connection. Owns its UDP endpoint via quic.c; the `qsocket` -/// pointer becomes dangling after `onConnClose`, so every accessor checks -/// `closed` first. -pub const ClientSession = struct { - ref_count: RefCount = .init(), - /// Null while DNS is in flight; set once `us_quic_connect_addr` returns. - qsocket: ?*quic.Socket, - hostname: []const u8, - port: u16, - reject_unauthorized: bool, - handshake_done: bool = false, - closed: bool = false, - registry_index: u32 = std.math.maxInt(u32), - - /// Requests waiting for `onStreamOpen` to hand them a stream. Order is - /// FIFO; `lsquic_conn_make_stream` was already called once per entry. - pending: std.ArrayListUnmanaged(*Stream) = .{}, - - pub const new = bun.TrivialNew(@This()); - const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{}); - pub const ref = RefCount.ref; - pub const deref = RefCount.deref; - - pub fn matches(this: *const ClientSession, hostname: []const u8, port: u16, reject_unauthorized: bool) bool { - return !this.closed and this.port == port and - this.reject_unauthorized == reject_unauthorized and - strings.eqlLong(this.hostname, hostname, true); - } - - pub fn hasHeadroom(this: *const ClientSession) bool { - if (this.closed) return false; - const qs = this.qsocket orelse return this.pending.items.len < 64; - // After handshake every pending entry has had make_stream called, so - // lsquic's n_avail_streams already accounts for them — comparing - // against pending.len would double-subtract. Before handshake nothing - // is counted yet, so cap optimistically at the default MAX_STREAMS. - if (!this.handshake_done) return this.pending.items.len < 64; - return qs.streamsAvail() > 0; - } - - /// Queue `client` for a stream on this connection. The lsquic stream is - /// created asynchronously, so the request goes into `pending` until - /// `onStreamOpen` pops it. - pub fn enqueue(this: *ClientSession, client: *HTTPClient) void { - bun.debugAssert(!this.closed); - client.h3 = null; - client.flags.protocol = .http3; - client.allow_retry = false; - - const stream = Stream.new(.{ .session = this, .client = client }); - _ = live_streams.fetchAdd(1, .monotonic); - client.h3 = stream; - bun.handleOom(this.pending.append(bun.default_allocator, stream)); - this.ref(); - - if (this.handshake_done) { - this.qsocket.?.makeStream(); - } - } - - pub fn streamBodyByHttpId(this: *ClientSession, async_http_id: u32, ended: bool) void { - for (this.pending.items) |stream| { - const client = stream.client orelse continue; - if (client.async_http_id != async_http_id) continue; - if (client.state.original_request_body != .stream) return; - client.state.original_request_body.stream.ended = ended; - if (stream.qstream) |qs| drainSendBody(stream, qs); - return; - } - } - - pub fn detach(this: *ClientSession, stream: *Stream) void { - if (stream.client) |cl| cl.h3 = null; - stream.client = null; - if (stream.qstream) |qs| { - qs.ext(Stream).* = null; - // The success path can reach here while the request body is still - // being written (server responded early). FIN would be a - // content-length violation; RESET_STREAM(H3_REQUEST_CANCELLED) - // is the correct "I'm abandoning this send half" so lsquic reaps - // the stream instead of leaking it on the pooled session. - if (!stream.request_body_done) qs.reset(); - } - stream.qstream = null; - if (std.mem.indexOfScalar(*Stream, this.pending.items, stream)) |i| { - _ = this.pending.orderedRemove(i); - } - stream.deinit(); - this.deref(); - } - - fn fail(this: *ClientSession, stream: *Stream, err: anyerror) void { - const client = stream.client; - stream.abort(); - this.detach(stream); - if (client) |cl| cl.failFromH2(err); - } - - /// A stream closed before any response headers arrived. If the request - /// hasn't been retried yet and the body wasn't a JS stream (which may - /// already be consumed), re-enqueue it on a fresh session — this is the - /// standard h2/h3 client behavior for the GOAWAY / stateless-reset / - /// port-reuse race where a pooled session goes stale between the - /// `matches()` check and the first stream open. - fn retryOrFail(this: *ClientSession, stream: *Stream, err: anyerror) void { - const client = stream.client orelse return this.fail(stream, err); - if (client.flags.h3_retried or stream.is_streaming_body) { - return this.fail(stream, err); - } - const ctx = ClientContext.get() orelse return this.fail(stream, err); - client.flags.h3_retried = true; - // The old session is dead from our perspective; make sure connect() - // can't pick it again. - this.closed = true; - const port = this.port; - const host = bun.handleOom(bun.default_allocator.dupe(u8, this.hostname)); - defer bun.default_allocator.free(host); - log("retry {s}:{d} after {s}", .{ host, port, @errorName(err) }); - stream.abort(); - this.detach(stream); - if (!ctx.connect(client, host, port)) { - client.failFromH2(err); - } - } - - pub fn abortByHttpId(this: *ClientSession, async_http_id: u32) bool { - for (this.pending.items) |stream| { - const cl = stream.client orelse continue; - if (cl.async_http_id == async_http_id) { - this.fail(stream, error.Aborted); - return true; - } - } - return false; - } - - fn writeRequest(this: *ClientSession, stream: *Stream, qs: *quic.Stream) !void { - const client = stream.client orelse return error.Aborted; - const request = client.buildRequest(client.state.original_request_body.len()); - if (client.verbose != .none) { - HTTPClient.printRequest(.http3, request, client.url.href, !client.flags.reject_unauthorized, client.state.request_body, client.verbose == .curl); - } - - var sfa = std.heap.stackFallback(2048, bun.default_allocator); - const alloc = sfa.get(); - var headers: std.ArrayListUnmanaged(quic.Header) = .{}; - defer headers.deinit(alloc); - try headers.ensureTotalCapacityPrecise(alloc, request.headers.len + 4); - - // Names not in the QPACK static table get lowercased into one - // pre-sized buffer so the pointers stay stable across the batch. - var name_bytes: usize = 0; - for (request.headers) |h| name_bytes += h.name.len; - const lower = try alloc.alloc(u8, name_bytes); - defer alloc.free(lower); - var lower_len: usize = 0; - - var authority: []const u8 = client.url.host; - headers.items.len = 4; - for (request.headers) |h| { - if (quic.Qpack.classify(h.name)) |class| switch (class) { - .forbidden => {}, - .host => authority = h.value, - .indexed => |i| headers.appendAssumeCapacity(.init(i.name, h.value, i.index)), - } else { - const dst = lower[lower_len..][0..h.name.len]; - _ = strings.copyLowercase(h.name, dst); - lower_len += h.name.len; - headers.appendAssumeCapacity(.init(dst, h.value, null)); - } - } - if (authority.len == 0) authority = this.hostname; - headers.items[0] = .init(":method", request.method, .method_get); - headers.items[1] = .init(":scheme", "https", .scheme_https); - headers.items[2] = .init(":authority", authority, .authority); - headers.items[3] = .init(":path", if (request.path.len > 0) request.path else "/", .path); - - const body = client.state.request_body; - const has_inline_body = client.state.original_request_body == .bytes and body.len > 0; - const is_streaming = client.state.original_request_body == .stream; - - const end_stream = !has_inline_body and !is_streaming; - if (qs.sendHeaders(headers.items, end_stream) != 0) { - return error.HTTP3HeaderEncodingError; - } - - if (has_inline_body) { - stream.pending_body = body; - drainSendBody(stream, qs); - } else if (is_streaming) { - stream.is_streaming_body = true; - drainSendBody(stream, qs); - } else { - stream.request_body_done = true; - } - - client.state.request_stage = if (stream.request_body_done) .done else .body; - client.state.response_stage = .headers; - - // For streaming bodies the JS sink waits for can_stream to start - // pumping; report progress now so it begins. - if (is_streaming) client.progressUpdateH3(); - } - - fn drainSendBody(stream: *Stream, qs: *quic.Stream) void { - if (stream.request_body_done) return; - const client = stream.client orelse return; - - if (stream.is_streaming_body) { - const body = &client.state.original_request_body.stream; - const sb = body.buffer orelse return; - const buffer = sb.acquire(); - const data = buffer.slice(); - var written: usize = 0; - while (written < data.len) { - const w = qs.write(data[written..]); - if (w <= 0) break; - written += @intCast(w); - } - buffer.cursor += written; - const drained = buffer.isEmpty(); - if (drained) buffer.reset(); - if (drained and body.ended) { - stream.request_body_done = true; - qs.shutdown(); - client.state.request_stage = .done; - } else if (!drained) { - qs.wantWrite(true); - } else if (data.len > 0) { - sb.reportDrain(); - } - sb.release(); - if (stream.request_body_done) body.detach(); - return; - } - - while (stream.pending_body.len > 0) { - const w = qs.write(stream.pending_body); - if (w <= 0) break; - stream.pending_body = stream.pending_body[@intCast(w)..]; - } - if (stream.pending_body.len == 0) { - stream.request_body_done = true; - qs.shutdown(); - client.state.request_stage = .done; - } else { - qs.wantWrite(true); - } - } - - fn applyHeaders(_: *ClientSession, stream: *Stream, client: *HTTPClient) !HeaderResult { - var response = picohttp.Response{ - .minor_version = 0, - .status_code = stream.status_code, - .status = "", - .headers = .{ .list = stream.decoded_headers.items }, - .bytes_read = 0, - }; - client.state.pending_response = response; - const should_continue = try client.handleResponseMetadata(&response); - client.state.pending_response = response; - client.state.transfer_encoding = .identity; - if (client.state.response_stage == .body_chunk) client.state.response_stage = .body; - client.state.flags.allow_keepalive = true; - return if (should_continue == .finished) .finished else .has_body; - } - - /// Runs from inside lsquic's process_conns via on_stream_{headers,data,close}. - /// `done` = the lsquic stream is gone; deliver whatever is buffered then - /// detach. Mirrors H2Client.deliverStream so the HTTPClient state machine - /// sees the same call sequence regardless of transport. - fn deliver(this: *ClientSession, stream: *Stream, done: bool) void { - const client = stream.client orelse { - if (done) this.detach(stream); - return; - }; - - if (client.signals.get(.aborted)) { - return this.fail(stream, error.Aborted); - } - - if (stream.status_code != 0 and !stream.headers_delivered) { - stream.headers_delivered = true; - const result = this.applyHeaders(stream, client) catch |err| { - return this.fail(stream, err); - }; - if (result == .finished or (done and stream.body_buffer.items.len == 0)) { - if (client.state.flags.is_redirect_pending) { - this.detach(stream); - return client.doRedirectH3(); - } - client.cloneMetadata(); - client.state.flags.received_last_chunk = true; - if (result == .finished) client.state.content_length = 0; - this.detach(stream); - return finish(client); - } - client.cloneMetadata(); - if (client.signals.get(.header_progress)) client.progressUpdateH3(); - } - - if (client.state.response_stage != .body) { - if (done) { - // Stream closed before headers — handshake/reset failure. - return this.retryOrFail(stream, if (stream.status_code == 0) - error.HTTP3StreamReset - else - error.ConnectionClosed); - } - return; - } - - if (stream.body_buffer.items.len > 0) { - if (done) { - client.state.flags.received_last_chunk = true; - } - const report = client.handleResponseBody(stream.body_buffer.items, false) catch |err| { - stream.body_buffer.clearRetainingCapacity(); - return this.fail(stream, err); - }; - stream.body_buffer.clearRetainingCapacity(); - if (done) { - this.detach(stream); - return finish(client); - } - if (report) { - if (client.state.isDone()) { - this.detach(stream); - return client.progressUpdateH3(); - } - client.progressUpdateH3(); - } - return; - } - - if (done) { - this.detach(stream); - client.state.flags.received_last_chunk = true; - return finish(client); - } - } - - fn finish(client: *HTTPClient) void { - if (client.state.content_length) |cl| { - if (client.state.total_body_received != cl) { - return client.failFromH2(error.HTTP3ContentLengthMismatch); - } - } - client.progressUpdateH3(); - } - - fn deinit(this: *ClientSession) void { - bun.debugAssert(this.pending.items.len == 0); - this.pending.deinit(bun.default_allocator); - bun.default_allocator.free(this.hostname); - bun.destroy(this); - } -}; - -const HeaderResult = enum { has_body, finished }; - -/// Process-global lazily-initialised on the HTTP thread. Owns the lsquic -/// client engine and the live-session registry. Never freed — the engine -/// lives for the process, same as the HTTP thread itself. -pub const ClientContext = struct { - qctx: *quic.Context, - sessions: std.ArrayListUnmanaged(*ClientSession) = .{}, - - /// One instance per HTTP-thread loop. Stored as a process global only - /// because `bun.http.http_thread` is itself a process singleton — the - /// underlying lsquic engine is bound to the `loop` passed to - /// `quic.Context.createClient` (it lives on `loop->data.quic_head` and is - /// driven by that loop's pre/post hooks), so a second loop would get its - /// own engine; this var would just need to become per-loop storage. - var instance: ?*ClientContext = null; - var lsquic_init_once = bun.once(quic.globalInit); - - pub fn get() ?*ClientContext { - return instance; - } - - pub fn getOrCreate(loop: *uws.Loop) ?*ClientContext { - if (instance) |i| return i; - lsquic_init_once.call(.{}); - const qctx = quic.Context.createClient( - loop, - 0, - @sizeOf(*ClientSession), - @sizeOf(*Stream), - ) orelse return null; - qctx.onHskDone(onHskDone); - qctx.onGoaway(onGoaway); - qctx.onClose(onConnClose); - qctx.onStreamOpen(onStreamOpen); - qctx.onStreamHeaders(onStreamHeaders); - qctx.onStreamData(onStreamData); - qctx.onStreamWritable(onStreamWritable); - qctx.onStreamClose(onStreamClose); - - const self = bun.handleOom(bun.default_allocator.create(ClientContext)); - self.* = .{ .qctx = qctx }; - instance = self; - return self; - } - - /// Find or open a connection to `hostname:port` and queue `client` on it. - pub fn connect(this: *ClientContext, client: *HTTPClient, hostname: []const u8, port: u16) bool { - const reject = client.flags.reject_unauthorized; - for (this.sessions.items) |s| { - if (s.matches(hostname, port, reject) and s.hasHeadroom()) { - log("reuse session {s}:{d}", .{ hostname, port }); - s.enqueue(client); - return true; - } - } - - const host_z = bun.handleOom(bun.default_allocator.dupeZ(u8, hostname)); - const session = ClientSession.new(.{ - .qsocket = null, - .hostname = host_z, - .port = port, - .reject_unauthorized = reject, - }); - _ = live_sessions.fetchAdd(1, .monotonic); - session.registry_index = @intCast(this.sessions.items.len); - bun.handleOom(this.sessions.append(bun.default_allocator, session)); - session.enqueue(client); - - switch (this.qctx.connect(host_z.ptr, port, host_z.ptr, reject, session)) { - .socket => |qs| { - session.qsocket = qs; - qs.ext(ClientSession).* = session; - log("connect {s}:{d} (sync)", .{ hostname, port }); - }, - .pending => |pending| { - log("connect {s}:{d} (dns pending)", .{ hostname, port }); - PendingConnect.register(session, pending, this.qctx.loop()); - }, - .err => { - log("connect {s}:{d} failed", .{ hostname, port }); - this.unregister(session); - PendingConnect.failSession(session, error.ConnectionRefused); - return false; - }, - } - return true; - } - - pub fn unregister(this: *ClientContext, session: *ClientSession) void { - const i = session.registry_index; - if (i >= this.sessions.items.len or this.sessions.items[i] != session) return; - _ = this.sessions.swapRemove(i); - if (i < this.sessions.items.len) this.sessions.items[i].registry_index = i; - session.registry_index = std.math.maxInt(u32); - } - - pub fn abortByHttpId(async_http_id: u32) bool { - const this = instance orelse return false; - for (this.sessions.items) |s| { - if (s.abortByHttpId(async_http_id)) return true; - } - return false; - } - - pub fn streamBodyByHttpId(async_http_id: u32, ended: bool) void { - const this = instance orelse return; - for (this.sessions.items) |s| s.streamBodyByHttpId(async_http_id, ended); - } -}; - -pub const PendingConnect = @import("./h3/PendingConnect.zig"); - -// ───── lsquic → Zig callbacks ───── - -fn onHskDone(qs: *quic.Socket, ok: c_int) callconv(.c) void { - const session = qs.ext(ClientSession).* orelse return; - log("hsk_done ok={d} pending={d}", .{ ok, session.pending.items.len }); - if (ok == 0) { - session.closed = true; - return; - } - session.handshake_done = true; - for (session.pending.items) |_| qs.makeStream(); -} - -/// Peer sent GOAWAY: this connection won't accept new streams (RFC 9114 -/// §5.2). Mark the session unusable now so the next `connect()` opens a fresh -/// one instead of waiting for `onConnClose`, which only fires after lsquic's -/// draining period. Stay in the registry so abort/body-chunk lookups still -/// reach in-flight streams; `onConnClose` does the actual unregister/deref. -fn onGoaway(qs: *quic.Socket) callconv(.c) void { - const session = qs.ext(ClientSession).* orelse return; - log("goaway {s}:{d}", .{ session.hostname, session.port }); - session.closed = true; -} - -fn onConnClose(qs: *quic.Socket) callconv(.c) void { - const session = qs.ext(ClientSession).* orelse return; - session.closed = true; - session.qsocket = null; - var buf: [256]u8 = [_]u8{0} ** 256; - const st = qs.status(&buf); - log("conn_close status={d} '{s}'", .{ st, std.mem.sliceTo(&buf, 0) }); - if (ClientContext.instance) |ctx| ctx.unregister(session); - while (session.pending.items.len > 0) { - // lsquic fires on_stream_close for every bound stream before - // on_conn_closed, so anything still here never got a qstream. - const stream = session.pending.items[0]; - bun.debugAssert(stream.qstream == null); - session.retryOrFail(stream, if (session.handshake_done) - error.ConnectionClosed - else - error.HTTP3HandshakeFailed); - } - _ = live_sessions.fetchSub(1, .monotonic); - session.deref(); -} - -fn onStreamOpen(s: *quic.Stream, is_client: c_int) callconv(.c) void { - s.ext(Stream).* = null; - if (is_client == 0) return; - const qs = s.socket() orelse return; - const session = qs.ext(ClientSession).* orelse { - s.close(); - return; - }; - // Bind the next pending request to this stream. - const stream: *Stream = for (session.pending.items) |st| { - if (st.qstream == null) break st; - } else { - s.close(); - return; - }; - stream.qstream = s; - s.ext(Stream).* = stream; - log("stream_open", .{}); - session.writeRequest(stream, s) catch |err| { - session.fail(stream, err); - }; -} - -fn onStreamHeaders(s: *quic.Stream) callconv(.c) void { - const stream = s.ext(Stream).* orelse return; - const n = s.headerCount(); - - stream.decoded_headers.clearRetainingCapacity(); - bun.handleOom(stream.decoded_headers.ensureTotalCapacity(bun.default_allocator, n)); - var status: u16 = 0; - var i: c_uint = 0; - while (i < n) : (i += 1) { - const h = s.header(i) orelse continue; - const name = h.name[0..h.name_len]; - const value = h.value[0..h.value_len]; - if (strings.hasPrefixComptime(name, ":")) { - if (strings.eqlComptime(name, ":status")) { - status = std.fmt.parseInt(u16, value, 10) catch 0; - } - continue; - } - stream.decoded_headers.appendAssumeCapacity(.{ .name = name, .value = value }); - } - if (status == 0) { - // A second HEADERS block after the final response is trailers - // (RFC 9114 §4.1) and carries no :status; ignore it rather than - // treating the stream as malformed. - if (stream.status_code != 0) return; - stream.session.fail(stream, error.HTTP3ProtocolError); - return; - } - if (status >= 100 and status < 200) return; - stream.status_code = status; - stream.session.deliver(stream, false); -} - -fn onStreamData(s: *quic.Stream, data: [*]const u8, len: c_uint, fin: c_int) callconv(.c) void { - const stream = s.ext(Stream).* orelse return; - if (len > 0) { - bun.handleOom(stream.body_buffer.appendSlice(bun.default_allocator, data[0..len])); - } - stream.session.deliver(stream, fin != 0); -} - -fn onStreamWritable(s: *quic.Stream) callconv(.c) void { - const stream = s.ext(Stream).* orelse return; - ClientSession.drainSendBody(stream, s); -} - -fn onStreamClose(s: *quic.Stream) callconv(.c) void { - const stream = s.ext(Stream).* orelse return; - s.ext(Stream).* = null; - stream.qstream = null; - log("stream_close status={d} delivered={}", .{ stream.status_code, stream.headers_delivered }); - stream.session.deliver(stream, true); -} - +//! +//! Layout mirrors `h2_client/`: +//! - `Stream` — one in-flight request +//! - `ClientSession` — one QUIC connection (pooled per origin) +//! - `ClientContext` — process-global lsquic engine + session registry +//! - `encode` — request header/body framing onto a quic.Stream +//! - `callbacks` — lsquic → Zig glue (on_hsk_done / on_stream_* / …) +//! - `PendingConnect` — DNS-pending connect resolution + +pub const Stream = @import("./h3_client/Stream.zig"); +pub const ClientSession = @import("./h3_client/ClientSession.zig"); +pub const ClientContext = @import("./h3_client/ClientContext.zig"); +pub const PendingConnect = @import("./h3_client/PendingConnect.zig"); +pub const AltSvc = @import("./h3_client/AltSvc.zig"); + +/// Live-object counters for the leak test in fetch-http3-client.test.ts. +/// Incremented at allocation, decremented in deinit. Read from the JS thread +/// via TestingAPIs.quicLiveCounts so they must be atomic. pub var live_sessions = std.atomic.Value(u32).init(0); pub var live_streams = std.atomic.Value(u32).init(0); pub const TestingAPIs = struct { - pub fn liveCounts(globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + /// Named distinctly from H2's `liveCounts` because generate-js2native.ts + /// mangles `[^A-Za-z]` to `_`, so `H2Client.zig` and `H3Client.zig` produce + /// the same path prefix and the function name has to differ. + pub fn quicLiveCounts(globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { const obj = jsc.JSValue.createEmptyObject(globalThis, 2); obj.put(globalThis, jsc.ZigString.static("sessions"), .jsNumber(live_sessions.load(.monotonic))); obj.put(globalThis, jsc.ZigString.static("streams"), .jsNumber(live_streams.load(.monotonic))); @@ -649,15 +39,7 @@ pub const TestingAPIs = struct { } }; -const log = bun.Output.scoped(.h3_client, .hidden); - const std = @import("std"); const bun = @import("bun"); -const HTTPClient = bun.http; const jsc = bun.jsc; -const picohttp = bun.picohttp; -const strings = bun.strings; - -const uws = bun.uws; -const quic = uws.quic; diff --git a/src/http/h3_client/AltSvc.zig b/src/http/h3_client/AltSvc.zig new file mode 100644 index 00000000000..c1bdf0a507c --- /dev/null +++ b/src/http/h3_client/AltSvc.zig @@ -0,0 +1,160 @@ +//! Alt-Svc (RFC 7838) header handling for the HTTP/3 client. +//! +//! When `--experimental-http3-fetch` / `BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT` +//! is on, `handleResponseMetadata` calls `record()` for every `Alt-Svc` header +//! and `start_()` calls `lookup()` before opening a TCP socket: if the origin +//! previously advertised `h3`, the request is routed onto the QUIC engine +//! instead. The cache is keyed on the *origin* authority (the host:port the +//! request was sent to) and lives only on the HTTP thread, so it needs no +//! locking. +//! +//! Only same-host alternatives (`h3=":port"` with an empty uri-host) are +//! honored; cross-host alternatives need extra certificate-authority checks +//! (RFC 7838 §2.1) that are out of scope here. + +/// One advertised `h3` alternative from an `Alt-Svc` field-value. `port` is +/// the alt-authority port (where QUIC should connect); `ma` is the freshness +/// lifetime in seconds (default 24 h per §3.1). +pub const Entry = struct { + port: u16, + ma: u32 = 86400, +}; + +/// Parse the first usable `h3` alternative out of an `Alt-Svc` field-value, or +/// `null` if none / `clear`. Tolerant of extra whitespace and unknown params. +/// +/// Alt-Svc = clear / 1#alt-value +/// alt-value = protocol-id "=" alt-authority *( OWS ";" OWS parameter ) +/// alt-authority = quoted-string containing [uri-host] ":" port +/// +/// Returns `error.Clear` for the literal `clear` so the caller can drop the +/// cache entry. +pub fn parse(field_value: []const u8) error{Clear}!?Entry { + const value = strings.trim(field_value, " \t"); + if (value.len == 0) return null; + if (std.ascii.eqlIgnoreCase(value, "clear")) return error.Clear; + + var entries = std.mem.splitScalar(u8, value, ','); + while (entries.next()) |raw_entry| { + const entry = strings.trim(raw_entry, " \t"); + if (entry.len == 0) continue; + + var params = std.mem.splitScalar(u8, entry, ';'); + const alternative = strings.trim(params.first(), " \t"); + + const eq = strings.indexOfChar(alternative, '=') orelse continue; + const proto = alternative[0..eq]; + // Only the final IETF "h3" ALPN token; draft `h3-NN` versions are + // ignored since lsquic is built for the final spec. + if (!std.ascii.eqlIgnoreCase(proto, "h3")) continue; + + // alt-authority is a quoted-string: `":443"` or `"host:443"`. + var auth = strings.trim(alternative[eq + 1 ..], " \t"); + if (auth.len >= 2 and auth[0] == '"' and auth[auth.len - 1] == '"') { + auth = auth[1 .. auth.len - 1]; + } + const colon = std.mem.lastIndexOfScalar(u8, auth, ':') orelse continue; + // Same-host alternatives only (empty uri-host). + if (colon != 0) continue; + const port = std.fmt.parseInt(u16, auth[colon + 1 ..], 10) catch continue; + if (port == 0) continue; + + var result: Entry = .{ .port = port }; + while (params.next()) |raw_param| { + const param = strings.trim(raw_param, " \t"); + const peq = strings.indexOfChar(param, '=') orelse continue; + if (std.ascii.eqlIgnoreCase(param[0..peq], "ma")) { + result.ma = std.fmt.parseInt(u32, param[peq + 1 ..], 10) catch result.ma; + } + // `persist` and unknown parameters are ignored (§3.1). + } + return result; + } + return null; +} + +/// HTTP-thread-only Alt-Svc cache. Key is `"hostname:port"` of the origin the +/// header was received from; value is the advertised h3 port + expiry. +const Record = struct { + h3_port: u16, + expires_at: i64, +}; + +var cache: bun.StringHashMapUnmanaged(Record) = .{}; + +/// Hard cap on cached origins. When reached, `record()` first sweeps expired +/// entries and then refuses the new insert if still full — bounded memory for +/// long-lived processes that hit many distinct origins. +const max_entries = 256; + +fn key(buf: []u8, hostname: []const u8, port: u16) []const u8 { + // Callers guard `hostname.len > 256` against a `256+8` buffer, and a u16 + // port is at most 5 digits + ':' — bufPrint cannot overflow. + return std.fmt.bufPrint(buf, "{s}:{d}", .{ hostname, port }) catch unreachable; +} + +fn sweepExpired(now: i64) void { + var it = cache.iterator(); + while (it.next()) |kv| { + if (now >= kv.value_ptr.expires_at) { + const owned = kv.key_ptr.*; + cache.removeByPtr(kv.key_ptr); + bun.default_allocator.free(owned); + // Unmanaged hash-map iteration is not removal-safe; restart. + it = cache.iterator(); + } + } +} + +/// Remember (or refresh / clear) the h3 alternative for `origin_host:origin_port` +/// from a received `Alt-Svc` field-value. Runs on the HTTP thread inside +/// `handleResponseMetadata`. +pub fn record(origin_host: []const u8, origin_port: u16, field_value: []const u8) void { + var buf: [256 + 8]u8 = undefined; + if (origin_host.len > 256) return; + const k = key(&buf, origin_host, origin_port); + + const entry = parse(field_value) catch { + // `clear` + if (cache.fetchRemove(k)) |kv| bun.default_allocator.free(kv.key); + log("alt-svc clear {s}", .{k}); + return; + } orelse return; + + const now = std.time.timestamp(); + if (cache.count() >= max_entries and !cache.contains(k)) { + sweepExpired(now); + if (cache.count() >= max_entries) return; + } + const gop = bun.handleOom(cache.getOrPut(bun.default_allocator, k)); + if (!gop.found_existing) { + gop.key_ptr.* = bun.handleOom(bun.default_allocator.dupe(u8, k)); + } + gop.value_ptr.* = .{ + .h3_port = entry.port, + .expires_at = now + @as(i64, entry.ma), + }; + log("alt-svc h3 {s} -> :{d} ma={d}", .{ k, entry.port, entry.ma }); +} + +/// Look up a previously-advertised h3 alternative for `origin_host:origin_port`. +/// Expired entries are dropped on access. Runs on the HTTP thread inside +/// `start_()`. +pub fn lookup(origin_host: []const u8, origin_port: u16) ?u16 { + var buf: [256 + 8]u8 = undefined; + if (origin_host.len > 256) return null; + const k = key(&buf, origin_host, origin_port); + const rec = cache.get(k) orelse return null; + if (std.time.timestamp() >= rec.expires_at) { + if (cache.fetchRemove(k)) |kv| bun.default_allocator.free(kv.key); + return null; + } + return rec.h3_port; +} + +const log = bun.Output.scoped(.h3_client, .hidden); + +const std = @import("std"); + +const bun = @import("bun"); +const strings = bun.strings; diff --git a/src/http/h3_client/ClientContext.zig b/src/http/h3_client/ClientContext.zig new file mode 100644 index 00000000000..9de27cb3977 --- /dev/null +++ b/src/http/h3_client/ClientContext.zig @@ -0,0 +1,117 @@ +//! Process-global lazily-initialised on the HTTP thread. Owns the lsquic +//! client engine and the live-session registry. Never freed — the engine +//! lives for the process, same as the HTTP thread itself. + +const ClientContext = @This(); + +qctx: *quic.Context, +sessions: std.ArrayListUnmanaged(*ClientSession) = .{}, + +/// One instance per HTTP-thread loop. Stored as a process global only +/// because `bun.http.http_thread` is itself a process singleton — the +/// underlying lsquic engine is bound to the `loop` passed to +/// `quic.Context.createClient` (it lives on `loop->data.quic_head` and is +/// driven by that loop's pre/post hooks), so a second loop would get its +/// own engine; this var would just need to become per-loop storage. +var instance: ?*ClientContext = null; +var lsquic_init_once = bun.once(quic.globalInit); + +pub fn get() ?*ClientContext { + return instance; +} + +pub fn getOrCreate(loop: *uws.Loop) ?*ClientContext { + if (instance) |i| return i; + lsquic_init_once.call(.{}); + const qctx = quic.Context.createClient( + loop, + 0, + @sizeOf(*ClientSession), + @sizeOf(*Stream), + ) orelse return null; + callbacks.register(qctx); + + const self = bun.handleOom(bun.default_allocator.create(ClientContext)); + self.* = .{ .qctx = qctx }; + instance = self; + return self; +} + +/// Find or open a connection to `hostname:port` and queue `client` on it. +pub fn connect(this: *ClientContext, client: *HTTPClient, hostname: []const u8, port: u16) bool { + const reject = client.flags.reject_unauthorized; + for (this.sessions.items) |s| { + if (s.matches(hostname, port, reject) and s.hasHeadroom()) { + log("reuse session {s}:{d}", .{ hostname, port }); + s.enqueue(client); + return true; + } + } + + const host_z = bun.handleOom(bun.default_allocator.dupeZ(u8, hostname)); + const session = ClientSession.new(.{ + .qsocket = null, + .hostname = host_z, + .port = port, + .reject_unauthorized = reject, + }); + _ = H3.live_sessions.fetchAdd(1, .monotonic); + session.registry_index = @intCast(this.sessions.items.len); + bun.handleOom(this.sessions.append(bun.default_allocator, session)); + session.enqueue(client); + + switch (this.qctx.connect(host_z.ptr, port, host_z.ptr, reject, session)) { + .socket => |qs| { + session.qsocket = qs; + qs.ext(ClientSession).* = session; + log("connect {s}:{d} (sync)", .{ hostname, port }); + }, + .pending => |pending| { + log("connect {s}:{d} (dns pending)", .{ hostname, port }); + PendingConnect.register(session, pending, this.qctx.loop()); + }, + .err => { + log("connect {s}:{d} failed", .{ hostname, port }); + this.unregister(session); + PendingConnect.failSession(session, error.ConnectionRefused); + return false; + }, + } + return true; +} + +pub fn unregister(this: *ClientContext, session: *ClientSession) void { + const i = session.registry_index; + if (i >= this.sessions.items.len or this.sessions.items[i] != session) return; + _ = this.sessions.swapRemove(i); + if (i < this.sessions.items.len) this.sessions.items[i].registry_index = i; + session.registry_index = std.math.maxInt(u32); +} + +pub fn abortByHttpId(async_http_id: u32) bool { + const this = instance orelse return false; + for (this.sessions.items) |s| { + if (s.abortByHttpId(async_http_id)) return true; + } + return false; +} + +pub fn streamBodyByHttpId(async_http_id: u32, ended: bool) void { + const this = instance orelse return; + for (this.sessions.items) |s| s.streamBodyByHttpId(async_http_id, ended); +} + +const log = bun.Output.scoped(.h3_client, .hidden); + +const ClientSession = @import("./ClientSession.zig"); +const H3 = @import("../H3Client.zig"); +const PendingConnect = @import("./PendingConnect.zig"); +const Stream = @import("./Stream.zig"); +const callbacks = @import("./callbacks.zig"); +const std = @import("std"); + +const bun = @import("bun"); +const HTTPClient = bun.http; + +const uws = bun.uws; +const quic = uws.quic; diff --git a/src/http/h3_client/ClientSession.zig b/src/http/h3_client/ClientSession.zig new file mode 100644 index 00000000000..2dd79a8c404 --- /dev/null +++ b/src/http/h3_client/ClientSession.zig @@ -0,0 +1,268 @@ +//! One QUIC connection to an origin. Owns its UDP endpoint via quic.c and +//! multiplexes `Stream`s, each bound 1:1 to an `HTTPClient`. The `qsocket` +//! pointer becomes dangling after `callbacks.onConnClose`, so every accessor +//! checks `closed` first. See `src/http/H3Client.zig` for the module-level +//! overview. + +const ClientSession = @This(); + +pub const new = bun.TrivialNew(@This()); + +/// Ref holders: the `ClientContext.sessions` registry while listed (1), the +/// `quic.Socket` ext slot while connected (1, transferred from the registry +/// add via `connect`), and one per entry in `pending`. `PendingConnect` holds +/// an extra ref while DNS is in flight. +const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{}); +pub const ref = RefCount.ref; +pub const deref = RefCount.deref; + +ref_count: RefCount = .init(), +/// Null while DNS is in flight; set once `us_quic_connect_addr` returns. +qsocket: ?*quic.Socket, +hostname: []const u8, +port: u16, +reject_unauthorized: bool, +handshake_done: bool = false, +closed: bool = false, +registry_index: u32 = std.math.maxInt(u32), + +/// Requests waiting for `onStreamOpen` to hand them a stream. Order is +/// FIFO; `lsquic_conn_make_stream` was already called once per entry. +pending: std.ArrayListUnmanaged(*Stream) = .{}, + +pub fn matches(this: *const ClientSession, hostname: []const u8, port: u16, reject_unauthorized: bool) bool { + return !this.closed and this.port == port and + this.reject_unauthorized == reject_unauthorized and + strings.eqlLong(this.hostname, hostname, true); +} + +pub fn hasHeadroom(this: *const ClientSession) bool { + if (this.closed) return false; + const qs = this.qsocket orelse return this.pending.items.len < 64; + // After handshake every pending entry has had make_stream called, so + // lsquic's n_avail_streams already accounts for them — comparing + // against pending.len would double-subtract. Before handshake nothing + // is counted yet, so cap optimistically at the default MAX_STREAMS. + if (!this.handshake_done) return this.pending.items.len < 64; + return qs.streamsAvail() > 0; +} + +/// Queue `client` for a stream on this connection. The lsquic stream is +/// created asynchronously, so the request goes into `pending` until +/// `onStreamOpen` pops it. +pub fn enqueue(this: *ClientSession, client: *HTTPClient) void { + bun.debugAssert(!this.closed); + client.h3 = null; + client.flags.protocol = .http3; + client.allow_retry = false; + + const stream = Stream.new(.{ .session = this, .client = client }); + _ = H3.live_streams.fetchAdd(1, .monotonic); + client.h3 = stream; + bun.handleOom(this.pending.append(bun.default_allocator, stream)); + this.ref(); + + if (this.handshake_done) { + this.qsocket.?.makeStream(); + } +} + +pub fn streamBodyByHttpId(this: *ClientSession, async_http_id: u32, ended: bool) void { + for (this.pending.items) |stream| { + const client = stream.client orelse continue; + if (client.async_http_id != async_http_id) continue; + if (client.state.original_request_body != .stream) return; + client.state.original_request_body.stream.ended = ended; + if (stream.qstream) |qs| encode.drainSendBody(stream, qs); + return; + } +} + +pub fn detach(this: *ClientSession, stream: *Stream) void { + if (stream.client) |cl| cl.h3 = null; + stream.client = null; + if (stream.qstream) |qs| { + qs.ext(Stream).* = null; + // The success path can reach here while the request body is still + // being written (server responded early). FIN would be a + // content-length violation; RESET_STREAM(H3_REQUEST_CANCELLED) + // is the correct "I'm abandoning this send half" so lsquic reaps + // the stream instead of leaking it on the pooled session. + if (!stream.request_body_done) qs.reset(); + } + stream.qstream = null; + if (std.mem.indexOfScalar(*Stream, this.pending.items, stream)) |i| { + _ = this.pending.orderedRemove(i); + } + stream.deinit(); + this.deref(); +} + +pub fn fail(this: *ClientSession, stream: *Stream, err: anyerror) void { + const client = stream.client; + stream.abort(); + this.detach(stream); + if (client) |cl| cl.failFromH2(err); +} + +/// A stream closed before any response headers arrived. If the request +/// hasn't been retried yet and the body wasn't a JS stream (which may +/// already be consumed), re-enqueue it on a fresh session — this is the +/// standard h2/h3 client behavior for the GOAWAY / stateless-reset / +/// port-reuse race where a pooled session goes stale between the +/// `matches()` check and the first stream open. +pub fn retryOrFail(this: *ClientSession, stream: *Stream, err: anyerror) void { + const client = stream.client orelse return this.fail(stream, err); + if (client.flags.h3_retried or stream.is_streaming_body) { + return this.fail(stream, err); + } + const ctx = ClientContext.get() orelse return this.fail(stream, err); + client.flags.h3_retried = true; + // The old session is dead from our perspective; make sure connect() + // can't pick it again. + this.closed = true; + const port = this.port; + const host = bun.handleOom(bun.default_allocator.dupe(u8, this.hostname)); + defer bun.default_allocator.free(host); + log("retry {s}:{d} after {s}", .{ host, port, @errorName(err) }); + stream.abort(); + this.detach(stream); + if (!ctx.connect(client, host, port)) { + client.failFromH2(err); + } +} + +pub fn abortByHttpId(this: *ClientSession, async_http_id: u32) bool { + for (this.pending.items) |stream| { + const cl = stream.client orelse continue; + if (cl.async_http_id == async_http_id) { + this.fail(stream, error.Aborted); + return true; + } + } + return false; +} + +/// Runs from inside lsquic's process_conns via on_stream_{headers,data,close}. +/// `done` = the lsquic stream is gone; deliver whatever is buffered then +/// detach. Mirrors H2's `ClientSession.deliverStream` so the HTTPClient state +/// machine sees the same call sequence regardless of transport. +pub fn deliver(this: *ClientSession, stream: *Stream, done: bool) void { + const client = stream.client orelse { + if (done) this.detach(stream); + return; + }; + + if (client.signals.get(.aborted)) { + return this.fail(stream, error.Aborted); + } + + if (stream.status_code != 0 and !stream.headers_delivered) { + stream.headers_delivered = true; + const result = this.applyHeaders(stream, client) catch |err| { + return this.fail(stream, err); + }; + if (result == .finished or (done and stream.body_buffer.items.len == 0)) { + if (client.state.flags.is_redirect_pending) { + this.detach(stream); + return client.doRedirectH3(); + } + client.cloneMetadata(); + client.state.flags.received_last_chunk = true; + if (result == .finished) client.state.content_length = 0; + this.detach(stream); + return finish(client); + } + client.cloneMetadata(); + if (client.signals.get(.header_progress)) client.progressUpdateH3(); + } + + if (client.state.response_stage != .body) { + if (done) { + // Stream closed before headers — handshake/reset failure. + return this.retryOrFail(stream, if (stream.status_code == 0) + error.HTTP3StreamReset + else + error.ConnectionClosed); + } + return; + } + + if (stream.body_buffer.items.len > 0) { + if (done) { + client.state.flags.received_last_chunk = true; + } + const report = client.handleResponseBody(stream.body_buffer.items, false) catch |err| { + stream.body_buffer.clearRetainingCapacity(); + return this.fail(stream, err); + }; + stream.body_buffer.clearRetainingCapacity(); + if (done) { + this.detach(stream); + return finish(client); + } + if (report) { + if (client.state.isDone()) { + this.detach(stream); + return client.progressUpdateH3(); + } + client.progressUpdateH3(); + } + return; + } + + if (done) { + this.detach(stream); + client.state.flags.received_last_chunk = true; + return finish(client); + } +} + +fn applyHeaders(_: *ClientSession, stream: *Stream, client: *HTTPClient) !HeaderResult { + var response = picohttp.Response{ + .minor_version = 0, + .status_code = stream.status_code, + .status = "", + .headers = .{ .list = stream.decoded_headers.items }, + .bytes_read = 0, + }; + client.state.pending_response = response; + const should_continue = try client.handleResponseMetadata(&response); + client.state.pending_response = response; + client.state.transfer_encoding = .identity; + if (client.state.response_stage == .body_chunk) client.state.response_stage = .body; + client.state.flags.allow_keepalive = true; + return if (should_continue == .finished) .finished else .has_body; +} + +fn finish(client: *HTTPClient) void { + if (client.state.content_length) |cl| { + if (client.state.total_body_received != cl) { + return client.failFromH2(error.HTTP3ContentLengthMismatch); + } + } + client.progressUpdateH3(); +} + +fn deinit(this: *ClientSession) void { + bun.debugAssert(this.pending.items.len == 0); + this.pending.deinit(bun.default_allocator); + bun.default_allocator.free(this.hostname); + bun.destroy(this); +} + +const HeaderResult = enum { has_body, finished }; + +const log = bun.Output.scoped(.h3_client, .hidden); + +const ClientContext = @import("./ClientContext.zig"); +const H3 = @import("../H3Client.zig"); +const Stream = @import("./Stream.zig"); +const encode = @import("./encode.zig"); +const std = @import("std"); + +const bun = @import("bun"); +const HTTPClient = bun.http; +const picohttp = bun.picohttp; +const strings = bun.strings; +const quic = bun.uws.quic; diff --git a/src/http/h3/PendingConnect.zig b/src/http/h3_client/PendingConnect.zig similarity index 100% rename from src/http/h3/PendingConnect.zig rename to src/http/h3_client/PendingConnect.zig diff --git a/src/http/h3_client/Stream.zig b/src/http/h3_client/Stream.zig new file mode 100644 index 00000000000..7179f551af4 --- /dev/null +++ b/src/http/h3_client/Stream.zig @@ -0,0 +1,46 @@ +//! One in-flight HTTP/3 request. Created when the request is enqueued on a +//! `ClientSession`; the lsquic stream is bound later from +//! `callbacks.onStreamOpen` (lsquic creates streams asynchronously once +//! MAX_STREAMS credit is available). Owned by the session's `pending` list +//! until `ClientSession.detach`. + +const Stream = @This(); + +pub const new = bun.TrivialNew(@This()); + +session: *ClientSession, +client: ?*HTTPClient, +qstream: ?*quic.Stream = null, + +/// Slices into the lsquic-owned hset buffer; valid only for the duration +/// of the `onStreamHeaders` callback that populated it. `cloneMetadata` +/// deep-copies synchronously inside that callback, so nothing reads these +/// after they go stale. +decoded_headers: std.ArrayListUnmanaged(picohttp.Header) = .{}, +body_buffer: std.ArrayListUnmanaged(u8) = .{}, +status_code: u16 = 0, + +pending_body: []const u8 = "", +request_body_done: bool = false, +is_streaming_body: bool = false, +headers_delivered: bool = false, + +pub fn deinit(this: *Stream) void { + this.decoded_headers.deinit(bun.default_allocator); + this.body_buffer.deinit(bun.default_allocator); + _ = H3.live_streams.fetchSub(1, .monotonic); + bun.destroy(this); +} + +pub fn abort(this: *Stream) void { + if (this.qstream) |qs| qs.close(); +} + +const ClientSession = @import("./ClientSession.zig"); +const H3 = @import("../H3Client.zig"); +const std = @import("std"); + +const bun = @import("bun"); +const HTTPClient = bun.http; +const picohttp = bun.picohttp; +const quic = bun.uws.quic; diff --git a/src/http/h3_client/callbacks.zig b/src/http/h3_client/callbacks.zig new file mode 100644 index 00000000000..4e81d561ae1 --- /dev/null +++ b/src/http/h3_client/callbacks.zig @@ -0,0 +1,151 @@ +//! lsquic → Zig callbacks for the HTTP/3 client. Registered on the +//! `quic.Context` from `ClientContext.getOrCreate`; lsquic invokes these from +//! inside `process_conns` on the HTTP thread. Each one resolves the +//! `ClientSession` / `Stream` from the ext slot and forwards into the +//! corresponding session/stream method so the protocol logic stays in +//! `ClientSession.zig` / `encode.zig`. + +pub fn register(qctx: *quic.Context) void { + qctx.onHskDone(onHskDone); + qctx.onGoaway(onGoaway); + qctx.onClose(onConnClose); + qctx.onStreamOpen(onStreamOpen); + qctx.onStreamHeaders(onStreamHeaders); + qctx.onStreamData(onStreamData); + qctx.onStreamWritable(onStreamWritable); + qctx.onStreamClose(onStreamClose); +} + +fn onHskDone(qs: *quic.Socket, ok: c_int) callconv(.c) void { + const session = qs.ext(ClientSession).* orelse return; + log("hsk_done ok={d} pending={d}", .{ ok, session.pending.items.len }); + if (ok == 0) { + session.closed = true; + return; + } + session.handshake_done = true; + for (session.pending.items) |_| qs.makeStream(); +} + +/// Peer sent GOAWAY: this connection won't accept new streams (RFC 9114 +/// §5.2). Mark the session unusable now so the next `connect()` opens a fresh +/// one instead of waiting for `onConnClose`, which only fires after lsquic's +/// draining period. Stay in the registry so abort/body-chunk lookups still +/// reach in-flight streams; `onConnClose` does the actual unregister/deref. +fn onGoaway(qs: *quic.Socket) callconv(.c) void { + const session = qs.ext(ClientSession).* orelse return; + log("goaway {s}:{d}", .{ session.hostname, session.port }); + session.closed = true; +} + +fn onConnClose(qs: *quic.Socket) callconv(.c) void { + const session = qs.ext(ClientSession).* orelse return; + session.closed = true; + session.qsocket = null; + var buf: [256]u8 = [_]u8{0} ** 256; + const st = qs.status(&buf); + log("conn_close status={d} '{s}'", .{ st, std.mem.sliceTo(&buf, 0) }); + if (ClientContext.get()) |ctx| ctx.unregister(session); + while (session.pending.items.len > 0) { + // lsquic fires on_stream_close for every bound stream before + // on_conn_closed, so anything still here never got a qstream. + const stream = session.pending.items[0]; + bun.debugAssert(stream.qstream == null); + session.retryOrFail(stream, if (session.handshake_done) + error.ConnectionClosed + else + error.HTTP3HandshakeFailed); + } + _ = H3.live_sessions.fetchSub(1, .monotonic); + session.deref(); +} + +fn onStreamOpen(s: *quic.Stream, is_client: c_int) callconv(.c) void { + s.ext(Stream).* = null; + if (is_client == 0) return; + const qs = s.socket() orelse return; + const session = qs.ext(ClientSession).* orelse { + s.close(); + return; + }; + // Bind the next pending request to this stream. + const stream: *Stream = for (session.pending.items) |st| { + if (st.qstream == null) break st; + } else { + s.close(); + return; + }; + stream.qstream = s; + s.ext(Stream).* = stream; + log("stream_open", .{}); + encode.writeRequest(session, stream, s) catch |err| { + session.fail(stream, err); + }; +} + +fn onStreamHeaders(s: *quic.Stream) callconv(.c) void { + const stream = s.ext(Stream).* orelse return; + const n = s.headerCount(); + + stream.decoded_headers.clearRetainingCapacity(); + bun.handleOom(stream.decoded_headers.ensureTotalCapacity(bun.default_allocator, n)); + var status: u16 = 0; + var i: c_uint = 0; + while (i < n) : (i += 1) { + const h = s.header(i) orelse continue; + const name = h.name[0..h.name_len]; + const value = h.value[0..h.value_len]; + if (strings.hasPrefixComptime(name, ":")) { + if (strings.eqlComptime(name, ":status")) { + status = std.fmt.parseInt(u16, value, 10) catch 0; + } + continue; + } + stream.decoded_headers.appendAssumeCapacity(.{ .name = name, .value = value }); + } + if (status == 0) { + // A second HEADERS block after the final response is trailers + // (RFC 9114 §4.1) and carries no :status; ignore it rather than + // treating the stream as malformed. + if (stream.status_code != 0) return; + stream.session.fail(stream, error.HTTP3ProtocolError); + return; + } + if (status >= 100 and status < 200) return; + stream.status_code = status; + stream.session.deliver(stream, false); +} + +fn onStreamData(s: *quic.Stream, data: [*]const u8, len: c_uint, fin: c_int) callconv(.c) void { + const stream = s.ext(Stream).* orelse return; + if (len > 0) { + bun.handleOom(stream.body_buffer.appendSlice(bun.default_allocator, data[0..len])); + } + stream.session.deliver(stream, fin != 0); +} + +fn onStreamWritable(s: *quic.Stream) callconv(.c) void { + const stream = s.ext(Stream).* orelse return; + encode.drainSendBody(stream, s); +} + +fn onStreamClose(s: *quic.Stream) callconv(.c) void { + const stream = s.ext(Stream).* orelse return; + s.ext(Stream).* = null; + stream.qstream = null; + log("stream_close status={d} delivered={}", .{ stream.status_code, stream.headers_delivered }); + stream.session.deliver(stream, true); +} + +const log = bun.Output.scoped(.h3_client, .hidden); + +const ClientContext = @import("./ClientContext.zig"); +const ClientSession = @import("./ClientSession.zig"); +const H3 = @import("../H3Client.zig"); +const Stream = @import("./Stream.zig"); +const encode = @import("./encode.zig"); +const std = @import("std"); + +const bun = @import("bun"); +const strings = bun.strings; +const quic = bun.uws.quic; diff --git a/src/http/h3_client/encode.zig b/src/http/h3_client/encode.zig new file mode 100644 index 00000000000..29e9846bfb9 --- /dev/null +++ b/src/http/h3_client/encode.zig @@ -0,0 +1,132 @@ +//! Request-side framing for the HTTP/3 client: build the QPACK header list +//! from `HTTPClient.buildRequest` and drain the request body (inline bytes or +//! a JS streaming sink) onto the lsquic stream. Mirrors `h2_client/encode.zig`. + +/// Build pseudo-headers + user headers and send them on `qs`, then kick off +/// body transmission. Called from `callbacks.onStreamOpen` once lsquic hands +/// us a stream for a pending request. +pub fn writeRequest(session: *ClientSession, stream: *Stream, qs: *quic.Stream) !void { + const client = stream.client orelse return error.Aborted; + const request = client.buildRequest(client.state.original_request_body.len()); + if (client.verbose != .none) { + HTTPClient.printRequest(.http3, request, client.url.href, !client.flags.reject_unauthorized, client.state.request_body, client.verbose == .curl); + } + + var sfa = std.heap.stackFallback(2048, bun.default_allocator); + const alloc = sfa.get(); + var headers: std.ArrayListUnmanaged(quic.Header) = .{}; + defer headers.deinit(alloc); + try headers.ensureTotalCapacityPrecise(alloc, request.headers.len + 4); + + // Names not in the QPACK static table get lowercased into one + // pre-sized buffer so the pointers stay stable across the batch. + var name_bytes: usize = 0; + for (request.headers) |h| name_bytes += h.name.len; + const lower = try alloc.alloc(u8, name_bytes); + defer alloc.free(lower); + var lower_len: usize = 0; + + var authority: []const u8 = client.url.host; + headers.items.len = 4; + for (request.headers) |h| { + if (quic.Qpack.classify(h.name)) |class| switch (class) { + .forbidden => {}, + .host => authority = h.value, + .indexed => |i| headers.appendAssumeCapacity(.init(i.name, h.value, i.index)), + } else { + const dst = lower[lower_len..][0..h.name.len]; + _ = strings.copyLowercase(h.name, dst); + lower_len += h.name.len; + headers.appendAssumeCapacity(.init(dst, h.value, null)); + } + } + if (authority.len == 0) authority = session.hostname; + headers.items[0] = .init(":method", request.method, .method_get); + headers.items[1] = .init(":scheme", "https", .scheme_https); + headers.items[2] = .init(":authority", authority, .authority); + headers.items[3] = .init(":path", if (request.path.len > 0) request.path else "/", .path); + + const body = client.state.request_body; + const has_inline_body = client.state.original_request_body == .bytes and body.len > 0; + const is_streaming = client.state.original_request_body == .stream; + + const end_stream = !has_inline_body and !is_streaming; + if (qs.sendHeaders(headers.items, end_stream) != 0) { + return error.HTTP3HeaderEncodingError; + } + + if (has_inline_body) { + stream.pending_body = body; + drainSendBody(stream, qs); + } else if (is_streaming) { + stream.is_streaming_body = true; + drainSendBody(stream, qs); + } else { + stream.request_body_done = true; + } + + client.state.request_stage = if (stream.request_body_done) .done else .body; + client.state.response_stage = .headers; + + // For streaming bodies the JS sink waits for can_stream to start + // pumping; report progress now so it begins. + if (is_streaming) client.progressUpdateH3(); +} + +/// Push as much of the request body onto `qs` as flow control allows. Called +/// from `writeRequest`, `callbacks.onStreamWritable`, and +/// `ClientSession.streamBodyByHttpId` (when the JS sink delivers more bytes). +pub fn drainSendBody(stream: *Stream, qs: *quic.Stream) void { + if (stream.request_body_done) return; + const client = stream.client orelse return; + + if (stream.is_streaming_body) { + const body = &client.state.original_request_body.stream; + const sb = body.buffer orelse return; + const buffer = sb.acquire(); + const data = buffer.slice(); + var written: usize = 0; + while (written < data.len) { + const w = qs.write(data[written..]); + if (w <= 0) break; + written += @intCast(w); + } + buffer.cursor += written; + const drained = buffer.isEmpty(); + if (drained) buffer.reset(); + if (drained and body.ended) { + stream.request_body_done = true; + qs.shutdown(); + client.state.request_stage = .done; + } else if (!drained) { + qs.wantWrite(true); + } else if (data.len > 0) { + sb.reportDrain(); + } + sb.release(); + if (stream.request_body_done) body.detach(); + return; + } + + while (stream.pending_body.len > 0) { + const w = qs.write(stream.pending_body); + if (w <= 0) break; + stream.pending_body = stream.pending_body[@intCast(w)..]; + } + if (stream.pending_body.len == 0) { + stream.request_body_done = true; + qs.shutdown(); + client.state.request_stage = .done; + } else { + qs.wantWrite(true); + } +} + +const ClientSession = @import("./ClientSession.zig"); +const Stream = @import("./Stream.zig"); +const std = @import("std"); + +const bun = @import("bun"); +const HTTPClient = bun.http; +const strings = bun.strings; +const quic = bun.uws.quic; diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 429019ee055..15bba5d839b 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -259,3 +259,10 @@ export const fetchH2Internals = { streams: number; }, }; + +export const fetchH3Internals = { + liveCounts: $newZigFunction("http/H3Client.zig", "TestingAPIs.quicLiveCounts", 0) as () => { + sessions: number; + streams: number; + }, +}; diff --git a/test/js/web/fetch/fetch-http3-client.test.ts b/test/js/web/fetch/fetch-http3-client.test.ts index 4f0e493404d..9c719e9454a 100644 --- a/test/js/web/fetch/fetch-http3-client.test.ts +++ b/test/js/web/fetch/fetch-http3-client.test.ts @@ -1,6 +1,6 @@ import { gzipSync, type Server } from "bun"; import { afterAll, beforeAll, describe, expect, test } from "bun:test"; -import { tls } from "harness"; +import { bunEnv, bunExe, tempDir, tls } from "harness"; // In-process server with `h1: false` so the build under test binds UDP only. // A fetch that silently fell back to HTTP/1.1 would get ECONNREFUSED, which @@ -577,3 +577,74 @@ test("retries on a fresh session when a pooled session is stale (port reuse)", a void b.stop(true); } }); + +// Subprocess so the experimental flag is process-scoped and the in-process +// server above (h1: false) doesn't interfere — this server keeps h1 on so the +// first fetch goes over TCP and reads Alt-Svc. +describe("Alt-Svc upgrade (--experimental-http3-fetch)", () => { + // The fixture starts a server with both h1+h3 listening on the same port, + // does two fetches with no `protocol:` hint, and prints the alt-svc header + // plus the live h3 session count after each. With the flag on, fetch #1 + // goes over h1 (sessions=0) and records Alt-Svc; fetch #2 goes over QUIC + // (sessions=1). + const fixture = ` + import { fetchH3Internals } from "bun:internal-for-testing"; + const { liveCounts } = fetchH3Internals; + using server = Bun.serve({ + port: 0, + tls: ${JSON.stringify(tls)}, + h3: true, + fetch: () => new Response("ok"), + }); + const url = "https://127.0.0.1:" + server.port + "/"; + const opts = { tls: { rejectUnauthorized: false } }; + { + const r = await fetch(url, opts); + await r.text(); + console.log("first alt-svc=%s sessions=%d", r.headers.get("alt-svc") ?? "", liveCounts().sessions); + } + { + const r = await fetch(url, opts); + await r.text(); + console.log("second status=%d sessions=%d", r.status, liveCounts().sessions); + } + `; + + async function run(extra: { env?: Record; args?: string[] }) { + using dir = tempDir("h3-altsvc", { "fixture.ts": fixture }); + await using proc = Bun.spawn({ + cmd: [bunExe(), ...(extra.args ?? []), "fixture.ts"], + env: { ...bunEnv, ...extra.env }, + cwd: String(dir), + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + return { stdout, stderr, exitCode }; + } + + test("env var: BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT=1", async () => { + const { stdout, stderr, exitCode } = await run({ env: { BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT: "1" } }); + expect(stderr).toBe(""); + expect(stdout).toMatch(/^first alt-svc=h3=":\d+"; ma=\d+ sessions=0\n/); + expect(stdout).toMatch(/second status=200 sessions=1\n$/); + expect(exitCode).toBe(0); + }); + + test("CLI flag: --experimental-http3-fetch", async () => { + const { stdout, stderr, exitCode } = await run({ args: ["--experimental-http3-fetch"] }); + expect(stderr).toBe(""); + expect(stdout).toMatch(/^first alt-svc=h3=":\d+"; ma=\d+ sessions=0\n/); + expect(stdout).toMatch(/second status=200 sessions=1\n$/); + expect(exitCode).toBe(0); + }); + + test("off by default: both fetches stay on h1", async () => { + const { stdout, stderr, exitCode } = await run({}); + expect(stderr).toBe(""); + // Alt-Svc header is still emitted by the server, but the client never + // records it and never opens a QUIC session. + expect(stdout).toMatch(/^first alt-svc=h3=":\d+"; ma=\d+ sessions=0\n/); + expect(stdout).toMatch(/second status=200 sessions=0\n$/); + expect(exitCode).toBe(0); + }); +});