Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use http keep-alive in collector exporter #1661

Merged
merged 8 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
CollectorExporterNodeBase as CollectorExporterBaseMain,
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

Expand Down Expand Up @@ -55,7 +56,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.push(promise);
}

onInit(config: collectorTypes.CollectorExporterConfigBase): void {
onInit(config: CollectorExporterNodeConfigBase): void {
this._isShutdown = false;
// defer to next tick and lazy load to avoid loading protobufjs too early
// and making this impossible to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
collectorTypes,
toCollectorExportMetricServiceRequest,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { MetricRecord, MetricExporter } from '@opentelemetry/metrics';
import { ServiceClientType } from './types';
Expand Down Expand Up @@ -47,16 +48,14 @@ export class CollectorMetricExporter
);
}

getDefaultUrl(config: collectorTypes.CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(
config: collectorTypes.CollectorExporterConfigBase
): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import {
collectorTypes,
toCollectorExportTraceServiceRequest,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

Expand All @@ -40,16 +41,14 @@ export class CollectorTraceExporter
return toCollectorExportTraceServiceRequest(spans, this);
}

getDefaultUrl(config: collectorTypes.CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(
config: collectorTypes.CollectorExporterConfigBase
): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}

Expand Down
3 changes: 2 additions & 1 deletion packages/opentelemetry-exporter-collector-proto/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
collectorTypes,
sendWithHttp,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import * as path from 'path';

Expand All @@ -33,7 +34,7 @@ export function getExportRequestProto(): Type | undefined {

export function onInit<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
_config: collectorTypes.CollectorExporterConfigBase
_config: CollectorExporterNodeConfigBase
): void {
const dir = path.resolve(__dirname, '..', 'protos');
const root = new protobufjs.Root();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

import {
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import * as core from '@opentelemetry/core';
import * as http from 'http';
import * as assert from 'assert';
Expand Down Expand Up @@ -48,7 +51,7 @@ const waitTimeMS = 20;

describe('CollectorMetricExporter - node with proto over http', () => {
let collectorExporter: CollectorMetricExporter;
let collectorExporterConfig: collectorTypes.CollectorExporterConfigBase;
let collectorExporterConfig: CollectorExporterNodeConfigBase;
let spyRequest: sinon.SinonSpy;
let spyWrite: sinon.SinonSpy;
let metrics: metrics.MetricRecord[];
Expand All @@ -65,6 +68,8 @@ describe('CollectorMetricExporter - node with proto over http', () => {
serviceName: 'bar',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorMetricExporter(collectorExporterConfig);
// Overwrites the start time to make tests consistent
Expand Down Expand Up @@ -120,6 +125,19 @@ describe('CollectorMetricExporter - node with proto over http', () => {
}, waitTimeMS);
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(metrics, () => {});

setTimeout(() => {
const args = spyRequest.args[0];
const options = args[0];
const agent = options.agent;
assert.strictEqual(agent.keepAlive, true);
assert.strictEqual(agent.options.keepAliveMsecs, 2000);
done();
});
});

it('should successfully send metrics', done => {
collectorExporter.export(metrics, () => {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

import { collectorTypes } from '@opentelemetry/exporter-collector';
import {
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';

import * as core from '@opentelemetry/core';
import { ReadableSpan } from '@opentelemetry/tracing';
Expand Down Expand Up @@ -43,7 +46,7 @@ const waitTimeMS = 20;

describe('CollectorTraceExporter - node with proto over http', () => {
let collectorExporter: CollectorTraceExporter;
let collectorExporterConfig: collectorTypes.CollectorExporterConfigBase;
let collectorExporterConfig: CollectorExporterNodeConfigBase;
let spyRequest: sinon.SinonSpy;
let spyWrite: sinon.SinonSpy;
let spans: ReadableSpan[];
Expand All @@ -60,6 +63,8 @@ describe('CollectorTraceExporter - node with proto over http', () => {
serviceName: 'bar',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
Expand Down Expand Up @@ -95,6 +100,19 @@ describe('CollectorTraceExporter - node with proto over http', () => {
}, waitTimeMS);
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});

setTimeout(() => {
const args = spyRequest.args[0];
const options = args[0];
const agent = options.agent;
assert.strictEqual(agent.keepAlive, true);
assert.strictEqual(agent.options.keepAliveMsecs, 2000);
done();
});
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
* limitations under the License.
*/

import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterBase } from '../../CollectorExporterBase';
import { CollectorExporterConfigBase } from '../../types';
import { CollectorExporterNodeConfigBase } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { sendWithHttp } from './util';
Expand All @@ -27,22 +30,35 @@ export abstract class CollectorExporterNodeBase<
ExportItem,
ServiceRequest
> extends CollectorExporterBase<
CollectorExporterConfigBase,
CollectorExporterNodeConfigBase,
ExportItem,
ServiceRequest
> {
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
constructor(config: CollectorExporterConfigBase = {}) {
keepAlive: boolean = true;
httpAgentOptions: http.AgentOptions | https.AgentOptions = {};
constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
if ((config as any).metadata) {
this.logger.warn('Metadata cannot be set when using http');
}
this.headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
if (typeof config.keepAlive === 'boolean') {
this.keepAlive = config.keepAlive;
}
if (config.httpAgentOptions) {
if (!this.keepAlive) {
this.logger.warn(
'httpAgentOptions is used only when keepAlive is true'
);
}
this.httpAgentOptions = Object.assign({}, config.httpAgentOptions);
}
}

onInit(_config: CollectorExporterConfigBase): void {
onInit(_config: CollectorExporterNodeConfigBase): void {
this._isShutdown = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { MetricRecord, MetricExporter } from '@opentelemetry/metrics';
import { CollectorExporterConfigBase } from '../../types';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeConfigBase } from './types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { toCollectorExportMetricServiceRequest } from '../../transformMetrics';

Expand Down Expand Up @@ -45,14 +45,14 @@ export class CollectorMetricExporter
);
}

getDefaultUrl(config: CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(config: CollectorExporterConfigBase): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { ReadableSpan, SpanExporter } from '@opentelemetry/tracing';
import { CollectorExporterConfigBase } from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from './types';
import * as collectorTypes from '../../types';
import { toCollectorExportTraceServiceRequest } from '../../transform';

Expand All @@ -38,14 +38,14 @@ export class CollectorTraceExporter
return toCollectorExportTraceServiceRequest(spans, this, true);
}

getDefaultUrl(config: CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(config: CollectorExporterConfigBase): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * from './CollectorTraceExporter';
export * from './CollectorMetricExporter';
export * from './CollectorExporterNodeBase';
export * from './util';
export * from './types';
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterConfigBase } from '../../types';

/**
* Collector Exporter node base config
*/
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
): void {
const parsedUrl = new url.URL(collector.url);

const options = {
const options: http.RequestOptions | https.RequestOptions = {
hostname: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname,
Expand All @@ -49,6 +49,14 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
};

const request = parsedUrl.protocol === 'http:' ? http.request : https.request;
const Agent = parsedUrl.protocol === 'http:' ? http.Agent : https.Agent;
if (collector.keepAlive) {
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
options.agent = new Agent({
...collector.httpAgentOptions,
keepAlive: true,
});
}

const req = request(options, (res: http.IncomingMessage) => {
let data = '';
res.on('data', chunk => (data += chunk));
Expand Down
Loading