From 54a5cb6f9b95a462abdc1aec0ff5d8940b1c6228 Mon Sep 17 00:00:00 2001 From: "Node.js GitHub Bot" Date: Sun, 3 Apr 2022 00:23:31 +0000 Subject: [PATCH] deps: update undici to 5.0.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/42583 Reviewed-By: Michaël Zasso Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy Reviewed-By: Mohammed Keyvanzadeh Reviewed-By: Mestery Reviewed-By: James M Snell Reviewed-By: Darshan Sen --- deps/undici/src/docs/api/MockAgent.md | 54 +- deps/undici/src/docs/api/MockClient.md | 5 +- deps/undici/src/docs/api/MockPool.md | 6 +- deps/undici/src/lib/agent.js | 128 +- deps/undici/src/lib/client.js | 218 +--- deps/undici/src/lib/core/symbols.js | 3 + deps/undici/src/lib/dispatcher-base.js | 159 +++ deps/undici/src/lib/fetch/constants.js | 3 + deps/undici/src/lib/fetch/headers.js | 7 +- deps/undici/src/lib/fetch/index.js | 1024 ++++++++------- deps/undici/src/lib/fetch/request.js | 26 +- deps/undici/src/lib/fetch/response.js | 109 +- deps/undici/src/lib/fetch/util.js | 28 +- deps/undici/src/lib/mock/mock-interceptor.js | 2 +- deps/undici/src/lib/mock/mock-utils.js | 8 + deps/undici/src/lib/pool-base.js | 120 +- deps/undici/src/lib/proxy-agent.js | 12 +- deps/undici/src/package.json | 6 +- deps/undici/src/types/mock-interceptor.d.ts | 4 +- deps/undici/undici.js | 1192 +++++++++--------- 20 files changed, 1576 insertions(+), 1538 deletions(-) create mode 100644 deps/undici/src/lib/dispatcher-base.js diff --git a/deps/undici/src/docs/api/MockAgent.md b/deps/undici/src/docs/api/MockAgent.md index 4500ef13c6bda6..f94ae339f963d0 100644 --- a/deps/undici/src/docs/api/MockAgent.md +++ b/deps/undici/src/docs/api/MockAgent.md @@ -72,11 +72,7 @@ const mockAgent = new MockAgent() setGlobalDispatcher(mockAgent) const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, body } = await request('http://localhost:3000/foo') @@ -95,11 +91,7 @@ import { MockAgent, request } from 'undici' const mockAgent = new MockAgent() const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, @@ -121,11 +113,7 @@ import { MockAgent, request } from 'undici' const mockAgent = new MockAgent() const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, @@ -147,11 +135,7 @@ import { MockAgent, request } from 'undici' const mockAgent = new MockAgent({ connections: 1 }) const mockClient = mockAgent.get('http://localhost:3000') - -mockClient.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockClient.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, @@ -174,16 +158,8 @@ const mockAgent = new MockAgent() setGlobalDispatcher(mockAgent) const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') - -mockPool.intercept({ - path: '/hello', - method: 'GET' -}).reply(200, 'hello') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') +mockPool.intercept({ path: '/hello'}).reply(200, 'hello') const result1 = await request('http://localhost:3000/foo') @@ -250,11 +226,7 @@ const mockAgent = new MockAgent() setGlobalDispatcher(mockAgent) const mockPool = mockAgent.get(new RegExp('http://localhost:3000')) - -mockPool.intercept({ - path: '/foo', - method: 'GET', -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, @@ -277,11 +249,7 @@ const mockAgent = new MockAgent() setGlobalDispatcher(mockAgent) const mockPool = mockAgent.get((origin) => origin === 'http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, @@ -328,11 +296,7 @@ import { MockAgent } from 'undici' const mockAgent = new MockAgent() const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET' -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, diff --git a/deps/undici/src/docs/api/MockClient.md b/deps/undici/src/docs/api/MockClient.md index de73b24702e0ee..ac546913d237d5 100644 --- a/deps/undici/src/docs/api/MockClient.md +++ b/deps/undici/src/docs/api/MockClient.md @@ -58,10 +58,7 @@ import { MockAgent } from 'undici' const mockAgent = new MockAgent({ connections: 1 }) const mockClient = mockAgent.get('http://localhost:3000') -mockClient.intercept({ - path: '/foo', - method: 'GET', -}).reply(200, 'foo') +mockClient.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, diff --git a/deps/undici/src/docs/api/MockPool.md b/deps/undici/src/docs/api/MockPool.md index df861255941868..29667f3177b51e 100644 --- a/deps/undici/src/docs/api/MockPool.md +++ b/deps/undici/src/docs/api/MockPool.md @@ -95,11 +95,7 @@ setGlobalDispatcher(mockAgent) // MockPool const mockPool = mockAgent.get('http://localhost:3000') - -mockPool.intercept({ - path: '/foo', - method: 'GET', -}).reply(200, 'foo') +mockPool.intercept({ path: '/foo' }).reply(200, 'foo') const { statusCode, diff --git a/deps/undici/src/lib/agent.js b/deps/undici/src/lib/agent.js index 30ac4ee1181e9b..47aa2365e61a34 100644 --- a/deps/undici/src/lib/agent.js +++ b/deps/undici/src/lib/agent.js @@ -1,20 +1,14 @@ 'use strict' -const { - ClientClosedError, - InvalidArgumentError, - ClientDestroyedError -} = require('./core/errors') -const { kClients, kRunning } = require('./core/symbols') -const Dispatcher = require('./dispatcher') +const { InvalidArgumentError } = require('./core/errors') +const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('./core/symbols') +const DispatcherBase = require('./dispatcher-base') const Pool = require('./pool') const Client = require('./client') const util = require('./core/util') const RedirectHandler = require('./handler/redirect') const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')() -const kDestroyed = Symbol('destroyed') -const kClosed = Symbol('closed') const kOnConnect = Symbol('onConnect') const kOnDisconnect = Symbol('onDisconnect') const kOnConnectionError = Symbol('onConnectionError') @@ -30,7 +24,7 @@ function defaultFactory (origin, opts) { : new Pool(origin, opts) } -class Agent extends Dispatcher { +class Agent extends DispatcherBase { constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) { super() @@ -60,8 +54,6 @@ class Agent extends Dispatcher { this[kClients].delete(key) } }) - this[kClosed] = false - this[kDestroyed] = false const agent = this @@ -94,76 +86,38 @@ class Agent extends Dispatcher { return ret } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object.') + [kDispatch] (opts, handler) { + let key + if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { + key = String(opts.origin) + } else { + throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') } - try { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be an object.') - } - - let key - if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { - key = String(opts.origin) - } else { - throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') - } - - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosed]) { - throw new ClientClosedError() - } + const ref = this[kClients].get(key) - const ref = this[kClients].get(key) - - let dispatcher = ref ? ref.deref() : null - if (!dispatcher) { - dispatcher = this[kFactory](opts.origin, this[kOptions]) - .on('drain', this[kOnDrain]) - .on('connect', this[kOnConnect]) - .on('disconnect', this[kOnDisconnect]) - .on('connectionError', this[kOnConnectionError]) - - this[kClients].set(key, new WeakRef(dispatcher)) - this[kFinalizer].register(dispatcher, key) - } - - const { maxRedirections = this[kMaxRedirections] } = opts - if (maxRedirections != null && maxRedirections !== 0) { - opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. - handler = new RedirectHandler(this, maxRedirections, opts, handler) - } - - return dispatcher.dispatch(opts, handler) - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } + let dispatcher = ref ? ref.deref() : null + if (!dispatcher) { + dispatcher = this[kFactory](opts.origin, this[kOptions]) + .on('drain', this[kOnDrain]) + .on('connect', this[kOnConnect]) + .on('disconnect', this[kOnDisconnect]) + .on('connectionError', this[kOnConnectionError]) - handler.onError(err) + this[kClients].set(key, new WeakRef(dispatcher)) + this[kFinalizer].register(dispatcher, key) } - } - - get closed () { - return this[kClosed] - } - - get destroyed () { - return this[kDestroyed] - } - close (callback) { - if (callback != null && typeof callback !== 'function') { - throw new InvalidArgumentError('callback must be a function') + const { maxRedirections = this[kMaxRedirections] } = opts + if (maxRedirections != null && maxRedirections !== 0) { + opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. + handler = new RedirectHandler(this, maxRedirections, opts, handler) } - this[kClosed] = true + return dispatcher.dispatch(opts, handler) + } + async [kClose] () { const closePromises = [] for (const ref of this[kClients].values()) { const client = ref.deref() @@ -173,27 +127,10 @@ class Agent extends Dispatcher { } } - if (!callback) { - return Promise.all(closePromises) - } - - // Should never error. - Promise.all(closePromises).then(() => process.nextTick(callback)) + await Promise.all(closePromises) } - destroy (err, callback) { - if (typeof err === 'function') { - callback = err - err = null - } - - if (callback != null && typeof callback !== 'function') { - throw new InvalidArgumentError('callback must be a function') - } - - this[kClosed] = true - this[kDestroyed] = true - + async [kDestroy] (err) { const destroyPromises = [] for (const ref of this[kClients].values()) { const client = ref.deref() @@ -203,12 +140,7 @@ class Agent extends Dispatcher { } } - if (!callback) { - return Promise.all(destroyPromises) - } - - // Should never error. - Promise.all(destroyPromises).then(() => process.nextTick(callback)) + await Promise.all(destroyPromises) } } diff --git a/deps/undici/src/lib/client.js b/deps/undici/src/lib/client.js index 55d9afabf956bd..d3d4cfc705d635 100644 --- a/deps/undici/src/lib/client.js +++ b/deps/undici/src/lib/client.js @@ -6,7 +6,7 @@ const assert = require('assert') const net = require('net') const util = require('./core/util') const Request = require('./core/request') -const Dispatcher = require('./dispatcher') +const DispatcherBase = require('./dispatcher-base') const RedirectHandler = require('./handler/redirect') const { RequestContentLengthMismatchError, @@ -16,8 +16,6 @@ const { RequestAbortedError, HeadersTimeoutError, HeadersOverflowError, - ClientDestroyedError, - ClientClosedError, SocketError, InformationalError, BodyTimeoutError, @@ -45,12 +43,9 @@ const { kNoRef, kKeepAliveDefaultTimeout, kHostHeader, - kClosed, - kDestroyed, kPendingIdx, kRunningIdx, kError, - kOnDestroyed, kPipelining, kSocket, kKeepAliveTimeoutValue, @@ -63,9 +58,14 @@ const { kConnector, kMaxRedirections, kMaxRequests, - kCounter + kCounter, + kClose, + kDestroy, + kDispatch } = require('./core/symbols') +const kClosedResolve = Symbol('kClosedResolve') + const channels = {} try { @@ -81,7 +81,7 @@ try { channels.connected = { hasSubscribers: false } } -class Client extends Dispatcher { +class Client extends DispatcherBase { constructor (url, { maxHeaderSize, headersTimeout, @@ -189,10 +189,7 @@ class Client extends Dispatcher { this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout] - this[kClosed] = false - this[kDestroyed] = false this[kServerName] = null - this[kOnDestroyed] = [] this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n` @@ -201,6 +198,7 @@ class Client extends Dispatcher { this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength this[kMaxRedirections] = maxRedirections this[kMaxRequests] = maxRequestsPerClient + this[kClosedResolve] = null // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -216,25 +214,15 @@ class Client extends Dispatcher { this[kPendingIdx] = 0 } - // TODO: Make private? get pipelining () { return this[kPipelining] } - // TODO: Make private? set pipelining (value) { this[kPipelining] = value resume(this, true) } - get destroyed () { - return this[kDestroyed] - } - - get closed () { - return this[kClosed] - } - get [kPending] () { return this[kQueue].length - this[kPendingIdx] } @@ -266,141 +254,68 @@ class Client extends Dispatcher { this.once('connect', cb) } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') - } - - try { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be an object.') - } - - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosed]) { - throw new ClientClosedError() - } - - const { maxRedirections = this[kMaxRedirections] } = opts - if (maxRedirections) { - handler = new RedirectHandler(this, maxRedirections, opts, handler) - } - - const origin = opts.origin || this[kUrl].origin - - const request = new Request(origin, opts, handler) - - this[kQueue].push(request) - if (this[kResuming]) { - // Do nothing. - } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { - // Wait a tick in case stream/iterator is ended in the same tick. - this[kResuming] = 1 - process.nextTick(resume, this) - } else { - resume(this, true) - } - - if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { - this[kNeedDrain] = 2 - } - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } - - handler.onError(err) - } - - return this[kNeedDrain] < 2 - } - - close (callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - this.close((err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') + [kDispatch] (opts, handler) { + const { maxRedirections = this[kMaxRedirections] } = opts + if (maxRedirections) { + handler = new RedirectHandler(this, maxRedirections, opts, handler) } - if (this[kDestroyed]) { - queueMicrotask(() => callback(new ClientDestroyedError(), null)) - return - } + const origin = opts.origin || this[kUrl].origin - this[kClosed] = true + const request = new Request(origin, opts, handler) - if (!this[kSize]) { - this.destroy(callback) + this[kQueue].push(request) + if (this[kResuming]) { + // Do nothing. + } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { + // Wait a tick in case stream/iterator is ended in the same tick. + this[kResuming] = 1 + process.nextTick(resume, this) } else { - this[kOnDestroyed].push(callback) + resume(this, true) } - } - destroy (err, callback) { - if (typeof err === 'function') { - callback = err - err = null + if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { + this[kNeedDrain] = 2 } - if (callback === undefined) { - return new Promise((resolve, reject) => { - this.destroy(err, (err, data) => { - return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data) - }) - }) - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } + return this[kNeedDrain] < 2 + } - if (this[kDestroyed]) { - if (this[kOnDestroyed]) { - this[kOnDestroyed].push(callback) + async [kClose] () { + return new Promise((resolve) => { + if (!this[kSize]) { + this.destroy(resolve) } else { - queueMicrotask(() => callback(null, null)) + this[kClosedResolve] = resolve } - return - } - - if (!err) { - err = new ClientDestroyedError() - } - - const requests = this[kQueue].splice(this[kPendingIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(this, request, err) - } + }) + } - this[kClosed] = true - this[kDestroyed] = true - this[kOnDestroyed].push(callback) + async [kDestroy] (err) { + return new Promise((resolve) => { + const requests = this[kQueue].splice(this[kPendingIdx]) + for (let i = 0; i < requests.length; i++) { + const request = requests[i] + errorRequest(this, request, err) + } - const onDestroyed = () => { - const callbacks = this[kOnDestroyed] - this[kOnDestroyed] = null - for (let i = 0; i < callbacks.length; i++) { - callbacks[i](null, null) + const callback = () => { + if (this[kClosedResolve]) { + this[kClosedResolve]() + this[kClosedResolve] = null + } + resolve() } - } - if (!this[kSocket]) { - queueMicrotask(onDestroyed) - } else { - util.destroy(this[kSocket].on('close', onDestroyed), err) - } + if (!this[kSocket]) { + queueMicrotask(callback) + } else { + util.destroy(this[kSocket].on('close', callback), err) + } - resume(this) + resume(this) + }) } } @@ -476,7 +391,6 @@ async function lazyllhttp () { let llhttpInstance = null let llhttpPromise = lazyllhttp() .catch(() => { - // TODO: Emit warning? }) let currentParser = null @@ -586,7 +500,6 @@ class Parser { currentBufferPtr = llhttp.malloc(currentBufferSize) } - // TODO (perf): Can we avoid this copy somehow? new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data) // Call `execute` on the wasm parser. @@ -635,12 +548,10 @@ class Parser { try { try { currentParser = this - this.llhttp.llhttp_finish(this.ptr) // TODO (fix): Check ret? } finally { currentParser = null } } catch (err) { - // TODO (fix): What if socket is already destroyed? Error will be swallowed. /* istanbul ignore next: difficult to make a test case for */ util.destroy(this.socket, err) } @@ -782,13 +693,9 @@ class Parser { return -1 } - // TODO: Check for content-length mismatch from server? - assert(!this.upgrade) assert(this.statusCode < 200) - // TODO: More statusCode validation? - if (statusCode === 100) { util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket))) return -1 @@ -979,7 +886,6 @@ class Parser { util.destroy(socket, new InformationalError('reset')) return constants.ERROR.PAUSED } else if (!shouldKeepAlive) { - // TODO: What if running > 0? util.destroy(socket, new InformationalError('reset')) return constants.ERROR.PAUSED } else if (socket[kReset] && client[kRunning] === 0) { @@ -1079,7 +985,7 @@ function onSocketClose () { client[kSocket] = null - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0) // Fail entire queue. @@ -1251,14 +1157,14 @@ function resume (client, sync) { function _resume (client, sync) { while (true) { - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0) return } - if (client[kClosed] && !client[kSize]) { - client.destroy(util.nop) - continue + if (client.closed && !client[kSize]) { + client.destroy() + return } const socket = client[kSocket] @@ -1479,8 +1385,6 @@ function write (client, request) { socket[kBlocking] = true } - // TODO: Expect: 100-continue - let header = `${method} ${path} HTTP/1.1\r\n` if (typeof host === 'string') { @@ -1600,7 +1504,6 @@ function writeStream ({ body, client, request, socket, contentLength, header, ex writer.destroy(err) - // TODO (fix): Avoid using err.message for logic. if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) { util.destroy(body, err) } else { @@ -1679,7 +1582,6 @@ async function writeIterable ({ body, client, request, socket, contentLength, he const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) try { - // TODO (fix): What if socket errors while waiting for body? // It's up to the user to somehow abort the async iterable. for await (const chunk of body) { if (socket[kError]) { @@ -1730,7 +1632,6 @@ class AsyncWriter { return true } - // TODO: What if not ended and bytesWritten === contentLength? // We should defer writing chunks. if (contentLength !== null && bytesWritten + len > contentLength) { if (client[kStrictContentLength]) { @@ -1800,7 +1701,6 @@ class AsyncWriter { } } - // TODO (fix): Add comment clarifying what this does? if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { // istanbul ignore else: only for jest if (socket[kParser].timeout.refresh) { diff --git a/deps/undici/src/lib/core/symbols.js b/deps/undici/src/lib/core/symbols.js index 1d28bc15e0c497..30108827a846d0 100644 --- a/deps/undici/src/lib/core/symbols.js +++ b/deps/undici/src/lib/core/symbols.js @@ -1,4 +1,7 @@ module.exports = { + kClose: Symbol('close'), + kDestroy: Symbol('destroy'), + kDispatch: Symbol('dispatch'), kUrl: Symbol('url'), kWriting: Symbol('writing'), kResuming: Symbol('resuming'), diff --git a/deps/undici/src/lib/dispatcher-base.js b/deps/undici/src/lib/dispatcher-base.js new file mode 100644 index 00000000000000..2c12ba80f351cf --- /dev/null +++ b/deps/undici/src/lib/dispatcher-base.js @@ -0,0 +1,159 @@ +'use strict' + +const Dispatcher = require('./dispatcher') +const { + ClientDestroyedError, + ClientClosedError, + InvalidArgumentError +} = require('./core/errors') +const { kDestroy, kClose, kDispatch } = require('./core/symbols') + +const kDestroyed = Symbol('destroyed') +const kClosed = Symbol('closed') +const kOnDestroyed = Symbol('onDestroyed') +const kOnClosed = Symbol('onClosed') + +class DispatcherBase extends Dispatcher { + constructor () { + super() + + this[kDestroyed] = false + this[kOnDestroyed] = [] + this[kClosed] = false + this[kOnClosed] = [] + } + + get destroyed () { + return this[kDestroyed] + } + + get closed () { + return this[kClosed] + } + + close (callback) { + if (callback === undefined) { + return new Promise((resolve, reject) => { + this.close((err, data) => { + return err ? reject(err) : resolve(data) + }) + }) + } + + if (typeof callback !== 'function') { + throw new InvalidArgumentError('invalid callback') + } + + if (this[kDestroyed]) { + queueMicrotask(() => callback(new ClientDestroyedError(), null)) + return + } + + if (this[kClosed]) { + if (this[kOnClosed]) { + this[kOnClosed].push(callback) + } else { + queueMicrotask(() => callback(null, null)) + } + return + } + + this[kClosed] = true + this[kOnClosed].push(callback) + + const onClosed = () => { + const callbacks = this[kOnClosed] + this[kOnClosed] = null + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null) + } + } + + // Should not error. + this[kClose]() + .then(() => this.destroy()) + .then(() => { + queueMicrotask(onClosed) + }) + } + + destroy (err, callback) { + if (typeof err === 'function') { + callback = err + err = null + } + + if (callback === undefined) { + return new Promise((resolve, reject) => { + this.destroy(err, (err, data) => { + return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data) + }) + }) + } + + if (typeof callback !== 'function') { + throw new InvalidArgumentError('invalid callback') + } + + if (this[kDestroyed]) { + if (this[kOnDestroyed]) { + this[kOnDestroyed].push(callback) + } else { + queueMicrotask(() => callback(null, null)) + } + return + } + + if (!err) { + err = new ClientDestroyedError() + } + + this[kDestroyed] = true + this[kOnDestroyed].push(callback) + + const onDestroyed = () => { + const callbacks = this[kOnDestroyed] + this[kOnDestroyed] = null + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null) + } + } + + // Should not error. + this[kDestroy](err).then(() => { + queueMicrotask(onDestroyed) + }) + } + + dispatch (opts, handler) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler must be an object') + } + + try { + if (!opts || typeof opts !== 'object') { + throw new InvalidArgumentError('opts must be an object.') + } + + if (this[kDestroyed]) { + throw new ClientDestroyedError() + } + + if (this[kClosed]) { + throw new ClientClosedError() + } + + return this[kDispatch](opts, handler) + } catch (err) { + if (typeof handler.onError !== 'function') { + throw new InvalidArgumentError('invalid onError method') + } + + handler.onError(err) + + return false + } + } +} + +module.exports = DispatcherBase diff --git a/deps/undici/src/lib/fetch/constants.js b/deps/undici/src/lib/fetch/constants.js index 2eff7596968023..75c9265e8c1557 100644 --- a/deps/undici/src/lib/fetch/constants.js +++ b/deps/undici/src/lib/fetch/constants.js @@ -86,9 +86,12 @@ const subresource = [ '' ] +const corsSafeListedResponseHeaderNames = [] // TODO + module.exports = { subresource, forbiddenResponseHeaderNames, + corsSafeListedResponseHeaderNames, forbiddenMethods, requestBodyHeader, referrerPolicy, diff --git a/deps/undici/src/lib/fetch/headers.js b/deps/undici/src/lib/fetch/headers.js index 7a59f15371e786..a5dd5b7a413373 100644 --- a/deps/undici/src/lib/fetch/headers.js +++ b/deps/undici/src/lib/fetch/headers.js @@ -346,11 +346,12 @@ class Headers { const callback = args[0] const thisArg = args[1] - for (let index = 0; index < this[kHeadersList].length; index += 2) { + const clone = this[kHeadersList].slice() + for (let index = 0; index < clone.length; index += 2) { callback.call( thisArg, - this[kHeadersList][index + 1], - this[kHeadersList][index], + clone[index + 1], + clone[index], this ) } diff --git a/deps/undici/src/lib/fetch/index.js b/deps/undici/src/lib/fetch/index.js index d5ca150b151960..dfc7b824082660 100644 --- a/deps/undici/src/lib/fetch/index.js +++ b/deps/undici/src/lib/fetch/index.js @@ -5,6 +5,7 @@ const { Response, makeNetworkError, + makeAppropriateNetworkError, filterResponse, makeResponse } = require('./response') @@ -12,8 +13,6 @@ const { Headers } = require('./headers') const { Request, makeRequest } = require('./request') const zlib = require('zlib') const { - ServiceWorkerGlobalScope, - Window, matchRequestIntegrity, makePolicyContainer, clonePolicyContainer, @@ -33,7 +32,9 @@ const { createDeferredPromise, isBlobLike, CORBCheck, - sameOrigin + sameOrigin, + isCancelled, + isAborted } = require('./util') const { kState, kHeaders, kGuard, kRealm } = require('./symbols') const { AbortError } = require('../core/errors') @@ -48,10 +49,11 @@ const { } = require('./constants') const { kHeadersList } = require('../core/symbols') const EE = require('events') -const { PassThrough, pipeline } = require('stream') +const { Readable, pipeline } = require('stream') const { isErrored, isReadable } = require('../core/util') -const { kIsMockActive } = require('../mock/mock-symbols') const { dataURLProcessor } = require('./dataURL') +const { kIsMockActive } = require('../mock/mock-symbols') +const { TransformStream } = require('stream/web') /** @type {import('buffer').resolveObjectURL} */ let resolveObjectURL @@ -62,19 +64,30 @@ class Fetch extends EE { super() this.dispatcher = dispatcher - this.terminated = null this.connection = null this.dump = false + this.state = 'ongoing' } - terminate ({ reason, aborted } = {}) { - if (this.terminated) { + terminate (reason) { + if (this.state !== 'ongoing') { return } - this.terminated = { aborted, reason } + this.state = 'terminated' this.connection?.destroy(reason) + this.emit('terminated', reason) + } + + abort () { + if (this.state !== 'ongoing') { + return + } + const reason = new AbortError() + + this.state = 'aborted' + this.connection?.destroy(reason) this.emit('terminated', reason) } } @@ -99,8 +112,6 @@ async function fetch (...args) { const resource = args[0] const init = args.length >= 1 ? args[1] ?? {} : {} - const context = new Fetch(this) - // 1. Let p be a new promise. const p = createDeferredPromise() @@ -115,19 +126,18 @@ async function fetch (...args) { // 4. If requestObject’s signal’s aborted flag is set, then: if (requestObject.signal.aborted) { // 1. Abort fetch with p, request, and null. - abortFetch.call(context, p, request, null) + abortFetch(p, request, null) // 2. Return p. return p.promise } // 5. Let globalObject be request’s client’s global object. - // TODO: What if request.client is null? - const globalObject = request.client?.globalObject + const globalObject = request.client.globalObject // 6. If globalObject is a ServiceWorkerGlobalScope object, then set // request’s service-workers mode to "none". - if (globalObject instanceof ServiceWorkerGlobalScope) { + if (globalObject?.constructor?.name === 'ServiceWorkerGlobalScope') { request.serviceWorkers = 'none' } @@ -140,7 +150,10 @@ async function fetch (...args) { // 9. Let locallyAborted be false. let locallyAborted = false - // 10. Add the following abort steps to requestObject’s signal: + // 10. Let controller be null. + let controller = null + + // 11. Add the following abort steps to requestObject’s signal: requestObject.signal.addEventListener( 'abort', () => { @@ -148,21 +161,25 @@ async function fetch (...args) { locallyAborted = true // 2. Abort fetch with p, request, and responseObject. - abortFetch.call(context, p, request, responseObject) + abortFetch(p, request, responseObject) - // 3. Terminate the ongoing fetch with the aborted flag set. - context.terminate({ aborted: true }) + // 3. If controller is not null, then abort controller. + if (controller != null) { + controller.abort() + } }, { once: true } ) - // 11. Let handleFetchDone given response response be to finalize and + // 12. Let handleFetchDone given response response be to finalize and // report timing with response, globalObject, and "fetch". const handleFetchDone = (response) => finalizeAndReportTiming(response, 'fetch') - // 12. Fetch request with processResponseEndOfBody set to handleFetchDone, - // and processResponse given response being these substeps: + // 13. Set controller to the result of calling fetch given request, + // with processResponseEndOfBody set to handleFetchDone, and processResponse + // given response being these substeps: + const processResponse = (response) => { // 1. If locallyAborted is true, terminate these substeps. if (locallyAborted) { @@ -172,7 +189,7 @@ async function fetch (...args) { // 2. If response’s aborted flag is set, then abort fetch with p, // request, and responseObject, and terminate these substeps. if (response.aborted) { - abortFetch.call(context, p, request, responseObject) + abortFetch(p, request, responseObject) return } @@ -198,17 +215,14 @@ async function fetch (...args) { p.resolve(responseObject) } - fetching - .call(context, { - request, - processResponseEndOfBody: handleFetchDone, - processResponse - }) - .catch((err) => { - p.reject(err) - }) + controller = fetching({ + request, + processResponseEndOfBody: handleFetchDone, + processResponse, + dispatcher: this // undici + }) - // 13. Return p. + // 14. Return p. return p.promise } @@ -329,7 +343,8 @@ function fetching ({ processResponse, processResponseEndOfBody, processResponseConsumeBody, - useParallelQueue = false + useParallelQueue = false, + dispatcher // undici }) { // 1. Let taskDestination be null. let taskDestination = null @@ -371,6 +386,7 @@ function fetching ({ // task destination is taskDestination, // and cross-origin isolated capability is crossOriginIsolatedCapability. const fetchParams = { + controller: new Fetch(dispatcher), request, timingInfo, processRequestBodyChunkLength, @@ -394,7 +410,7 @@ function fetching ({ if (request.window === 'client') { // TODO: What if request.client is null? request.window = - request.client?.globalObject instanceof Window + request.client?.globalObject?.constructor?.name === 'Window' ? request.client : 'no-window' } @@ -406,7 +422,10 @@ function fetching ({ request.origin = request.client?.origin } - // 10. If request’s policy container is "client", then: + // 10. If all of the following conditions are true: + // TODO + + // 11. If request’s policy container is "client", then: if (request.policyContainer === 'client') { // 1. If request’s client is non-null, then set request’s policy // container to a clone of request’s client’s policy container. [HTML] @@ -421,7 +440,7 @@ function fetching ({ } } - // 11. If request’s header list does not contain `Accept`, then: + // 12. If request’s header list does not contain `Accept`, then: if (!request.headersList.has('accept')) { // 1. Let value be `*/*`. const value = '*/*' @@ -442,38 +461,37 @@ function fetching ({ request.headersList.append('accept', value) } - // 12. If request’s header list does not contain `Accept-Language`, then + // 13. If request’s header list does not contain `Accept-Language`, then // user agents should append `Accept-Language`/an appropriate value to // request’s header list. if (!request.headersList.has('accept-language')) { request.headersList.append('accept-language', '*') } - // 13. If request’s priority is null, then use request’s initiator and + // 14. If request’s priority is null, then use request’s initiator and // destination appropriately in setting request’s priority to a // user-agent-defined object. if (request.priority === null) { // TODO } - // 14. If request is a subresource request, then: + // 15. If request is a subresource request, then: if (subresource.includes(request.destination)) { - // 1. Let record be a new fetch record consisting of request and this - // instance of the fetch algorithm. - // TODO - // 2. Append record to request’s client’s fetch group list of fetch - // records. // TODO } - // 15. Run main fetch given fetchParams. - return mainFetch.call(this, fetchParams) + // 16. Run main fetch given fetchParams. + mainFetch(fetchParams) + .catch(err => { + fetchParams.controller.terminate(err) + }) + + // 17. Return fetchParam's controller + return fetchParams.controller } // https://fetch.spec.whatwg.org/#concept-main-fetch async function mainFetch (fetchParams, recursive = false) { - const context = this - // 1. Let request be fetchParams’s request. const request = fetchParams.request @@ -548,8 +566,7 @@ async function mainFetch (fetchParams, recursive = false) { request.responseTainting = 'basic' // 2. Return the result of running scheme fetch given fetchParams. - return await schemeFetch - .call(this, fetchParams) + return await schemeFetch(fetchParams) } // request’s mode is "same-origin" @@ -573,8 +590,7 @@ async function mainFetch (fetchParams, recursive = false) { // 3. Let noCorsResponse be the result of running scheme fetch given // fetchParams. - const noCorsResponse = await schemeFetch - .call(this, fetchParams) + const noCorsResponse = await schemeFetch(fetchParams) // 4. If noCorsResponse is a filtered response or the CORB check with // request and noCorsResponse returns allowed, then return noCorsResponse. @@ -609,9 +625,7 @@ async function mainFetch (fetchParams, recursive = false) { request.responseTainting = 'cors' // 2. Return the result of running HTTP fetch given fetchParams. - return await httpFetch - .call(this, fetchParams) - .catch((err) => makeNetworkError(err)) + return await httpFetch(fetchParams) })() } @@ -699,7 +713,7 @@ async function mainFetch (fetchParams, recursive = false) { nullBodyStatus.includes(internalResponse.status)) ) { internalResponse.body = null - context.dump = true + fetchParams.controller.dump = true } // 20. If request’s integrity metadata is not the empty string, then: @@ -707,7 +721,7 @@ async function mainFetch (fetchParams, recursive = false) { // 1. Let processBodyError be this step: run fetch finale given fetchParams // and a network error. const processBodyError = (reason) => - fetchFinale.call(context, fetchParams, makeNetworkError(reason)) + fetchFinale(fetchParams, makeNetworkError(reason)) // 2. If request’s response tainting is "opaque", or response’s body is null, // then run processBodyError and abort these steps. @@ -730,7 +744,7 @@ async function mainFetch (fetchParams, recursive = false) { response.body = safelyExtractBody(bytes)[0] // 3. Run fetch finale given fetchParams and response. - fetchFinale.call(context, fetchParams, response) + fetchFinale(fetchParams, response) } // 4. Fully read response’s body given processBody and processBodyError. @@ -741,15 +755,13 @@ async function mainFetch (fetchParams, recursive = false) { } } else { // 21. Otherwise, run fetch finale given fetchParams and response. - fetchFinale.call(context, fetchParams, response) + fetchFinale(fetchParams, response) } } // https://fetch.spec.whatwg.org/#concept-scheme-fetch // given a fetch params fetchParams async function schemeFetch (fetchParams) { - const context = this - // let request be fetchParams’s request const { request } = fetchParams @@ -782,12 +794,10 @@ async function schemeFetch (fetchParams) { case 'blob:': { resolveObjectURL ??= require('buffer').resolveObjectURL - context.on('terminated', onRequestAborted) - // 1. Run these steps, but abort when the ongoing fetch is terminated: - // 1a. Let blob be request’s current URL’s blob URL entry’s object. - // https://w3c.github.io/FileAPI/#blob-url-entry - // P.S. Thank God this method is available in node. + // 1. Let blob be request’s current URL’s blob URL entry’s object. + // https://w3c.github.io/FileAPI/#blob-url-entry + // P.S. Thank God this method is available in node. const currentURL = requestCurrentURL(request) // https://github.com/web-platform-tests/wpt/blob/7b0ebaccc62b566a1965396e5be7bb2bc06f841f/FileAPI/url/resources/fetch-tests.js#L52-L56 @@ -798,42 +808,29 @@ async function schemeFetch (fetchParams) { const blob = resolveObjectURL(currentURL.toString()) - // 2a. If request’s method is not `GET` or blob is not a Blob object, then return a network error. [FILEAPI] + // 2. If request’s method is not `GET` or blob is not a Blob object, then return a network error. [FILEAPI] if (request.method !== 'GET' || !isBlobLike(blob)) { return makeNetworkError('invalid method') } - // 3a. Let response be a new response whose status message is `OK`. + // 3. Let response be a new response whose status message is `OK`. const response = makeResponse({ statusText: 'OK', urlList: [currentURL] }) - // 4a. Append (`Content-Length`, blob’s size attribute value) to response’s header list. + // 4. Append (`Content-Length`, blob’s size attribute value) to response’s header list. response.headersList.set('content-length', `${blob.size}`) - // 5a. Append (`Content-Type`, blob’s type attribute value) to response’s header list. + // 5. Append (`Content-Type`, blob’s type attribute value) to response’s header list. response.headersList.set('content-type', blob.type) - // 6a. Set response’s body to the result of performing the read operation on blob. + // 6. Set response’s body to the result of performing the read operation on blob. + // TODO (fix): This needs to read? response.body = extractBody(blob)[0] - // since the request has not been aborted, we can safely remove the listener. - context.off('terminated', onRequestAborted) - - // 7a. Return response. + // 7. Return response. return response - // 2. If aborted, then: - function onRequestAborted () { - // 1. Let aborted be the termination’s aborted flag. - const aborted = context.terminated.aborted - - // 2. If aborted is set, then return an aborted network error. - if (aborted) { - return makeNetworkError(new AbortError()) - } - - // 3. Return a network error. - return makeNetworkError(context.terminated.reason) - } + // 2. If aborted, then return the appropriate network error for fetchParams. + // TODO } case 'data:': { // 1. Let dataURLStruct be the result of running the @@ -876,7 +873,7 @@ async function schemeFetch (fetchParams) { headersList: [ 'content-type', contentType ], - body: dataURLStruct.body + body: extractBody(dataURLStruct.body)[0] }) } case 'file:': { @@ -888,8 +885,7 @@ async function schemeFetch (fetchParams) { case 'https:': { // Return the result of running HTTP fetch given fetchParams. - return await httpFetch - .call(this, fetchParams) + return await httpFetch(fetchParams) .catch((err) => makeNetworkError(err)) } default: { @@ -907,14 +903,12 @@ function finalizeResponse (fetchParams, response) { // task to run fetchParams’s process response done given response, with // fetchParams’s task destination. if (fetchParams.processResponseDone != null) { - fetchParams.processResponseDone(response) + queueMicrotask(() => fetchParams.processResponseDone(response)) } } // https://fetch.spec.whatwg.org/#fetch-finale -function fetchFinale (fetchParams, response) { - const context = this - +async function fetchFinale (fetchParams, response) { // 1. If response is a network error, then: if (response.type === 'error') { // 1. Set response’s URL list to « fetchParams’s request’s URL list[0] ». @@ -928,40 +922,79 @@ function fetchFinale (fetchParams, response) { } // 2. Let processResponseEndOfBody be the following steps: - // TODO + const processResponseEndOfBody = () => { + // 1. Set fetchParams’s request’s done flag. + fetchParams.request.done = true + + // If fetchParams’s process response end-of-body is not null, + // then queue a fetch task to run fetchParams’s process response + // end-of-body given response with fetchParams’s task destination. + if (fetchParams.processResponseEndOfBody != null) { + queueMicrotask(() => fetchParams.processResponseEndOfBody(response)) + } + } // 3. If fetchParams’s process response is non-null, then queue a fetch task // to run fetchParams’s process response given response, with fetchParams’s // task destination. if (fetchParams.processResponse != null) { - fetchParams.processResponse(response) + queueMicrotask(() => fetchParams.processResponse(response)) } - // 4. If fetchParams’s process response is non-null, then queue a fetch task - // to run fetchParams’s process response given response, with fetchParams’s - // task destination. - // TODO + // 4. If response’s body is null, then run processResponseEndOfBody. + if (response.body == null) { + processResponseEndOfBody() + } else { + // 5. Otherwise: - // 5. If response’s body is null, then run processResponseEndOfBody. - // TODO + // 1. Let transformStream be a new a TransformStream. - // 6. Otherwise: - // TODO + // 2. Let identityTransformAlgorithm be an algorithm which, given chunk, + // enqueues chunk in transformStream. + const identityTransformAlgorithm = (chunk, controller) => { + controller.enqueue(chunk) + } - // 7. If fetchParams’s process response consume body is non-null, then: - // TODO + // 3. Set up transformStream with transformAlgorithm set to identityTransformAlgorithm + // and flushAlgorithm set to processResponseEndOfBody. + const transformStream = new TransformStream({ + start () {}, + transform: identityTransformAlgorithm, + flush: processResponseEndOfBody + }) - // TODO: This is a workaround. Until the above has been implemented, i.e. - // we need to either fully consume the body or terminate the fetch. - if (response.type === 'error') { - context.terminate({ reason: response.error }) + // 4. Set response’s body to the result of piping response’s body through transformStream. + response.body = { stream: response.body.stream.pipeThrough(transformStream) } + } + + // 6. If fetchParams’s process response consume body is non-null, then: + if (fetchParams.processResponseConsumeBody != null) { + // 1. Let processBody given nullOrBytes be this step: run fetchParams’s + // process response consume body given response and nullOrBytes. + const processBody = (nullOrBytes) => fetchParams.processResponseConsumeBody(response, nullOrBytes) + + // 2. Let processBodyError be this step: run fetchParams’s process + // response consume body given response and failure. + const processBodyError = (failure) => fetchParams.processResponseConsumeBody(response, failure) + + // 3. If response’s body is null, then queue a fetch task to run processBody + // given null, with fetchParams’s task destination. + if (response.body == null) { + queueMicrotask(() => processBody(null)) + } else { + // 4. Otherwise, fully read response’s body given processBody, processBodyError, + // and fetchParams’s task destination. + try { + processBody(await response.body.stream.arrayBuffer()) + } catch (err) { + processBodyError(err) + } + } } } // https://fetch.spec.whatwg.org/#http-fetch async function httpFetch (fetchParams) { - const context = this - // 1. Let request be fetchParams’s request. const request = fetchParams.request @@ -992,10 +1025,7 @@ async function httpFetch (fetchParams) { // 3. Set response and actualResponse to the result of running // HTTP-network-or-cache fetch given fetchParams. - actualResponse = response = await httpNetworkOrCacheFetch.call( - this, - fetchParams - ) + actualResponse = response = await httpNetworkOrCacheFetch(fetchParams) // 4. If request’s response tainting is "cors" and a CORS check // for request and response returns failure, then return a network error. @@ -1035,7 +1065,7 @@ async function httpFetch (fetchParams) { // and the connection uses HTTP/2, then user agents may, and are even // encouraged to, transmit an RST_STREAM frame. // See, https://github.com/whatwg/fetch/issues/1288 - context.connection.destroy() + fetchParams.controller.connection.destroy() // 2. Switch on request’s redirect mode: if (request.redirect === 'error') { @@ -1051,7 +1081,7 @@ async function httpFetch (fetchParams) { } else if (request.redirect === 'follow') { // Set response to the result of running HTTP-redirect fetch given // fetchParams and response. - response = await httpRedirectFetch.call(this, fetchParams, response) + response = await httpRedirectFetch(fetchParams, response) } else { assert(false) } @@ -1190,7 +1220,7 @@ async function httpRedirectFetch (fetchParams, response) { setRequestReferrerPolicyOnRedirect(request, actualResponse) // 19. Return the result of running main fetch given fetchParams and true. - return mainFetch.call(this, fetchParams, true) + return mainFetch(fetchParams, true) } // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch @@ -1199,8 +1229,6 @@ async function httpNetworkOrCacheFetch ( isAuthenticationFetch = false, isNewConnectionFetch = false ) { - const context = this - // 1. Let request be fetchParams’s request. const request = fetchParams.request @@ -1279,13 +1307,16 @@ async function httpNetworkOrCacheFetch ( httpRequest.headersList.append('content-length', contentLengthHeaderValue) } - // 9. If contentLength is non-null and httpRequest’s keepalive is true, + // 9. If contentLengthHeaderValue is non-null, then append (`Content-Length`, + // contentLengthHeaderValue) to httpRequest’s header list. + + // 10. If contentLength is non-null and httpRequest’s keepalive is true, // then: if (contentLength != null && httpRequest.keepalive) { // NOTE: keepalive is a noop outside of browser context. } - // 10 .If httpRequest’s referrer is a URL, then append + // 11. If httpRequest’s referrer is a URL, then append // `Referer`/httpRequest’s referrer, serialized and isomorphic encoded, // to httpRequest’s header list. if (httpRequest.referrer instanceof URL) { @@ -1293,20 +1324,20 @@ async function httpNetworkOrCacheFetch ( httpRequest.headersList.append('referer', httpRequest.referrer.href) } - // 11. Append a request `Origin` header for httpRequest. + // 12. Append a request `Origin` header for httpRequest. appendRequestOriginHeader(httpRequest) - // 12. Append the Fetch metadata headers for httpRequest. [FETCH-METADATA] + // 13. Append the Fetch metadata headers for httpRequest. [FETCH-METADATA] appendFetchMetadata(httpRequest) - // 13. If httpRequest’s header list does not contain `User-Agent`, then + // 14. If httpRequest’s header list does not contain `User-Agent`, then // user agents should append `User-Agent`/default `User-Agent` value to // httpRequest’s header list. if (!httpRequest.headersList.has('user-agent')) { httpRequest.headersList.append('user-agent', 'undici') } - // 14. If httpRequest’s cache mode is "default" and httpRequest’s header + // 15. If httpRequest’s cache mode is "default" and httpRequest’s header // list contains `If-Modified-Since`, `If-None-Match`, // `If-Unmodified-Since`, `If-Match`, or `If-Range`, then set // httpRequest’s cache mode to "no-store". @@ -1321,7 +1352,7 @@ async function httpNetworkOrCacheFetch ( httpRequest.cache = 'no-store' } - // 15. If httpRequest’s cache mode is "no-cache", httpRequest’s prevent + // 16. If httpRequest’s cache mode is "no-cache", httpRequest’s prevent // no-cache cache-control header modification flag is unset, and // httpRequest’s header list does not contain `Cache-Control`, then append // `Cache-Control`/`max-age=0` to httpRequest’s header list. @@ -1333,7 +1364,7 @@ async function httpNetworkOrCacheFetch ( httpRequest.headersList.append('cache-control', 'max-age=0') } - // 16. If httpRequest’s cache mode is "no-store" or "reload", then: + // 17. If httpRequest’s cache mode is "no-store" or "reload", then: if (httpRequest.cache === 'no-store' || httpRequest.cache === 'reload') { // 1. If httpRequest’s header list does not contain `Pragma`, then append // `Pragma`/`no-cache` to httpRequest’s header list. @@ -1348,13 +1379,13 @@ async function httpNetworkOrCacheFetch ( } } - // 17. If httpRequest’s header list contains `Range`, then append + // 18. If httpRequest’s header list contains `Range`, then append // `Accept-Encoding`/`identity` to httpRequest’s header list. if (httpRequest.headersList.has('range')) { httpRequest.headersList.append('accept-encoding', 'identity') } - // 18. Modify httpRequest’s header list per HTTP. Do not append a given + // 19. Modify httpRequest’s header list per HTTP. Do not append a given // header if httpRequest’s header list contains that header’s name. // TODO: https://github.com/whatwg/fetch/issues/1285#issuecomment-896560129 if (!httpRequest.headersList.has('accept-encoding')) { @@ -1365,7 +1396,7 @@ async function httpNetworkOrCacheFetch ( } } - // 19. If includeCredentials is true, then: + // 20. If includeCredentials is true, then: if (includeCredentials) { // 1. If the user agent is not configured to block cookies for httpRequest // (see section 7 of [COOKIES]), then: @@ -1374,26 +1405,26 @@ async function httpNetworkOrCacheFetch ( // TODO: credentials } - // 20. If there’s a proxy-authentication entry, use it as appropriate. + // 21. If there’s a proxy-authentication entry, use it as appropriate. // TODO: proxy-authentication - // 21. Set httpCache to the result of determining the HTTP cache + // 22. Set httpCache to the result of determining the HTTP cache // partition, given httpRequest. // TODO: cache - // 22. If httpCache is null, then set httpRequest’s cache mode to + // 23. If httpCache is null, then set httpRequest’s cache mode to // "no-store". if (httpCache == null) { httpRequest.cache = 'no-store' } - // 23. If httpRequest’s cache mode is neither "no-store" nor "reload", + // 24. If httpRequest’s cache mode is neither "no-store" nor "reload", // then: if (httpRequest.mode !== 'no-store' && httpRequest.mode !== 'reload') { // TODO: cache } - // 9. If aborted, then: + // 9. If aborted, then return the appropriate network error for fetchParams. // TODO // 10. If response is null, then: @@ -1406,8 +1437,7 @@ async function httpNetworkOrCacheFetch ( // 2. Let forwardResponse be the result of running HTTP-network fetch // given httpFetchParams, includeCredentials, and isNewConnectionFetch. - const forwardResponse = await httpNetworkFetch.call( - this, + const forwardResponse = await httpNetworkFetch( httpFetchParams, includeCredentials, isNewConnectionFetch @@ -1451,12 +1481,15 @@ async function httpNetworkOrCacheFetch ( response.rangeRequested = true } - // 13. If response’s status is 401, httpRequest’s response tainting is not + // 13. Set response’s request-includes-credentials to includeCredentials. + response.requestIncludesCredentials = includeCredentials + + // 14. If response’s status is 401, httpRequest’s response tainting is not // "cors", includeCredentials is true, and request’s window is an environment // settings object, then: // TODO - // 14. If response’s status is 407, then: + // 15. If response’s status is 407, then: if (response.status === 407) { // 1. If request’s window is "no-window", then return a network error. if (request.window === 'no-window') { @@ -1465,18 +1498,9 @@ async function httpNetworkOrCacheFetch ( // 2. ??? - // 3. If the ongoing fetch is terminated, then: - if (context.terminated) { - // 1. Let aborted be the termination’s aborted flag. - const aborted = context.terminated.aborted - - // 2. If aborted is set, then return an aborted network error. - if (aborted) { - return makeNetworkError(new AbortError()) - } - - // 3. Return a network error. - return makeNetworkError(context.terminated.reason) + // 3. If fetchParams is canceled, then return the appropriate network error for fetchParams. + if (isCancelled(fetchParams)) { + return makeAppropriateNetworkError(fetchParams) } // 4. Prompt the end user as appropriate in request’s window and store @@ -1489,7 +1513,7 @@ async function httpNetworkOrCacheFetch ( return makeNetworkError('proxy authentication required') } - // 15. If all of the following are true + // 16. If all of the following are true if ( // response’s status is 421 response.status === 421 && @@ -1500,18 +1524,9 @@ async function httpNetworkOrCacheFetch ( ) { // then: - // 1. If the ongoing fetch is terminated, then: - if (context.terminated) { - // 1. Let aborted be the termination’s aborted flag. - const aborted = context.terminated.aborted - - // 2. If aborted is set, then return an aborted network error. - if (aborted) { - return makeNetworkError(new AbortError()) - } - - // 3. Return a network error. - return makeNetworkError(context.terminated.reason) + // 1. If fetchParams is canceled, then return the appropriate network error for fetchParams. + if (isCancelled(fetchParams)) { + return makeAppropriateNetworkError(fetchParams) } // 2. Set response to the result of running HTTP-network-or-cache @@ -1520,313 +1535,405 @@ async function httpNetworkOrCacheFetch ( // TODO (spec): The spec doesn't specify this but we need to cancel // the active response before we can start a new one. // https://github.com/whatwg/fetch/issues/1293 - context.connection.destroy() + fetchParams.controller.connection.destroy() - response = await httpNetworkOrCacheFetch.call( - this, + response = await httpNetworkOrCacheFetch( fetchParams, isAuthenticationFetch, true ) } - // 16. If isAuthenticationFetch is true, then create an authentication entry + // 17. If isAuthenticationFetch is true, then create an authentication entry if (isAuthenticationFetch) { // TODO } - // 17. Return response. + // 18. Return response. return response } // https://fetch.spec.whatwg.org/#http-network-fetch -function httpNetworkFetch ( +async function httpNetworkFetch ( fetchParams, includeCredentials = false, forceNewConnection = false ) { - const context = this - - return new Promise((resolve) => { - assert(!context.connection || context.connection.destroyed) - - context.connection = { - abort: null, - destroyed: false, - destroy (err) { - if (!this.destroyed) { - this.destroyed = true - this.abort?.(err ?? new AbortError()) - } + assert(!fetchParams.controller.connection || fetchParams.controller.connection.destroyed) + + fetchParams.controller.connection = { + abort: null, + destroyed: false, + destroy (err) { + if (!this.destroyed) { + this.destroyed = true + this.abort?.(err ?? new AbortError()) } } + } - // 1. Let request be fetchParams’s request. - const request = fetchParams.request + // 1. Let request be fetchParams’s request. + const request = fetchParams.request - // 2. Let response be null. - let response = null + // 2. Let response be null. + let response = null - // 3. Let timingInfo be fetchParams’s timing info. - const timingInfo = fetchParams.timingInfo + // 3. Let timingInfo be fetchParams’s timing info. + const timingInfo = fetchParams.timingInfo - // 4. Let httpCache be the result of determining the HTTP cache partition, - // given request. - // TODO: cache - const httpCache = null + // 4. Let httpCache be the result of determining the HTTP cache partition, + // given request. + // TODO: cache + const httpCache = null - // 5. If httpCache is null, then set request’s cache mode to "no-store". - if (httpCache == null) { - request.cache = 'no-store' - } + // 5. If httpCache is null, then set request’s cache mode to "no-store". + if (httpCache == null) { + request.cache = 'no-store' + } + + // 6. Let networkPartitionKey be the result of determining the network + // partition key given request. + // TODO - // 6. Let networkPartitionKey be the result of determining the network - // partition key given request. + // 7. Let newConnection be "yes" if forceNewConnection is true; otherwise + // "no". + const newConnection = forceNewConnection ? 'yes' : 'no' // eslint-disable-line no-unused-vars + + // 8. Switch on request’s mode: + if (request.mode === 'websocket') { + // Let connection be the result of obtaining a WebSocket connection, + // given request’s current URL. + // TODO + } else { + // Let connection be the result of obtaining a connection, given + // networkPartitionKey, request’s current URL’s origin, + // includeCredentials, and forceNewConnection. // TODO + } - // 7. Switch on request’s mode: - if (request.mode === 'websocket') { - // Let connection be the result of obtaining a WebSocket connection, - // given request’s current URL. - // TODO - } else { - // Let connection be the result of obtaining a connection, given - // networkPartitionKey, request’s current URL’s origin, - // includeCredentials, and forceNewConnection. - // TODO - } + // 9. Run these steps, but abort when the ongoing fetch is terminated: - // 8. Run these steps, but abort when the ongoing fetch is terminated: - // TODO: When do we cleanup this listener? - context.on('terminated', onRequestAborted) + // 1. If connection is failure, then return a network error. - // 5. Set response to the result of making an HTTP request over connection - // using request with the following caveats: + // 2. Set timingInfo’s final connection timing info to the result of + // calling clamp and coarsen connection timing info with connection’s + // timing info, timingInfo’s post-redirect start time, and fetchParams’s + // cross-origin isolated capability. - // Follow the relevant requirements from HTTP. [HTTP] [HTTP-SEMANTICS] - // [HTTP-COND] [HTTP-CACHING] [HTTP-AUTH] + // 3. If connection is not an HTTP/2 connection, request’s body is non-null, + // and request’s body’s source is null, then append (`Transfer-Encoding`, + // `chunked`) to request’s header list. - // If request’s body is non-null, and request’s body’s source is null, - // then the user agent may have a buffer of up to 64 kibibytes and store - // a part of request’s body in that buffer. If the user agent reads from - // request’s body beyond that buffer’s size and the user agent needs to - // resend request, then instead return a network error. - // TODO + // 4. Set timingInfo’s final network-request start time to the coarsened + // shared current time given fetchParams’s cross-origin isolated + // capability. - // Set timingInfo’s final network-response start time to the coarsened - // shared current time given fetchParams’s cross-origin isolated capability, - // immediately after the user agent’s HTTP parser receives the first byte - // of the response (e.g., frame header bytes for HTTP/2 or response status - // line for HTTP/1.x). - // TODO + // 5. Set response to the result of making an HTTP request over connection + // using request with the following caveats: - // Wait until all the headers are transmitted. + // - Follow the relevant requirements from HTTP. [HTTP] [HTTP-SEMANTICS] + // [HTTP-COND] [HTTP-CACHING] [HTTP-AUTH] - // Any responses whose status is in the range 100 to 199, inclusive, - // and is not 101, are to be ignored, except for the purposes of setting - // timingInfo’s final network-response start time above. + // - If request’s body is non-null, and request’s body’s source is null, + // then the user agent may have a buffer of up to 64 kibibytes and store + // a part of request’s body in that buffer. If the user agent reads from + // request’s body beyond that buffer’s size and the user agent needs to + // resend request, then instead return a network error. - // If request’s header list contains `Transfer-Encoding`/`chunked` and - // response is transferred via HTTP/1.0 or older, then return a network - // error. + // - Set timingInfo’s final network-response start time to the coarsened + // shared current time given fetchParams’s cross-origin isolated capability, + // immediately after the user agent’s HTTP parser receives the first byte + // of the response (e.g., frame header bytes for HTTP/2 or response status + // line for HTTP/1.x). - // If the HTTP request results in a TLS client certificate dialog, then: + // - Wait until all the headers are transmitted. - // 1. If request’s window is an environment settings object, make the - // dialog available in request’s window. + // - Any responses whose status is in the range 100 to 199, inclusive, + // and is not 101, are to be ignored, except for the purposes of setting + // timingInfo’s final network-response start time above. - // 2. Otherwise, return a network error. + // - If request’s header list contains `Transfer-Encoding`/`chunked` and + // response is transferred via HTTP/1.0 or older, then return a network + // error. - // To transmit request’s body body, run these steps: - const body = (async function * () { - try { - // 1. If body is null and fetchParams’s process request end-of-body is - // non-null, then queue a fetch task given fetchParams’s process request - // end-of-body and fetchParams’s task destination. - if (request.body === null) { - fetchParams.processEndOfBody?.() - return - } + // - If the HTTP request results in a TLS client certificate dialog, then: - // 2. Otherwise, if body is non-null: + // 1. If request’s window is an environment settings object, make the + // dialog available in request’s window. - // 1. Let processBodyChunk given bytes be these steps: - for await (const bytes of request.body.stream) { - // 1. If the ongoing fetch is terminated, then abort these steps. - if (context.terminated) { - return - } + // 2. Otherwise, return a network error. - // 2. Run this step in parallel: transmit bytes. - yield bytes + // To transmit request’s body body, run these steps: + let requestBody = null + // 1. If body is null and fetchParams’s process request end-of-body is + // non-null, then queue a fetch task given fetchParams’s process request + // end-of-body and fetchParams’s task destination. + if (request.body == null && fetchParams.processRequestEndOfBody) { + queueMicrotask(() => fetchParams.processRequestEndOfBody()) + } else if (request.body != null) { + // 2. Otherwise, if body is non-null: - // 3. If fetchParams’s process request body is non-null, then run - // fetchParams’s process request body given bytes’s length. - fetchParams.processRequestBody?.(bytes.byteLength) - } + // 1. Let processBodyChunk given bytes be these steps: + const processBodyChunk = async function * (bytes) { + // 1. If the ongoing fetch is terminated, then abort these steps. + if (isCancelled(fetchParams)) { + return + } - // 2. Let processEndOfBody be these steps: + // 2. Run this step in parallel: transmit bytes. + yield bytes - // 1. If the ongoing fetch is terminated, then abort these steps. - if (context.terminated) { - return - } + // 3. If fetchParams’s process request body is non-null, then run + // fetchParams’s process request body given bytes’s length. + fetchParams.processRequestBodyChunkLength?.(bytes.byteLength) + } - // 2. If fetchParams’s process request end-of-body is non-null, - // then run fetchParams’s process request end-of-body. - fetchParams.processRequestEndOfBody?.() - } catch (e) { - // 3. Let processBodyError given e be these steps: + // 2. Let processEndOfBody be these steps: + const processEndOfBody = () => { + // 1. If fetchParams is canceled, then abort these steps. + if (isCancelled(fetchParams)) { + return + } - // 1. If the ongoing fetch is terminated, then abort these steps. - if (context.terminated) { - return - } + // 2. If fetchParams’s process request end-of-body is non-null, + // then run fetchParams’s process request end-of-body. + if (fetchParams.processRequestEndOfBody) { + fetchParams.processRequestEndOfBody() + } + } - // 2. If e is an "AbortError" DOMException, then terminate the ongoing fetch with the aborted flag set. - // 3. Otherwise, terminate the ongoing fetch. - context.terminate({ - aborted: e.name === 'AbortError', - reason: e - }) + // 3. Let processBodyError given e be these steps: + const processBodyError = (e) => { + // 1. If fetchParams is canceled, then abort these steps. + if (isCancelled(fetchParams)) { + return + } + + // 2. If e is an "AbortError" DOMException, then abort fetchParams’s controller. + if (e.name === 'AbortError') { + fetchParams.controller.abort() + } else { + fetchParams.controller.terminate(e) + } + } + + // 4. Incrementally read request’s body given processBodyChunk, processEndOfBody, + // processBodyError, and fetchParams’s task destination. + requestBody = (async function * () { + try { + for await (const bytes of request.body.stream) { + yield * processBodyChunk(bytes) + } + processEndOfBody() + } catch (err) { + processBodyError(err) } })() + } - // 9. If aborted, then: - function onRequestAborted () { - // 1. Let aborted be the termination’s aborted flag. - const aborted = this.terminated.aborted + try { + const { body, status, statusText, headersList } = await dispatch({ body: requestBody }) - // 2. If connection uses HTTP/2, then transmit an RST_STREAM frame. - this.connection.destroy() + const iterator = body[Symbol.asyncIterator]() + fetchParams.controller.next = () => iterator.next() - // 3. If aborted is set, then return an aborted network error. - if (aborted) { - return resolve(makeNetworkError(new AbortError())) - } + response = makeResponse({ status, statusText, headersList }) + } catch (err) { + // 10. If aborted, then: + if (err.name === 'AbortError') { + // 1. If connection uses HTTP/2, then transmit an RST_STREAM frame. + fetchParams.controller.connection.destroy() - // 4. Return a network error. - return resolve(makeNetworkError(this.terminated.reason)) + // 2. Return the appropriate network error for fetchParams. + return makeAppropriateNetworkError(fetchParams) } - // 10. Let pullAlgorithm be an action that resumes the ongoing fetch - // if it is suspended. - let pullAlgorithm + return makeNetworkError(err) + } - // 11. Let cancelAlgorithm be an action that terminates the ongoing - // fetch with the aborted flag set. - const cancelAlgorithm = () => { - context.terminate({ aborted: true }) - } + // 11. Let pullAlgorithm be an action that resumes the ongoing fetch + // if it is suspended. + const pullAlgorithm = () => { + fetchParams.controller.resume() + } - // 12. Let highWaterMark be a non-negative, non-NaN number, chosen by - // the user agent. - const highWaterMark = 64 * 1024 // Same as nodejs fs streams. + // 12. Let cancelAlgorithm be an algorithm that aborts fetchParams’s + // controller. + const cancelAlgorithm = () => { + fetchParams.controller.abort() + } - // 13. Let sizeAlgorithm be an algorithm that accepts a chunk object - // and returns a non-negative, non-NaN, non-infinite number, chosen by the user agent. - // TODO + // 13. Let highWaterMark be a non-negative, non-NaN number, chosen by + // the user agent. + // TODO - // 14. Let stream be a new ReadableStream. - // 15. Set up stream with pullAlgorithm set to pullAlgorithm, - // cancelAlgorithm set to cancelAlgorithm, highWaterMark set to - // highWaterMark, and sizeAlgorithm set to sizeAlgorithm. - if (!ReadableStream) { - ReadableStream = require('stream/web').ReadableStream - } + // 14. Let sizeAlgorithm be an algorithm that accepts a chunk object + // and returns a non-negative, non-NaN, non-infinite number, chosen by the user agent. + // TODO - let pullResolve + // 15. Let stream be a new ReadableStream. + // 16. Set up stream with pullAlgorithm set to pullAlgorithm, + // cancelAlgorithm set to cancelAlgorithm, highWaterMark set to + // highWaterMark, and sizeAlgorithm set to sizeAlgorithm. + if (!ReadableStream) { + ReadableStream = require('stream/web').ReadableStream + } - const stream = new ReadableStream( - { - async start (controller) { - context.controller = controller - }, - async pull (controller) { - if (!pullAlgorithm) { - await new Promise((resolve) => { - pullResolve = resolve - }) - } - await pullAlgorithm(controller) - }, - async cancel (reason) { - await cancelAlgorithm(reason) - } + const stream = new ReadableStream( + { + async start (controller) { + fetchParams.controller.controller = controller }, - { highWaterMark } - ) + async pull (controller) { + await pullAlgorithm(controller) + }, + async cancel (reason) { + await cancelAlgorithm(reason) + } + }, + { highWaterMark: 0 } + ) - // 16. Run these steps, but abort when the ongoing fetch is terminated: - // TODO + // 17. Run these steps, but abort when the ongoing fetch is terminated: + + // 1. Set response’s body to a new body whose stream is stream. + response.body = { stream } + + // 2. If response is not a network error and request’s cache mode is + // not "no-store", then update response in httpCache for request. + // TODO - // 17. If aborted, then: - // TODO: How can this happen? The steps above are not async? - - // 18. Run these steps in parallel: - // 1. Run these steps, but abort when the ongoing fetch is terminated: - // 1. While true: - // 1. If one or more bytes have been transmitted from response’s - // message body, then: - // NOTE: See onHeaders - // 2. Otherwise, if the bytes transmission for response’s message - // body is done normally and stream is readable, then close stream, - // finalize response for fetchParams and response, and abort these - // in-parallel steps. - // NOTE: See onHeaders - - // 2. If aborted, then: - function onResponseAborted () { - // 1. Let aborted be the termination’s aborted flag. - const aborted = this.terminated.aborted - - // 2. If aborted is set, then: - if (aborted) { - // 1. Set response’s aborted flag. - response.aborted = true - - // 2. If stream is readable, error stream with an "AbortError" DOMException. - if (isReadable(stream)) { - this.controller.error(new AbortError()) + // 3. If includeCredentials is true and the user agent is not configured + // to block cookies for request (see section 7 of [COOKIES]), then run the + // "set-cookie-string" parsing algorithm (see section 5.2 of [COOKIES]) on + // the value of each header whose name is a byte-case-insensitive match for + // `Set-Cookie` in response’s header list, if any, and request’s current URL. + // TODO + + // 18. If aborted, then: + // TODO + + // 19. Run these steps in parallel: + + // 1. Run these steps, but abort when fetchParams is canceled: + fetchParams.controller.on('terminated', onAborted) + fetchParams.controller.resume = async () => { + // 1. While true + while (true) { + // 1-3. See onData... + + // 4. Set bytes to the result of handling content codings given + // codings and bytes. + let bytes + try { + const { done, value } = await fetchParams.controller.next() + bytes = done ? undefined : value + } catch (err) { + if (fetchParams.controller.ended && !timingInfo.encodedBodySize) { + // zlib doesn't like empty streams. + bytes = undefined + } else { + bytes = err } - } else { - // 3. Otherwise, if stream is readable, error stream with a TypeError. - if (isReadable(stream)) { - this.controller.error(new TypeError('terminated')) + } + + if (bytes === undefined) { + // 2. Otherwise, if the bytes transmission for response’s message + // body is done normally and stream is readable, then close + // stream, finalize response for fetchParams and response, and + // abort these in-parallel steps. + try { + fetchParams.controller.controller.close() + } catch (err) { + // TODO (fix): How/Why can this happen? Do we have a bug? + if (!/Controller is already closed/.test(err)) { + throw err + } } + + finalizeResponse(fetchParams, response) + + return } - // 4. If connection uses HTTP/2, then transmit an RST_STREAM frame. - // 5. Otherwise, the user agent should close connection unless it would be bad for performance to do so. - this.connection.destroy() + // 5. Increase timingInfo’s decoded body size by bytes’s length. + timingInfo.decodedBodySize += bytes?.byteLength ?? 0 + + // 6. If bytes is failure, then terminate fetchParams’s controller. + if (bytes instanceof Error) { + fetchParams.controller.terminate(bytes) + return + } + + // 7. Enqueue a Uint8Array wrapping an ArrayBuffer containing bytes + // into stream. + fetchParams.controller.controller.enqueue(new Uint8Array(bytes)) + + // 8. If stream is errored, then terminate the ongoing fetch. + if (isErrored(stream)) { + fetchParams.controller.terminate() + return + } + + // 9. If stream doesn’t need more data ask the user agent to suspend + // the ongoing fetch. + if (!fetchParams.controller.controller.desiredSize) { + return + } + } + } + + // 2. If aborted, then: + function onAborted (reason) { + // 2. If fetchParams is aborted, then: + if (isAborted(fetchParams)) { + // 1. Set response’s aborted flag. + response.aborted = true + + // 2. If stream is readable, error stream with an "AbortError" DOMException. + if (isReadable(stream)) { + fetchParams.controller.controller.error(new AbortError()) + } + } else { + // 3. Otherwise, if stream is readable, error stream with a TypeError. + if (isReadable(stream)) { + fetchParams.controller.controller.error(new TypeError('terminated', { + cause: reason instanceof Error ? reason : undefined + })) + } } - // 19. Return response. - // NOTE: See onHeaders + // 4. If connection uses HTTP/2, then transmit an RST_STREAM frame. + // 5. Otherwise, the user agent should close connection unless it would be bad for performance to do so. + fetchParams.controller.connection.destroy() + } + + // 20. Return response. + return response - // Implementation + async function dispatch ({ body }) { const url = requestCurrentURL(request) - context.dispatcher.dispatch( + return new Promise((resolve, reject) => fetchParams.controller.dispatcher.dispatch( { path: url.pathname + url.search, origin: url.origin, method: request.method, - body: context.dispatcher[kIsMockActive] ? request.body && request.body.source : body, + body: fetchParams.controller.dispatcher[kIsMockActive] ? request.body && request.body.source : body, headers: request.headersList, maxRedirections: 0 }, { - decoder: null, + body: null, abort: null, - context, onConnect (abort) { // TODO (fix): Do we need connection here? - const { connection } = this.context + const { connection } = fetchParams.controller if (connection.destroyed) { abort(new AbortError()) } else { + fetchParams.controller.on('terminated', abort) this.abort = connection.abort = abort } }, @@ -1836,29 +1943,21 @@ function httpNetworkFetch ( return } + let codings = [] + const headers = new Headers() for (let n = 0; n < headersList.length; n += 2) { - headers.append( - headersList[n + 0].toString(), - headersList[n + 1].toString() - ) - } + const key = headersList[n + 0].toString() + const val = headersList[n + 1].toString() - response = makeResponse({ - status, - statusText, - headersList: headers[kHeadersList], - body: { stream } - }) + if (key.toLowerCase() === 'content-encoding') { + codings = val.split(',').map((x) => x.trim()) + } - this.context.on('terminated', onResponseAborted) + headers.append(key, val) + } - const codings = - headers - .get('content-encoding') - ?.toLowerCase() - .split(',') - .map((x) => x.trim()) ?? [] + this.body = new Readable({ read: resume }) const decoders = [] @@ -1876,118 +1975,65 @@ function httpNetworkFetch ( } } - if (decoders.length > 1) { - pipeline(...decoders, () => {}) - } else if (decoders.length === 0) { - // TODO (perf): Avoid intermediate. - decoders.push(new PassThrough()) - } - - this.decoder = decoders[0].on('drain', resume) - - const iterator = decoders[decoders.length - 1][Symbol.asyncIterator]() - - pullAlgorithm = async (controller) => { - // 4. Set bytes to the result of handling content codings given - // codings and bytes. - let bytes - try { - const { done, value } = await iterator.next() - bytes = done ? undefined : value - } catch (err) { - if (this.decoder.writableEnded && !timingInfo.encodedBodySize) { - // zlib doesn't like empty streams. - bytes = undefined - } else { - bytes = err - } - } - - if (bytes === undefined) { - // 2. Otherwise, if the bytes transmission for response’s message - // body is done normally and stream is readable, then close - // stream, finalize response for fetchParams and response, and - // abort these in-parallel steps. - finalizeResponse(fetchParams, response) - - controller.close() - - return - } - - // 5. Increase timingInfo’s decoded body size by bytes’s length. - timingInfo.decodedBodySize += bytes?.byteLength ?? 0 - - // 6. If bytes is failure, then terminate the ongoing fetch. - if (bytes instanceof Error) { - this.context.terminate({ reason: bytes }) - return - } - - // 7. Enqueue a Uint8Array wrapping an ArrayBuffer containing bytes - // into stream. - controller.enqueue(new Uint8Array(bytes)) - - // 8. If stream is errored, then terminate the ongoing fetch. - if (isErrored(stream)) { - this.context.terminate() - return - } - - // 9. If stream doesn’t need more data ask the user agent to suspend - // the ongoing fetch. - return controller.desiredSize > 0 - } - - if (pullResolve) { - pullResolve() - pullResolve = null - } - - resolve(response) + resolve({ + status, + statusText, + headersList: headers[kHeadersList], + body: decoders.length + ? pipeline(this.body, ...decoders, () => {}) + : this.body.on('error', () => {}) + }) return true }, onData (chunk) { - if (this.context.dump) { + if (fetchParams.controller.dump) { return } - // 1. If one or more bytes have been transmitted from response’s - // message body, then: + // 1. If one or more bytes have been transmitted from response’s + // message body, then: - // 1. Let bytes be the transmitted bytes. + // 1. Let bytes be the transmitted bytes. const bytes = chunk - // 2. Let codings be the result of extracting header list values - // given `Content-Encoding` and response’s header list. - // See pullAlgorithm. + // 2. Let codings be the result of extracting header list values + // given `Content-Encoding` and response’s header list. + // See pullAlgorithm. - // 3. Increase timingInfo’s encoded body size by bytes’s length. + // 3. Increase timingInfo’s encoded body size by bytes’s length. timingInfo.encodedBodySize += bytes.byteLength - // 4. See pullAlgorithm... + // 4. See pullAlgorithm... - return this.decoder.write(bytes) + return this.body.push(bytes) }, onComplete () { - this.decoder.end() + if (this.abort) { + fetchParams.controller.off('terminated', this.abort) + } + + fetchParams.controller.ended = true + + this.body.push(null) }, onError (error) { - this.decoder?.destroy(error) + if (this.abort) { + fetchParams.controller.off('terminated', this.abort) + } - this.context.terminate({ reason: error }) + this.body?.destroy(error) - if (!response) { - resolve(makeNetworkError(error)) - } + fetchParams.controller.terminate(error) + + reject(makeNetworkError(error)) } } - ) - }) + )) + } } module.exports = fetch diff --git a/deps/undici/src/lib/fetch/request.js b/deps/undici/src/lib/fetch/request.js index adb5639229a95b..151dc8e44111f1 100644 --- a/deps/undici/src/lib/fetch/request.js +++ b/deps/undici/src/lib/fetch/request.js @@ -7,9 +7,9 @@ const { Headers, fill: fillHeaders, HeadersList } = require('./headers') const util = require('../core/util') const { isValidHTTPToken, - EnvironmentSettingsObject, sameOrigin, - toUSVString + toUSVString, + normalizeMethod } = require('./util') const { forbiddenMethods, @@ -81,9 +81,7 @@ class Request { try { parsedURL = new URL(input, baseUrl) } catch (err) { - const error = new TypeError('Failed to parse URL from ' + input) - error.cause = err - throw error + throw new TypeError('Failed to parse URL from ' + input, { cause: err }) } // 3. If parsedURL includes credentials, then throw a TypeError. @@ -121,7 +119,7 @@ class Request { // 9. If request’s window is an environment settings object and its origin // is same origin with origin, then set window to request’s window. if ( - request.window instanceof EnvironmentSettingsObject && + request.window?.constructor?.name === 'EnvironmentSettingsObject' && sameOrigin(request.window, origin) ) { window = request.window @@ -149,7 +147,7 @@ class Request { // unsafe-request flag Set. unsafeRequest: request.unsafeRequest, // client This’s relevant settings object. - client: request.client, + client: this[kRealm].settingsObject, // window window. window, // priority request’s priority. @@ -179,8 +177,7 @@ class Request { // history-navigation flag request’s history-navigation flag. historyNavigation: request.historyNavigation, // URL list A clone of request’s URL list. - // undici implementation note: urlList is cloned in makeRequest - urlList: request.urlList + urlList: [...request.urlList] }) // 13. If init is not empty, then: @@ -228,11 +225,7 @@ class Request { try { parsedReferrer = new URL(referrer, baseUrl) } catch (err) { - const error = new TypeError( - `Referrer "${referrer}" is not a valid URL.` - ) - error.cause = err - throw error + throw new TypeError(`Referrer "${referrer}" is not a valid URL.`, { cause: err }) } // 3. If one of the following is true @@ -346,8 +339,7 @@ class Request { } // 3. Normalize method. - // https://fetch.spec.whatwg.org/#concept-method-normalize - method = init.method.toUpperCase() + method = normalizeMethod(init.method) // 4. Set request’s method to method. request.method = method @@ -722,7 +714,7 @@ class Request { } // 1. If this is unusable, then throw a TypeError. - if (this.bodyUsed || (this.body && this.body.locked)) { + if (this.bodyUsed || this.body?.locked) { throw new TypeError('unusable') } diff --git a/deps/undici/src/lib/fetch/response.js b/deps/undici/src/lib/fetch/response.js index 4449d364005cee..64fc3170dac413 100644 --- a/deps/undici/src/lib/fetch/response.js +++ b/deps/undici/src/lib/fetch/response.js @@ -1,14 +1,16 @@ 'use strict' const { Headers, HeadersList, fill } = require('./headers') +const { AbortError } = require('../core/errors') const { extractBody, cloneBody, mixinBody } = require('./body') const util = require('../core/util') const { kEnumerableProperty } = util -const { responseURL, isValidReasonPhrase, toUSVString } = require('./util') +const { responseURL, isValidReasonPhrase, toUSVString, isCancelled, isAborted } = require('./util') const { redirectStatus, nullBodyStatus, - forbiddenResponseHeaderNames + forbiddenResponseHeaderNames, + corsSafeListedResponseHeaderNames } = require('./constants') const { kState, kHeaders, kGuard, kRealm } = require('./symbols') const { kHeadersList } = require('../core/symbols') @@ -337,10 +339,10 @@ function cloneResponse (response) { function makeResponse (init) { return { - internalResponse: null, aborted: false, rangeRequested: false, timingAllowPassed: false, + requestIncludesCredentials: false, type: 'default', status: 200, timingInfo: null, @@ -361,11 +363,55 @@ function makeNetworkError (reason) { error: reason instanceof Error ? reason - : new Error(reason ? String(reason) : reason), + : new Error(reason ? String(reason) : reason, { + cause: reason instanceof Error ? reason : undefined + }), aborted: reason && reason.name === 'AbortError' }) } +function makeFilteredResponse (response, state) { + state = { + internalResponse: response, + ...state + } + + return new Proxy(response, { + get (target, p) { + return p in state ? state[p] : target[p] + }, + set (target, p, value) { + assert(!(p in state)) + target[p] = value + return true + } + }) +} + +function makeFilteredHeadersList (headersList, filter) { + return new Proxy(headersList, { + get (target, prop) { + // Override methods used by Headers class. + if (prop === 'get' || prop === 'has') { + return (name) => filter(name) ? target[prop](name) : undefined + } else if (prop === 'slice') { + return (...args) => { + assert(args.length === 0) + const arr = [] + for (let index = 0; index < target.length; index += 2) { + if (filter(target[index])) { + arr.push(target[index], target[index + 1]) + } + } + return arr + } + } else { + return target[prop] + } + } + }) +} + // https://fetch.spec.whatwg.org/#concept-filtered-response function filterResponse (response, type) { // Set response to the following filtered response with response as its @@ -375,18 +421,9 @@ function filterResponse (response, type) { // and header list excludes any headers in internal response’s header list // whose name is a forbidden response-header name. - const headers = [] - for (let n = 0; n < response.headersList.length; n += 2) { - if (!forbiddenResponseHeaderNames.includes(response.headersList[n])) { - headers.push(response.headersList[n + 0], response.headersList[n + 1]) - } - } - - return makeResponse({ - ...response, - internalResponse: response, - headersList: new HeadersList(...headers), - type: 'basic' + return makeFilteredResponse(response, { + type: 'basic', + headersList: makeFilteredHeadersList(response.headersList, (name) => !forbiddenResponseHeaderNames.includes(name)) }) } else if (type === 'cors') { // A CORS filtered response is a filtered response whose type is "cors" @@ -394,22 +431,18 @@ function filterResponse (response, type) { // list whose name is not a CORS-safelisted response-header name, given // internal response’s CORS-exposed header-name list. - // TODO: This is not correct... - return makeResponse({ - ...response, - internalResponse: response, - type: 'cors' + return makeFilteredResponse(response, { + type: 'cors', + headersList: makeFilteredHeadersList(response.headersList, (name) => !corsSafeListedResponseHeaderNames.includes(name)) }) } else if (type === 'opaque') { // An opaque filtered response is a filtered response whose type is // "opaque", URL list is the empty list, status is 0, status message // is the empty byte sequence, header list is empty, and body is null. - return makeResponse({ - ...response, - internalResponse: response, + return makeFilteredResponse(response, { type: 'opaque', - urlList: [], + urlList: Object.freeze([]), status: 0, statusText: '', body: null @@ -419,13 +452,11 @@ function filterResponse (response, type) { // is "opaqueredirect", status is 0, status message is the empty byte // sequence, header list is empty, and body is null. - return makeResponse({ - ...response, - internalResponse: response, + return makeFilteredResponse(response, { type: 'opaqueredirect', status: 0, statusText: '', - headersList: new HeadersList(), + headersList: makeFilteredHeadersList(response.headersList, () => false), body: null }) } else { @@ -433,4 +464,22 @@ function filterResponse (response, type) { } } -module.exports = { makeNetworkError, makeResponse, filterResponse, Response } +// https://fetch.spec.whatwg.org/#appropriate-network-error +function makeAppropriateNetworkError (fetchParams) { + // 1. Assert: fetchParams is canceled. + assert(isCancelled(fetchParams)) + + // 2. Return an aborted network error if fetchParams is aborted; + // otherwise return a network error. + return isAborted(fetchParams) + ? makeNetworkError(new AbortError()) + : makeNetworkError(fetchParams.controller.terminated.reason) +} + +module.exports = { + makeNetworkError, + makeResponse, + makeAppropriateNetworkError, + filterResponse, + Response +} diff --git a/deps/undici/src/lib/fetch/util.js b/deps/undici/src/lib/fetch/util.js index 4e6e79838f36c2..eea3905586d90b 100644 --- a/deps/undici/src/lib/fetch/util.js +++ b/deps/undici/src/lib/fetch/util.js @@ -195,7 +195,7 @@ function appendFetchMetadata (httpRequest) { header = httpRequest.mode // 4. Set a structured field value `Sec-Fetch-Mode`/header in r’s header list. - httpRequest.headersList.append('sec-fetch-mode', header) + httpRequest.headersList.set('sec-fetch-mode', header) // https://w3c.github.io/webappsec-fetch-metadata/#sec-fetch-site-header // TODO @@ -333,14 +333,25 @@ function createDeferredPromise () { return { promise, resolve: res, reject: rej } } -class ServiceWorkerGlobalScope {} // dummy -class Window {} // dummy -class EnvironmentSettingsObject {} // dummy +function isAborted (fetchParams) { + return fetchParams.controller.state === 'aborted' +} + +function isCancelled (fetchParams) { + return fetchParams.controller.state === 'aborted' || + fetchParams.controller.state === 'terminated' +} + +// https://fetch.spec.whatwg.org/#concept-method-normalize +function normalizeMethod (method) { + return /^(DELETE|GET|HEAD|OPTIONS|POST|PUT)$/i.test(method) + ? method.toUpperCase() + : method +} module.exports = { - ServiceWorkerGlobalScope, - Window, - EnvironmentSettingsObject, + isAborted, + isCancelled, createDeferredPromise, ReadableStreamFrom, toUSVString, @@ -366,5 +377,6 @@ module.exports = { isFileLike, isValidReasonPhrase, sameOrigin, - CORBCheck + CORBCheck, + normalizeMethod } diff --git a/deps/undici/src/lib/mock/mock-interceptor.js b/deps/undici/src/lib/mock/mock-interceptor.js index a10c71debb5f49..699bec41287e41 100644 --- a/deps/undici/src/lib/mock/mock-interceptor.js +++ b/deps/undici/src/lib/mock/mock-interceptor.js @@ -64,7 +64,7 @@ class MockInterceptor { throw new InvalidArgumentError('opts.path must be defined') } if (typeof opts.method === 'undefined') { - throw new InvalidArgumentError('opts.method must be defined') + opts.method = 'GET' } // See https://github.com/nodejs/undici/issues/1245 // As per RFC 3986, clients are not supposed to send URI diff --git a/deps/undici/src/lib/mock/mock-utils.js b/deps/undici/src/lib/mock/mock-utils.js index fc47bcd23a53dd..8bd4df51a09436 100644 --- a/deps/undici/src/lib/mock/mock-utils.js +++ b/deps/undici/src/lib/mock/mock-utils.js @@ -33,6 +33,14 @@ function lowerCaseEntries (headers) { function matchHeaders (mockDispatch, headers) { if (typeof mockDispatch.headers === 'function') { + if (Array.isArray(headers)) { // fetch HeadersList + const clone = headers.slice() + const entries = [] + for (let index = 0; index < clone.length; index += 2) { + entries.push([clone[index], clone[index + 1]]) + } + headers = Object.fromEntries(entries) + } return mockDispatch.headers(headers ? lowerCaseEntries(headers) : {}) } if (typeof mockDispatch.headers === 'undefined') { diff --git a/deps/undici/src/lib/pool-base.js b/deps/undici/src/lib/pool-base.js index 274280f835d241..2a909eee08312e 100644 --- a/deps/undici/src/lib/pool-base.js +++ b/deps/undici/src/lib/pool-base.js @@ -1,20 +1,13 @@ 'use strict' -const Dispatcher = require('./dispatcher') -const { - ClientDestroyedError, - ClientClosedError, - InvalidArgumentError -} = require('./core/errors') +const DispatcherBase = require('./dispatcher-base') const FixedQueue = require('./node/fixed-queue') -const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl } = require('./core/symbols') +const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols') const PoolStats = require('./pool-stats') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') const kQueue = Symbol('queue') -const kDestroyed = Symbol('destroyed') -const kClosedPromise = Symbol('closed promise') const kClosedResolve = Symbol('closed resolve') const kOnDrain = Symbol('onDrain') const kOnConnect = Symbol('onConnect') @@ -25,16 +18,12 @@ const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') -class PoolBase extends Dispatcher { +class PoolBase extends DispatcherBase { constructor () { super() this[kQueue] = new FixedQueue() - this[kClosedPromise] = null - this[kClosedResolve] = null - this[kDestroyed] = false this[kClients] = [] - this[kNeedDrain] = false this[kQueued] = 0 const pool = this @@ -122,59 +111,17 @@ class PoolBase extends Dispatcher { return this[kStats] } - get destroyed () { - return this[kDestroyed] - } - - get closed () { - return this[kClosedPromise] != null - } - - close (cb) { - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (!this[kClosedPromise]) { - if (this[kQueue].isEmpty()) { - this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close())) - } else { - this[kClosedPromise] = new Promise((resolve) => { - this[kClosedResolve] = resolve - }) - } - this[kClosedPromise] = this[kClosedPromise].then(() => { - this[kDestroyed] = true - }) - } - - if (cb) { - this[kClosedPromise].then(() => cb(null, null)) - } else { - return this[kClosedPromise] - } - } catch (err) { - if (cb) { - cb(err) - } else { - return Promise.reject(err) - } + async [kClose] () { + if (this[kQueue].isEmpty()) { + return Promise.all(this[kClients].map(c => c.close())) + } else { + return new Promise((resolve) => { + this[kClosedResolve] = resolve + }) } } - destroy (err, cb) { - this[kDestroyed] = true - - if (typeof err === 'function') { - cb = err - err = null - } - - if (!err) { - err = new ClientDestroyedError() - } - + async [kDestroy] (err) { while (true) { const item = this[kQueue].shift() if (!item) { @@ -183,44 +130,19 @@ class PoolBase extends Dispatcher { item.handler.onError(err) } - const promise = Promise.all(this[kClients].map(c => c.destroy(err))) - if (cb) { - promise.then(() => cb(null, null)) - } else { - return promise - } + return Promise.all(this[kClients].map(c => c.destroy(err))) } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') - } - - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosedPromise]) { - throw new ClientClosedError() - } - - const dispatcher = this[kGetDispatcher]() - - if (!dispatcher) { - this[kNeedDrain] = true - this[kQueue].push({ opts, handler }) - this[kQueued]++ - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true - this[kNeedDrain] = !this[kGetDispatcher]() - } - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } + [kDispatch] (opts, handler) { + const dispatcher = this[kGetDispatcher]() - handler.onError(err) + if (!dispatcher) { + this[kNeedDrain] = true + this[kQueue].push({ opts, handler }) + this[kQueued]++ + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = !this[kGetDispatcher]() } return !this[kNeedDrain] diff --git a/deps/undici/src/lib/proxy-agent.js b/deps/undici/src/lib/proxy-agent.js index b0dec86ac00657..ee674df646fe65 100644 --- a/deps/undici/src/lib/proxy-agent.js +++ b/deps/undici/src/lib/proxy-agent.js @@ -1,14 +1,14 @@ 'use strict' -const { kProxy } = require('./core/symbols') +const { kProxy, kClose, kDestroy } = require('./core/symbols') const { URL } = require('url') const Agent = require('./agent') -const Dispatcher = require('./dispatcher') +const DispatcherBase = require('./dispatcher-base') const { InvalidArgumentError } = require('./core/errors') const kAgent = Symbol('proxy agent') -class ProxyAgent extends Dispatcher { +class ProxyAgent extends DispatcherBase { constructor (opts) { super(opts) this[kProxy] = buildProxyOptions(opts) @@ -31,9 +31,13 @@ class ProxyAgent extends Dispatcher { ) } - async close () { + async [kClose] () { await this[kAgent].close() } + + async [kDestroy] () { + await this[kAgent].destroy() + } } function buildProxyOptions (opts) { diff --git a/deps/undici/src/package.json b/deps/undici/src/package.json index f0fd80ce7ca139..aee29e463873fd 100644 --- a/deps/undici/src/package.json +++ b/deps/undici/src/package.json @@ -1,6 +1,6 @@ { "name": "undici", - "version": "4.16.0", + "version": "5.0.0", "description": "An HTTP/1.1 client, written from scratch for Node.js", "homepage": "https://undici.nodejs.org", "bugs": { @@ -54,10 +54,10 @@ "test:typescript": "tsd", "coverage": "nyc --reporter=text --reporter=html npm run test", "coverage:ci": "nyc --reporter=lcov npm run test", - "bench": "concurrently -k -s first npm:bench:server npm:bench:run", + "bench": "PORT=3042 concurrently -k -s first npm:bench:server npm:bench:run", "bench:server": "node benchmarks/server.js", "prebench:run": "node benchmarks/wait.js", - "bench:run": "CONNECTIONS=1 node --experimental-wasm-simd benchmarks/benchmark.js && CONNECTIONS=50 node --experimental-wasm-simd benchmarks/benchmark.js", + "bench:run": "CONNECTIONS=1 node --experimental-wasm-simd benchmarks/benchmark.js; CONNECTIONS=50 node --experimental-wasm-simd benchmarks/benchmark.js", "serve:website": "docsify serve .", "prepare": "husky install", "fuzz": "jsfuzz test/fuzzing/fuzz.js corpus" diff --git a/deps/undici/src/types/mock-interceptor.d.ts b/deps/undici/src/types/mock-interceptor.d.ts index 0166b1f1db3a1a..2e4272176adb2d 100644 --- a/deps/undici/src/types/mock-interceptor.d.ts +++ b/deps/undici/src/types/mock-interceptor.d.ts @@ -44,8 +44,8 @@ declare namespace MockInterceptor { export interface Options { /** Path to intercept on. */ path: string | RegExp | ((path: string) => boolean); - /** Method to intercept on. */ - method: string | RegExp | ((method: string) => boolean); + /** Method to intercept on. Defaults to GET. */ + method?: string | RegExp | ((method: string) => boolean); /** Body to intercept on. */ body?: string | RegExp | ((body: string) => boolean); /** Headers to intercept on. */ diff --git a/deps/undici/undici.js b/deps/undici/undici.js index 4c52acb9d76987..f955b9dc320038 100644 --- a/deps/undici/undici.js +++ b/deps/undici/undici.js @@ -8,6 +8,9 @@ var __commonJS = (cb, mod) => function __require() { var require_symbols = __commonJS({ "lib/core/symbols.js"(exports2, module2) { module2.exports = { + kClose: Symbol("close"), + kDestroy: Symbol("destroy"), + kDispatch: Symbol("dispatch"), kUrl: Symbol("url"), kWriting: Symbol("writing"), kResuming: Symbol("resuming"), @@ -753,6 +756,138 @@ var require_dispatcher = __commonJS({ } }); +// lib/dispatcher-base.js +var require_dispatcher_base = __commonJS({ + "lib/dispatcher-base.js"(exports2, module2) { + "use strict"; + var Dispatcher2 = require_dispatcher(); + var { + ClientDestroyedError, + ClientClosedError, + InvalidArgumentError: InvalidArgumentError2 + } = require_errors(); + var { kDestroy, kClose, kDispatch } = require_symbols(); + var kDestroyed = Symbol("destroyed"); + var kClosed = Symbol("closed"); + var kOnDestroyed = Symbol("onDestroyed"); + var kOnClosed = Symbol("onClosed"); + var DispatcherBase = class extends Dispatcher2 { + constructor() { + super(); + this[kDestroyed] = false; + this[kOnDestroyed] = []; + this[kClosed] = false; + this[kOnClosed] = []; + } + get destroyed() { + return this[kDestroyed]; + } + get closed() { + return this[kClosed]; + } + close(callback) { + if (callback === void 0) { + return new Promise((resolve, reject) => { + this.close((err, data) => { + return err ? reject(err) : resolve(data); + }); + }); + } + if (typeof callback !== "function") { + throw new InvalidArgumentError2("invalid callback"); + } + if (this[kDestroyed]) { + queueMicrotask(() => callback(new ClientDestroyedError(), null)); + return; + } + if (this[kClosed]) { + if (this[kOnClosed]) { + this[kOnClosed].push(callback); + } else { + queueMicrotask(() => callback(null, null)); + } + return; + } + this[kClosed] = true; + this[kOnClosed].push(callback); + const onClosed = () => { + const callbacks = this[kOnClosed]; + this[kOnClosed] = null; + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null); + } + }; + this[kClose]().then(() => this.destroy()).then(() => { + queueMicrotask(onClosed); + }); + } + destroy(err, callback) { + if (typeof err === "function") { + callback = err; + err = null; + } + if (callback === void 0) { + return new Promise((resolve, reject) => { + this.destroy(err, (err2, data) => { + return err2 ? reject(err2) : resolve(data); + }); + }); + } + if (typeof callback !== "function") { + throw new InvalidArgumentError2("invalid callback"); + } + if (this[kDestroyed]) { + if (this[kOnDestroyed]) { + this[kOnDestroyed].push(callback); + } else { + queueMicrotask(() => callback(null, null)); + } + return; + } + if (!err) { + err = new ClientDestroyedError(); + } + this[kDestroyed] = true; + this[kOnDestroyed].push(callback); + const onDestroyed = () => { + const callbacks = this[kOnDestroyed]; + this[kOnDestroyed] = null; + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null); + } + }; + this[kDestroy](err).then(() => { + queueMicrotask(onDestroyed); + }); + } + dispatch(opts, handler) { + if (!handler || typeof handler !== "object") { + throw new InvalidArgumentError2("handler must be an object"); + } + try { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError2("opts must be an object."); + } + if (this[kDestroyed]) { + throw new ClientDestroyedError(); + } + if (this[kClosed]) { + throw new ClientClosedError(); + } + return this[kDispatch](opts, handler); + } catch (err) { + if (typeof handler.onError !== "function") { + throw new InvalidArgumentError2("invalid onError method"); + } + handler.onError(err); + return false; + } + } + }; + module2.exports = DispatcherBase; + } +}); + // lib/handler/redirect.js var require_redirect = __commonJS({ "lib/handler/redirect.js"(exports2, module2) { @@ -1336,7 +1471,7 @@ var require_client = __commonJS({ var net = require("net"); var util2 = require_util(); var Request = require_request(); - var Dispatcher2 = require_dispatcher(); + var DispatcherBase = require_dispatcher_base(); var RedirectHandler = require_redirect(); var { RequestContentLengthMismatchError, @@ -1346,8 +1481,6 @@ var require_client = __commonJS({ RequestAbortedError, HeadersTimeoutError, HeadersOverflowError, - ClientDestroyedError, - ClientClosedError, SocketError, InformationalError, BodyTimeoutError, @@ -1375,12 +1508,9 @@ var require_client = __commonJS({ kNoRef, kKeepAliveDefaultTimeout, kHostHeader, - kClosed, - kDestroyed, kPendingIdx, kRunningIdx, kError, - kOnDestroyed, kPipelining, kSocket, kKeepAliveTimeoutValue, @@ -1393,8 +1523,12 @@ var require_client = __commonJS({ kConnector, kMaxRedirections, kMaxRequests, - kCounter + kCounter, + kClose, + kDestroy, + kDispatch } = require_symbols(); + var kClosedResolve = Symbol("kClosedResolve"); var channels = {}; try { const diagnosticsChannel = require("diagnostics_channel"); @@ -1408,7 +1542,7 @@ var require_client = __commonJS({ channels.connectError = { hasSubscribers: false }; channels.connected = { hasSubscribers: false }; } - var Client2 = class extends Dispatcher2 { + var Client2 = class extends DispatcherBase { constructor(url, { maxHeaderSize, headersTimeout, @@ -1498,10 +1632,7 @@ var require_client = __commonJS({ this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 6e5 : keepAliveMaxTimeout; this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold; this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]; - this[kClosed] = false; - this[kDestroyed] = false; this[kServerName] = null; - this[kOnDestroyed] = []; this[kResuming] = 0; this[kNeedDrain] = 0; this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ""}\r @@ -1511,6 +1642,7 @@ var require_client = __commonJS({ this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength; this[kMaxRedirections] = maxRedirections; this[kMaxRequests] = maxRequestsPerClient; + this[kClosedResolve] = null; this[kQueue] = []; this[kRunningIdx] = 0; this[kPendingIdx] = 0; @@ -1522,12 +1654,6 @@ var require_client = __commonJS({ this[kPipelining] = value; resume(this, true); } - get destroyed() { - return this[kDestroyed]; - } - get closed() { - return this[kClosed]; - } get [kPending]() { return this[kQueue].length - this[kPendingIdx]; } @@ -1548,114 +1674,56 @@ var require_client = __commonJS({ connect(this); this.once("connect", cb); } - dispatch(opts, handler) { - if (!handler || typeof handler !== "object") { - throw new InvalidArgumentError2("handler must be an object"); + [kDispatch](opts, handler) { + const { maxRedirections = this[kMaxRedirections] } = opts; + if (maxRedirections) { + handler = new RedirectHandler(this, maxRedirections, opts, handler); + } + const origin = opts.origin || this[kUrl].origin; + const request = new Request(origin, opts, handler); + this[kQueue].push(request); + if (this[kResuming]) { + } else if (util2.bodyLength(request.body) == null && util2.isIterable(request.body)) { + this[kResuming] = 1; + process.nextTick(resume, this); + } else { + resume(this, true); } - try { - if (!opts || typeof opts !== "object") { - throw new InvalidArgumentError2("opts must be an object."); - } - if (this[kDestroyed]) { - throw new ClientDestroyedError(); - } - if (this[kClosed]) { - throw new ClientClosedError(); - } - const { maxRedirections = this[kMaxRedirections] } = opts; - if (maxRedirections) { - handler = new RedirectHandler(this, maxRedirections, opts, handler); - } - const origin = opts.origin || this[kUrl].origin; - const request = new Request(origin, opts, handler); - this[kQueue].push(request); - if (this[kResuming]) { - } else if (util2.bodyLength(request.body) == null && util2.isIterable(request.body)) { - this[kResuming] = 1; - process.nextTick(resume, this); - } else { - resume(this, true); - } - if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { - this[kNeedDrain] = 2; - } - } catch (err) { - if (typeof handler.onError !== "function") { - throw new InvalidArgumentError2("invalid onError method"); - } - handler.onError(err); + if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { + this[kNeedDrain] = 2; } return this[kNeedDrain] < 2; } - close(callback) { - if (callback === void 0) { - return new Promise((resolve, reject) => { - this.close((err, data) => { - return err ? reject(err) : resolve(data); - }); - }); - } - if (typeof callback !== "function") { - throw new InvalidArgumentError2("invalid callback"); - } - if (this[kDestroyed]) { - queueMicrotask(() => callback(new ClientDestroyedError(), null)); - return; - } - this[kClosed] = true; - if (!this[kSize]) { - this.destroy(callback); - } else { - this[kOnDestroyed].push(callback); - } - } - destroy(err, callback) { - if (typeof err === "function") { - callback = err; - err = null; - } - if (callback === void 0) { - return new Promise((resolve, reject) => { - this.destroy(err, (err2, data) => { - return err2 ? reject(err2) : resolve(data); - }); - }); - } - if (typeof callback !== "function") { - throw new InvalidArgumentError2("invalid callback"); - } - if (this[kDestroyed]) { - if (this[kOnDestroyed]) { - this[kOnDestroyed].push(callback); + async [kClose]() { + return new Promise((resolve) => { + if (!this[kSize]) { + this.destroy(resolve); } else { - queueMicrotask(() => callback(null, null)); + this[kClosedResolve] = resolve; } - return; - } - if (!err) { - err = new ClientDestroyedError(); - } - const requests = this[kQueue].splice(this[kPendingIdx]); - for (let i = 0; i < requests.length; i++) { - const request = requests[i]; - errorRequest(this, request, err); - } - this[kClosed] = true; - this[kDestroyed] = true; - this[kOnDestroyed].push(callback); - const onDestroyed = () => { - const callbacks = this[kOnDestroyed]; - this[kOnDestroyed] = null; - for (let i = 0; i < callbacks.length; i++) { - callbacks[i](null, null); + }); + } + async [kDestroy](err) { + return new Promise((resolve) => { + const requests = this[kQueue].splice(this[kPendingIdx]); + for (let i = 0; i < requests.length; i++) { + const request = requests[i]; + errorRequest(this, request, err); + } + const callback = () => { + if (this[kClosedResolve]) { + this[kClosedResolve](); + this[kClosedResolve] = null; + } + resolve(); + }; + if (!this[kSocket]) { + queueMicrotask(callback); + } else { + util2.destroy(this[kSocket].on("close", callback), err); } - }; - if (!this[kSocket]) { - queueMicrotask(onDestroyed); - } else { - util2.destroy(this[kSocket].on("close", onDestroyed), err); - } - resume(this); + resume(this); + }); } }; var constants = require_constants(); @@ -1839,7 +1907,6 @@ var require_client = __commonJS({ try { try { currentParser = this; - this.llhttp.llhttp_finish(this.ptr); } finally { currentParser = null; } @@ -2156,7 +2223,7 @@ var require_client = __commonJS({ this[kParser] = null; const err = this[kError] || new SocketError("closed", util2.getSocketInfo(this)); client[kSocket] = null; - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0); const requests = client[kQueue].splice(client[kRunningIdx]); for (let i = 0; i < requests.length; i++) { @@ -2291,13 +2358,13 @@ var require_client = __commonJS({ } function _resume(client, sync) { while (true) { - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0); return; } - if (client[kClosed] && !client[kSize]) { - client.destroy(util2.nop); - continue; + if (client.closed && !client[kSize]) { + client.destroy(); + return; } const socket = client[kSocket]; if (socket) { @@ -2807,20 +2874,13 @@ var require_pool_stats = __commonJS({ var require_pool_base = __commonJS({ "lib/pool-base.js"(exports2, module2) { "use strict"; - var Dispatcher2 = require_dispatcher(); - var { - ClientDestroyedError, - ClientClosedError, - InvalidArgumentError: InvalidArgumentError2 - } = require_errors(); + var DispatcherBase = require_dispatcher_base(); var FixedQueue = require_fixed_queue(); - var { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl } = require_symbols(); + var { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require_symbols(); var PoolStats = require_pool_stats(); var kClients = Symbol("clients"); var kNeedDrain = Symbol("needDrain"); var kQueue = Symbol("queue"); - var kDestroyed = Symbol("destroyed"); - var kClosedPromise = Symbol("closed promise"); var kClosedResolve = Symbol("closed resolve"); var kOnDrain = Symbol("onDrain"); var kOnConnect = Symbol("onConnect"); @@ -2830,15 +2890,11 @@ var require_pool_base = __commonJS({ var kAddClient = Symbol("add client"); var kRemoveClient = Symbol("remove client"); var kStats = Symbol("stats"); - var PoolBase = class extends Dispatcher2 { + var PoolBase = class extends DispatcherBase { constructor() { super(); this[kQueue] = new FixedQueue(); - this[kClosedPromise] = null; - this[kClosedResolve] = null; - this[kDestroyed] = false; this[kClients] = []; - this[kNeedDrain] = false; this[kQueued] = 0; const pool = this; this[kOnDrain] = function onDrain(origin, targets) { @@ -2905,51 +2961,16 @@ var require_pool_base = __commonJS({ get stats() { return this[kStats]; } - get destroyed() { - return this[kDestroyed]; - } - get closed() { - return this[kClosedPromise] != null; - } - close(cb) { - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError(); - } - if (!this[kClosedPromise]) { - if (this[kQueue].isEmpty()) { - this[kClosedPromise] = Promise.all(this[kClients].map((c) => c.close())); - } else { - this[kClosedPromise] = new Promise((resolve) => { - this[kClosedResolve] = resolve; - }); - } - this[kClosedPromise] = this[kClosedPromise].then(() => { - this[kDestroyed] = true; - }); - } - if (cb) { - this[kClosedPromise].then(() => cb(null, null)); - } else { - return this[kClosedPromise]; - } - } catch (err) { - if (cb) { - cb(err); - } else { - return Promise.reject(err); - } + async [kClose]() { + if (this[kQueue].isEmpty()) { + return Promise.all(this[kClients].map((c) => c.close())); + } else { + return new Promise((resolve) => { + this[kClosedResolve] = resolve; + }); } } - destroy(err, cb) { - this[kDestroyed] = true; - if (typeof err === "function") { - cb = err; - err = null; - } - if (!err) { - err = new ClientDestroyedError(); - } + async [kDestroy](err) { while (true) { const item = this[kQueue].shift(); if (!item) { @@ -2957,38 +2978,17 @@ var require_pool_base = __commonJS({ } item.handler.onError(err); } - const promise = Promise.all(this[kClients].map((c) => c.destroy(err))); - if (cb) { - promise.then(() => cb(null, null)); - } else { - return promise; - } + return Promise.all(this[kClients].map((c) => c.destroy(err))); } - dispatch(opts, handler) { - if (!handler || typeof handler !== "object") { - throw new InvalidArgumentError2("handler must be an object"); - } - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError(); - } - if (this[kClosedPromise]) { - throw new ClientClosedError(); - } - const dispatcher = this[kGetDispatcher](); - if (!dispatcher) { - this[kNeedDrain] = true; - this[kQueue].push({ opts, handler }); - this[kQueued]++; - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true; - this[kNeedDrain] = !this[kGetDispatcher](); - } - } catch (err) { - if (typeof handler.onError !== "function") { - throw new InvalidArgumentError2("invalid onError method"); - } - handler.onError(err); + [kDispatch](opts, handler) { + const dispatcher = this[kGetDispatcher](); + if (!dispatcher) { + this[kNeedDrain] = true; + this[kQueue].push({ opts, handler }); + this[kQueued]++; + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true; + this[kNeedDrain] = !this[kGetDispatcher](); } return !this[kNeedDrain]; } @@ -3213,20 +3213,14 @@ var require_dispatcher_weakref = __commonJS({ var require_agent = __commonJS({ "lib/agent.js"(exports2, module2) { "use strict"; - var { - ClientClosedError, - InvalidArgumentError: InvalidArgumentError2, - ClientDestroyedError - } = require_errors(); - var { kClients, kRunning } = require_symbols(); - var Dispatcher2 = require_dispatcher(); + var { InvalidArgumentError: InvalidArgumentError2 } = require_errors(); + var { kClients, kRunning, kClose, kDestroy, kDispatch } = require_symbols(); + var DispatcherBase = require_dispatcher_base(); var Pool2 = require_pool(); var Client2 = require_client(); var util2 = require_util(); var RedirectHandler = require_redirect(); var { WeakRef, FinalizationRegistry: FinalizationRegistry2 } = require_dispatcher_weakref()(); - var kDestroyed = Symbol("destroyed"); - var kClosed = Symbol("closed"); var kOnConnect = Symbol("onConnect"); var kOnDisconnect = Symbol("onDisconnect"); var kOnConnectionError = Symbol("onConnectionError"); @@ -3238,7 +3232,7 @@ var require_agent = __commonJS({ function defaultFactory(origin, opts) { return opts && opts.connections === 1 ? new Client2(origin, opts) : new Pool2(origin, opts); } - var Agent2 = class extends Dispatcher2 { + var Agent2 = class extends DispatcherBase { constructor({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) { super(); if (typeof factory !== "function") { @@ -3263,8 +3257,6 @@ var require_agent = __commonJS({ this[kClients].delete(key); } }); - this[kClosed] = false; - this[kDestroyed] = false; const agent = this; this[kOnDrain] = (origin, targets) => { agent.emit("drain", origin, [agent, ...targets]); @@ -3289,57 +3281,28 @@ var require_agent = __commonJS({ } return ret; } - dispatch(opts, handler) { - if (!handler || typeof handler !== "object") { - throw new InvalidArgumentError2("handler must be an object."); + [kDispatch](opts, handler) { + let key; + if (opts.origin && (typeof opts.origin === "string" || opts.origin instanceof URL)) { + key = String(opts.origin); + } else { + throw new InvalidArgumentError2("opts.origin must be a non-empty string or URL."); } - try { - if (!opts || typeof opts !== "object") { - throw new InvalidArgumentError2("opts must be an object."); - } - let key; - if (opts.origin && (typeof opts.origin === "string" || opts.origin instanceof URL)) { - key = String(opts.origin); - } else { - throw new InvalidArgumentError2("opts.origin must be a non-empty string or URL."); - } - if (this[kDestroyed]) { - throw new ClientDestroyedError(); - } - if (this[kClosed]) { - throw new ClientClosedError(); - } - const ref = this[kClients].get(key); - let dispatcher = ref ? ref.deref() : null; - if (!dispatcher) { - dispatcher = this[kFactory](opts.origin, this[kOptions]).on("drain", this[kOnDrain]).on("connect", this[kOnConnect]).on("disconnect", this[kOnDisconnect]).on("connectionError", this[kOnConnectionError]); - this[kClients].set(key, new WeakRef(dispatcher)); - this[kFinalizer].register(dispatcher, key); - } - const { maxRedirections = this[kMaxRedirections] } = opts; - if (maxRedirections != null && maxRedirections !== 0) { - opts = { ...opts, maxRedirections: 0 }; - handler = new RedirectHandler(this, maxRedirections, opts, handler); - } - return dispatcher.dispatch(opts, handler); - } catch (err) { - if (typeof handler.onError !== "function") { - throw new InvalidArgumentError2("invalid onError method"); - } - handler.onError(err); + const ref = this[kClients].get(key); + let dispatcher = ref ? ref.deref() : null; + if (!dispatcher) { + dispatcher = this[kFactory](opts.origin, this[kOptions]).on("drain", this[kOnDrain]).on("connect", this[kOnConnect]).on("disconnect", this[kOnDisconnect]).on("connectionError", this[kOnConnectionError]); + this[kClients].set(key, new WeakRef(dispatcher)); + this[kFinalizer].register(dispatcher, key); } - } - get closed() { - return this[kClosed]; - } - get destroyed() { - return this[kDestroyed]; - } - close(callback) { - if (callback != null && typeof callback !== "function") { - throw new InvalidArgumentError2("callback must be a function"); + const { maxRedirections = this[kMaxRedirections] } = opts; + if (maxRedirections != null && maxRedirections !== 0) { + opts = { ...opts, maxRedirections: 0 }; + handler = new RedirectHandler(this, maxRedirections, opts, handler); } - this[kClosed] = true; + return dispatcher.dispatch(opts, handler); + } + async [kClose]() { const closePromises = []; for (const ref of this[kClients].values()) { const client = ref.deref(); @@ -3347,21 +3310,9 @@ var require_agent = __commonJS({ closePromises.push(client.close()); } } - if (!callback) { - return Promise.all(closePromises); - } - Promise.all(closePromises).then(() => process.nextTick(callback)); + await Promise.all(closePromises); } - destroy(err, callback) { - if (typeof err === "function") { - callback = err; - err = null; - } - if (callback != null && typeof callback !== "function") { - throw new InvalidArgumentError2("callback must be a function"); - } - this[kClosed] = true; - this[kDestroyed] = true; + async [kDestroy](err) { const destroyPromises = []; for (const ref of this[kClients].values()) { const client = ref.deref(); @@ -3369,10 +3320,7 @@ var require_agent = __commonJS({ destroyPromises.push(client.destroy(err)); } } - if (!callback) { - return Promise.all(destroyPromises); - } - Promise.all(destroyPromises).then(() => process.nextTick(callback)); + await Promise.all(destroyPromises); } }; module2.exports = Agent2; @@ -4400,6 +4348,14 @@ var require_mock_utils = __commonJS({ } function matchHeaders(mockDispatch2, headers) { if (typeof mockDispatch2.headers === "function") { + if (Array.isArray(headers)) { + const clone = headers.slice(); + const entries = []; + for (let index = 0; index < clone.length; index += 2) { + entries.push([clone[index], clone[index + 1]]); + } + headers = Object.fromEntries(entries); + } return mockDispatch2.headers(headers ? lowerCaseEntries(headers) : {}); } if (typeof mockDispatch2.headers === "undefined") { @@ -4633,7 +4589,7 @@ var require_mock_interceptor = __commonJS({ throw new InvalidArgumentError2("opts.path must be defined"); } if (typeof opts.method === "undefined") { - throw new InvalidArgumentError2("opts.method must be defined"); + opts.method = "GET"; } if (typeof opts.path === "string") { const parsedURL = new URL(opts.path, "data://"); @@ -4936,13 +4892,13 @@ var require_mock_agent = __commonJS({ var require_proxy_agent = __commonJS({ "lib/proxy-agent.js"(exports2, module2) { "use strict"; - var { kProxy } = require_symbols(); + var { kProxy, kClose, kDestroy } = require_symbols(); var { URL: URL2 } = require("url"); var Agent2 = require_agent(); - var Dispatcher2 = require_dispatcher(); + var DispatcherBase = require_dispatcher_base(); var { InvalidArgumentError: InvalidArgumentError2 } = require_errors(); var kAgent = Symbol("proxy agent"); - var ProxyAgent2 = class extends Dispatcher2 { + var ProxyAgent2 = class extends DispatcherBase { constructor(opts) { super(opts); this[kProxy] = buildProxyOptions(opts); @@ -4960,9 +4916,12 @@ var require_proxy_agent = __commonJS({ } }, handler); } - async close() { + async [kClose]() { await this[kAgent].close(); } + async [kDestroy]() { + await this[kAgent].destroy(); + } }; function buildProxyOptions(opts) { if (typeof opts === "string") { @@ -5069,9 +5028,11 @@ var require_constants2 = __commonJS({ "xslt", "" ]; + var corsSafeListedResponseHeaderNames = []; module2.exports = { subresource, forbiddenResponseHeaderNames, + corsSafeListedResponseHeaderNames, forbiddenMethods, requestBodyHeader, referrerPolicy, @@ -5320,8 +5281,9 @@ var require_headers = __commonJS({ } const callback = args[0]; const thisArg = args[1]; - for (let index = 0; index < this[kHeadersList].length; index += 2) { - callback.call(thisArg, this[kHeadersList][index + 1], this[kHeadersList][index], this); + const clone = this[kHeadersList].slice(); + for (let index = 0; index < clone.length; index += 2) { + callback.call(thisArg, clone[index + 1], clone[index], this); } } [Symbol.for("nodejs.util.inspect.custom")]() { @@ -5625,7 +5587,7 @@ var require_util2 = __commonJS({ function appendFetchMetadata(httpRequest) { let header = null; header = httpRequest.mode; - httpRequest.headersList.append("sec-fetch-mode", header); + httpRequest.headersList.set("sec-fetch-mode", header); } function appendRequestOriginHeader(request) { let serializedOrigin = request.origin; @@ -5707,16 +5669,18 @@ var require_util2 = __commonJS({ }); return { promise, resolve: res, reject: rej }; } - var ServiceWorkerGlobalScope = class { - }; - var Window = class { - }; - var EnvironmentSettingsObject = class { - }; + function isAborted(fetchParams) { + return fetchParams.controller.state === "aborted"; + } + function isCancelled(fetchParams) { + return fetchParams.controller.state === "aborted" || fetchParams.controller.state === "terminated"; + } + function normalizeMethod(method) { + return /^(DELETE|GET|HEAD|OPTIONS|POST|PUT)$/i.test(method) ? method.toUpperCase() : method; + } module2.exports = { - ServiceWorkerGlobalScope, - Window, - EnvironmentSettingsObject, + isAborted, + isCancelled, createDeferredPromise, ReadableStreamFrom, toUSVString, @@ -5742,7 +5706,8 @@ var require_util2 = __commonJS({ isFileLike, isValidReasonPhrase, sameOrigin, - CORBCheck + CORBCheck, + normalizeMethod }; } }); @@ -6138,14 +6103,16 @@ var require_response = __commonJS({ "lib/fetch/response.js"(exports2, module2) { "use strict"; var { Headers, HeadersList, fill } = require_headers(); + var { AbortError } = require_errors(); var { extractBody, cloneBody, mixinBody } = require_body(); var util2 = require_util(); var { kEnumerableProperty } = util2; - var { responseURL, isValidReasonPhrase, toUSVString } = require_util2(); + var { responseURL, isValidReasonPhrase, toUSVString, isCancelled, isAborted } = require_util2(); var { redirectStatus, nullBodyStatus, - forbiddenResponseHeaderNames + forbiddenResponseHeaderNames, + corsSafeListedResponseHeaderNames } = require_constants2(); var { kState, kHeaders, kGuard, kRealm } = require_symbols2(); var { kHeadersList } = require_symbols(); @@ -6329,10 +6296,10 @@ var require_response = __commonJS({ } function makeResponse(init) { return { - internalResponse: null, aborted: false, rangeRequested: false, timingAllowPassed: false, + requestIncludesCredentials: false, type: "default", status: 200, timingInfo: null, @@ -6347,55 +6314,92 @@ var require_response = __commonJS({ return makeResponse({ type: "error", status: 0, - error: reason instanceof Error ? reason : new Error(reason ? String(reason) : reason), + error: reason instanceof Error ? reason : new Error(reason ? String(reason) : reason, { + cause: reason instanceof Error ? reason : void 0 + }), aborted: reason && reason.name === "AbortError" }); } - function filterResponse(response, type) { - if (type === "basic") { - const headers = []; - for (let n = 0; n < response.headersList.length; n += 2) { - if (!forbiddenResponseHeaderNames.includes(response.headersList[n])) { - headers.push(response.headersList[n + 0], response.headersList[n + 1]); + function makeFilteredResponse(response, state) { + state = { + internalResponse: response, + ...state + }; + return new Proxy(response, { + get(target, p) { + return p in state ? state[p] : target[p]; + }, + set(target, p, value) { + assert(!(p in state)); + target[p] = value; + return true; + } + }); + } + function makeFilteredHeadersList(headersList, filter) { + return new Proxy(headersList, { + get(target, prop) { + if (prop === "get" || prop === "has") { + return (name) => filter(name) ? target[prop](name) : void 0; + } else if (prop === "slice") { + return (...args) => { + assert(args.length === 0); + const arr = []; + for (let index = 0; index < target.length; index += 2) { + if (filter(target[index])) { + arr.push(target[index], target[index + 1]); + } + } + return arr; + }; + } else { + return target[prop]; } } - return makeResponse({ - ...response, - internalResponse: response, - headersList: new HeadersList(...headers), - type: "basic" + }); + } + function filterResponse(response, type) { + if (type === "basic") { + return makeFilteredResponse(response, { + type: "basic", + headersList: makeFilteredHeadersList(response.headersList, (name) => !forbiddenResponseHeaderNames.includes(name)) }); } else if (type === "cors") { - return makeResponse({ - ...response, - internalResponse: response, - type: "cors" + return makeFilteredResponse(response, { + type: "cors", + headersList: makeFilteredHeadersList(response.headersList, (name) => !corsSafeListedResponseHeaderNames.includes(name)) }); } else if (type === "opaque") { - return makeResponse({ - ...response, - internalResponse: response, + return makeFilteredResponse(response, { type: "opaque", - urlList: [], + urlList: Object.freeze([]), status: 0, statusText: "", body: null }); } else if (type === "opaqueredirect") { - return makeResponse({ - ...response, - internalResponse: response, + return makeFilteredResponse(response, { type: "opaqueredirect", status: 0, statusText: "", - headersList: new HeadersList(), + headersList: makeFilteredHeadersList(response.headersList, () => false), body: null }); } else { assert(false); } } - module2.exports = { makeNetworkError, makeResponse, filterResponse, Response }; + function makeAppropriateNetworkError(fetchParams) { + assert(isCancelled(fetchParams)); + return isAborted(fetchParams) ? makeNetworkError(new AbortError()) : makeNetworkError(fetchParams.controller.terminated.reason); + } + module2.exports = { + makeNetworkError, + makeResponse, + makeAppropriateNetworkError, + filterResponse, + Response + }; } }); @@ -6408,9 +6412,9 @@ var require_request2 = __commonJS({ var util2 = require_util(); var { isValidHTTPToken, - EnvironmentSettingsObject, sameOrigin, - toUSVString + toUSVString, + normalizeMethod } = require_util2(); var { forbiddenMethods, @@ -6453,9 +6457,7 @@ var require_request2 = __commonJS({ try { parsedURL = new URL(input, baseUrl); } catch (err) { - const error = new TypeError("Failed to parse URL from " + input); - error.cause = err; - throw error; + throw new TypeError("Failed to parse URL from " + input, { cause: err }); } if (parsedURL.username || parsedURL.password) { throw new TypeError("Request cannot be constructed from a URL that includes credentials: " + input); @@ -6469,7 +6471,7 @@ var require_request2 = __commonJS({ } const origin = this[kRealm].settingsObject.origin; let window = "client"; - if (request.window instanceof EnvironmentSettingsObject && sameOrigin(request.window, origin)) { + if (request.window?.constructor?.name === "EnvironmentSettingsObject" && sameOrigin(request.window, origin)) { window = request.window; } if (init.window !== void 0 && init.window != null) { @@ -6482,7 +6484,7 @@ var require_request2 = __commonJS({ method: request.method, headersList: request.headersList, unsafeRequest: request.unsafeRequest, - client: request.client, + client: this[kRealm].settingsObject, window, priority: request.priority, origin: request.origin, @@ -6496,7 +6498,7 @@ var require_request2 = __commonJS({ keepalive: request.keepalive, reloadNavigation: request.reloadNavigation, historyNavigation: request.historyNavigation, - urlList: request.urlList + urlList: [...request.urlList] }); if (Object.keys(init).length > 0) { if (request.mode === "navigate") { @@ -6519,9 +6521,7 @@ var require_request2 = __commonJS({ try { parsedReferrer = new URL(referrer, baseUrl); } catch (err) { - const error = new TypeError(`Referrer "${referrer}" is not a valid URL.`); - error.cause = err; - throw error; + throw new TypeError(`Referrer "${referrer}" is not a valid URL.`, { cause: err }); } request.referrer = parsedReferrer; } @@ -6582,7 +6582,7 @@ var require_request2 = __commonJS({ if (forbiddenMethods.indexOf(method.toUpperCase()) !== -1) { throw TypeError(`'${init.method}' HTTP method is unsupported.`); } - method = init.method.toUpperCase(); + method = normalizeMethod(init.method); request.method = method; } if (init.signal !== void 0) { @@ -6767,7 +6767,7 @@ var require_request2 = __commonJS({ if (!(this instanceof Request)) { throw new TypeError("Illegal invocation"); } - if (this.bodyUsed || this.body && this.body.locked) { + if (this.bodyUsed || this.body?.locked) { throw new TypeError("unusable"); } const clonedRequest = cloneRequest(this[kState]); @@ -7067,6 +7067,7 @@ var require_fetch = __commonJS({ var { Response, makeNetworkError, + makeAppropriateNetworkError, filterResponse, makeResponse } = require_response(); @@ -7074,8 +7075,6 @@ var require_fetch = __commonJS({ var { Request, makeRequest } = require_request2(); var zlib = require("zlib"); var { - ServiceWorkerGlobalScope, - Window, matchRequestIntegrity, makePolicyContainer, clonePolicyContainer, @@ -7095,7 +7094,9 @@ var require_fetch = __commonJS({ createDeferredPromise, isBlobLike, CORBCheck, - sameOrigin + sameOrigin, + isCancelled, + isAborted } = require_util2(); var { kState, kHeaders, kGuard, kRealm } = require_symbols2(); var { AbortError } = require_errors(); @@ -7110,25 +7111,35 @@ var require_fetch = __commonJS({ } = require_constants2(); var { kHeadersList } = require_symbols(); var EE = require("events"); - var { PassThrough, pipeline } = require("stream"); + var { Readable, pipeline } = require("stream"); var { isErrored, isReadable } = require_util(); - var { kIsMockActive } = require_mock_symbols(); var { dataURLProcessor } = require_dataURL(); + var { kIsMockActive } = require_mock_symbols(); + var { TransformStream } = require("stream/web"); var resolveObjectURL; var ReadableStream; var Fetch = class extends EE { constructor(dispatcher) { super(); this.dispatcher = dispatcher; - this.terminated = null; this.connection = null; this.dump = false; + this.state = "ongoing"; + } + terminate(reason) { + if (this.state !== "ongoing") { + return; + } + this.state = "terminated"; + this.connection?.destroy(reason); + this.emit("terminated", reason); } - terminate({ reason, aborted } = {}) { - if (this.terminated) { + abort() { + if (this.state !== "ongoing") { return; } - this.terminated = { aborted, reason }; + const reason = new AbortError(); + this.state = "aborted"; this.connection?.destroy(reason); this.emit("terminated", reason); } @@ -7142,25 +7153,27 @@ var require_fetch = __commonJS({ } const resource = args[0]; const init = args.length >= 1 ? args[1] ?? {} : {}; - const context = new Fetch(this); const p = createDeferredPromise(); const requestObject = new Request(resource, init); const request = requestObject[kState]; if (requestObject.signal.aborted) { - abortFetch.call(context, p, request, null); + abortFetch(p, request, null); return p.promise; } - const globalObject = request.client?.globalObject; - if (globalObject instanceof ServiceWorkerGlobalScope) { + const globalObject = request.client.globalObject; + if (globalObject?.constructor?.name === "ServiceWorkerGlobalScope") { request.serviceWorkers = "none"; } let responseObject = null; const relevantRealm = null; let locallyAborted = false; + let controller = null; requestObject.signal.addEventListener("abort", () => { locallyAborted = true; - abortFetch.call(context, p, request, responseObject); - context.terminate({ aborted: true }); + abortFetch(p, request, responseObject); + if (controller != null) { + controller.abort(); + } }, { once: true }); const handleFetchDone = (response) => finalizeAndReportTiming(response, "fetch"); const processResponse = (response) => { @@ -7168,7 +7181,7 @@ var require_fetch = __commonJS({ return; } if (response.aborted) { - abortFetch.call(context, p, request, responseObject); + abortFetch(p, request, responseObject); return; } if (response.type === "error") { @@ -7183,12 +7196,11 @@ var require_fetch = __commonJS({ responseObject[kHeaders][kRealm] = relevantRealm; p.resolve(responseObject); }; - fetching.call(context, { + controller = fetching({ request, processResponseEndOfBody: handleFetchDone, - processResponse - }).catch((err) => { - p.reject(err); + processResponse, + dispatcher: this }); return p.promise; } @@ -7251,7 +7263,8 @@ var require_fetch = __commonJS({ processResponse, processResponseEndOfBody, processResponseConsumeBody, - useParallelQueue = false + useParallelQueue = false, + dispatcher }) { let taskDestination = null; let crossOriginIsolatedCapability = false; @@ -7264,6 +7277,7 @@ var require_fetch = __commonJS({ startTime: currenTime }); const fetchParams = { + controller: new Fetch(dispatcher), request, timingInfo, processRequestBodyChunkLength, @@ -7276,7 +7290,7 @@ var require_fetch = __commonJS({ }; assert(!request.body || request.body.stream); if (request.window === "client") { - request.window = request.client?.globalObject instanceof Window ? request.client : "no-window"; + request.window = request.client?.globalObject?.constructor?.name === "Window" ? request.client : "no-window"; } if (request.origin === "client") { request.origin = request.client?.origin; @@ -7299,10 +7313,12 @@ var require_fetch = __commonJS({ } if (subresource.includes(request.destination)) { } - return mainFetch.call(this, fetchParams); + mainFetch(fetchParams).catch((err) => { + fetchParams.controller.terminate(err); + }); + return fetchParams.controller; } async function mainFetch(fetchParams, recursive = false) { - const context = this; const request = fetchParams.request; let response = null; if (request.localURLsOnly && !/^(about|blob|data):/.test(requestCurrentURL(request).protocol)) { @@ -7323,7 +7339,7 @@ var require_fetch = __commonJS({ const currentURL = requestCurrentURL(request); if (sameOrigin(currentURL, request.url) && request.responseTainting === "basic" || currentURL.protocol === "data:" || (request.mode === "navigate" || request.mode === "websocket")) { request.responseTainting = "basic"; - return await schemeFetch.call(this, fetchParams); + return await schemeFetch(fetchParams); } if (request.mode === "same-origin") { return makeNetworkError('request mode cannot be "same-origin"'); @@ -7333,7 +7349,7 @@ var require_fetch = __commonJS({ return makeNetworkError('redirect mode cannot be "follow" for "no-cors" request'); } request.responseTainting = "opaque"; - const noCorsResponse = await schemeFetch.call(this, fetchParams); + const noCorsResponse = await schemeFetch(fetchParams); if (noCorsResponse.status === 0 || CORBCheck(request, noCorsResponse) === "allowed") { return noCorsResponse; } @@ -7343,7 +7359,7 @@ var require_fetch = __commonJS({ return makeNetworkError("URL scheme must be a HTTP(S) scheme"); } request.responseTainting = "cors"; - return await httpFetch.call(this, fetchParams).catch((err) => makeNetworkError(err)); + return await httpFetch(fetchParams); })(); } if (recursive) { @@ -7374,10 +7390,10 @@ var require_fetch = __commonJS({ } if (response.status !== 0 && (request.method === "HEAD" || request.method === "CONNECT" || nullBodyStatus.includes(internalResponse.status))) { internalResponse.body = null; - context.dump = true; + fetchParams.controller.dump = true; } if (request.integrity) { - const processBodyError = (reason) => fetchFinale.call(context, fetchParams, makeNetworkError(reason)); + const processBodyError = (reason) => fetchFinale(fetchParams, makeNetworkError(reason)); if (request.responseTainting === "opaque" || response.body == null) { processBodyError(response.error); return; @@ -7388,7 +7404,7 @@ var require_fetch = __commonJS({ return; } response.body = safelyExtractBody(bytes)[0]; - fetchFinale.call(context, fetchParams, response); + fetchFinale(fetchParams, response); }; try { processBody(await response.arrayBuffer()); @@ -7396,11 +7412,10 @@ var require_fetch = __commonJS({ processBodyError(err); } } else { - fetchFinale.call(context, fetchParams, response); + fetchFinale(fetchParams, response); } } async function schemeFetch(fetchParams) { - const context = this; const { request } = fetchParams; const { protocol: scheme, @@ -7422,15 +7437,7 @@ var require_fetch = __commonJS({ return makeNetworkError("invalid path called"); } case "blob:": { - let onRequestAborted = function() { - const aborted = context.terminated.aborted; - if (aborted) { - return makeNetworkError(new AbortError()); - } - return makeNetworkError(context.terminated.reason); - }; resolveObjectURL ??= require("buffer").resolveObjectURL; - context.on("terminated", onRequestAborted); const currentURL = requestCurrentURL(request); if (currentURL.search.length !== 0) { return makeNetworkError("NetworkError when attempting to fetch resource."); @@ -7443,7 +7450,6 @@ var require_fetch = __commonJS({ response.headersList.set("content-length", `${blob.size}`); response.headersList.set("content-type", blob.type); response.body = extractBody(blob)[0]; - context.off("terminated", onRequestAborted); return response; } case "data:": { @@ -7472,7 +7478,7 @@ var require_fetch = __commonJS({ "content-type", contentType ], - body: dataURLStruct.body + body: extractBody(dataURLStruct.body)[0] }); } case "file:": { @@ -7480,7 +7486,7 @@ var require_fetch = __commonJS({ } case "http:": case "https:": { - return await httpFetch.call(this, fetchParams).catch((err) => makeNetworkError(err)); + return await httpFetch(fetchParams).catch((err) => makeNetworkError(err)); } default: { return makeNetworkError("unknown scheme"); @@ -7490,26 +7496,54 @@ var require_fetch = __commonJS({ function finalizeResponse(fetchParams, response) { fetchParams.request.done = true; if (fetchParams.processResponseDone != null) { - fetchParams.processResponseDone(response); + queueMicrotask(() => fetchParams.processResponseDone(response)); } } - function fetchFinale(fetchParams, response) { - const context = this; + async function fetchFinale(fetchParams, response) { if (response.type === "error") { response.urlList = [fetchParams.request.urlList[0]]; response.timingInfo = createOpaqueTimingInfo({ startTime: fetchParams.timingInfo.startTime }); } + const processResponseEndOfBody = () => { + fetchParams.request.done = true; + if (fetchParams.processResponseEndOfBody != null) { + queueMicrotask(() => fetchParams.processResponseEndOfBody(response)); + } + }; if (fetchParams.processResponse != null) { - fetchParams.processResponse(response); + queueMicrotask(() => fetchParams.processResponse(response)); } - if (response.type === "error") { - context.terminate({ reason: response.error }); + if (response.body == null) { + processResponseEndOfBody(); + } else { + const identityTransformAlgorithm = (chunk, controller) => { + controller.enqueue(chunk); + }; + const transformStream = new TransformStream({ + start() { + }, + transform: identityTransformAlgorithm, + flush: processResponseEndOfBody + }); + response.body = { stream: response.body.stream.pipeThrough(transformStream) }; + } + if (fetchParams.processResponseConsumeBody != null) { + const processBody = (nullOrBytes) => fetchParams.processResponseConsumeBody(response, nullOrBytes); + const processBodyError = (failure) => fetchParams.processResponseConsumeBody(response, failure); + if (response.body == null) { + queueMicrotask(() => processBody(null)); + } else { + try { + processBody(await response.body.stream.arrayBuffer()); + } catch (err) { + processBodyError(err); + } + } } } async function httpFetch(fetchParams) { - const context = this; const request = fetchParams.request; let response = null; let actualResponse = null; @@ -7520,7 +7554,7 @@ var require_fetch = __commonJS({ if (request.redirect === "follow") { request.serviceWorkers = "none"; } - actualResponse = response = await httpNetworkOrCacheFetch.call(this, fetchParams); + actualResponse = response = await httpNetworkOrCacheFetch(fetchParams); if (request.responseTainting === "cors" && corsCheck(request, response) === "failure") { return makeNetworkError("cors failure"); } @@ -7532,13 +7566,13 @@ var require_fetch = __commonJS({ return makeNetworkError("blocked"); } if (redirectStatus.includes(actualResponse.status)) { - context.connection.destroy(); + fetchParams.controller.connection.destroy(); if (request.redirect === "error") { response = makeNetworkError(); } else if (request.redirect === "manual") { response = actualResponse; } else if (request.redirect === "follow") { - response = await httpRedirectFetch.call(this, fetchParams, response); + response = await httpRedirectFetch(fetchParams, response); } else { assert(false); } @@ -7592,10 +7626,9 @@ var require_fetch = __commonJS({ } request.urlList.push(locationURL); setRequestReferrerPolicyOnRedirect(request, actualResponse); - return mainFetch.call(this, fetchParams, true); + return mainFetch(fetchParams, true); } async function httpNetworkOrCacheFetch(fetchParams, isAuthenticationFetch = false, isNewConnectionFetch = false) { - const context = this; const request = fetchParams.request; let httpFetchParams = null; let httpRequest = null; @@ -7667,7 +7700,7 @@ var require_fetch = __commonJS({ if (httpRequest.mode === "only-if-cached") { return makeNetworkError("only if cached"); } - const forwardResponse = await httpNetworkFetch.call(this, httpFetchParams, includeCredentials, isNewConnectionFetch); + const forwardResponse = await httpNetworkFetch(httpFetchParams, includeCredentials, isNewConnectionFetch); if (!safeMethods.includes(httpRequest.method) && forwardResponse.status >= 200 && forwardResponse.status <= 399) { } if (revalidatingFlag && forwardResponse.status === 304) { @@ -7680,150 +7713,197 @@ var require_fetch = __commonJS({ if (httpRequest.headersList.has("range")) { response.rangeRequested = true; } + response.requestIncludesCredentials = includeCredentials; if (response.status === 407) { if (request.window === "no-window") { return makeNetworkError(); } - if (context.terminated) { - const aborted = context.terminated.aborted; - if (aborted) { - return makeNetworkError(new AbortError()); - } - return makeNetworkError(context.terminated.reason); + if (isCancelled(fetchParams)) { + return makeAppropriateNetworkError(fetchParams); } return makeNetworkError("proxy authentication required"); } if (response.status === 421 && !isNewConnectionFetch && (request.body == null || request.body.source != null)) { - if (context.terminated) { - const aborted = context.terminated.aborted; - if (aborted) { - return makeNetworkError(new AbortError()); - } - return makeNetworkError(context.terminated.reason); + if (isCancelled(fetchParams)) { + return makeAppropriateNetworkError(fetchParams); } - context.connection.destroy(); - response = await httpNetworkOrCacheFetch.call(this, fetchParams, isAuthenticationFetch, true); + fetchParams.controller.connection.destroy(); + response = await httpNetworkOrCacheFetch(fetchParams, isAuthenticationFetch, true); } if (isAuthenticationFetch) { } return response; } - function httpNetworkFetch(fetchParams, includeCredentials = false, forceNewConnection = false) { - const context = this; - return new Promise((resolve) => { - assert(!context.connection || context.connection.destroyed); - context.connection = { - abort: null, - destroyed: false, - destroy(err) { - if (!this.destroyed) { - this.destroyed = true; - this.abort?.(err ?? new AbortError()); - } + async function httpNetworkFetch(fetchParams, includeCredentials = false, forceNewConnection = false) { + assert(!fetchParams.controller.connection || fetchParams.controller.connection.destroyed); + fetchParams.controller.connection = { + abort: null, + destroyed: false, + destroy(err) { + if (!this.destroyed) { + this.destroyed = true; + this.abort?.(err ?? new AbortError()); } - }; - const request = fetchParams.request; - let response = null; - const timingInfo = fetchParams.timingInfo; - const httpCache = null; - if (httpCache == null) { - request.cache = "no-store"; - } - if (request.mode === "websocket") { - } else { } - context.on("terminated", onRequestAborted); - const body = async function* () { + }; + const request = fetchParams.request; + let response = null; + const timingInfo = fetchParams.timingInfo; + const httpCache = null; + if (httpCache == null) { + request.cache = "no-store"; + } + const newConnection = forceNewConnection ? "yes" : "no"; + if (request.mode === "websocket") { + } else { + } + let requestBody = null; + if (request.body == null && fetchParams.processRequestEndOfBody) { + queueMicrotask(() => fetchParams.processRequestEndOfBody()); + } else if (request.body != null) { + const processBodyChunk = async function* (bytes) { + if (isCancelled(fetchParams)) { + return; + } + yield bytes; + fetchParams.processRequestBodyChunkLength?.(bytes.byteLength); + }; + const processEndOfBody = () => { + if (isCancelled(fetchParams)) { + return; + } + if (fetchParams.processRequestEndOfBody) { + fetchParams.processRequestEndOfBody(); + } + }; + const processBodyError = (e) => { + if (isCancelled(fetchParams)) { + return; + } + if (e.name === "AbortError") { + fetchParams.controller.abort(); + } else { + fetchParams.controller.terminate(e); + } + }; + requestBody = async function* () { try { - if (request.body === null) { - fetchParams.processEndOfBody?.(); - return; - } for await (const bytes of request.body.stream) { - if (context.terminated) { - return; - } - yield bytes; - fetchParams.processRequestBody?.(bytes.byteLength); - } - if (context.terminated) { - return; + yield* processBodyChunk(bytes); } - fetchParams.processRequestEndOfBody?.(); - } catch (e) { - if (context.terminated) { - return; - } - context.terminate({ - aborted: e.name === "AbortError", - reason: e - }); + processEndOfBody(); + } catch (err) { + processBodyError(err); } }(); - function onRequestAborted() { - const aborted = this.terminated.aborted; - this.connection.destroy(); - if (aborted) { - return resolve(makeNetworkError(new AbortError())); - } - return resolve(makeNetworkError(this.terminated.reason)); + } + try { + const { body, status, statusText, headersList } = await dispatch({ body: requestBody }); + const iterator = body[Symbol.asyncIterator](); + fetchParams.controller.next = () => iterator.next(); + response = makeResponse({ status, statusText, headersList }); + } catch (err) { + if (err.name === "AbortError") { + fetchParams.controller.connection.destroy(); + return makeAppropriateNetworkError(fetchParams); } - let pullAlgorithm; - const cancelAlgorithm = () => { - context.terminate({ aborted: true }); - }; - const highWaterMark = 64 * 1024; - if (!ReadableStream) { - ReadableStream = require("stream/web").ReadableStream; - } - let pullResolve; - const stream = new ReadableStream({ - async start(controller) { - context.controller = controller; - }, - async pull(controller) { - if (!pullAlgorithm) { - await new Promise((resolve2) => { - pullResolve = resolve2; - }); - } - await pullAlgorithm(controller); - }, - async cancel(reason) { - await cancelAlgorithm(reason); - } - }, { highWaterMark }); - function onResponseAborted() { - const aborted = this.terminated.aborted; - if (aborted) { - response.aborted = true; - if (isReadable(stream)) { - this.controller.error(new AbortError()); + return makeNetworkError(err); + } + const pullAlgorithm = () => { + fetchParams.controller.resume(); + }; + const cancelAlgorithm = () => { + fetchParams.controller.abort(); + }; + if (!ReadableStream) { + ReadableStream = require("stream/web").ReadableStream; + } + const stream = new ReadableStream({ + async start(controller) { + fetchParams.controller.controller = controller; + }, + async pull(controller) { + await pullAlgorithm(controller); + }, + async cancel(reason) { + await cancelAlgorithm(reason); + } + }, { highWaterMark: 0 }); + response.body = { stream }; + fetchParams.controller.on("terminated", onAborted); + fetchParams.controller.resume = async () => { + while (true) { + let bytes; + try { + const { done, value } = await fetchParams.controller.next(); + bytes = done ? void 0 : value; + } catch (err) { + if (fetchParams.controller.ended && !timingInfo.encodedBodySize) { + bytes = void 0; + } else { + bytes = err; } - } else { - if (isReadable(stream)) { - this.controller.error(new TypeError("terminated")); + } + if (bytes === void 0) { + try { + fetchParams.controller.controller.close(); + } catch (err) { + if (!/Controller is already closed/.test(err)) { + throw err; + } } + finalizeResponse(fetchParams, response); + return; + } + timingInfo.decodedBodySize += bytes?.byteLength ?? 0; + if (bytes instanceof Error) { + fetchParams.controller.terminate(bytes); + return; + } + fetchParams.controller.controller.enqueue(new Uint8Array(bytes)); + if (isErrored(stream)) { + fetchParams.controller.terminate(); + return; + } + if (!fetchParams.controller.controller.desiredSize) { + return; + } + } + }; + function onAborted(reason) { + if (isAborted(fetchParams)) { + response.aborted = true; + if (isReadable(stream)) { + fetchParams.controller.controller.error(new AbortError()); + } + } else { + if (isReadable(stream)) { + fetchParams.controller.controller.error(new TypeError("terminated", { + cause: reason instanceof Error ? reason : void 0 + })); } - this.connection.destroy(); } + fetchParams.controller.connection.destroy(); + } + return response; + async function dispatch({ body }) { const url = requestCurrentURL(request); - context.dispatcher.dispatch({ + return new Promise((resolve, reject) => fetchParams.controller.dispatcher.dispatch({ path: url.pathname + url.search, origin: url.origin, method: request.method, - body: context.dispatcher[kIsMockActive] ? request.body && request.body.source : body, + body: fetchParams.controller.dispatcher[kIsMockActive] ? request.body && request.body.source : body, headers: request.headersList, maxRedirections: 0 }, { - decoder: null, + body: null, abort: null, - context, onConnect(abort) { - const { connection } = this.context; + const { connection } = fetchParams.controller; if (connection.destroyed) { abort(new AbortError()); } else { + fetchParams.controller.on("terminated", abort); this.abort = connection.abort = abort; } }, @@ -7831,18 +7911,17 @@ var require_fetch = __commonJS({ if (status < 200) { return; } + let codings = []; const headers = new Headers(); for (let n = 0; n < headersList.length; n += 2) { - headers.append(headersList[n + 0].toString(), headersList[n + 1].toString()); + const key = headersList[n + 0].toString(); + const val = headersList[n + 1].toString(); + if (key.toLowerCase() === "content-encoding") { + codings = val.split(",").map((x) => x.trim()); + } + headers.append(key, val); } - response = makeResponse({ - status, - statusText, - headersList: headers[kHeadersList], - body: { stream } - }); - this.context.on("terminated", onResponseAborted); - const codings = headers.get("content-encoding")?.toLowerCase().split(",").map((x) => x.trim()) ?? []; + this.body = new Readable({ read: resume }); const decoders = []; for (const coding of codings) { if (/(x-)?gzip/.test(coding)) { @@ -7856,70 +7935,41 @@ var require_fetch = __commonJS({ break; } } - if (decoders.length > 1) { - pipeline(...decoders, () => { - }); - } else if (decoders.length === 0) { - decoders.push(new PassThrough()); - } - this.decoder = decoders[0].on("drain", resume); - const iterator = decoders[decoders.length - 1][Symbol.asyncIterator](); - pullAlgorithm = async (controller) => { - let bytes; - try { - const { done, value } = await iterator.next(); - bytes = done ? void 0 : value; - } catch (err) { - if (this.decoder.writableEnded && !timingInfo.encodedBodySize) { - bytes = void 0; - } else { - bytes = err; - } - } - if (bytes === void 0) { - finalizeResponse(fetchParams, response); - controller.close(); - return; - } - timingInfo.decodedBodySize += bytes?.byteLength ?? 0; - if (bytes instanceof Error) { - this.context.terminate({ reason: bytes }); - return; - } - controller.enqueue(new Uint8Array(bytes)); - if (isErrored(stream)) { - this.context.terminate(); - return; - } - return controller.desiredSize > 0; - }; - if (pullResolve) { - pullResolve(); - pullResolve = null; - } - resolve(response); + resolve({ + status, + statusText, + headersList: headers[kHeadersList], + body: decoders.length ? pipeline(this.body, ...decoders, () => { + }) : this.body.on("error", () => { + }) + }); return true; }, onData(chunk) { - if (this.context.dump) { + if (fetchParams.controller.dump) { return; } const bytes = chunk; timingInfo.encodedBodySize += bytes.byteLength; - return this.decoder.write(bytes); + return this.body.push(bytes); }, onComplete() { - this.decoder.end(); + if (this.abort) { + fetchParams.controller.off("terminated", this.abort); + } + fetchParams.controller.ended = true; + this.body.push(null); }, onError(error) { - this.decoder?.destroy(error); - this.context.terminate({ reason: error }); - if (!response) { - resolve(makeNetworkError(error)); + if (this.abort) { + fetchParams.controller.off("terminated", this.abort); } + this.body?.destroy(error); + fetchParams.controller.terminate(error); + reject(makeNetworkError(error)); } - }); - }); + })); + } } module2.exports = fetch; }