diff --git a/packages/dd-trace/src/exporters/agent/writer.js b/packages/dd-trace/src/exporters/agent/writer.js index 7a17299c73e..c1ac7b1dd26 100644 --- a/packages/dd-trace/src/exporters/agent/writer.js +++ b/packages/dd-trace/src/exporters/agent/writer.js @@ -8,6 +8,9 @@ const tracerVersion = require('../../../../../package.json').version const BaseWriter = require('../common/writer') const METRIC_PREFIX = 'datadog.tracer.node.exporter.agent' +const MAX_RETRY_QUEUE_SIZE = 1000 +const DEFAULT_INITIAL_BACKOFF_MS = 1000 // 1 second +const DEFAULT_MAX_BACKOFF_MS = 30000 // 30 seconds class Writer extends BaseWriter { constructor ({ prioritySampler, lookup, protocolVersion, headers, config = {} }) { @@ -20,6 +23,14 @@ class Writer extends BaseWriter { this._headers = headers this._config = config this._encoder = new AgentEncoder(this) + + // Retry queue for handling 429 responses with exponential backoff + this._retryQueue = [] + this._initialBackoff = config.initialBackoff || DEFAULT_INITIAL_BACKOFF_MS + this._maxBackoff = config.maxBackoff || DEFAULT_MAX_BACKOFF_MS + this._currentBackoff = this._initialBackoff + this._retryInProgress = false + this._retryTimeout = null } _sendPayload (data, count, done) { @@ -41,6 +52,14 @@ class Writer extends BaseWriter { startupLog({ agentError: err }) + // Handle 429 Too Many Requests - queue for retry with exponential backoff + if (status === 429) { + log.debug('Received 429 from agent, queueing payload for retry') + this._queueForRetry(data, count) + done() + return + } + if (err) { log.errorWithoutTelemetry('Error sending payload to the agent (status code: %s)', err.status, err) done() @@ -60,6 +79,86 @@ class Writer extends BaseWriter { done() }) } + + _queueForRetry (data, count) { + // Don't infinitely queue failed payloads to prevent unbounded memory growth + if (this._retryQueue.length >= MAX_RETRY_QUEUE_SIZE) { + log.debug('Retry queue is full, dropping payload') + return + } + + this._retryQueue.push({ data, count }) + log.debug('Queued payload for retry. Queue size: %d', this._retryQueue.length) + + // Start processing the retry queue if not already in progress + if (!this._retryInProgress) { + this._processRetryQueue() + } + } + + _processRetryQueue () { + if (this._retryInProgress || this._retryQueue.length === 0) { + return + } + + this._retryInProgress = true + this._retryNextPayload() + } + + _retryNextPayload () { + if (this._retryQueue.length === 0) { + this._retryInProgress = false + this._retryTimeout = null + return + } + + log.debug('Retrying payload after %dms backoff', this._currentBackoff) + + this._retryTimeout = setTimeout(() => { + this._retryTimeout = null + + // Check again after the timeout in case queue was cleared + if (this._retryQueue.length === 0) { + this._retryInProgress = false + return + } + + const payload = this._retryQueue.shift() + const { data, count } = payload + + const { _headers, _lookup, _protocolVersion, _url } = this + makeRequest(_protocolVersion, data, count, _url, _headers, _lookup, false, (err, res, status) => { + if (status === 429) { + // Still getting 429, requeue and increase backoff + log.debug('Retry still received 429, requeueing') + this._retryQueue.unshift(payload) // Put it back at the front + this._currentBackoff = Math.min(this._currentBackoff * 2, this._maxBackoff) + this._retryNextPayload() + } else if (err) { + // Other errors, drop the payload and continue with next + log.debug('Retry failed with error: %s', err.message) + this._retryNextPayload() + } else { + // Success! Reset backoff and continue + log.debug('Retry succeeded, resetting backoff') + this._currentBackoff = this._initialBackoff + + try { + this._prioritySampler.update(JSON.parse(res).rate_by_service) + } catch (e) { + log.error('Error updating prioritySampler rates', e) + } + + this._retryNextPayload() + } + }) + }, this._currentBackoff) + + // Unref the timeout so it doesn't keep the process alive + if (this._retryTimeout.unref) { + this._retryTimeout.unref() + } + } } function setHeader (headers, key, value) { @@ -82,7 +181,8 @@ function makeRequest (version, data, count, url, headers, lookup, needsStartupLo ...headers, 'Content-Type': 'application/msgpack', 'Datadog-Meta-Tracer-Version': tracerVersion, - 'X-Datadog-Trace-Count': String(count) + 'X-Datadog-Trace-Count': String(count), + 'Datadog-Send-Real-Http-Status': 'true' }, lookup, url diff --git a/packages/dd-trace/test/exporters/agent/writer.spec.js b/packages/dd-trace/test/exporters/agent/writer.spec.js index bb5278eb054..efb2919df10 100644 --- a/packages/dd-trace/test/exporters/agent/writer.spec.js +++ b/packages/dd-trace/test/exporters/agent/writer.spec.js @@ -45,7 +45,8 @@ function describeWriter (protocolVersion) { log = { error: sinon.spy(), - errorWithoutTelemetry: sinon.spy() + errorWithoutTelemetry: sinon.spy(), + debug: sinon.spy() } const AgentEncoder = function () { @@ -59,7 +60,13 @@ function describeWriter (protocolVersion) { '../../../../../package.json': { version: 'tracerVersion' }, '../../log': log }) - writer = new Writer({ url, prioritySampler, protocolVersion }) + + // Use shorter backoff times for testing + const config = { + initialBackoff: 50, // 50ms instead of 1s + maxBackoff: 500 // 500ms instead of 30s + } + writer = new Writer({ url, prioritySampler, protocolVersion, config }) process.nextTick(done) }) @@ -122,7 +129,8 @@ function describeWriter (protocolVersion) { 'Datadog-Meta-Lang-Version': process.version, 'Datadog-Meta-Lang-Interpreter': 'v8', 'Datadog-Meta-Tracer-Version': 'tracerVersion', - 'X-Datadog-Trace-Count': '2' + 'X-Datadog-Trace-Count': '2', + 'Datadog-Send-Real-Http-Status': 'true' }, lookup: undefined }) @@ -145,12 +153,22 @@ function describeWriter (protocolVersion) { 'Datadog-Meta-Lang-Version': process.version, 'Datadog-Meta-Lang-Interpreter': 'v8', 'Datadog-Meta-Tracer-Version': 'tracerVersion', - 'X-Datadog-Trace-Count': '2' + 'X-Datadog-Trace-Count': '2', + 'Datadog-Send-Real-Http-Status': 'true' }) done() }) }) + it('should include Datadog-Send-Real-Http-Status header', (done) => { + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + writer.flush(() => { + expect(request.getCall(0).args[1].headers).to.have.property('Datadog-Send-Real-Http-Status', 'true') + done() + }) + }) + it('should log request errors', done => { const error = new Error('boom') error.status = 42 @@ -158,9 +176,7 @@ function describeWriter (protocolVersion) { request.yields(error) encoder.count.returns(1) - writer.flush() - - setTimeout(() => { + writer.flush(() => { expect(log.errorWithoutTelemetry) .to.have.been.calledWith('Error sending payload to the agent (status code: %s)', error.status, error) @@ -178,10 +194,131 @@ function describeWriter (protocolVersion) { }) }) + it('should queue payload for retry on 429 response', (done) => { + request.yieldsAsync(new Error('Too Many Requests'), null, 429) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + // First request should have been made + expect(request).to.have.been.calledOnce + // Retry process should be in progress + expect(writer._retryInProgress).to.equal(true) + // Check queue before setTimeout processes it + setTimeout(() => { + // The item should still be in queue waiting for backoff timeout + expect(writer._retryQueue).to.have.lengthOf(1) + done() + }, 100) + }) + }) + + it('should not queue payload for retry on non-429 errors', (done) => { + const error = new Error('Server Error') + error.status = 500 + request.yieldsAsync(error, null, 500) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + expect(request).to.have.been.calledOnce + // Should not queue non-429 errors + expect(writer._retryQueue).to.have.lengthOf(0) + done() + }) + }) + + it('should not queue if retry queue is at max capacity', (done) => { + request.yieldsAsync(new Error('Too Many Requests'), null, 429) + + // Fill the retry queue to max capacity + writer._retryQueue = new Array(1000).fill({ data: [Buffer.from('old')], count: 1 }) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + // Queue should still be at max + expect(writer._retryQueue).to.have.lengthOf(1000) + done() + }) + }) + + it('should retry queued payloads with exponential backoff', (done) => { + // First call returns 429, second call succeeds + request.onFirstCall().yieldsAsync(new Error('Too Many Requests'), null, 429) + request.onSecondCall().yieldsAsync(null, response, 200) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + // After first flush, should have queued the payload + // The retry process starts automatically + expect(writer._retryInProgress).to.equal(true) + + // Wait for retry to complete (initial backoff is 50ms in tests) + setTimeout(() => { + expect(request).to.have.been.calledTwice + expect(writer._retryInProgress).to.equal(false) + done() + }, 150) + }) + }) + + it('should reset backoff after successful retry', (done) => { + // First two calls return 429, third succeeds + request.onCall(0).yieldsAsync(new Error('Too Many Requests'), null, 429) + request.onCall(1).yieldsAsync(new Error('Too Many Requests'), null, 429) + request.onCall(2).yieldsAsync(null, response, 200) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + // Wait for retries to complete + // Backoffs: 50ms, 100ms + setTimeout(() => { + // After success, backoff should be reset to 50ms + expect(writer._currentBackoff).to.equal(50) + expect(request).to.have.been.calledThrice + done() + }, 250) + }) + }) + + it('should enforce maximum backoff time', (done) => { + // Simulate multiple failures to test max backoff + // Calls 0-4 return 429, call 5 succeeds + for (let i = 0; i < 5; i++) { + request.onCall(i).yieldsAsync(new Error('Too Many Requests'), null, 429) + } + request.onCall(5).yieldsAsync(null, response, 200) + + encoder.count.returns(1) + encoder.makePayload.returns([Buffer.from('data')]) + + writer.flush(() => { + // After multiple 429s, backoff should cap at 500ms (test max) + // Backoffs: 50ms, 100ms, 200ms, 400ms, 500ms (capped) + setTimeout(() => { + // Backoff should not exceed maximum (500ms in tests) + expect(writer._currentBackoff).to.be.at.most(500) + done() + }, 2000) + }) + }) + context('with the url as a unix socket', () => { beforeEach(() => { url = new URL('unix:/path/to/somesocket.sock') - writer = new Writer({ url, protocolVersion }) + const config = { + initialBackoff: 50, + maxBackoff: 500 + } + writer = new Writer({ url, protocolVersion, config }) }) it('should make a request to the socket', () => {