From 5f11247b34510a3dc821da3c10d3cea0f39a7b13 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 29 May 2024 12:42:31 +0200 Subject: [PATCH] fix: retry on body support (#3294) * test: add testing * refactor: enhance body wrapping * fix: do not mutate original opts * docs: extend documentation --- docs/docs/api/RetryHandler.md | 3 + lib/core/symbols.js | 1 + lib/core/util.js | 60 +++++++++- lib/handler/retry-handler.js | 15 ++- test/retry-handler.js | 211 +++++++++++++++++++++++++++++++++- 5 files changed, 281 insertions(+), 9 deletions(-) diff --git a/docs/docs/api/RetryHandler.md b/docs/docs/api/RetryHandler.md index 6dbc5077d02..8988ee53010 100644 --- a/docs/docs/api/RetryHandler.md +++ b/docs/docs/api/RetryHandler.md @@ -46,6 +46,9 @@ It represents the retry state for a given request. - **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise` (required) - Dispatch function to be called after every retry. - **handler** Extends [`Dispatch.DispatchHandlers`](Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted. +>__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in an state that cannot be reutilized. For these situations the `RetryHandler` will identify +>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. + Examples: ```js diff --git a/lib/core/symbols.js b/lib/core/symbols.js index b58fc90a69f..c8ba5dd8ec5 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -20,6 +20,7 @@ module.exports = { kHost: Symbol('host'), kNoRef: Symbol('no ref'), kBodyUsed: Symbol('used'), + kBody: Symbol('abstracted request body'), kRunning: Symbol('running'), kBlocking: Symbol('blocking'), kPending: Symbol('pending'), diff --git a/lib/core/util.js b/lib/core/util.js index 2bad24df2f6..ddb72d226ce 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -1,19 +1,72 @@ 'use strict' const assert = require('node:assert') -const { kDestroyed, kBodyUsed, kListeners } = require('./symbols') +const { kDestroyed, kBodyUsed, kListeners, kBody } = require('./symbols') const { IncomingMessage } = require('node:http') const stream = require('node:stream') const net = require('node:net') -const { InvalidArgumentError } = require('./errors') const { Blob } = require('node:buffer') const nodeUtil = require('node:util') const { stringify } = require('node:querystring') +const { EventEmitter: EE } = require('node:events') +const { InvalidArgumentError } = require('./errors') const { headerNameLowerCasedRecord } = require('./constants') const { tree } = require('./tree') const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v)) +class BodyAsyncIterable { + constructor (body) { + this[kBody] = body + this[kBodyUsed] = false + } + + async * [Symbol.asyncIterator] () { + assert(!this[kBodyUsed], 'disturbed') + this[kBodyUsed] = true + yield * this[kBody] + } +} + +function wrapRequestBody (body) { + if (isStream(body)) { + // TODO (fix): Provide some way for the user to cache the file to e.g. /tmp + // so that it can be dispatched again? + // TODO (fix): Do we need 100-expect support to provide a way to do this properly? + if (bodyLength(body) === 0) { + body + .on('data', function () { + assert(false) + }) + } + + if (typeof body.readableDidRead !== 'boolean') { + body[kBodyUsed] = false + EE.prototype.on.call(body, 'data', function () { + this[kBodyUsed] = true + }) + } + + return body + } else if (body && typeof body.pipeTo === 'function') { + // TODO (fix): We can't access ReadableStream internal state + // to determine whether or not it has been disturbed. This is just + // a workaround. + return new BodyAsyncIterable(body) + } else if ( + body && + typeof body !== 'string' && + !ArrayBuffer.isView(body) && + isIterable(body) + ) { + // TODO: Should we allow re-using iterable if !this.opts.idempotent + // or through some other flag? + return new BodyAsyncIterable(body) + } else { + return body + } +} + function nop () {} function isStream (obj) { @@ -634,5 +687,6 @@ module.exports = { isHttpOrHttpsPrefixed, nodeMajor, nodeMinor, - safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'] + safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'], + wrapRequestBody } diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index 56ea4be79be..fcd9f0df513 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -3,7 +3,12 @@ const assert = require('node:assert') const { kRetryHandlerDefaultRetry } = require('../core/symbols') const { RequestRetryError } = require('../core/errors') -const { isDisturbed, parseHeaders, parseRangeHeader } = require('../core/util') +const { + isDisturbed, + parseHeaders, + parseRangeHeader, + wrapRequestBody +} = require('../core/util') function calculateRetryAfterHeader (retryAfter) { const current = Date.now() @@ -29,7 +34,7 @@ class RetryHandler { this.dispatch = handlers.dispatch this.handler = handlers.handler - this.opts = dispatchOpts + this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) } this.abort = null this.aborted = false this.retryOpts = { @@ -174,7 +179,9 @@ class RetryHandler { this.abort( new RequestRetryError('Request failed', statusCode, { headers, - count: this.retryCount + data: { + count: this.retryCount + } }) ) return false @@ -278,7 +285,7 @@ class RetryHandler { const err = new RequestRetryError('Request failed', statusCode, { headers, - count: this.retryCount + data: { count: this.retryCount } }) this.abort(err) diff --git a/test/retry-handler.js b/test/retry-handler.js index 8894ea86668..83e222861ae 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -4,6 +4,7 @@ const { tspl } = require('@matteo.collina/tspl') const { test, after } = require('node:test') const { createServer } = require('node:http') const { once } = require('node:events') +const { Readable } = require('node:stream') const { RetryHandler, Client } = require('..') const { RequestHandler } = require('../lib/api/api-request') @@ -204,6 +205,74 @@ test('Should account for network and response errors', async t => { await t.completed }) +test('Issue #3288 - request with body (asynciterable)', async t => { + t = tspl(t, { plan: 6 }) + const server = createServer() + const dispatchOptions = { + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: (function * () { + yield 'hello' + yield 'world' + })() + } + + server.on('request', (req, res) => { + res.writeHead(500, { + 'content-type': 'application/json' + }) + + res.end('{"message": "failed"}') + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.ok(true, 'pass') + }, + onBodySent () { + t.ok(true, 'pass') + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.strictEqual(status, 500) + return true + }, + onData (chunk) { + return true + }, + onComplete () { + t.fail() + }, + onError (err) { + t.equal(err.message, 'Request failed') + t.equal(err.statusCode, 500) + t.equal(err.data.count, 1) + } + } + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + test('Should use retry-after header for retries', async t => { t = tspl(t, { plan: 4 }) @@ -734,6 +803,145 @@ test('retrying a request with a body', async t => { await t.completed }) +test('retrying a request with a body (stream)', async t => { + let counter = 0 + const server = createServer() + const dispatchOptions = { + retryOptions: { + retry: (err, { state, opts }, done) => { + counter++ + + if ( + err.statusCode === 500 || + err.message.includes('other side closed') + ) { + setTimeout(done, 500) + return + } + + return done(err) + } + }, + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))) + } + + t = tspl(t, { plan: 3 }) + + server.on('request', (req, res) => { + switch (counter) { + case 0: + res.writeHead(500) + res.end('failed') + return + default: + t.fail() + } + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: new RequestHandler(dispatchOptions, (err, data) => { + t.equal(err.statusCode, 500) + t.equal(err.data.count, 1) + t.equal(err.code, 'UND_ERR_REQ_RETRY') + }) + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + +test('retrying a request with a body (buffer)', async t => { + let counter = 0 + const server = createServer() + const dispatchOptions = { + retryOptions: { + retry: (err, { state, opts }, done) => { + counter++ + + if ( + err.statusCode === 500 || + err.message.includes('other side closed') + ) { + setTimeout(done, 500) + return + } + + return done(err) + } + }, + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: Buffer.from(JSON.stringify({ hello: 'world' })) + } + + t = tspl(t, { plan: 1 }) + + server.on('request', (req, res) => { + switch (counter) { + case 0: + req.destroy() + return + case 1: + res.writeHead(500) + res.end('failed') + return + case 2: + res.writeHead(200) + res.end('hello world!') + return + default: + t.fail() + } + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: new RequestHandler(dispatchOptions, (err, data) => { + t.ifError(err) + }) + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + test('should not error if request is not meant to be retried', async t => { t = tspl(t, { plan: 3 }) @@ -777,8 +985,7 @@ test('should not error if request is not meant to be retried', async t => { t.strictEqual(Buffer.concat(chunks).toString('utf-8'), 'Bad request') }, onError (err) { - console.log({ err }) - t.fail() + t.fail(err) } } })