diff --git a/packages/opentelemetry-exporter-collector/README.md b/packages/opentelemetry-exporter-collector/README.md index d0a6ffb90f0..473f7b62611 100644 --- a/packages/opentelemetry-exporter-collector/README.md +++ b/packages/opentelemetry-exporter-collector/README.md @@ -19,18 +19,24 @@ npm install --save @opentelemetry/exporter-collector The CollectorTraceExporter in Web expects the endpoint to end in `/v1/trace`. ```js -import { SimpleSpanProcessor } from '@opentelemetry/tracing'; +import { BatchSpanProcessor } from '@opentelemetry/tracing'; import { WebTracerProvider } from '@opentelemetry/web'; import { CollectorTraceExporter } from '@opentelemetry/exporter-collector'; const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/trace - headers: {}, //an optional object containing custom headers to be sent with each request + headers: {}, // an optional object containing custom headers to be sent with each request + concurrencyLimit: 10, // an optional limit on pending requests }; const provider = new WebTracerProvider(); const exporter = new CollectorTraceExporter(collectorOptions); -provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +provider.addSpanProcessor(new BatchSpanProcessor(exporter, { + // send spans as soon as we have this many + bufferSize: 10, + // send spans if we have buffered spans older than this + bufferTimeout: 500, +})); provider.register(); @@ -45,7 +51,8 @@ import { MetricProvider } from '@opentelemetry/metrics'; import { CollectorMetricExporter } from '@opentelemetry/exporter-collector'; const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics - headers: {}, //an optional object containing custom headers to be sent with each request + headers: {}, // an optional object containing custom headers to be sent with each request + concurrencyLimit: 1, // an optional limit on pending requests }; const exporter = new CollectorMetricExporter(collectorOptions); @@ -64,7 +71,7 @@ counter.add(10, { 'key': 'value' }); ## Traces in Node - JSON over http ```js -const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing'); +const { BasicTracerProvider, BatchSpanProcessor } = require('@opentelemetry/tracing'); const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector'); const collectorOptions = { @@ -72,12 +79,18 @@ const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/trace headers: { foo: 'bar' - }, //an optional object containing custom headers to be sent with each request will only work with http + }, // an optional object containing custom headers to be sent with each request will only work with http + concurrencyLimit: 10, // an optional limit on pending requests }; const provider = new BasicTracerProvider(); const exporter = new CollectorTraceExporter(collectorOptions); -provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +provider.addSpanProcessor(new BatchSpanProcessor(exporter, { + // send spans as soon as we have this many + bufferSize: 1000, + // send spans if we have buffered spans older than this + bufferTimeout: 30000, +})); provider.register(); @@ -91,6 +104,7 @@ const { CollectorMetricExporter } = require('@opentelemetry/exporter-collector' const collectorOptions = { serviceName: 'basic-service', url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics + concurrencyLimit: 1, // an optional limit on pending requests }; const exporter = new CollectorMetricExporter(collectorOptions); diff --git a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts index 4b2775abb7c..cb4d535d941 100644 --- a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts +++ b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts @@ -39,6 +39,7 @@ export abstract class CollectorExporterBase< public readonly logger: Logger; public readonly hostname: string | undefined; public readonly attributes?: Attributes; + protected _concurrencyLimit: number; protected _isShutdown: boolean = false; private _shuttingDownPromise: Promise = Promise.resolve(); protected _sendingPromises: Promise[] = []; @@ -59,6 +60,11 @@ export abstract class CollectorExporterBase< this.shutdown = this.shutdown.bind(this); + this._concurrencyLimit = + typeof config.concurrencyLimit === 'number' + ? config.concurrencyLimit + : Infinity; + // platform dependent this.onInit(config); } @@ -77,6 +83,14 @@ export abstract class CollectorExporterBase< return; } + if (this._sendingPromises.length >= this._concurrencyLimit) { + resultCallback({ + code: ExportResultCode.FAILED, + error: new Error('Concurrent export limit reached'), + }); + return; + } + this._export(items) .then(() => { resultCallback({ code: ExportResultCode.SUCCESS }); diff --git a/packages/opentelemetry-exporter-collector/src/types.ts b/packages/opentelemetry-exporter-collector/src/types.ts index 694c60d8d9a..ab6423b8f94 100644 --- a/packages/opentelemetry-exporter-collector/src/types.ts +++ b/packages/opentelemetry-exporter-collector/src/types.ts @@ -341,6 +341,7 @@ export interface CollectorExporterConfigBase { serviceName?: string; attributes?: Attributes; url?: string; + concurrencyLimit?: number; } /** diff --git a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts index 03aedf7ca4d..f3cee3a8cd7 100644 --- a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts @@ -31,7 +31,18 @@ class CollectorTraceExporter extends CollectorExporterBase< > { onInit() {} onShutdown() {} - send() {} + send( + items: any[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void + ) { + const promise = Promise.resolve(null); + this._sendingPromises.push( + promise.then(() => + this._sendingPromises.splice(this._sendingPromises.indexOf(promise), 1) + ) + ); + } getDefaultUrl(config: CollectorExporterConfig): string { return config.url || ''; } @@ -187,7 +198,34 @@ describe('CollectorTraceExporter - common', () => { }); }); }); + describe('export - concurrency limit', () => { + it('should error if too many concurrent exports are queued', done => { + const collectorExporterWithConcurrencyLimit = new CollectorTraceExporter({ + ...collectorExporterConfig, + concurrencyLimit: 3, + }); + const spans: ReadableSpan[] = [{ ...mockedReadableSpan }]; + const callbackSpy = sinon.spy(); + for (let i = 0; i < 7; i++) { + collectorExporterWithConcurrencyLimit.export(spans, callbackSpy); + } + setTimeout(() => { + // Expect 3 writes + // assert.strictEqual(spySend.args.length, 3); + // Expect 4 failures + assert.strictEqual(callbackSpy.args.length, 4); + callbackSpy.args.forEach(([result]) => { + assert.strictEqual(result.code, ExportResultCode.FAILED); + assert.strictEqual( + result.error!.message, + 'Concurrent export limit reached' + ); + }); + done(); + }); + }); + }); describe('shutdown', () => { let onShutdownSpy: any; beforeEach(() => {