Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(@opentelemetry/exporter-collector): remove fulfilled promises cor… #1775

Merged
merged 19 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,16 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
promise.finally(() => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
}

onInit(config: CollectorExporterConfigNode): void {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { collectorTypes } from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/tracing';

import * as assert from 'assert';
import { CollectorExporterNodeBase } from '../src/CollectorExporterNodeBase';
import { CollectorExporterConfigNode, ServiceClientType } from '../src/types';
import { mockedReadableSpan } from './helper';

class MockCollectorExporter extends CollectorExporterNodeBase<
ReadableSpan,
ReadableSpan[]
> {
/**
* Callbacks passed to _send()
*/
sendCallbacks: {
onSuccess: () => void;
onError: (error: collectorTypes.CollectorExporterError) => void;
}[] = [];

getDefaultUrl(config: CollectorExporterConfigNode): string {
return '';
}

getDefaultServiceName(config: CollectorExporterConfigNode): string {
return '';
}

convert(spans: ReadableSpan[]): ReadableSpan[] {
return spans;
}

getServiceClientType() {
return ServiceClientType.SPANS;
}

getServiceProtoPath(): string {
return 'opentelemetry/proto/collector/trace/v1/trace_service.proto';
}
}

// Mocked _send which just saves the callbacks for later
MockCollectorExporter.prototype['_send'] = function _sendMock(
dyladan marked this conversation as resolved.
Show resolved Hide resolved
self: MockCollectorExporter,
objects: ReadableSpan[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
self.sendCallbacks.push({ onSuccess, onError });
};

describe('CollectorExporterNodeBase', () => {
let exporter: MockCollectorExporter;
const concurrencyLimit = 5;

beforeEach(done => {
exporter = new MockCollectorExporter({ concurrencyLimit });
done();
});

describe('export', () => {
it('should export requests concurrently', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, numToExport);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should drop new export requests when already sending at concurrencyLimit', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit + 5;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, concurrencyLimit);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if they failed', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter.export(spans, () => {});
assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests fail sending
exporter.sendCallbacks.forEach(({ onError }) =>
onError(new Error('Failed to send!!'))
);

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if success callback throws error', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter['_sendPromise'](
spans,
() => {
throw new Error('Oops');
},
() => {}
);

assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that the request finishes sending
exporter.sendCallbacks.forEach(({ onSuccess }) => {
onSuccess();
});

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,16 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
promise.finally(() => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
}

override onInit(config: CollectorExporterNodeConfigBase): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,19 @@ export abstract class CollectorExporterBrowserBase<
const serviceRequest = this.convert(items);
const body = JSON.stringify(serviceRequest);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

const promise = new Promise<void>((resolve, reject) => {
if (this._useXHR) {
sendWithXhr(body, this.url, this._headers, _onSuccess, _onError);
sendWithXhr(body, this.url, this._headers, resolve, reject);
} else {
sendWithBeacon(body, this.url, _onSuccess, _onError);
sendWithBeacon(body, this.url, resolve, reject);
}
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
promise.finally(() => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,22 @@ export abstract class CollectorExporterNodeBase<
}
const serviceRequest = this.convert(objects);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};
const promise = new Promise<void>((resolve, reject) => {
sendWithHttp(
this,
JSON.stringify(serviceRequest),
'application/json',
_onSuccess,
_onError
resolve,
reject
);
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
promise.finally(() => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
}

onShutdown(): void {}
Expand Down
7 changes: 5 additions & 2 deletions packages/opentelemetry-exporter-zipkin/src/zipkin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ export class ZipkinExporter implements SpanExporter {
this._sendSpans(spans, serviceName, result => {
resolve();
resultCallback(result);
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
});

this._sendingPromises.push(promise);
promise.finally(() => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
}

/**
Expand Down