diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index cfeee35cb17..f9eb5aee975 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -1094,6 +1094,65 @@ await client.request({ }); ``` +##### `decompress` + +⚠️ The decompress interceptor is experimental and subject to change. + +The `decompress` interceptor automatically decompresses response bodies that are compressed with gzip, deflate, brotli, or zstd compression. It removes the `content-encoding` and `content-length` headers from decompressed responses and supports RFC-9110 compliant multiple encodings. + +**Options** + +- `skipErrorResponses` - Whether to skip decompression for error responses (status codes >= 400). Default: `true`. +- `skipStatusCodes` - Array of status codes to skip decompression for. Default: `[204, 304]`. + +**Example - Basic Decompress Interceptor** + +```js +const { Client, interceptors } = require("undici"); +const { decompress } = interceptors; + +const client = new Client("http://example.com").compose( + decompress() +); + +// Automatically decompresses gzip/deflate/brotli/zstd responses +const response = await client.request({ + method: "GET", + path: "/" +}); +``` + +**Example - Custom Options** + +```js +const { Client, interceptors } = require("undici"); +const { decompress } = interceptors; + +const client = new Client("http://example.com").compose( + decompress({ + skipErrorResponses: false, // Decompress 5xx responses + skipStatusCodes: [204, 304, 201] // Skip these status codes + }) +); +``` + +**Supported Encodings** + +- `gzip` / `x-gzip` - GZIP compression +- `deflate` / `x-compress` - DEFLATE compression +- `br` - Brotli compression +- `zstd` - Zstandard compression +- Multiple encodings (e.g., `gzip, deflate`) are supported per RFC-9110 + +**Behavior** + +- Skips decompression for status codes < 200 or >= 400 (configurable) +- Skips decompression for 204 No Content and 304 Not Modified by default +- Removes `content-encoding` and `content-length` headers when decompressing +- Passes through unsupported encodings unchanged +- Handles case-insensitive encoding names +- Supports streaming decompression without buffering + ##### `Cache Interceptor` The `cache` interceptor implements client-side response caching as described in diff --git a/index.js b/index.js index 4ebdabd441d..537958fadf8 100644 --- a/index.js +++ b/index.js @@ -46,7 +46,8 @@ module.exports.interceptors = { retry: require('./lib/interceptor/retry'), dump: require('./lib/interceptor/dump'), dns: require('./lib/interceptor/dns'), - cache: require('./lib/interceptor/cache') + cache: require('./lib/interceptor/cache'), + decompress: require('./lib/interceptor/decompress') } module.exports.cacheStores = { diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js new file mode 100644 index 00000000000..847aefdbf62 --- /dev/null +++ b/lib/interceptor/decompress.js @@ -0,0 +1,253 @@ +'use strict' + +const { createInflate, createGunzip, createBrotliDecompress, createZstdDecompress } = require('node:zlib') +const { pipeline } = require('node:stream') +const DecoratorHandler = require('../handler/decorator-handler') + +/** @typedef {import('node:stream').Transform} Transform */ +/** @typedef {import('node:stream').Transform} Controller */ +/** @typedef {Transform&import('node:zlib').Zlib} DecompressorStream */ + +/** @type {Record DecompressorStream>} */ +const supportedEncodings = { + gzip: createGunzip, + 'x-gzip': createGunzip, + br: createBrotliDecompress, + deflate: createInflate, + compress: createInflate, + 'x-compress': createInflate, + ...(createZstdDecompress ? { zstd: createZstdDecompress } : {}) +} + +const defaultSkipStatusCodes = /** @type {const} */ ([204, 304]) + +let warningEmitted = /** @type {boolean} */ (false) + +/** + * @typedef {Object} DecompressHandlerOptions + * @property {number[]|Readonly} [skipStatusCodes=[204, 304]] - List of status codes to skip decompression for + * @property {boolean} [skipErrorResponses] - Whether to skip decompression for error responses (status codes >= 400) + */ + +class DecompressHandler extends DecoratorHandler { + /** @type {Transform[]} */ + #decompressors = [] + /** @type {NodeJS.WritableStream&NodeJS.ReadableStream|null} */ + #pipelineStream + /** @type {Readonly} */ + #skipStatusCodes + /** @type {boolean} */ + #skipErrorResponses + + constructor (handler, { skipStatusCodes = defaultSkipStatusCodes, skipErrorResponses = true } = {}) { + super(handler) + this.#skipStatusCodes = skipStatusCodes + this.#skipErrorResponses = skipErrorResponses + } + + /** + * Determines if decompression should be skipped based on encoding and status code + * @param {string} contentEncoding - Content-Encoding header value + * @param {number} statusCode - HTTP status code of the response + * @returns {boolean} - True if decompression should be skipped + */ + #shouldSkipDecompression (contentEncoding, statusCode) { + if (!contentEncoding || statusCode < 200) return true + if (this.#skipStatusCodes.includes(statusCode)) return true + if (this.#skipErrorResponses && statusCode >= 400) return true + return false + } + + /** + * Creates a chain of decompressors for multiple content encodings + * + * @param {string} encodings - Comma-separated list of content encodings + * @returns {Array} - Array of decompressor streams + */ + #createDecompressionChain (encodings) { + const parts = encodings.split(',') + + /** @type {DecompressorStream[]} */ + const decompressors = [] + + for (let i = parts.length - 1; i >= 0; i--) { + const encoding = parts[i].trim() + if (!encoding) continue + + if (!supportedEncodings[encoding]) { + decompressors.length = 0 // Clear if unsupported encoding + return decompressors // Unsupported encoding + } + + decompressors.push(supportedEncodings[encoding]()) + } + + return decompressors + } + + /** + * Sets up event handlers for a decompressor stream using readable events + * @param {DecompressorStream} decompressor - The decompressor stream + * @param {Controller} controller - The controller to coordinate with + * @returns {void} + */ + #setupDecompressorEvents (decompressor, controller) { + decompressor.on('readable', () => { + let chunk + while ((chunk = decompressor.read()) !== null) { + const result = super.onResponseData(controller, chunk) + if (result === false) { + break + } + } + }) + + decompressor.on('error', (error) => { + super.onResponseError(controller, error) + }) + } + + /** + * Sets up event handling for a single decompressor + * @param {Controller} controller - The controller to handle events + * @returns {void} + */ + #setupSingleDecompressor (controller) { + const decompressor = this.#decompressors[0] + this.#setupDecompressorEvents(decompressor, controller) + + decompressor.on('end', () => { + super.onResponseEnd(controller, {}) + }) + } + + /** + * Sets up event handling for multiple chained decompressors using pipeline + * @param {Controller} controller - The controller to handle events + * @returns {void} + */ + #setupMultipleDecompressors (controller) { + const lastDecompressor = this.#decompressors[this.#decompressors.length - 1] + this.#setupDecompressorEvents(lastDecompressor, controller) + + this.#pipelineStream = pipeline(this.#decompressors, (err) => { + if (err) { + super.onResponseError(controller, err) + return + } + super.onResponseEnd(controller, {}) + }) + } + + /** + * Cleans up decompressor references to prevent memory leaks + * @returns {void} + */ + #cleanupDecompressors () { + this.#decompressors.length = 0 + this.#pipelineStream = null + } + + /** + * @param {Controller} controller + * @param {number} statusCode + * @param {Record} headers + * @param {string} statusMessage + * @returns {void} + */ + onResponseStart (controller, statusCode, headers, statusMessage) { + const contentEncoding = headers['content-encoding'] + + // If content encoding is not supported or status code is in skip list + if (this.#shouldSkipDecompression(contentEncoding, statusCode)) { + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + const decompressors = this.#createDecompressionChain(contentEncoding.toLowerCase()) + + if (decompressors.length === 0) { + this.#cleanupDecompressors() + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + this.#decompressors = decompressors + + // Remove compression headers since we're decompressing + const { 'content-encoding': _, 'content-length': __, ...newHeaders } = headers + + if (this.#decompressors.length === 1) { + this.#setupSingleDecompressor(controller) + } else { + this.#setupMultipleDecompressors(controller) + } + + super.onResponseStart(controller, statusCode, newHeaders, statusMessage) + } + + /** + * @param {Controller} controller + * @param {Buffer} chunk + * @returns {void} + */ + onResponseData (controller, chunk) { + if (this.#decompressors.length > 0) { + this.#decompressors[0].write(chunk) + return + } + super.onResponseData(controller, chunk) + } + + /** + * @param {Controller} controller + * @param {Record | undefined} trailers + * @returns {void} + */ + onResponseEnd (controller, trailers) { + if (this.#decompressors.length > 0) { + this.#decompressors[0].end() + this.#cleanupDecompressors() + return + } + super.onResponseEnd(controller, trailers) + } + + /** + * @param {Controller} controller + * @param {Error} err + * @returns {void} + */ + onResponseError (controller, err) { + if (this.#decompressors.length > 0) { + for (const decompressor of this.#decompressors) { + decompressor.destroy(err) + } + this.#cleanupDecompressors() + } + super.onResponseError(controller, err) + } +} + +/** + * Creates a decompression interceptor for HTTP responses + * @param {DecompressHandlerOptions} [options] - Options for the interceptor + * @returns {Function} - Interceptor function + */ +function createDecompressInterceptor (options = {}) { + // Emit experimental warning only once + if (!warningEmitted) { + process.emitWarning( + 'DecompressInterceptor is experimental and subject to change', + 'ExperimentalWarning' + ) + warningEmitted = true + } + + return (dispatch) => { + return (opts, handler) => { + const decompressHandler = new DecompressHandler(handler, options) + return dispatch(opts, decompressHandler) + } + } +} + +module.exports = createDecompressInterceptor diff --git a/test/interceptors/decompress.js b/test/interceptors/decompress.js new file mode 100644 index 00000000000..e0737e400c3 --- /dev/null +++ b/test/interceptors/decompress.js @@ -0,0 +1,1003 @@ +'use strict' + +const { test, after } = require('node:test') +const { createServer } = require('node:http') +const { once } = require('node:events') +const { createGzip, createDeflate, createBrotliCompress, createZstdCompress } = require('node:zlib') +const { tspl } = require('@matteo.collina/tspl') + +const { Client, getGlobalDispatcher, setGlobalDispatcher, request } = require('../..') +const createDecompressInterceptor = require('../../lib/interceptor/decompress') + +test('should decompress gzip response', async t => { + t = tspl(t, { plan: 3 }) + + const data = 'This is a test message for gzip compression validation.' + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, data) + + await t.completed +}) + +test('should decompress deflate response', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const deflate = createDeflate() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'deflate' + }) + + const data = 'This message is compressed with deflate algorithm!' + deflate.pipe(res) + deflate.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, 'This message is compressed with deflate algorithm!') + + await t.completed +}) + +test('should decompress brotli response', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const brotli = createBrotliCompress() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'br' + }) + + const data = 'This message is compressed with brotli compression!' + brotli.pipe(res) + brotli.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, 'This message is compressed with brotli compression!') + + await t.completed +}) + +test('should decompress zstd response', { skip: typeof createZstdCompress !== 'function' }, async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const zstd = createZstdCompress() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'zstd' + }) + + const data = 'This message is compressed with zstd compression!' + zstd.pipe(res) + zstd.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, 'This message is compressed with zstd compression!') + + await t.completed +}) + +test('should pass through uncompressed response', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/plain' + }) + res.end('This is uncompressed data') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain') + t.equal(body, 'This is uncompressed data') + + await t.completed +}) + +test('should pass through unsupported encoding', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'unsupported' + }) + res.end('This has unsupported encoding') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], 'unsupported') + t.equal(body, 'This has unsupported encoding') + + await t.completed +}) + +test('should pass through error responses (4xx, 5xx)', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + const gzip = createGzip() + res.writeHead(404, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + const data = 'Not found error message' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 404) + t.equal(response.headers['content-encoding'], 'gzip') + t.notEqual(body, 'Not found error message') + + await t.completed +}) + +test('should pass through 204 No Content responses', async t => { + t = tspl(t, { plan: 2 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.writeHead(204, { + 'Content-Encoding': 'gzip' + }) + res.end() + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(response.statusCode, 204) + t.equal(response.headers['content-encoding'], 'gzip') + + await t.completed +}) + +test('should pass through 304 Not Modified responses', async t => { + t = tspl(t, { plan: 2 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.writeHead(304, { + 'Content-Encoding': 'gzip' + }) + res.end() + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(response.statusCode, 304) + t.equal(response.headers['content-encoding'], 'gzip') + + await t.completed +}) + +test('should handle large compressed responses', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + const largeData = 'A'.repeat(10000) + 'B'.repeat(10000) + 'C'.repeat(10000) + gzip.pipe(res) + gzip.end(largeData) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body.length, 30000) + + await t.completed +}) + +test('should handle case-insensitive content-encoding', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'GZIP' // Uppercase + }) + + const data = 'Case insensitive test' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, 'Case insensitive test') + + await t.completed +}) + +test('should remove content-length header when decompressing', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + const data = 'Test data' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-length'], undefined) + t.equal(body, 'Test data') + + await t.completed +}) + +test('should allow decompressing 5xx responses when skipErrorResponses is false', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(500, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + const data = 'Internal server error message' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor({ skipErrorResponses: false })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 500) + t.equal(response.headers['content-encoding'], undefined) // Should be removed when decompressing + t.equal(body, 'Internal server error message') // Should be decompressed + + await t.completed +}) + +test('should allow custom skipStatusCodes', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(201, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + const data = 'Created response' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + // Skip decompression for 201 status codes + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor({ skipStatusCodes: [201, 204, 304] })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 201) + t.equal(response.headers['content-encoding'], 'gzip') // Should be preserved when skipping + t.notEqual(body, 'Created response') // Should still be compressed + + await t.completed +}) + +test('should decompress multiple encodings in correct order', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + // First compress with gzip, then with deflate (gzip, deflate) + const gzip = createGzip() + const deflate = createDeflate() + + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip, deflate' // Applied in this order + }) + + const data = 'Multiple encoding test message' + + // Pipe: data → gzip → deflate → response + gzip.pipe(deflate) + deflate.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) // Should be removed + t.equal(body, 'Multiple encoding test message') // Should be fully decompressed + + await t.completed +}) + +test('should handle legacy encoding names (x-gzip)', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'x-gzip' // Legacy name + }) + + const data = 'Legacy encoding test' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) // Should be removed + t.equal(body, 'Legacy encoding test') + + await t.completed +}) + +test('should pass through responses with unsupported encoding in chain', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip, unsupported, deflate' // Contains unsupported encoding + }) + res.end('This should pass through unchanged') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], 'gzip, unsupported, deflate') // Should be preserved + t.equal(body, 'This should pass through unchanged') + + await t.completed +}) + +test('should handle empty encoding values', async t => { + t = tspl(t, { plan: 3 }) + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip, ' // Contains empty value at end + }) + + const data = 'Empty encoding value test' + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-encoding'], undefined) + t.equal(body, 'Empty encoding value test') + + await t.completed +}) + +test('should handle multiple pause/resume cycles during decompression', async t => { + t = tspl(t, { plan: 3 }) + + const data = 'Large data chunk for testing multiple pause/resume cycles. '.repeat(1000) + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + + gzip.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + let controller + let callCount = 0 + let responseData = '' + + const handler = { + onRequestStart (ctrl) { + controller = ctrl + }, + + onResponseStart (ctrl, statusCode, headers, statusMessage) { + t.equal(statusCode, 200) + + for (let i = 0; i < 3; i++) { + callCount++ + controller.pause() + controller.resume() + } + }, + + onResponseData (ctrl, chunk) { + responseData += chunk.toString() + }, + + onResponseEnd (ctrl, trailers) { + t.equal(callCount, 3, 'Should have called pause/resume 3 times') + t.equal(responseData, data, 'All data should be received') + }, + + onResponseError (ctrl, err) { + t.fail(err) + } + } + + await client.dispatch({ + method: 'GET', + path: '/' + }, handler) + + await t.completed +}) + +test('should handle controller pause with chained decompression', async t => { + t = tspl(t, { plan: 3 }) + + const data = 'Test data for chained decompression pause/resume functionality' + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + const deflate = createDeflate() + + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip, deflate' + }) + + gzip.pipe(deflate) + deflate.pipe(res) + gzip.end(data) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(createDecompressInterceptor()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + let controller + let pauseResumeWorked = false + let responseData = '' + + const handler = { + onRequestStart (ctrl) { + controller = ctrl + }, + + onResponseStart (ctrl, statusCode, headers, statusMessage) { + t.equal(statusCode, 200) + + try { + controller.pause() + controller.resume() + pauseResumeWorked = true + } catch (err) { + t.fail('Pause/resume should not throw error') + } + }, + + onResponseData (ctrl, chunk) { + responseData += chunk.toString() + }, + + onResponseEnd (ctrl, trailers) { + t.ok(pauseResumeWorked, 'Pause/resume should work with chained decompression') + t.equal(responseData, data, 'Data should be correctly decompressed from chained encodings') + }, + + onResponseError (ctrl, err) { + t.fail(err) + } + } + + await client.dispatch({ + method: 'GET', + path: '/' + }, handler) + + await t.completed +}) + +test('should behave like fetch() for compressed responses', async t => { + t = tspl(t, { plan: 10 }) + + const testData = 'Test data that will be compressed and should be automatically decompressed by both fetch and request with decompress interceptor' + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip' + }) + gzip.pipe(res) + gzip.end(testData) + }) + + server.listen(0) + await once(server, 'listening') + + const baseUrl = `http://localhost:${server.address().port}` + + const { fetch } = require('../..') + const fetchResponse = await fetch(baseUrl) + const fetchBody = await fetchResponse.text() + + const client = new Client(baseUrl) + const requestResponseWithoutDecompression = await client.request({ + method: 'GET', + path: '/' + }) + const requestBodyWithoutDecompression = await requestResponseWithoutDecompression.body.text() + + const clientWithDecompression = client.compose(createDecompressInterceptor()) + const requestResponseWithDecompression = await clientWithDecompression.request({ + method: 'GET', + path: '/' + }) + const requestBodyWithDecompression = await requestResponseWithDecompression.body.text() + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + t.equal(fetchResponse.status, 200) + t.equal(fetchBody, testData, 'fetch should automatically decompress') + t.equal(requestBodyWithDecompression, fetchBody, 'request with decompression interceptor should match fetch behavior') + t.notEqual(requestBodyWithoutDecompression, fetchBody, 'request without decompression interceptor should return compressed data') + t.equal(fetchResponse.headers.get('content-type'), 'text/plain', 'content-type header should be preserved with fetch') + t.equal(fetchResponse.headers.get('content-encoding'), 'gzip', 'content-encoding header should be preserved with fetch') + t.equal(requestResponseWithoutDecompression.headers['content-type'], 'text/plain', 'content-type header should be preserved without decompression') + t.equal(requestResponseWithoutDecompression.headers['content-encoding'], 'gzip', 'content-encoding header should be preserved without decompression') + t.equal(requestResponseWithDecompression.headers['content-type'], 'text/plain', 'content-type header should be preserved with decompression') + t.equal(requestResponseWithDecompression.headers['content-encoding'], undefined, 'content-encoding header should be removed with decompression') + await t.completed +}) + +test('should work with global dispatcher for both fetch() and request()', async t => { + t = tspl(t, { plan: 8 }) + + const testData = 'Global dispatcher test data for decompression interceptor' + + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + const gzip = createGzip() + const chunks = [] + + gzip.on('data', chunk => chunks.push(chunk)) + gzip.on('end', () => { + const compressedData = Buffer.concat(chunks) + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Content-Encoding': 'gzip', + 'Content-Length': compressedData.length + }) + res.end(compressedData) + }) + + gzip.end(testData) + }) + + server.listen(0) + await once(server, 'listening') + + const baseUrl = `http://localhost:${server.address().port}` + + const originalDispatcher = getGlobalDispatcher() + + setGlobalDispatcher(getGlobalDispatcher().compose(createDecompressInterceptor())) + + after(async () => { + setGlobalDispatcher(originalDispatcher) + server.close() + await once(server, 'close') + }) + + const { fetch } = require('../..') + const fetchResponse = await fetch(baseUrl) + const fetchBody = await fetchResponse.text() + + const requestResponse = await request(baseUrl, { + method: 'GET' + }) + const requestBody = await requestResponse.body.text() + + t.equal(fetchResponse.status, 200) + t.equal(fetchBody, testData, 'fetch should automatically decompress with global interceptor') + t.equal(requestResponse.statusCode, 200) + t.equal(requestBody, testData, 'request should automatically decompress with global interceptor') + t.equal(requestResponse.headers['content-encoding'], undefined, 'request content-encoding header should be removed with global interceptor') + t.equal(requestResponse.headers['content-length'], undefined, 'request content-length header should be removed with global interceptor') + t.equal(fetchResponse.headers.get('content-length'), undefined, 'content-length header should be removed with fetch due to global interceptor') + t.equal(fetchResponse.headers.get('content-encoding'), undefined, 'content-encoding header should be removed with fetch due to global interceptor') + + await t.completed +}) diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 11c2e3109ce..05a8b1ca1a3 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -28,6 +28,7 @@ expectAssignable(Undici.FormData) expectAssignable(Undici.interceptors.dump()) expectAssignable(Undici.interceptors.redirect()) expectAssignable(Undici.interceptors.retry()) +expectAssignable(Undici.interceptors.decompress()) expectAssignable(Undici.interceptors.cache()) const dispatcher = new Dispatcher() diff --git a/types/interceptors.d.ts b/types/interceptors.d.ts index 5a6fcb28ba7..74389db2758 100644 --- a/types/interceptors.d.ts +++ b/types/interceptors.d.ts @@ -9,6 +9,10 @@ declare namespace Interceptors { export type DumpInterceptorOpts = { maxSize?: number } export type RetryInterceptorOpts = RetryHandler.RetryOptions export type RedirectInterceptorOpts = { maxRedirections?: number } + export type DecompressInterceptorOpts = { + skipErrorResponses?: boolean + skipStatusCodes?: number[] + } export type ResponseErrorInterceptorOpts = { throwOnError: boolean } export type CacheInterceptorOpts = CacheHandler.CacheOptions @@ -28,6 +32,7 @@ declare namespace Interceptors { export function dump (opts?: DumpInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function retry (opts?: RetryInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function redirect (opts?: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor + export function decompress (opts?: DecompressInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function responseError (opts?: ResponseErrorInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function dns (opts?: DNSInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function cache (opts?: CacheInterceptorOpts): Dispatcher.DispatcherComposeInterceptor