Skip to content

Commit

Permalink
feat: adding json over http for collector exporter (#1247)
Browse files Browse the repository at this point in the history
* feat: adding json over http for collector exporter

* feat: updating readme and adding headers options in config for json over http

* chore: reviews and few small cleanups

* chore: aligning type for headers

* chore: fixing doc

* chore: unifying types for headers

* chore: reviews

* chore: adding validation for headers, and making the types correct this time

* chore: linting

* chore: linting

* chore: fixes after merge

* chore: reviews

* chore: merge branch 'master' into collector_json
  • Loading branch information
obecny authored Jul 6, 2020
1 parent 413edb5 commit f0a3dba
Show file tree
Hide file tree
Showing 19 changed files with 622 additions and 93 deletions.
1 change: 1 addition & 0 deletions examples/collector-exporter-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"dependencies": {
"@opentelemetry/api": "^0.9.0",
"@opentelemetry/core": "^0.9.0",
"@opentelemetry/exporter-collector": "^0.9.0",
"@opentelemetry/tracing": "^0.9.0"
},
Expand Down
8 changes: 5 additions & 3 deletions examples/collector-exporter-node/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

const opentelemetry = require('@opentelemetry/api');
const { BasicTracerProvider, ConsoleSpanExporter, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector');
const { CollectorTraceExporter, CollectorProtocolNode } = require('@opentelemetry/exporter-collector');

const address = '127.0.0.1:55680';
const exporter = new CollectorTraceExporter({
serviceName: 'basic-service',
url: address,
// headers: {
// foo: 'bar'
// },
protocolNode: CollectorProtocolNode.HTTP_JSON,
});

const provider = new BasicTracerProvider();
Expand Down
25 changes: 24 additions & 1 deletion packages/opentelemetry-exporter-collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ provider.register();

```

## Usage in Node
## Usage in Node - GRPC

The CollectorTraceExporter in Node expects the URL to only be the hostname. It will not work with `/v1/trace`.

Expand Down Expand Up @@ -109,6 +109,29 @@ provider.register();

Note, that this will only work if TLS is also configured on the server.

## Usage in Node - JSON over http

```js
const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { CollectorExporter, CollectorTransportNode } = require('@opentelemetry/exporter-collector');

const collectorOptions = {
protocolNode: CollectorTransportNode.HTTP_JSON,
serviceName: 'basic-service',
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55680/v1/trace
headers: {
foo: 'bar'
}, //an optional object containing custom headers to be sent with each request will only work with json over http
};

const provider = new BasicTracerProvider();
const exporter = new CollectorExporter(collectorOptions);
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();

```

## Running opentelemetry-collector locally to see the traces

1. Go to examples/basic-tracer-node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export abstract class CollectorTraceExporterBase<
*/
constructor(config: T = {} as T) {
this.serviceName = config.serviceName || DEFAULT_SERVICE_NAME;
this.url = this.getDefaultUrl(config.url);
this.url = this.getDefaultUrl(config);
if (typeof config.hostname === 'string') {
this.hostname = config.hostname;
}
Expand Down Expand Up @@ -123,5 +123,5 @@ export abstract class CollectorTraceExporterBase<
onSuccess: () => void,
onError: (error: CollectorExporterError) => void
): void;
abstract getDefaultUrl(url: string | undefined): string;
abstract getDefaultUrl(config: T): string;
}
24 changes: 24 additions & 0 deletions packages/opentelemetry-exporter-collector/src/enums.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Collector transport protocol node options
* Default is GRPC
*/
export enum CollectorProtocolNode {
GRPC,
HTTP_JSON,
}
1 change: 1 addition & 0 deletions packages/opentelemetry-exporter-collector/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
*/

export * from './platform';
export * from './enums';
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { ReadableSpan } from '@opentelemetry/tracing';
import { toCollectorExportTraceServiceRequest } from '../../transform';
import { CollectorExporterConfigBrowser } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';

const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace';

Expand All @@ -28,18 +29,19 @@ const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace';
export class CollectorTraceExporter extends CollectorTraceExporterBase<
CollectorExporterConfigBrowser
> {
DEFAULT_HEADERS: { [key: string]: string } = {
DEFAULT_HEADERS: Record<string, string> = {
[collectorTypes.OT_REQUEST_HEADER]: '1',
};
private _headers: { [key: string]: string };
private _headers: Record<string, string>;
private _useXHR: boolean = false;

/**
* @param config
*/
constructor(config: CollectorExporterConfigBrowser = {}) {
super(config);
this._headers = config.headers || this.DEFAULT_HEADERS;
this._headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
this._useXHR =
!!config.headers || typeof navigator.sendBeacon !== 'function';
}
Expand All @@ -52,8 +54,8 @@ export class CollectorTraceExporter extends CollectorTraceExporterBase<
window.removeEventListener('unload', this.shutdown);
}

getDefaultUrl(url: string | undefined) {
return url || DEFAULT_COLLECTOR_URL;
getDefaultUrl(config: CollectorExporterConfigBrowser) {
return config.url || DEFAULT_COLLECTOR_URL;
}

sendSpans(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,69 @@
* limitations under the License.
*/

import * as protoLoader from '@grpc/proto-loader';
import { ReadableSpan } from '@opentelemetry/tracing';
import * as grpc from 'grpc';
import * as path from 'path';
import { CollectorTraceExporterBase } from '../../CollectorTraceExporterBase';
import * as collectorTypes from '../../types';

import { ReadableSpan } from '@opentelemetry/tracing';
import { CollectorTraceExporterBase } from '../../CollectorTraceExporterBase';
import { CollectorExporterError } from '../../types';
import { toCollectorExportTraceServiceRequest } from '../../transform';
import { CollectorProtocolNode } from '../../enums';
import { parseHeaders } from '../../util';
import {
GRPCSpanQueueItem,
ServiceClient,
CollectorExporterConfigNode,
} from './types';
import { removeProtocol } from './util';

const DEFAULT_COLLECTOR_URL = 'localhost:55680';
import {
DEFAULT_COLLECTOR_URL_GRPC,
onInitWithGrpc,
sendSpansUsingGrpc,
} from './utilWithGrpc';
import {
DEFAULT_COLLECTOR_URL_JSON,
onInitWithJson,
sendSpansUsingJson,
} from './utilWithJson';

/**
* Collector Trace Exporter for Node
*/
export class CollectorTraceExporter extends CollectorTraceExporterBase<
CollectorExporterConfigNode
> {
DEFAULT_HEADERS: Record<string, string> = {
[collectorTypes.OT_REQUEST_HEADER]: '1',
};
isShutDown: boolean = false;
traceServiceClient?: ServiceClient = undefined;
grpcSpansQueue: GRPCSpanQueueItem[] = [];
metadata?: grpc.Metadata;
headers: Record<string, string>;
private readonly _protocol: CollectorProtocolNode;

/**
* @param config
*/
constructor(config: CollectorExporterConfigNode = {}) {
super(config);
this._protocol =
typeof config.protocolNode !== 'undefined'
? config.protocolNode
: CollectorProtocolNode.GRPC;
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
this.logger.debug('CollectorExporter - using json over http');
if (config.metadata) {
this.logger.warn('Metadata cannot be set when using json');
}
} else {
this.logger.debug('CollectorExporter - using grpc');
if (config.headers) {
this.logger.warn('Headers cannot be set when using grpc');
}
}
this.metadata = config.metadata;
this.headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
}

onShutdown(): void {
Expand All @@ -60,81 +88,36 @@ export class CollectorTraceExporter extends CollectorTraceExporterBase<

onInit(config: CollectorExporterConfigNode): void {
this.isShutDown = false;
this.grpcSpansQueue = [];
const serverAddress = removeProtocol(this.url);
const credentials: grpc.ChannelCredentials =
config.credentials || grpc.credentials.createInsecure();

const traceServiceProtoPath =
'opentelemetry/proto/collector/trace/v1/trace_service.proto';
const includeDirs = [path.resolve(__dirname, 'protos')];

protoLoader
.load(traceServiceProtoPath, {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs,
})
.then(packageDefinition => {
const packageObject: any = grpc.loadPackageDefinition(
packageDefinition
);
this.traceServiceClient = new packageObject.opentelemetry.proto.collector.trace.v1.TraceService(
serverAddress,
credentials
);
if (this.grpcSpansQueue.length > 0) {
const queue = this.grpcSpansQueue.splice(0);
queue.forEach((item: GRPCSpanQueueItem) => {
this.sendSpans(item.spans, item.onSuccess, item.onError);
});
}
});
if (config.protocolNode === CollectorProtocolNode.HTTP_JSON) {
onInitWithJson(this, config);
} else {
onInitWithGrpc(this, config);
}
}

sendSpans(
spans: ReadableSpan[],
onSuccess: () => void,
onError: (error: CollectorExporterError) => void
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
if (this.isShutDown) {
this.logger.debug('Shutdown already started. Cannot send spans');
return;
}
if (this.traceServiceClient) {
const exportTraceServiceRequest = toCollectorExportTraceServiceRequest(
spans,
this
);

this.traceServiceClient.export(
exportTraceServiceRequest,
this.metadata,
(err: collectorTypes.ExportServiceError) => {
if (err) {
this.logger.error(
'exportTraceServiceRequest',
exportTraceServiceRequest
);
onError(err);
} else {
onSuccess();
}
}
);
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
sendSpansUsingJson(this, spans, onSuccess, onError);
} else {
this.grpcSpansQueue.push({
spans,
onSuccess,
onError,
});
sendSpansUsingGrpc(this, spans, onSuccess, onError);
}
}

getDefaultUrl(url: string | undefined): string {
return url || DEFAULT_COLLECTOR_URL;
getDefaultUrl(config: CollectorExporterConfigNode): string {
if (!config.url) {
return config.protocolNode === CollectorProtocolNode.HTTP_JSON
? DEFAULT_COLLECTOR_URL_JSON
: DEFAULT_COLLECTOR_URL_GRPC;
}
return config.url;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import * as grpc from 'grpc';
import { ReadableSpan } from '@opentelemetry/tracing';
import { CollectorProtocolNode } from '../../enums';
import {
CollectorExporterError,
CollectorExporterConfigBase,
Expand Down Expand Up @@ -49,4 +50,5 @@ export interface CollectorExporterConfigNode
extends CollectorExporterConfigBase {
credentials?: grpc.ChannelCredentials;
metadata?: grpc.Metadata;
protocolNode?: CollectorProtocolNode;
}
Loading

0 comments on commit f0a3dba

Please sign in to comment.