Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion packages/dd-trace/src/exporters/agent/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
const BaseWriter = require('../common/writer')

const METRIC_PREFIX = 'datadog.tracer.node.exporter.agent'
const MAX_RETRY_QUEUE_SIZE = 1000
const DEFAULT_INITIAL_BACKOFF_MS = 1000 // 1 second
const DEFAULT_MAX_BACKOFF_MS = 30000 // 30 seconds

Check failure on line 13 in packages/dd-trace/src/exporters/agent/writer.js

View workflow job for this annotation

GitHub Actions / lint

Invalid group length in numeric value

class Writer extends BaseWriter {
constructor ({ prioritySampler, lookup, protocolVersion, headers, config = {} }) {
Expand All @@ -20,6 +23,14 @@
this._headers = headers
this._config = config
this._encoder = new AgentEncoder(this)

// Retry queue for handling 429 responses with exponential backoff
this._retryQueue = []
this._initialBackoff = config.initialBackoff || DEFAULT_INITIAL_BACKOFF_MS
this._maxBackoff = config.maxBackoff || DEFAULT_MAX_BACKOFF_MS
this._currentBackoff = this._initialBackoff
this._retryInProgress = false
this._retryTimeout = null
}

_sendPayload (data, count, done) {
Expand All @@ -41,6 +52,14 @@

startupLog({ agentError: err })

// Handle 429 Too Many Requests - queue for retry with exponential backoff
if (status === 429) {
log.debug('Received 429 from agent, queueing payload for retry')
this._queueForRetry(data, count)
done()
return
}

if (err) {
log.errorWithoutTelemetry('Error sending payload to the agent (status code: %s)', err.status, err)
done()
Expand All @@ -60,6 +79,86 @@
done()
})
}

_queueForRetry (data, count) {
// Don't infinitely queue failed payloads to prevent unbounded memory growth
if (this._retryQueue.length >= MAX_RETRY_QUEUE_SIZE) {
log.debug('Retry queue is full, dropping payload')
return
}

this._retryQueue.push({ data, count })
log.debug('Queued payload for retry. Queue size: %d', this._retryQueue.length)

// Start processing the retry queue if not already in progress
if (!this._retryInProgress) {
this._processRetryQueue()
}
}

_processRetryQueue () {
if (this._retryInProgress || this._retryQueue.length === 0) {
return
}

this._retryInProgress = true
this._retryNextPayload()
}

_retryNextPayload () {
if (this._retryQueue.length === 0) {
this._retryInProgress = false
this._retryTimeout = null
return
}

log.debug('Retrying payload after %dms backoff', this._currentBackoff)

this._retryTimeout = setTimeout(() => {
this._retryTimeout = null

// Check again after the timeout in case queue was cleared
if (this._retryQueue.length === 0) {
this._retryInProgress = false
return
}

const payload = this._retryQueue.shift()
const { data, count } = payload

const { _headers, _lookup, _protocolVersion, _url } = this
makeRequest(_protocolVersion, data, count, _url, _headers, _lookup, false, (err, res, status) => {
if (status === 429) {
// Still getting 429, requeue and increase backoff
log.debug('Retry still received 429, requeueing')
this._retryQueue.unshift(payload) // Put it back at the front
this._currentBackoff = Math.min(this._currentBackoff * 2, this._maxBackoff)
this._retryNextPayload()
} else if (err) {
// Other errors, drop the payload and continue with next
log.debug('Retry failed with error: %s', err.message)
this._retryNextPayload()
} else {
// Success! Reset backoff and continue
log.debug('Retry succeeded, resetting backoff')
this._currentBackoff = this._initialBackoff

try {
this._prioritySampler.update(JSON.parse(res).rate_by_service)
} catch (e) {
log.error('Error updating prioritySampler rates', e)
}

this._retryNextPayload()
}
})
}, this._currentBackoff)

// Unref the timeout so it doesn't keep the process alive
if (this._retryTimeout.unref) {
this._retryTimeout.unref()
}
}
}

function setHeader (headers, key, value) {
Expand All @@ -82,7 +181,8 @@
...headers,
'Content-Type': 'application/msgpack',
'Datadog-Meta-Tracer-Version': tracerVersion,
'X-Datadog-Trace-Count': String(count)
'X-Datadog-Trace-Count': String(count),
'Datadog-Send-Real-Http-Status': 'true'
},
lookup,
url
Expand Down
153 changes: 145 additions & 8 deletions packages/dd-trace/test/exporters/agent/writer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

log = {
error: sinon.spy(),
errorWithoutTelemetry: sinon.spy()
errorWithoutTelemetry: sinon.spy(),
debug: sinon.spy()
}

const AgentEncoder = function () {
Expand All @@ -59,7 +60,13 @@
'../../../../../package.json': { version: 'tracerVersion' },
'../../log': log
})
writer = new Writer({ url, prioritySampler, protocolVersion })

// Use shorter backoff times for testing
const config = {
initialBackoff: 50, // 50ms instead of 1s

Check failure on line 66 in packages/dd-trace/test/exporters/agent/writer.spec.js

View workflow job for this annotation

GitHub Actions / lint

Multiple spaces found before '// 50ms instea...'
maxBackoff: 500 // 500ms instead of 30s

Check failure on line 67 in packages/dd-trace/test/exporters/agent/writer.spec.js

View workflow job for this annotation

GitHub Actions / lint

Multiple spaces found before '// 500ms inste...'
}
writer = new Writer({ url, prioritySampler, protocolVersion, config })

process.nextTick(done)
})
Expand Down Expand Up @@ -122,7 +129,8 @@
'Datadog-Meta-Lang-Version': process.version,
'Datadog-Meta-Lang-Interpreter': 'v8',
'Datadog-Meta-Tracer-Version': 'tracerVersion',
'X-Datadog-Trace-Count': '2'
'X-Datadog-Trace-Count': '2',
'Datadog-Send-Real-Http-Status': 'true'
},
lookup: undefined
})
Expand All @@ -145,22 +153,30 @@
'Datadog-Meta-Lang-Version': process.version,
'Datadog-Meta-Lang-Interpreter': 'v8',
'Datadog-Meta-Tracer-Version': 'tracerVersion',
'X-Datadog-Trace-Count': '2'
'X-Datadog-Trace-Count': '2',
'Datadog-Send-Real-Http-Status': 'true'
})
done()
})
})

it('should include Datadog-Send-Real-Http-Status header', (done) => {
encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])
writer.flush(() => {
expect(request.getCall(0).args[1].headers).to.have.property('Datadog-Send-Real-Http-Status', 'true')
done()
})
})

it('should log request errors', done => {
const error = new Error('boom')
error.status = 42

request.yields(error)

encoder.count.returns(1)
writer.flush()

setTimeout(() => {
writer.flush(() => {
expect(log.errorWithoutTelemetry)
.to.have.been.calledWith('Error sending payload to the agent (status code: %s)',
error.status, error)
Expand All @@ -178,10 +194,131 @@
})
})

it('should queue payload for retry on 429 response', (done) => {
request.yieldsAsync(new Error('Too Many Requests'), null, 429)

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
// First request should have been made
expect(request).to.have.been.calledOnce
// Retry process should be in progress
expect(writer._retryInProgress).to.equal(true)
// Check queue before setTimeout processes it
setTimeout(() => {
// The item should still be in queue waiting for backoff timeout
expect(writer._retryQueue).to.have.lengthOf(1)
done()
}, 100)
})
})

it('should not queue payload for retry on non-429 errors', (done) => {
const error = new Error('Server Error')
error.status = 500
request.yieldsAsync(error, null, 500)

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
expect(request).to.have.been.calledOnce
// Should not queue non-429 errors
expect(writer._retryQueue).to.have.lengthOf(0)
done()
})
})

it('should not queue if retry queue is at max capacity', (done) => {
request.yieldsAsync(new Error('Too Many Requests'), null, 429)

// Fill the retry queue to max capacity
writer._retryQueue = new Array(1000).fill({ data: [Buffer.from('old')], count: 1 })

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
// Queue should still be at max
expect(writer._retryQueue).to.have.lengthOf(1000)
done()
})
})

it('should retry queued payloads with exponential backoff', (done) => {
// First call returns 429, second call succeeds
request.onFirstCall().yieldsAsync(new Error('Too Many Requests'), null, 429)
request.onSecondCall().yieldsAsync(null, response, 200)

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
// After first flush, should have queued the payload
// The retry process starts automatically
expect(writer._retryInProgress).to.equal(true)

// Wait for retry to complete (initial backoff is 50ms in tests)
setTimeout(() => {
expect(request).to.have.been.calledTwice
expect(writer._retryInProgress).to.equal(false)
done()
}, 150)
})
})

it('should reset backoff after successful retry', (done) => {
// First two calls return 429, third succeeds
request.onCall(0).yieldsAsync(new Error('Too Many Requests'), null, 429)
request.onCall(1).yieldsAsync(new Error('Too Many Requests'), null, 429)
request.onCall(2).yieldsAsync(null, response, 200)

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
// Wait for retries to complete
// Backoffs: 50ms, 100ms
setTimeout(() => {
// After success, backoff should be reset to 50ms
expect(writer._currentBackoff).to.equal(50)
expect(request).to.have.been.calledThrice
done()
}, 250)
})
})

it('should enforce maximum backoff time', (done) => {
// Simulate multiple failures to test max backoff
// Calls 0-4 return 429, call 5 succeeds
for (let i = 0; i < 5; i++) {
request.onCall(i).yieldsAsync(new Error('Too Many Requests'), null, 429)
}
request.onCall(5).yieldsAsync(null, response, 200)

encoder.count.returns(1)
encoder.makePayload.returns([Buffer.from('data')])

writer.flush(() => {
// After multiple 429s, backoff should cap at 500ms (test max)
// Backoffs: 50ms, 100ms, 200ms, 400ms, 500ms (capped)
setTimeout(() => {
// Backoff should not exceed maximum (500ms in tests)
expect(writer._currentBackoff).to.be.at.most(500)
done()
}, 2000)
})
})

context('with the url as a unix socket', () => {
beforeEach(() => {
url = new URL('unix:/path/to/somesocket.sock')
writer = new Writer({ url, protocolVersion })
const config = {
initialBackoff: 50,
maxBackoff: 500
}
writer = new Writer({ url, protocolVersion, config })
})

it('should make a request to the socket', () => {
Expand Down
Loading