Skip to content

Commit 279c2d9

Browse files
committed
feat(exporter-collector): implement concurrencyLimit option
This adds an option to the collector exporters `concurrencyLimit`. If this is set and the number of export operations is equal to the limit, additional export operations will fail immediately. This should be set in combination with the batch span processor be set such that the concurrency limit would not be reached under "normal" circumstances - only if there is an issue would spans start to be dropped. This helps us cap the amount of memory & sockets used by the exporter if it is not able to keep up with the data it is being provided. This could happen if the local network (e.g. in a browser) or the remote collector are too slow to handle all the activity. If we do not have this cap, and the exporter cannot keep up, resources such as memory and network sockets can be consumed without limit, causing crashes and other undesirable outcomes far worse than losing some telemetry data. This also updates the examples to use `BatchSpanProcessor` as I couldn't really think of any reason why you would want to use SimpleSpanProcessor in combination with the collector exporter.
1 parent 3118aed commit 279c2d9

File tree

4 files changed

+73
-8
lines changed

4 files changed

+73
-8
lines changed

packages/opentelemetry-exporter-collector/README.md

+21-7
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,24 @@ npm install --save @opentelemetry/exporter-collector
1919
The CollectorTraceExporter in Web expects the endpoint to end in `/v1/trace`.
2020

2121
```js
22-
import { SimpleSpanProcessor } from '@opentelemetry/tracing';
22+
import { BatchSpanProcessor } from '@opentelemetry/tracing';
2323
import { WebTracerProvider } from '@opentelemetry/web';
2424
import { CollectorTraceExporter } from '@opentelemetry/exporter-collector';
2525

2626
const collectorOptions = {
2727
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/trace
28-
headers: {}, //an optional object containing custom headers to be sent with each request
28+
headers: {}, // an optional object containing custom headers to be sent with each request
29+
concurrencyLimit: 10, // an optional limit on pending requests
2930
};
3031

3132
const provider = new WebTracerProvider();
3233
const exporter = new CollectorTraceExporter(collectorOptions);
33-
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
34+
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
35+
// send spans as soon as we have this many
36+
bufferSize: 10,
37+
// send spans if we have buffered spans older than this
38+
bufferTimeout: 500,
39+
}));
3440

3541
provider.register();
3642

@@ -45,7 +51,8 @@ import { MetricProvider } from '@opentelemetry/metrics';
4551
import { CollectorMetricExporter } from '@opentelemetry/exporter-collector';
4652
const collectorOptions = {
4753
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics
48-
headers: {}, //an optional object containing custom headers to be sent with each request
54+
headers: {}, // an optional object containing custom headers to be sent with each request
55+
concurrencyLimit: 1, // an optional limit on pending requests
4956
};
5057
const exporter = new CollectorMetricExporter(collectorOptions);
5158

@@ -64,20 +71,26 @@ counter.add(10, { 'key': 'value' });
6471
## Traces in Node - JSON over http
6572

6673
```js
67-
const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing');
74+
const { BasicTracerProvider, BatchSpanProcessor } = require('@opentelemetry/tracing');
6875
const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector');
6976

7077
const collectorOptions = {
7178
serviceName: 'basic-service',
7279
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/trace
7380
headers: {
7481
foo: 'bar'
75-
}, //an optional object containing custom headers to be sent with each request will only work with http
82+
}, // an optional object containing custom headers to be sent with each request will only work with http
83+
concurrencyLimit: 10, // an optional limit on pending requests
7684
};
7785

7886
const provider = new BasicTracerProvider();
7987
const exporter = new CollectorTraceExporter(collectorOptions);
80-
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
88+
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
89+
// send spans as soon as we have this many
90+
bufferSize: 1000,
91+
// send spans if we have buffered spans older than this
92+
bufferTimeout: 30000,
93+
}));
8194

8295
provider.register();
8396

@@ -91,6 +104,7 @@ const { CollectorMetricExporter } = require('@opentelemetry/exporter-collector'
91104
const collectorOptions = {
92105
serviceName: 'basic-service',
93106
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics
107+
concurrencyLimit: 1, // an optional limit on pending requests
94108
};
95109
const exporter = new CollectorMetricExporter(collectorOptions);
96110

packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts

+14
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export abstract class CollectorExporterBase<
3939
public readonly logger: Logger;
4040
public readonly hostname: string | undefined;
4141
public readonly attributes?: Attributes;
42+
protected _concurrencyLimit: number;
4243
protected _isShutdown: boolean = false;
4344
private _shuttingDownPromise: Promise<void> = Promise.resolve();
4445
protected _sendingPromises: Promise<unknown>[] = [];
@@ -59,6 +60,11 @@ export abstract class CollectorExporterBase<
5960

6061
this.shutdown = this.shutdown.bind(this);
6162

63+
this._concurrencyLimit =
64+
typeof config.concurrencyLimit === 'number'
65+
? config.concurrencyLimit
66+
: Infinity;
67+
6268
// platform dependent
6369
this.onInit(config);
6470
}
@@ -77,6 +83,14 @@ export abstract class CollectorExporterBase<
7783
return;
7884
}
7985

86+
if (this._sendingPromises.length >= this._concurrencyLimit) {
87+
resultCallback({
88+
code: ExportResultCode.FAILED,
89+
error: new Error('Concurrent export limit reached'),
90+
});
91+
return;
92+
}
93+
8094
this._export(items)
8195
.then(() => {
8296
resultCallback({ code: ExportResultCode.SUCCESS });

packages/opentelemetry-exporter-collector/src/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ export interface CollectorExporterConfigBase {
341341
serviceName?: string;
342342
attributes?: Attributes;
343343
url?: string;
344+
concurrencyLimit?: number;
344345
}
345346

346347
/**

packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts

+37-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,18 @@ class CollectorTraceExporter extends CollectorExporterBase<
3131
> {
3232
onInit() {}
3333
onShutdown() {}
34-
send() {}
34+
send(
35+
items: any[],
36+
onSuccess: () => void,
37+
onError: (error: collectorTypes.CollectorExporterError) => void
38+
) {
39+
const promise = Promise.resolve(null);
40+
this._sendingPromises.push(
41+
promise.then(() =>
42+
this._sendingPromises.splice(this._sendingPromises.indexOf(promise), 1)
43+
)
44+
);
45+
}
3546
getDefaultUrl(config: CollectorExporterConfig): string {
3647
return config.url || '';
3748
}
@@ -187,7 +198,32 @@ describe('CollectorTraceExporter - common', () => {
187198
});
188199
});
189200
});
201+
describe('export - concurrency limit', () => {
202+
it('should error if too many concurrent exports are queued', done => {
203+
const collectorExporterWithConcurrencyLimit = new CollectorTraceExporter({
204+
...collectorExporterConfig,
205+
concurrencyLimit: 3,
206+
});
207+
const spans: ReadableSpan[] = [{ ...mockedReadableSpan }];
208+
const callbackSpy = sinon.spy();
209+
for (let i = 0; i < 7; i++) {
210+
collectorExporterWithConcurrencyLimit.export(spans, callbackSpy);
211+
}
190212

213+
setTimeout(() => {
214+
// Expect 4 failures
215+
assert.strictEqual(callbackSpy.args.length, 4);
216+
callbackSpy.args.forEach(([result]) => {
217+
assert.strictEqual(result.code, ExportResultCode.FAILED);
218+
assert.strictEqual(
219+
result.error!.message,
220+
'Concurrent export limit reached'
221+
);
222+
});
223+
done();
224+
});
225+
});
226+
});
191227
describe('shutdown', () => {
192228
let onShutdownSpy: any;
193229
beforeEach(() => {

0 commit comments

Comments
 (0)