From f44be5a387c6486b30b35aaa3ed764f83dbce4d3 Mon Sep 17 00:00:00 2001 From: "dan.castillo" Date: Tue, 7 Mar 2023 20:36:39 -0600 Subject: [PATCH 1/3] fix: undici stream throwOnError --- lib/api/api-request.js | 32 ++------------------------------ lib/api/api-stream.js | 27 +++++++++++++++++++++++---- lib/core/util.js | 35 ++++++++++++++++++++++++++++++++++- test/async_hooks.js | 2 +- test/client-stream.js | 34 ++++++++++++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 36 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index b4674878d2e..3dc75cad1ca 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -3,8 +3,7 @@ const Readable = require('./readable') const { InvalidArgumentError, - RequestAbortedError, - ResponseStatusCodeError + RequestAbortedError } = require('../core/errors') const util = require('../core/util') const { AsyncResource } = require('async_hooks') @@ -93,7 +92,7 @@ class RequestHandler extends AsyncResource { if (callback !== null) { if (this.throwOnError && statusCode >= 400) { - this.runInAsyncScope(getResolveErrorBodyCallback, null, + this.runInAsyncScope(util.getResolveErrorBodyCallback, null, { callback, body, contentType, statusCode, statusMessage, headers } ) return @@ -153,33 +152,6 @@ class RequestHandler extends AsyncResource { } } -async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) { - if (statusCode === 204 || !contentType) { - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) - return - } - - try { - if (contentType.startsWith('application/json')) { - const payload = await body.json() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - - if (contentType.startsWith('text/')) { - const payload = await body.text() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - } catch (err) { - // Process in a fallback if error - } - - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) -} - function request (opts, callback) { if (callback === undefined) { return new Promise((resolve, reject) => { diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index f33f459f9d4..bea55228323 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -1,6 +1,6 @@ 'use strict' -const { finished } = require('stream') +const { finished, PassThrough } = require('stream') const { InvalidArgumentError, InvalidReturnValueError, @@ -16,7 +16,7 @@ class StreamHandler extends AsyncResource { throw new InvalidArgumentError('invalid opts') } - const { signal, method, opaque, body, onInfo, responseHeaders } = opts + const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts try { if (typeof callback !== 'function') { @@ -57,6 +57,7 @@ class StreamHandler extends AsyncResource { this.trailers = null this.body = body this.onInfo = onInfo || null + this.throwOnError = throwOnError || false if (util.isStream(body)) { body.on('error', (err) => { @@ -76,8 +77,8 @@ class StreamHandler extends AsyncResource { this.context = context } - onHeaders (statusCode, rawHeaders, resume) { - const { factory, opaque, context } = this + onHeaders (statusCode, rawHeaders, resume, statusMessage) { + const { factory, opaque, context, callback } = this if (statusCode < 200) { if (this.onInfo) { @@ -96,6 +97,24 @@ class StreamHandler extends AsyncResource { context }) + if (this.throwOnError && statusCode >= 400) { + const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) + const chunks = [] + const pass = new PassThrough() + pass + .on('data', (chunk) => chunks.push(chunk)) + .on('end', () => { + const payload = Buffer.concat(chunks).toString('utf8') + this.runInAsyncScope( + util.getResolveErrorBodyCallback, + null, + { callback, statusCode, statusMessage, headers, payload } + ) + }) + this.res = pass + return + } + if ( !res || typeof res.write !== 'function' || diff --git a/lib/core/util.js b/lib/core/util.js index e203919cc35..c048108b734 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -5,7 +5,7 @@ const { kDestroyed, kBodyUsed } = require('./symbols') const { IncomingMessage } = require('http') const stream = require('stream') const net = require('net') -const { InvalidArgumentError } = require('./errors') +const { InvalidArgumentError, ResponseStatusCodeError } = require('./errors') const { Blob } = require('buffer') const nodeUtil = require('util') const { stringify } = require('querystring') @@ -342,6 +342,38 @@ function getSocketInfo (socket) { } } +async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers, payload }) { + if (typeof payload === 'string' && statusCode >= 400) { + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + + if (statusCode === 204 || !contentType) { + body.dump() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) + return + } + + try { + if (contentType.startsWith('application/json')) { + const payload = await body.json() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + + if (contentType.startsWith('text/')) { + const payload = await body.text() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + } catch (err) { + // Process in a fallback if error + } + + body.dump() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) +} + let ReadableStream function ReadableStreamFrom (iterable) { if (!ReadableStream) { @@ -426,6 +458,7 @@ module.exports = { getSocketInfo, isFormDataLike, buildURL, + getResolveErrorBodyCallback, nodeMajor, nodeMinor, nodeHasAutoSelectFamily: nodeMajor > 18 || (nodeMajor === 18 && nodeMinor >= 13) diff --git a/test/async_hooks.js b/test/async_hooks.js index 8a77af35d72..2e8533d2d9b 100644 --- a/test/async_hooks.js +++ b/test/async_hooks.js @@ -157,7 +157,7 @@ test('async hooks client is destroyed', (t) => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) - client.request({ path: '/', method: 'GET' }, (err, { body }) => { + client.request({ path: '/', method: 'GET', throwOnError: true }, (err, { body }) => { t.error(err) body.resume() body.on('error', (err) => { diff --git a/test/client-stream.js b/test/client-stream.js index 2ff5fa53563..0968ca03006 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -785,4 +785,38 @@ test('stream legacy needDrain', (t) => { t.pass() }) }) + + test('steam throwOnError', (t) => { + t.plan(2) + + const errStatusCode = 500 + const errMessage = 'Internal Server Error' + + const server = createServer((req, res) => { + res.writeHead(errStatusCode, { 'Content-Type': 'text/plain' }) + res.end(errMessage) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET', + throwOnError: true, + opaque: new PassThrough() + }, ({ opaque: pt }) => { + pt.on('data', () => { + t.not.pass() + }) + return pt + }, (e) => { + t.equal(e.status, errStatusCode) + t.equal(e.body, errMessage) + t.end() + }) + }) + }) }) From dfa5f418026ec267f8811687d75c7099d03ebdcc Mon Sep 17 00:00:00 2001 From: "dan.castillo" Date: Wed, 8 Mar 2023 20:19:27 -0600 Subject: [PATCH 2/3] fix: revert move getResolveErrorBodyCallback to api-request and pr comments --- lib/api/api-request.js | 32 ++++++++++++++++++++++++++++++-- lib/api/api-stream.js | 18 ++++++++++++------ lib/core/util.js | 35 +---------------------------------- test/client-stream.js | 2 +- 4 files changed, 44 insertions(+), 43 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 3dc75cad1ca..b4674878d2e 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -3,7 +3,8 @@ const Readable = require('./readable') const { InvalidArgumentError, - RequestAbortedError + RequestAbortedError, + ResponseStatusCodeError } = require('../core/errors') const util = require('../core/util') const { AsyncResource } = require('async_hooks') @@ -92,7 +93,7 @@ class RequestHandler extends AsyncResource { if (callback !== null) { if (this.throwOnError && statusCode >= 400) { - this.runInAsyncScope(util.getResolveErrorBodyCallback, null, + this.runInAsyncScope(getResolveErrorBodyCallback, null, { callback, body, contentType, statusCode, statusMessage, headers } ) return @@ -152,6 +153,33 @@ class RequestHandler extends AsyncResource { } } +async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) { + if (statusCode === 204 || !contentType) { + body.dump() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) + return + } + + try { + if (contentType.startsWith('application/json')) { + const payload = await body.json() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + + if (contentType.startsWith('text/')) { + const payload = await body.text() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + } catch (err) { + // Process in a fallback if error + } + + body.dump() + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) +} + function request (opts, callback) { if (callback === undefined) { return new Promise((resolve, reject) => { diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index bea55228323..843ac74678b 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -4,7 +4,8 @@ const { finished, PassThrough } = require('stream') const { InvalidArgumentError, InvalidReturnValueError, - RequestAbortedError + RequestAbortedError, + ResponseStatusCodeError } = require('../core/errors') const util = require('../core/util') const { AsyncResource } = require('async_hooks') @@ -100,18 +101,23 @@ class StreamHandler extends AsyncResource { if (this.throwOnError && statusCode >= 400) { const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) const chunks = [] - const pass = new PassThrough() - pass + const pt = new PassThrough() + pt .on('data', (chunk) => chunks.push(chunk)) .on('end', () => { const payload = Buffer.concat(chunks).toString('utf8') this.runInAsyncScope( - util.getResolveErrorBodyCallback, + callback, null, - { callback, statusCode, statusMessage, headers, payload } + new ResponseStatusCodeError( + `Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, + statusCode, + headers, + payload + ) ) }) - this.res = pass + this.res = pt return } diff --git a/lib/core/util.js b/lib/core/util.js index c048108b734..e203919cc35 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -5,7 +5,7 @@ const { kDestroyed, kBodyUsed } = require('./symbols') const { IncomingMessage } = require('http') const stream = require('stream') const net = require('net') -const { InvalidArgumentError, ResponseStatusCodeError } = require('./errors') +const { InvalidArgumentError } = require('./errors') const { Blob } = require('buffer') const nodeUtil = require('util') const { stringify } = require('querystring') @@ -342,38 +342,6 @@ function getSocketInfo (socket) { } } -async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers, payload }) { - if (typeof payload === 'string' && statusCode >= 400) { - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - - if (statusCode === 204 || !contentType) { - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) - return - } - - try { - if (contentType.startsWith('application/json')) { - const payload = await body.json() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - - if (contentType.startsWith('text/')) { - const payload = await body.text() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - } catch (err) { - // Process in a fallback if error - } - - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) -} - let ReadableStream function ReadableStreamFrom (iterable) { if (!ReadableStream) { @@ -458,7 +426,6 @@ module.exports = { getSocketInfo, isFormDataLike, buildURL, - getResolveErrorBodyCallback, nodeMajor, nodeMinor, nodeHasAutoSelectFamily: nodeMajor > 18 || (nodeMajor === 18 && nodeMinor >= 13) diff --git a/test/client-stream.js b/test/client-stream.js index 0968ca03006..af8d010bf10 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -809,7 +809,7 @@ test('stream legacy needDrain', (t) => { opaque: new PassThrough() }, ({ opaque: pt }) => { pt.on('data', () => { - t.not.pass() + t.fail() }) return pt }, (e) => { From c66e979301d06cbb72dee998e186d45f1e6d9f96 Mon Sep 17 00:00:00 2001 From: "dan.castillo" Date: Thu, 9 Mar 2023 21:09:35 -0600 Subject: [PATCH 3/3] fix: handle error event --- lib/api/api-stream.js | 3 +++ test/client-stream.js | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index 843ac74678b..7560a2e6505 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -117,6 +117,9 @@ class StreamHandler extends AsyncResource { ) ) }) + .on('error', (err) => { + this.onError(err) + }) this.res = pt return } diff --git a/test/client-stream.js b/test/client-stream.js index af8d010bf10..e67727b74c7 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -819,4 +819,29 @@ test('stream legacy needDrain', (t) => { }) }) }) + + test('steam throwOnError=true, error on stream', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET', + throwOnError: true, + opaque: new PassThrough() + }, () => { + throw new Error('asd') + }, (e) => { + t.equal(e.message, 'asd') + }) + }) + }) })