Skip to content

Commit

Permalink
Merge branch 'main' into body-wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Feb 14, 2024
2 parents da031f1 + 64b133c commit 67c6a5d
Show file tree
Hide file tree
Showing 98 changed files with 5,879 additions and 4,831 deletions.
10 changes: 10 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
*
!lib/**/*
!index.js
!index-fetch.js

# The wasm files are stored as base64 strings in the corresponding .js files
lib/llhttp/llhttp_simd.wasm
lib/llhttp/llhttp.wasm

!types/**/*
!index.d.ts
!docs/**/*
7 changes: 0 additions & 7 deletions .taprc

This file was deleted.

19 changes: 19 additions & 0 deletions benchmarks/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..')

let nodeFetch
const axios = require('axios')
let superagent
let got

const util = require('node:util')
Expand Down Expand Up @@ -85,6 +86,11 @@ const requestAgent = new http.Agent({
maxSockets: connections
})

const superagentAgent = new http.Agent({
keepAlive: true,
maxSockets: connections
})

const undiciOptions = {
path: '/',
method: 'GET',
Expand Down Expand Up @@ -318,6 +324,16 @@ if (process.env.PORT) {
}).catch(console.log)
})
}

experiments.superagent = () => {
return makeParallelRequests(resolve => {
superagent.get(dest.url).pipe(new Writable({
write (chunk, encoding, callback) {
callback()
}
})).on('finish', resolve)
})
}
}

async function main () {
Expand All @@ -326,6 +342,9 @@ async function main () {
nodeFetch = _nodeFetch.default
const _got = await import('got')
got = _got.default
const _superagent = await import('superagent')
// https://github.com/ladjs/superagent/issues/1540#issue-561464561
superagent = _superagent.agent().use((req) => req.agent(superagentAgent))

cronometro(
experiments,
Expand Down
2 changes: 2 additions & 0 deletions build/wasm.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ if (EXTERNAL_PATH) {
writeFileSync(join(ROOT, 'loader.js'), `
'use strict'
globalThis.__UNDICI_IS_NODE__ = true
module.exports = require('node:module').createRequire('${EXTERNAL_PATH}/loader.js')('./index-fetch.js')
delete globalThis.__UNDICI_IS_NODE__
`)
}
68 changes: 65 additions & 3 deletions examples/proxy/proxy.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

const net = require('node:net')
const { pipeline } = require('node:stream')
const createError = require('http-errors')
const { STATUS_CODES } = require('node:http')

module.exports = async function proxy (ctx, client) {
const { req, socket, proxyName } = ctx
Expand Down Expand Up @@ -214,13 +216,13 @@ function getHeaders ({
].join(';'))
} else if (forwarded) {
// The forwarded header should not be included in response.
throw new createError.BadGateway()
throw new BadGateway()
}

if (proxyName) {
if (via) {
if (via.split(',').some(name => name.endsWith(proxyName))) {
throw new createError.LoopDetected()
throw new LoopDetected()
}
via += ', '
}
Expand Down Expand Up @@ -254,3 +256,63 @@ function printIp (address, port) {
}
return str
}

class BadGateway extends Error {
constructor (message = STATUS_CODES[502]) {
super(message)
}

toString () {
return `BadGatewayError: ${this.message}`
}

get name () {
return 'BadGatewayError'
}

get status () {
return 502
}

get statusCode () {
return 502
}

get expose () {
return false
}

get headers () {
return undefined
}
}

class LoopDetected extends Error {
constructor (message = STATUS_CODES[508]) {
super(message)
}

toString () {
return `LoopDetectedError: ${this.message}`
}

get name () {
return 'LoopDetectedError'
}

get status () {
return 508
}

get statusCode () {
return 508
}

get expose () {
return false
}

get headers () {
return undefined
}
}
17 changes: 13 additions & 4 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,19 @@ class Request {
processHeader(this, headers[i], headers[i + 1])
}
} else if (headers && typeof headers === 'object') {
const keys = Object.keys(headers)
for (let i = 0; i < keys.length; i++) {
const key = keys[i]
processHeader(this, key, headers[key])
if (headers[Symbol.iterator]) {
for (const header of headers) {
if (!Array.isArray(header) || header.length !== 2) {
throw new InvalidArgumentError('headers must be in key-value pair format')
}
const [key, value] = header
processHeader(this, key, value)
}
} else {
const keys = Object.keys(headers)
for (const key of keys) {
processHeader(this, key, headers[key])
}
}
} else if (headers != null) {
throw new InvalidArgumentError('headers must be an object or an array')
Expand Down
8 changes: 1 addition & 7 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,7 @@ const hasToWellFormed = !!String.prototype.toWellFormed
* @param {string} val
*/
function toUSVString (val) {
if (hasToWellFormed) {
return `${val}`.toWellFormed()
} else if (nodeUtil.toUSVString) {
return nodeUtil.toUSVString(val)
}

return `${val}`
return hasToWellFormed ? `${val}`.toWellFormed() : nodeUtil.toUSVString(val)
}

/**
Expand Down
63 changes: 19 additions & 44 deletions lib/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const { FormData } = require('./formdata')
const { kState } = require('./symbols')
const { webidl } = require('./webidl')
const { Blob, File: NativeFile } = require('node:buffer')
const { kBodyUsed } = require('../core/symbols')
const assert = require('node:assert')
const { isErrored } = require('../core/util')
const { isUint8Array, isArrayBuffer } = require('util/types')
const { isArrayBuffer } = require('util/types')
const { File: UndiciFile } = require('./file')
const { serializeAMimeType } = require('./dataURL')
const { Readable } = require('node:stream')

/** @type {globalThis['File']} */
const File = NativeFile ?? UndiciFile
Expand Down Expand Up @@ -291,29 +291,6 @@ function cloneBody (body) {
}
}

async function * consumeBody (body) {
if (body) {
if (isUint8Array(body)) {
yield body
} else {
const stream = body.stream

if (util.isDisturbed(stream)) {
throw new TypeError('The body has already been consumed.')
}

if (stream.locked) {
throw new TypeError('The stream is locked.')
}

// Compat.
stream[kBodyUsed] = true

yield * stream
}
}
}

function throwIfAborted (state) {
if (state.aborted) {
throw new DOMException('The operation was aborted.', 'AbortError')
Expand All @@ -328,7 +305,7 @@ function bodyMixinMethods (instance) {
// given a byte sequence bytes: return a Blob whose
// contents are bytes and whose type attribute is this’s
// MIME type.
return specConsumeBody(this, (bytes) => {
return consumeBody(this, (bytes) => {
let mimeType = bodyMimeType(this)

if (mimeType === null) {
Expand All @@ -348,21 +325,21 @@ function bodyMixinMethods (instance) {
// of running consume body with this and the following step
// given a byte sequence bytes: return a new ArrayBuffer
// whose contents are bytes.
return specConsumeBody(this, (bytes) => {
return consumeBody(this, (bytes) => {
return new Uint8Array(bytes).buffer
}, instance)
},

text () {
// The text() method steps are to return the result of running
// consume body with this and UTF-8 decode.
return specConsumeBody(this, utf8DecodeBytes, instance)
return consumeBody(this, utf8DecodeBytes, instance)
},

json () {
// The json() method steps are to return the result of running
// consume body with this and parse JSON from bytes.
return specConsumeBody(this, parseJSONFromBytes, instance)
return consumeBody(this, parseJSONFromBytes, instance)
},

async formData () {
Expand All @@ -375,16 +352,15 @@ function bodyMixinMethods (instance) {

// If mimeType’s essence is "multipart/form-data", then:
if (mimeType !== null && mimeType.essence === 'multipart/form-data') {
const headers = {}
for (const [key, value] of this.headers) headers[key] = value

const responseFormData = new FormData()

let busboy

try {
busboy = new Busboy({
headers,
headers: {
'content-type': serializeAMimeType(mimeType)
},
preservePath: true
})
} catch (err) {
Expand Down Expand Up @@ -427,8 +403,10 @@ function bodyMixinMethods (instance) {
busboy.on('error', (err) => reject(new TypeError(err)))
})

if (this.body !== null) for await (const chunk of consumeBody(this[kState].body)) busboy.write(chunk)
busboy.end()
if (this.body !== null) {
Readable.from(this[kState].body.stream).pipe(busboy)
}

await busboyResolve

return responseFormData
Expand All @@ -442,20 +420,17 @@ function bodyMixinMethods (instance) {
// application/x-www-form-urlencoded parser will keep the BOM.
// https://url.spec.whatwg.org/#concept-urlencoded-parser
// Note that streaming decoder is stateful and cannot be reused
const streamingDecoder = new TextDecoder('utf-8', { ignoreBOM: true })
const stream = this[kState].body.stream.pipeThrough(new TextDecoderStream('utf-8', { ignoreBOM: true }))

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}
text += streamingDecoder.decode(chunk, { stream: true })
for await (const chunk of stream) {
text += chunk
}
text += streamingDecoder.decode()

entries = new URLSearchParams(text)
} catch (err) {
// istanbul ignore next: Unclear when new URLSearchParams can fail on a string.
// 2. If entries is failure, then throw a TypeError.
throw new TypeError(undefined, { cause: err })
throw new TypeError(err)
}

// 3. Return a new FormData object whose entries are entries.
Expand Down Expand Up @@ -493,7 +468,7 @@ function mixinBody (prototype) {
* @param {(value: unknown) => unknown} convertBytesToJSValue
* @param {Response|Request} instance
*/
async function specConsumeBody (object, convertBytesToJSValue, instance) {
async function consumeBody (object, convertBytesToJSValue, instance) {
webidl.brandCheck(object, instance)

throwIfAborted(object[kState])
Expand Down
Loading

0 comments on commit 67c6a5d

Please sign in to comment.