Skip to content

Commit

Permalink
fix: Use an AbortSignal for per-request timeouts (#96)
Browse files Browse the repository at this point in the history
A potential fix for
#63, largely
inspired by a community member's PR that was never merged:
#55

According to an Undici core committer in this comment
elastic/elasticsearch-js#1716 (comment)
the issue that triggers the MaxListenersExceededWarning, and possibly a
memory leak in some cases, is caused by attaching an EventEmitter to
each request by default when a per-request timeout is set, rather than
attaching an AbortSignal.

My assumption is that an EventEmitter was used because AbortSignal and
AbortController were not added to Node.js until v14.17.0, so we couldn't
guarantee v14 users would have it. I'm not certain why using
EventEmitters makes a difference memory-wise, but it does get rid of the
MaxListenersExceededWarning.
  • Loading branch information
JoshMock authored May 21, 2024
1 parent 588f88e commit 063a4bb
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 32 deletions.
14 changes: 3 additions & 11 deletions src/connection/UndiciConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

/* eslint-disable @typescript-eslint/restrict-template-expressions */

import { EventEmitter } from 'events'
import Debug from 'debug'
import buffer from 'buffer'
import { TLSSocket } from 'tls'
Expand All @@ -41,7 +40,7 @@ import {
TimeoutError
} from '../errors'
import { UndiciAgentOptions } from '../types'
import { kCaFingerprint, kEmitter } from '../symbols'
import { kCaFingerprint } from '../symbols'

const debug = Debug('elasticsearch')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
Expand All @@ -50,7 +49,6 @@ const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH

export default class Connection extends BaseConnection {
pool: Pool
[kEmitter]: EventEmitter

constructor (opts: ConnectionOptions) {
super(opts)
Expand All @@ -67,8 +65,6 @@ export default class Connection extends BaseConnection {
throw new ConfigurationError('Bad agent configuration for Undici agent')
}

this[kEmitter] = new EventEmitter()
this[kEmitter].setMaxListeners(this.maxEventListeners)
const undiciOptions: Pool.Options = {
keepAliveTimeout: 600e3,
keepAliveMaxTimeout: 600e3,
Expand Down Expand Up @@ -124,7 +120,7 @@ export default class Connection extends BaseConnection {
path: params.path + (params.querystring == null || params.querystring === '' ? '' : `?${params.querystring}`),
headers: Object.assign({}, this.headers, params.headers),
body: params.body,
signal: options.signal ?? this[kEmitter]
signal: options.signal ?? new AbortController().signal
}

if (requestParams.path[0] !== '/') {
Expand All @@ -141,11 +137,7 @@ export default class Connection extends BaseConnection {
if (options.timeout != null && options.timeout !== this.timeout) {
timeoutId = setTimeout(() => {
timedout = true
if (options.signal != null) {
options.signal.dispatchEvent(new Event('abort'))
} else {
this[kEmitter].emit('abort')
}
requestParams.signal.dispatchEvent(new Event('abort'))
}, options.timeout)
}

Expand Down
1 change: 0 additions & 1 deletion src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export const kNodeFilter = Symbol('node filter')
export const kNodeSelector = Symbol('node selector')
export const kJsonOptions = Symbol('secure json parse options')
export const kStatus = Symbol('status')
export const kEmitter = Symbol('event emitter')
export const kProductCheck = Symbol('product check')
export const kCaFingerprint = Symbol('ca fingerprint')
export const kMaxResponseSize = Symbol('max response size')
Expand Down
20 changes: 0 additions & 20 deletions test/unit/undici-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1064,26 +1064,6 @@ test('Path without intial slash', async t => {
server.stop()
})

test('Should increase number of max event listeners', async t => {
t.plan(1)

function handler (req: http.IncomingMessage, res: http.ServerResponse) {
res.end('ok')
}

const [{ port }, server] = await buildServer(handler, { secure: true })
const connection = new UndiciConnection({
url: new URL(`https://localhost:${port}`),
maxEventListeners: 100,
})
const res = await connection.request({
path: '/hello',
method: 'GET'
}, options)
t.equal(res.body, 'ok')
server.stop()
})

test('as stream', async t => {
t.plan(2)

Expand Down

0 comments on commit 063a4bb

Please sign in to comment.