Skip to content

Commit

Permalink
feat(exporter-collector-proto): support gzip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
alisabzevari committed Jul 8, 2021
1 parent bc475d1 commit 4c91cfb
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,24 @@ import {
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

type SendFn = <ExportItem, ServiceRequest>(collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compress: boolean,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void) => void;

/**
* Collector Metric Exporter abstract base class
*/
export abstract class CollectorExporterNodeBase<
ExportItem,
ServiceRequest
> extends CollectorExporterBaseMain<ExportItem, ServiceRequest> {
private _send!: Function;
private _send!: SendFn;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config)
}

private _sendPromise(
objects: ExportItem[],
Expand All @@ -51,7 +61,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
this._send(this, objects, this.compress, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
Expand Down
2 changes: 2 additions & 0 deletions packages/opentelemetry-exporter-collector-proto/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export function onInit<ExportItem, ServiceRequest>(
export function send<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compress: boolean,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand All @@ -76,6 +77,7 @@ export function send<ExportItem, ServiceRequest>(
collector,
Buffer.from(body),
'application/x-protobuf',
compress,
onSuccess,
onError
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { ReadableSpan } from '@opentelemetry/tracing';
import * as assert from 'assert';
import * as http from 'http';
import * as sinon from 'sinon';
import { Stream } from 'stream';
import * as zlib from 'zlib';
import { CollectorTraceExporter } from '../src';
import { getExportRequestProto } from '../src/util';
import {
Expand All @@ -34,9 +36,9 @@ import {
} from './helper';

const fakeRequest = {
end: function () {},
on: function () {},
write: function () {},
end: function () { },
on: function () { },
write: function () { },
};

describe('CollectorTraceExporter - node with proto over http', () => {
Expand Down Expand Up @@ -104,7 +106,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should open the connection', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.hostname, 'foo.bar.com');
Expand All @@ -116,7 +118,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should set custom headers', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.headers['foo'], 'bar');
Expand All @@ -126,7 +128,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.agent.keepAlive, true);
Expand All @@ -137,27 +139,31 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});

sinon.stub(http, 'request').returns({
end: () => {},
on: () => {},
write: (...args: any[]) => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(args[0]);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
},
} as any);
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(buff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});

it('should log the successful message', done => {
Expand Down Expand Up @@ -195,4 +201,54 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});
});
});
describe('export - with compression', () => {
beforeEach(() => {
collectorExporterConfig = {
headers: {
foo: 'bar',
},
hostname: 'foo',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compress: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
spans.push(Object.assign({}, mockedReadableSpan));
});
afterEach(() => {
sinon.restore();
});

it('should successfully send the spans', done => {
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const unzippedBuff = zlib.gunzipSync(buff);
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(unzippedBuff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => { });
let buff = Buffer.from('');

fakeRequest.on('end', () => {
Expand All @@ -198,6 +197,8 @@ describe('CollectorTraceExporter - node with json over http', () => {
fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});

it('should log the successful message', done => {
Expand Down

0 comments on commit 4c91cfb

Please sign in to comment.