diff --git a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts index 7fbb4f6057..3f2e4c53bc 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts @@ -19,9 +19,16 @@ import { CollectorExporterNodeBase as CollectorExporterBaseMain, collectorTypes, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import { ServiceClientType } from './types'; +type SendFn = (collector: CollectorExporterNodeBase, + objects: ExportItem[], + compression: CompressionAlgorithm, + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void) => void; + /** * Collector Metric Exporter abstract base class */ @@ -29,7 +36,11 @@ export abstract class CollectorExporterNodeBase< ExportItem, ServiceRequest > extends CollectorExporterBaseMain { - private _send!: Function; + private _send!: SendFn; + + constructor(config: CollectorExporterNodeConfigBase = {}) { + super(config) + } private _sendPromise( objects: ExportItem[], @@ -51,7 +62,7 @@ export abstract class CollectorExporterNodeBase< this._sendingPromises.splice(index, 1); }; - this._send(this, objects, _onSuccess, _onError); + this._send(this, objects, this.compression, _onSuccess, _onError); }); this._sendingPromises.push(promise); diff --git a/packages/opentelemetry-exporter-collector-proto/src/util.ts b/packages/opentelemetry-exporter-collector-proto/src/util.ts index f944d91512..e9e9ea8acc 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/util.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/util.ts @@ -18,6 +18,7 @@ import { collectorTypes, sendWithHttp, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import * as path from 'path'; @@ -63,6 +64,7 @@ export function onInit( export function send( collector: CollectorExporterNodeBase, objects: ExportItem[], + compression: CompressionAlgorithm, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { diff --git a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts index d33e8563b1..3a8d2a6758 100644 --- a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts @@ -19,11 +19,14 @@ import { ExportResultCode } from '@opentelemetry/core'; import { CollectorExporterNodeConfigBase, collectorTypes, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import { ReadableSpan } from '@opentelemetry/tracing'; import * as assert from 'assert'; import * as http from 'http'; import * as sinon from 'sinon'; +import { Stream } from 'stream'; +import * as zlib from 'zlib'; import { CollectorTraceExporter } from '../src'; import { getExportRequestProto } from '../src/util'; import { @@ -34,9 +37,9 @@ import { } from './helper'; const fakeRequest = { - end: function () {}, - on: function () {}, - write: function () {}, + end: function () { }, + on: function () { }, + write: function () { }, }; describe('CollectorTraceExporter - node with proto over http', () => { @@ -104,7 +107,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should open the connection', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.hostname, 'foo.bar.com'); @@ -116,7 +119,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should set custom headers', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.headers['foo'], 'bar'); @@ -126,7 +129,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should have keep alive and keepAliveMsecs option set', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.agent.keepAlive, true); @@ -137,27 +140,31 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => {}); - - sinon.stub(http, 'request').returns({ - end: () => {}, - on: () => {}, - write: (...args: any[]) => { - const ExportTraceServiceRequestProto = getExportRequestProto(); - const data = ExportTraceServiceRequestProto?.decode(args[0]); - const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; - const span1 = - json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; - assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); - if (span1) { - ensureProtoSpanIsCorrect(span1); - } - - ensureExportTraceServiceRequestIsSet(json); - - done(); - }, - } as any); + const fakeRequest = new Stream.PassThrough(); + sinon.stub(http, 'request').returns(fakeRequest as any); + + let buff = Buffer.from(''); + fakeRequest.on('end', () => { + const ExportTraceServiceRequestProto = getExportRequestProto(); + const data = ExportTraceServiceRequestProto?.decode(buff); + const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureProtoSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + + collectorExporter.export(spans, () => { }); }); it('should log the successful message', done => { @@ -195,4 +202,57 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); }); }); + describe('export - with compression', () => { + beforeEach(() => { + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + hostname: 'foo', + attributes: {}, + url: 'http://foo.bar.com', + keepAlive: true, + compression: CompressionAlgorithm.GZIP, + httpAgentOptions: { keepAliveMsecs: 2000 }, + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + afterEach(() => { + sinon.restore(); + }); + + it('should successfully send the spans', done => { + const fakeRequest = new Stream.PassThrough(); + sinon.stub(http, 'request').returns(fakeRequest as any); + const spySetHeader = sinon.spy(); + (fakeRequest as any).setHeader = spySetHeader; + + let buff = Buffer.from(''); + fakeRequest.on('end', () => { + const unzippedBuff = zlib.gunzipSync(buff); + const ExportTraceServiceRequestProto = getExportRequestProto(); + const data = ExportTraceServiceRequestProto?.decode(unzippedBuff); + const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureProtoSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip')); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + + collectorExporter.export(spans, () => { }); + }); + }); }); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 7b5dc98446..10a5bdca19 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -18,7 +18,7 @@ import type * as http from 'http'; import type * as https from 'https'; import { CollectorExporterBase } from '../../CollectorExporterBase'; -import { CollectorExporterNodeConfigBase } from './types'; +import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types'; import * as collectorTypes from '../../types'; import { parseHeaders } from '../../util'; import { createHttpAgent, sendWithHttp } from './util'; @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase< DEFAULT_HEADERS: Record = {}; headers: Record; agent: http.Agent | https.Agent | undefined; + compression: CompressionAlgorithm; constructor(config: CollectorExporterNodeConfigBase = {}) { super(config); @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase< baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS) ); this.agent = createHttpAgent(config); + this.compression = config.compression || CompressionAlgorithm.NONE; } onInit(_config: CollectorExporterNodeConfigBase): void { diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts index cee0f477a5..4ee5167e79 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts @@ -24,5 +24,11 @@ import { CollectorExporterConfigBase } from '../../types'; export interface CollectorExporterNodeConfigBase extends CollectorExporterConfigBase { keepAlive?: boolean; + compression?: CompressionAlgorithm; httpAgentOptions?: http.AgentOptions | https.AgentOptions; } + +export enum CompressionAlgorithm { + NONE = 'none', + GZIP = 'gzip' +} diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index fb303dd217..c7848014af 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -16,10 +16,15 @@ import * as url from 'url'; import * as http from 'http'; import * as https from 'https'; +import * as zlib from 'zlib'; +import { Readable } from 'stream'; import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterNodeConfigBase } from '.'; import { diag } from '@opentelemetry/api'; +import { CompressionAlgorithm } from './types'; + +const gzip = zlib.createGzip(); /** * Sends data using http @@ -71,11 +76,35 @@ export function sendWithHttp( }); }); + req.on('error', (error: Error) => { onError(error); }); - req.write(data); - req.end(); + + switch (collector.compression) { + case CompressionAlgorithm.GZIP: { + req.setHeader('Content-Encoding', 'gzip'); + const dataStream = readableFromBuffer(data); + dataStream.on('error', onError) + .pipe(gzip).on('error', onError) + .pipe(req); + + break; + } + default: + req.write(data); + req.end(); + + break; + } +} + +function readableFromBuffer(buff: string | Buffer): Readable { + const readable = new Readable(); + readable.push(buff); + readable.push(null); + + return readable; } export function createHttpAgent( diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index d64ef15db1..f8215d6494 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -20,9 +20,12 @@ import { ReadableSpan } from '@opentelemetry/tracing'; import * as http from 'http'; import * as assert from 'assert'; import * as sinon from 'sinon'; +import { PassThrough, Stream } from 'stream'; +import * as zlib from 'zlib'; import { CollectorTraceExporter, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '../../src/platform/node'; import * as collectorTypes from '../../src/types'; import { MockedResponse } from './nodeHelpers'; @@ -33,11 +36,7 @@ import { mockedReadableSpan, } from '../helper'; -const fakeRequest = { - end: function () {}, - on: function () {}, - write: function () {}, -}; +let fakeRequest: PassThrough; const address = 'localhost:1501'; @@ -45,10 +44,11 @@ describe('CollectorTraceExporter - node with json over http', () => { let collectorExporter: CollectorTraceExporter; let collectorExporterConfig: CollectorExporterNodeConfigBase; let stubRequest: sinon.SinonStub; - let stubWrite: sinon.SinonStub; + let spySetHeader: sinon.SinonSpy; let spans: ReadableSpan[]; afterEach(() => { + fakeRequest = new Stream.PassThrough(); sinon.restore(); }); @@ -109,7 +109,6 @@ describe('CollectorTraceExporter - node with json over http', () => { describe('export', () => { beforeEach(() => { stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); - stubWrite = sinon.stub(fakeRequest, 'write'); collectorExporterConfig = { headers: { foo: 'bar', @@ -126,7 +125,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should open the connection', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -140,7 +139,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should set custom headers', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -150,8 +149,19 @@ describe('CollectorTraceExporter - node with json over http', () => { }); }); + it('should not have Content-Encoding header', done => { + collectorExporter.export(spans, () => { }); + + setTimeout(() => { + const args = stubRequest.args[0]; + const options = args[0]; + assert.strictEqual(options.headers['Content-Encoding'], undefined); + done(); + }); + }); + it('should have keep alive and keepAliveMsecs option set', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -164,8 +174,8 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('different http export requests should use the same agent', done => { - collectorExporter.export(spans, () => {}); - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); + collectorExporter.export(spans, () => { }); setTimeout(() => { const [firstExportAgent, secondExportAgent] = stubRequest.args.map( @@ -177,12 +187,13 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => {}); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = buff.toString(); - setTimeout(() => { - const writeArgs = stubWrite.args[0]; const json = JSON.parse( - writeArgs[0] + responseBody ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; const span1 = json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; @@ -195,6 +206,12 @@ describe('CollectorTraceExporter - node with json over http', () => { done(); }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + + collectorExporter.export(spans, () => { }); }); it('should log the successful message', done => { @@ -242,6 +259,58 @@ describe('CollectorTraceExporter - node with json over http', () => { }); }); }); + + describe('export - with compression', () => { + beforeEach(() => { + stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); + spySetHeader = sinon.spy(); + (fakeRequest as any).setHeader = spySetHeader; + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + hostname: 'foo', + attributes: {}, + url: 'http://foo.bar.com', + keepAlive: true, + compression: CompressionAlgorithm.GZIP, + httpAgentOptions: { keepAliveMsecs: 2000 }, + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + + it('should successfully send the spans', done => { + collectorExporter.export(spans, () => { }); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = zlib.gunzipSync(buff).toString(); + + const json = JSON.parse( + responseBody + ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip')); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + }); + + }); + describe('CollectorTraceExporter - node (getDefaultUrl)', () => { it('should default to localhost', done => { const collectorExporter = new CollectorTraceExporter();