diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index d760ff5809..d9ea06f892 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -19,7 +19,6 @@ import { BindOnceFuture, ExportResultCode, getEnv, - globalErrorHandler, suppressTracing, unrefTimer, } from '@opentelemetry/core'; @@ -34,8 +33,7 @@ import { SpanExporter } from './SpanExporter'; * the SDK then pushes them to the exporter pipeline. */ export abstract class BatchSpanProcessorBase - implements SpanProcessor -{ + implements SpanProcessor { private readonly _maxExportBatchSize: number; private readonly _maxQueueSize: number; private readonly _scheduledDelayMillis: number; @@ -45,6 +43,8 @@ export abstract class BatchSpanProcessorBase private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; private _droppedSpansCount: number = 0; + private _waiting = false; + private _processing = false; constructor(private readonly _exporter: SpanExporter, config?: T) { const env = getEnv(); @@ -94,7 +94,15 @@ export abstract class BatchSpanProcessorBase return; } - this._addToBuffer(span); + if (this._finishedSpans.length >= this._maxQueueSize) { + this._droppedSpansCount++ + return; + } + + this._finishedSpans.push(span); + + this._maybeEagerlyProcessBatch(); + this._maybeStartTimer(); } shutdown(): Promise { @@ -114,29 +122,10 @@ export abstract class BatchSpanProcessorBase }); } - /** Add a span in the buffer. */ - private _addToBuffer(span: ReadableSpan) { - if (this._finishedSpans.length >= this._maxQueueSize) { - // limit reached, drop span - - if (this._droppedSpansCount === 0) { - diag.debug('maxQueueSize reached, dropping spans'); - } - this._droppedSpansCount++; - - return; - } - - if (this._droppedSpansCount > 0) { - // some spans were dropped, log once with count of spans dropped - diag.warn( - `Dropped ${this._droppedSpansCount} spans because maxQueueSize reached` - ); - this._droppedSpansCount = 0; + private _maybeEagerlyProcessBatch() { + if (!this._processing && this._finishedSpans.length >= this._maxExportBatchSize) { + this._processOneBatch(); } - - this._finishedSpans.push(span); - this._maybeStartTimer(); } /** @@ -144,93 +133,114 @@ export abstract class BatchSpanProcessorBase * This function is used only on forceFlush or shutdown, * for all other cases _flush should be used * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil( - this._finishedSpans.length / this._maxExportBatchSize - ); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); - } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + private async _flushAll(): Promise { + const promises = []; + // calculate number of batches + const count = Math.ceil( + this._finishedSpans.length / this._maxExportBatchSize + ); + for (let i = 0, j = count; i < j; i++) { + promises.push(this._processOneBatch()); + } + + await Promise.all(promises); } - private _flushOneBatch(): Promise { + private _processOneBatch() { + // because we eagerly export, we need to limit concurrency or we may have infinite concurrent exports + if (this._processing) { + this._waiting = true; + return; + }; + this._waiting = false; + this._processing = true; this._clearTimer(); - if (this._finishedSpans.length === 0) { - return Promise.resolve(); + + if (this._shutdownOnce.isCalled) { + return; + } + + // TODO decide if we want to favor spans without pending async resource attributes + // this is only important if a span processor is shared with multiple tracer providers + // this._finishedSpans.sort((a, b) => { + // if (a.resource.asyncAttributesPending == b.resource.asyncAttributesPending) return 0; + // if (a.resource.asyncAttributesPending && !b.resource.asyncAttributesPending) return 1; + // return -1; + // }); + + // drain 1 batch from queue. + const batch = this._finishedSpans.splice(0, this._maxExportBatchSize); + + console.log('drained', batch.length, 'spans from the queue'); + console.log('queue length', this._finishedSpans.length); + + + // start timer if there are still spans in the queue + this._maybeStartTimer(); + + if (this._droppedSpansCount > 0) { + diag.warn( + `Dropped ${this._droppedSpansCount} spans because maxQueueSize reached` + ); } - return new Promise((resolve, reject) => { + this._droppedSpansCount = 0; + + Promise.all( + batch + .map(span => span.resource) + .filter(resource => resource.asyncAttributesPending) + .map(res => res.waitForAsyncAttributes?.()) + ) + .then(() => this._promisifiedExportBatchWithTimeout(batch)) + .catch((err) => { + // TODO improve error message with some contextual info and err object + diag.error("Failed to export batch") + }) + .finally(() => { + this._processing = false; + if (this._waiting) { + this._processOneBatch(); + } + this._maybeEagerlyProcessBatch(); + this._maybeStartTimer(); + }); + } + + private _promisifiedExportBatchWithTimeout(batch: ReadableSpan[]): Promise { + // promisify and add to list of tracked export promises + return new Promise((resolve, reject) => { const timer = setTimeout(() => { // don't wait anymore for export, this way the next batch can start + // we can't cancel the export so we have to just forget about it reject(new Error('Timeout')); }, this._exportTimeoutMillis); - // prevent downstream exporter calls from generating spans + unrefTimer(timer); + context.with(suppressTracing(context.active()), () => { - // Reset the finished spans buffer here because the next invocations of the _flush method - // could pass the same finished spans to the exporter if the buffer is cleared - // outside the execution of this callback. - const spans = this._finishedSpans.splice(0, this._maxExportBatchSize); - - const doExport = () => - this._exporter.export(spans, result => { - clearTimeout(timer); - if (result.code === ExportResultCode.SUCCESS) { - resolve(); - } else { - reject( - result.error ?? - new Error('BatchSpanProcessor: span export failed') - ); - } - }); - const pendingResources = spans - .map(span => span.resource) - .filter(resource => resource.asyncAttributesPending); - - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (pendingResources.length === 0) { - doExport(); - } else { - Promise.all( - pendingResources.map(resource => - resource.waitForAsyncAttributes?.() - ) - ).then(doExport, err => { - globalErrorHandler(err); - reject(err); - }); - } + this._exporter.export(batch, (result) => { + clearTimeout(timer); + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject( + result.error ?? + new Error('BatchSpanProcessor: span export failed') + ); + } + }); }); }); } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { - this._flushOneBatch() - .then(() => { - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - globalErrorHandler(e); - }); - }, this._scheduledDelayMillis); - unrefTimer(this._timer); + if (this._finishedSpans.length > 0 && this._timer == null) { + this._timer = setTimeout(this._processOneBatch.bind(this), this._scheduledDelayMillis); + unrefTimer(this._timer); + } } private _clearTimer() { - if (this._timer !== undefined) { + if (this._timer != null) { clearTimeout(this._timer); this._timer = undefined; } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc59..0aaddf93bb 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -150,6 +150,7 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 1); await processor.shutdown(); + console.log(spy.args) assert.strictEqual(spy.args.length, 2); assert.strictEqual(exporter.getFinishedSpans().length, 0);