Skip to content

Commit

Permalink
feat(exporter-collector): implement concurrencyLimit option
Browse files Browse the repository at this point in the history
This adds an option to the collector exporters `concurrencyLimit`.  If this is set and
the number of export operations is equal to the limit, additional export operations
will fail immediately.

This should be set in combination with the batch span processor be set such that the
concurrency limit would not be reached under "normal" circumstances - only if there
is an issue would spans start to be dropped.

This helps us cap the amount of memory & sockets used by the exporter if it is not
able to keep up with the data it is being provided.

This could happen if the local network (e.g. in a browser) or the remote collector
are too slow to handle all the activity.

 If we do not have this cap, and the exporter cannot keep up, resources such as
 memory and network sockets can be consumed without limit, causing crashes and
 other undesirable outcomes far worse than losing some telemetry data.

 This also updates the examples to use `BatchSpanProcessor` as I couldn't really
 think of any reason why you would want to use SimpleSpanProcessor in combination
 with the collector exporter.
  • Loading branch information
dobesv committed Dec 3, 2020
1 parent 3118aed commit 742a228
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 8 deletions.
28 changes: 21 additions & 7 deletions packages/opentelemetry-exporter-collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@ npm install --save @opentelemetry/exporter-collector
The CollectorTraceExporter in Web expects the endpoint to end in `/v1/trace`.

```js
import { SimpleSpanProcessor } from '@opentelemetry/tracing';
import { BatchSpanProcessor } from '@opentelemetry/tracing';
import { WebTracerProvider } from '@opentelemetry/web';
import { CollectorTraceExporter } from '@opentelemetry/exporter-collector';

const collectorOptions = {
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/trace
headers: {}, //an optional object containing custom headers to be sent with each request
headers: {}, // an optional object containing custom headers to be sent with each request
concurrencyLimit: 10, // an optional limit on pending requests
};

const provider = new WebTracerProvider();
const exporter = new CollectorTraceExporter(collectorOptions);
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
// send spans as soon as we have this many
bufferSize: 10,
// send spans if we have buffered spans older than this
bufferTimeout: 500,
}));

provider.register();

Expand All @@ -45,7 +51,8 @@ import { MetricProvider } from '@opentelemetry/metrics';
import { CollectorMetricExporter } from '@opentelemetry/exporter-collector';
const collectorOptions = {
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics
headers: {}, //an optional object containing custom headers to be sent with each request
headers: {}, // an optional object containing custom headers to be sent with each request
concurrencyLimit: 1, // an optional limit on pending requests
};
const exporter = new CollectorMetricExporter(collectorOptions);

Expand All @@ -64,20 +71,26 @@ counter.add(10, { 'key': 'value' });
## Traces in Node - JSON over http

```js
const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { BasicTracerProvider, BatchSpanProcessor } = require('@opentelemetry/tracing');
const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector');

const collectorOptions = {
serviceName: 'basic-service',
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/trace
headers: {
foo: 'bar'
}, //an optional object containing custom headers to be sent with each request will only work with http
}, // an optional object containing custom headers to be sent with each request will only work with http
concurrencyLimit: 10, // an optional limit on pending requests
};

const provider = new BasicTracerProvider();
const exporter = new CollectorTraceExporter(collectorOptions);
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
// send spans as soon as we have this many
bufferSize: 1000,
// send spans if we have buffered spans older than this
bufferTimeout: 30000,
}));

provider.register();

Expand All @@ -91,6 +104,7 @@ const { CollectorMetricExporter } = require('@opentelemetry/exporter-collector'
const collectorOptions = {
serviceName: 'basic-service',
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics
concurrencyLimit: 1, // an optional limit on pending requests
};
const exporter = new CollectorMetricExporter(collectorOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export abstract class CollectorExporterBase<
public readonly logger: Logger;
public readonly hostname: string | undefined;
public readonly attributes?: Attributes;
protected _concurrencyLimit: number;
protected _isShutdown: boolean = false;
private _shuttingDownPromise: Promise<void> = Promise.resolve();
protected _sendingPromises: Promise<unknown>[] = [];
Expand All @@ -59,6 +60,11 @@ export abstract class CollectorExporterBase<

this.shutdown = this.shutdown.bind(this);

this._concurrencyLimit =
typeof config.concurrencyLimit === 'number'
? config.concurrencyLimit
: Infinity;

// platform dependent
this.onInit(config);
}
Expand All @@ -77,6 +83,14 @@ export abstract class CollectorExporterBase<
return;
}

if (this._sendingPromises.length >= this._concurrencyLimit) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Concurrent export limit reached'),
});
return;
}

this._export(items)
.then(() => {
resultCallback({ code: ExportResultCode.SUCCESS });
Expand Down
1 change: 1 addition & 0 deletions packages/opentelemetry-exporter-collector/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ export interface CollectorExporterConfigBase {
serviceName?: string;
attributes?: Attributes;
url?: string;
concurrencyLimit?: number;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ class CollectorTraceExporter extends CollectorExporterBase<
> {
onInit() {}
onShutdown() {}
send() {}
send(
items: any[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
) {
const promise = Promise.resolve(null);
this._sendingPromises.push(
promise.then(() =>
this._sendingPromises.splice(this._sendingPromises.indexOf(promise), 1)
)
);
}
getDefaultUrl(config: CollectorExporterConfig): string {
return config.url || '';
}
Expand Down Expand Up @@ -187,7 +198,34 @@ describe('CollectorTraceExporter - common', () => {
});
});
});
describe('export - concurrency limit', () => {
it('should error if too many concurrent exports are queued', done => {
const collectorExporterWithConcurrencyLimit = new CollectorTraceExporter({
...collectorExporterConfig,
concurrencyLimit: 3,
});
const spans: ReadableSpan[] = [{ ...mockedReadableSpan }];
const callbackSpy = sinon.spy();
for (let i = 0; i < 7; i++) {
collectorExporterWithConcurrencyLimit.export(spans, callbackSpy);
}

setTimeout(() => {
// Expect 3 writes
// assert.strictEqual(spySend.args.length, 3);
// Expect 4 failures
assert.strictEqual(callbackSpy.args.length, 4);
callbackSpy.args.forEach(([result]) => {
assert.strictEqual(result.code, ExportResultCode.FAILED);
assert.strictEqual(
result.error!.message,
'Concurrent export limit reached'
);
});
done();
});
});
});
describe('shutdown', () => {
let onShutdownSpy: any;
beforeEach(() => {
Expand Down

0 comments on commit 742a228

Please sign in to comment.