From 3a6a9fe9a8d3640e749c0afb2ab1dd7f106cc60e Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Tue, 23 May 2023 08:24:02 -0400 Subject: [PATCH 1/2] feat: eager batch span processing --- .../src/export/BatchSpanProcessorBase.ts | 208 +++++++++--------- 1 file changed, 101 insertions(+), 107 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index d760ff5809..1c2169ebe1 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -34,8 +34,7 @@ import { SpanExporter } from './SpanExporter'; * the SDK then pushes them to the exporter pipeline. */ export abstract class BatchSpanProcessorBase - implements SpanProcessor -{ + implements SpanProcessor { private readonly _maxExportBatchSize: number; private readonly _maxQueueSize: number; private readonly _scheduledDelayMillis: number; @@ -45,6 +44,8 @@ export abstract class BatchSpanProcessorBase private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; private _droppedSpansCount: number = 0; + private _waitingForEventLoop = false; + private _exports: Promise[] = []; constructor(private readonly _exporter: SpanExporter, config?: T) { const env = getEnv(); @@ -94,147 +95,140 @@ export abstract class BatchSpanProcessorBase return; } - this._addToBuffer(span); + // only possible if multiple spans are added _synchronously_ + if (this._finishedSpans.length >= this._maxQueueSize) { + this._droppedSpansCount++ + return; + } + + this._finishedSpans.push(span); + + // If many spans are added synchronously, we don't want to export a batch + // until the execution yields to the event loop. Otherwise we may end + // up with more than `maxQueueSize` spans effectively queued in the + // event loop which is a memory leak. + if (this._waitingForEventLoop) { + return; + } + + this._waitingForEventLoop = true; + setTimeout(() => { + this._waitingForEventLoop = false; + this._maybeEagerExportBatch(); + this._maybeStartTimer(); + }, 0) + + + // // Alternate to the above. Don't worry about the queue filling synchronously + // // In this scenario, it should be impossible for the queue to fill beyond + // // the size of a single bach because a batch is eagerly exported. + // this._maybeEagerExportBatch(); + // this._maybeStartTimer(); } shutdown(): Promise { return this._shutdownOnce.call(); } - private _shutdown() { - return Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }); + private _maybeEagerExportBatch() { + // if the queue contains enough spans for at least one batch + // export one batch and process again + if (this._finishedSpans.length >= this._maxExportBatchSize) { + this._processOneBatch(); + this._maybeEagerExportBatch(); + } + } - /** Add a span in the buffer. */ - private _addToBuffer(span: ReadableSpan) { - if (this._finishedSpans.length >= this._maxQueueSize) { - // limit reached, drop span + /** + * Send all spans to the exporter respecting the batch size limit + * This function is used only on forceFlush or shutdown, + * for all other cases _flush should be used + * */ + private async _flushAll(): Promise { + const promises = []; + // calculate number of batches + const count = Math.ceil( + this._finishedSpans.length / this._maxExportBatchSize + ); + for (let i = 0, j = count; i < j; i++) { + promises.push(this._exportOneBatch()); + } - if (this._droppedSpansCount === 0) { - diag.debug('maxQueueSize reached, dropping spans'); - } - this._droppedSpansCount++; + await Promise.all(promises); + } - return; - } + private _processOneBatch() { + this._clearTimer(); + const batch = this._finishedSpans.splice(0, this._maxExportBatchSize); if (this._droppedSpansCount > 0) { - // some spans were dropped, log once with count of spans dropped diag.warn( `Dropped ${this._droppedSpansCount} spans because maxQueueSize reached` ); - this._droppedSpansCount = 0; } - this._finishedSpans.push(span); - this._maybeStartTimer(); - } + const exportPromise = Promise.all( + batch + .map(span => span.resource) + .filter(resource => resource.asyncAttributesPending) + .map(res => res.waitForAsyncAttributes?.()) + ) + .then(() => { + this._promisifiedExportBatchWithTimeout(batch) + }); - /** - * Send all spans to the exporter respecting the batch size limit - * This function is used only on forceFlush or shutdown, - * for all other cases _flush should be used - * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil( - this._finishedSpans.length / this._maxExportBatchSize - ); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); - } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + this._exports.push(exportPromise); + + // remove from tracked promises on completion + exportPromise.finally(() => { + this._exports = this._exports.filter(p => p !== exportPromise); + }) + + // start timer if there are still spans in the queue + this._maybeStartTimer(); } - private _flushOneBatch(): Promise { - this._clearTimer(); - if (this._finishedSpans.length === 0) { - return Promise.resolve(); - } - return new Promise((resolve, reject) => { + private _promisifiedExportBatchWithTimeout(batch: ReadableSpan[]): Promise { + // promisify and add to list of tracked export promises + return new Promise((resolve, reject) => { const timer = setTimeout(() => { // don't wait anymore for export, this way the next batch can start reject(new Error('Timeout')); }, this._exportTimeoutMillis); - // prevent downstream exporter calls from generating spans + unrefTimer(timer); + context.with(suppressTracing(context.active()), () => { - // Reset the finished spans buffer here because the next invocations of the _flush method - // could pass the same finished spans to the exporter if the buffer is cleared - // outside the execution of this callback. - const spans = this._finishedSpans.splice(0, this._maxExportBatchSize); - - const doExport = () => - this._exporter.export(spans, result => { - clearTimeout(timer); - if (result.code === ExportResultCode.SUCCESS) { - resolve(); - } else { - reject( - result.error ?? - new Error('BatchSpanProcessor: span export failed') - ); - } - }); - const pendingResources = spans - .map(span => span.resource) - .filter(resource => resource.asyncAttributesPending); - - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (pendingResources.length === 0) { - doExport(); - } else { - Promise.all( - pendingResources.map(resource => - resource.waitForAsyncAttributes?.() - ) - ).then(doExport, err => { - globalErrorHandler(err); - reject(err); - }); - } + this._exporter.export(batch, (result) => { + clearTimeout(timer); + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject( + result.error ?? + new Error('BatchSpanProcessor: span export failed') + ); + } + }); }); }); } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { - this._flushOneBatch() - .then(() => { - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - globalErrorHandler(e); - }); - }, this._scheduledDelayMillis); + if (this._finishedSpans.length > 0 && this._timer == null) { + this._startTimer(this._processOneBatch.bind(this)); + } + } + + private _startTimer(cb: () => void) { + this._timer = setTimeout(cb, this._scheduledDelayMillis); unrefTimer(this._timer); } private _clearTimer() { - if (this._timer !== undefined) { + if (this._timer != null) { clearTimeout(this._timer); this._timer = undefined; } } - - protected abstract onShutdown(): void; } From 8bd21ff1b7caa0ab5770368db7f46ae4cd153378 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Wed, 21 Jun 2023 18:03:54 -0400 Subject: [PATCH 2/2] Limit concurrency --- .../src/export/BatchSpanProcessorBase.ts | 118 ++++++++++-------- .../export/BatchSpanProcessorBase.test.ts | 1 + 2 files changed, 68 insertions(+), 51 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 1c2169ebe1..d9ea06f892 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -19,7 +19,6 @@ import { BindOnceFuture, ExportResultCode, getEnv, - globalErrorHandler, suppressTracing, unrefTimer, } from '@opentelemetry/core'; @@ -44,8 +43,8 @@ export abstract class BatchSpanProcessorBase private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; private _droppedSpansCount: number = 0; - private _waitingForEventLoop = false; - private _exports: Promise[] = []; + private _waiting = false; + private _processing = false; constructor(private readonly _exporter: SpanExporter, config?: T) { const env = getEnv(); @@ -95,7 +94,6 @@ export abstract class BatchSpanProcessorBase return; } - // only possible if multiple spans are added _synchronously_ if (this._finishedSpans.length >= this._maxQueueSize) { this._droppedSpansCount++ return; @@ -103,41 +101,31 @@ export abstract class BatchSpanProcessorBase this._finishedSpans.push(span); - // If many spans are added synchronously, we don't want to export a batch - // until the execution yields to the event loop. Otherwise we may end - // up with more than `maxQueueSize` spans effectively queued in the - // event loop which is a memory leak. - if (this._waitingForEventLoop) { - return; - } - - this._waitingForEventLoop = true; - setTimeout(() => { - this._waitingForEventLoop = false; - this._maybeEagerExportBatch(); - this._maybeStartTimer(); - }, 0) - - - // // Alternate to the above. Don't worry about the queue filling synchronously - // // In this scenario, it should be impossible for the queue to fill beyond - // // the size of a single bach because a batch is eagerly exported. - // this._maybeEagerExportBatch(); - // this._maybeStartTimer(); + this._maybeEagerlyProcessBatch(); + this._maybeStartTimer(); } shutdown(): Promise { return this._shutdownOnce.call(); } - private _maybeEagerExportBatch() { - // if the queue contains enough spans for at least one batch - // export one batch and process again - if (this._finishedSpans.length >= this._maxExportBatchSize) { + private _shutdown() { + return Promise.resolve() + .then(() => { + return this.onShutdown(); + }) + .then(() => { + return this._flushAll(); + }) + .then(() => { + return this._exporter.shutdown(); + }); + } + + private _maybeEagerlyProcessBatch() { + if (!this._processing && this._finishedSpans.length >= this._maxExportBatchSize) { this._processOneBatch(); - this._maybeEagerExportBatch(); } - } /** @@ -152,41 +140,70 @@ export abstract class BatchSpanProcessorBase this._finishedSpans.length / this._maxExportBatchSize ); for (let i = 0, j = count; i < j; i++) { - promises.push(this._exportOneBatch()); + promises.push(this._processOneBatch()); } await Promise.all(promises); } private _processOneBatch() { + // because we eagerly export, we need to limit concurrency or we may have infinite concurrent exports + if (this._processing) { + this._waiting = true; + return; + }; + this._waiting = false; + this._processing = true; this._clearTimer(); + + if (this._shutdownOnce.isCalled) { + return; + } + + // TODO decide if we want to favor spans without pending async resource attributes + // this is only important if a span processor is shared with multiple tracer providers + // this._finishedSpans.sort((a, b) => { + // if (a.resource.asyncAttributesPending == b.resource.asyncAttributesPending) return 0; + // if (a.resource.asyncAttributesPending && !b.resource.asyncAttributesPending) return 1; + // return -1; + // }); + + // drain 1 batch from queue. const batch = this._finishedSpans.splice(0, this._maxExportBatchSize); + console.log('drained', batch.length, 'spans from the queue'); + console.log('queue length', this._finishedSpans.length); + + + // start timer if there are still spans in the queue + this._maybeStartTimer(); + if (this._droppedSpansCount > 0) { diag.warn( `Dropped ${this._droppedSpansCount} spans because maxQueueSize reached` ); } + this._droppedSpansCount = 0; - const exportPromise = Promise.all( + Promise.all( batch .map(span => span.resource) .filter(resource => resource.asyncAttributesPending) .map(res => res.waitForAsyncAttributes?.()) ) - .then(() => { - this._promisifiedExportBatchWithTimeout(batch) + .then(() => this._promisifiedExportBatchWithTimeout(batch)) + .catch((err) => { + // TODO improve error message with some contextual info and err object + diag.error("Failed to export batch") + }) + .finally(() => { + this._processing = false; + if (this._waiting) { + this._processOneBatch(); + } + this._maybeEagerlyProcessBatch(); + this._maybeStartTimer(); }); - - this._exports.push(exportPromise); - - // remove from tracked promises on completion - exportPromise.finally(() => { - this._exports = this._exports.filter(p => p !== exportPromise); - }) - - // start timer if there are still spans in the queue - this._maybeStartTimer(); } private _promisifiedExportBatchWithTimeout(batch: ReadableSpan[]): Promise { @@ -194,6 +211,7 @@ export abstract class BatchSpanProcessorBase return new Promise((resolve, reject) => { const timer = setTimeout(() => { // don't wait anymore for export, this way the next batch can start + // we can't cancel the export so we have to just forget about it reject(new Error('Timeout')); }, this._exportTimeoutMillis); unrefTimer(timer); @@ -216,19 +234,17 @@ export abstract class BatchSpanProcessorBase private _maybeStartTimer() { if (this._finishedSpans.length > 0 && this._timer == null) { - this._startTimer(this._processOneBatch.bind(this)); + this._timer = setTimeout(this._processOneBatch.bind(this), this._scheduledDelayMillis); + unrefTimer(this._timer); } } - private _startTimer(cb: () => void) { - this._timer = setTimeout(cb, this._scheduledDelayMillis); - unrefTimer(this._timer); - } - private _clearTimer() { if (this._timer != null) { clearTimeout(this._timer); this._timer = undefined; } } + + protected abstract onShutdown(): void; } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc59..0aaddf93bb 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -150,6 +150,7 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 1); await processor.shutdown(); + console.log(spy.args) assert.strictEqual(spy.args.length, 2); assert.strictEqual(exporter.getFinishedSpans().length, 0);