Skip to content

Commit

Permalink
Jaeger no flush interval (#609)
Browse files Browse the repository at this point in the history
* fix: remove flush timer from jaeger exporter

* chore: remove deprecated flushInterval

* feat: force flush on each export

* chore: make export noop on empty span array

* chore: do not flush empty batch span processor
  • Loading branch information
dyladan authored and mayurkale22 committed Dec 13, 2019
1 parent 9a91434 commit b4ab8a5
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 53 deletions.
2 changes: 0 additions & 2 deletions examples/basic-tracer-node/multi_exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const tracer = new BasicTracer();
const zipkinExporter = new ZipkinExporter({serviceName: 'basic-service'});
const jaegerExporter = new JaegerExporter({
serviceName: 'basic-service',
// The default flush interval is 5 seconds.
flushInterval: 2000
});
const collectorExporter = new CollectorExporter({serviceName: 'basic-service'});

Expand Down
2 changes: 0 additions & 2 deletions examples/dns/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
2 changes: 0 additions & 2 deletions examples/grpc/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
2 changes: 0 additions & 2 deletions examples/grpc_dynamic_codegen/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
2 changes: 0 additions & 2 deletions examples/http/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
2 changes: 0 additions & 2 deletions examples/https/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
2 changes: 0 additions & 2 deletions examples/mysql/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ function setupTracerAndExporters(service) {
})));
tracer.addSpanProcessor(new SimpleSpanProcessor(new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
})));

// Initialize the OpenTelemetry APIs to use the BasicTracer bindings
Expand Down
2 changes: 0 additions & 2 deletions examples/redis/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ function setupTracerAndExporters(service) {
} else {
exporter = new JaegerExporter({
serviceName: service,
// The default flush interval is 5 seconds.
flushInterval: 2000
});
}

Expand Down
76 changes: 46 additions & 30 deletions packages/opentelemetry-exporter-jaeger/src/jaeger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import * as jaegerTypes from './types';
import { NoopLogger } from '@opentelemetry/core';
import * as types from '@opentelemetry/types';
import { spanToThrift } from './transform';
import { unrefTimer } from '@opentelemetry/core';

/**
* Format and sends span information to Jaeger Exporter.
Expand All @@ -29,77 +28,94 @@ export class JaegerExporter implements SpanExporter {
private readonly _logger: types.Logger;
private readonly _process: jaegerTypes.ThriftProcess;
private readonly _sender: typeof jaegerTypes.UDPSender;
private readonly _forceFlush: boolean = true;
private readonly _flushTimeout: number;
private _timer: NodeJS.Timeout;
private readonly _forceFlushOnShutdown: boolean = true;
private readonly _onShutdownFlushTimeout: number;

constructor(config: jaegerTypes.ExporterConfig) {
this._logger = config.logger || new NoopLogger();
const tags: jaegerTypes.Tag[] = config.tags || [];
if (config.forceFlush !== undefined) {
this._forceFlush = config.forceFlush;
}
this._flushTimeout = config.flushTimeout || 2000;
this._forceFlushOnShutdown =
typeof config.forceFlush === 'boolean' ? config.forceFlush : true;
this._onShutdownFlushTimeout =
typeof config.flushTimeout === 'number' ? config.flushTimeout : 2000;

this._sender = new jaegerTypes.UDPSender(config);
this._process = {
serviceName: config.serviceName,
tags: jaegerTypes.ThriftUtils.getThriftTags(tags),
};
this._sender.setProcess(this._process);

const flushInterval = config.flushInterval || 5000;
this._timer = setInterval(this._flush.bind(this), flushInterval);
unrefTimer(this._timer);
}

/** Exports a list of spans to Jaeger. */
export(
spans: ReadableSpan[],
resultCallback: (result: ExportResult) => void
): void {
if (spans.length === 0) {
return resultCallback(ExportResult.SUCCESS);
}
this._logger.debug('Jaeger exporter export');
return this._sendSpans(spans, resultCallback);
this._sendSpans(spans, resultCallback).catch(err => {
this._logger.error(`JaegerExporter failed to export: ${err}`);
});
}

/** Shutdown exporter. */
shutdown(): void {
if (!this._forceFlush) return;
if (!this._forceFlushOnShutdown) return;
// Make an optimistic flush.
this._flush();
// Sleeping x seconds before closing the sender's connection to ensure
// all spans are flushed.
setTimeout(() => {
this._sender.close();
}, this._flushTimeout);
}, this._onShutdownFlushTimeout);
}

/** Transform spans and sends to Jaeger service. */
private _sendSpans(
private async _sendSpans(
spans: ReadableSpan[],
done?: (result: ExportResult) => void
) {
const thriftSpan = spans.map(span => spanToThrift(span));
for (const span of thriftSpan) {
this._sender.append(span, (numSpans: number, err?: string) => {
if (err) {
// @todo: decide whether to break out the loop on first error.
this._logger.error(`failed to append span: ${err}`);
if (done) return done(ExportResult.FAILED_NOT_RETRYABLE);
}
});
try {
await this._append(span);
} catch (err) {
this._logger.error(`failed to append span: ${err}`);
// TODO right now we break out on first error, is that desirable?
if (done) return done(ExportResult.FAILED_NOT_RETRYABLE);
}
}
// @todo: We should wait for all the callbacks of the append calls to
// complete before it calls done with success.
this._logger.debug('successful append for : %s', thriftSpan.length);

// Flush all spans on each export. No-op if span buffer is empty
await this._flush();

if (done) return done(ExportResult.SUCCESS);
}

private _flush(): void {
this._sender.flush((numSpans: number, err?: string) => {
if (err) {
this._logger.error(`failed to flush ${numSpans} spans: ${err}`);
}
private async _append(span: jaegerTypes.ThriftSpan): Promise<number> {
return new Promise((resolve, reject) => {
this._sender.append(span, (count: number, err?: string) => {
if (err) {
return reject(new Error(err));
}
resolve(count);
});
});
}

private async _flush(): Promise<void> {
await new Promise((resolve, reject) => {
this._sender.flush((_count: number, err?: string) => {
if (err) {
return reject(new Error(err));
}
this._logger.debug('successful flush for %s spans', _count);
resolve();
});
});
}
}
3 changes: 2 additions & 1 deletion packages/opentelemetry-exporter-jaeger/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ export interface ExporterConfig {
host?: string; // default: 'localhost'
port?: number; // default: 6832
maxPacketSize?: number; // default: 65000
/** Force a flush on shutdown */
forceFlush?: boolean; // default: true
/** Time to wait for an onShutdown flush to finish before closing the sender */
flushTimeout?: number; // default: 2000
flushInterval?: number; // default(ms): 5000
}

// Below require is needed as jaeger-client types does not expose the thrift,
Expand Down
10 changes: 5 additions & 5 deletions packages/opentelemetry-exporter-jaeger/test/jaeger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ describe('JaegerExporter', () => {
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');

assert.ok(exporter['_forceFlush']);
assert.strictEqual(exporter['_flushTimeout'], 5000);
assert.ok(exporter['_forceFlushOnShutdown']);
assert.strictEqual(exporter['_onShutdownFlushTimeout'], 5000);
});

it('should construct an exporter without forceFlush and flushTimeout', () => {
Expand All @@ -72,8 +72,8 @@ describe('JaegerExporter', () => {
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');

assert.ok(exporter['_forceFlush']);
assert.strictEqual(exporter['_flushTimeout'], 2000);
assert.ok(exporter['_forceFlushOnShutdown']);
assert.strictEqual(exporter['_onShutdownFlushTimeout'], 2000);
});

it('should construct an exporter with forceFlush = false', () => {
Expand All @@ -84,7 +84,7 @@ describe('JaegerExporter', () => {
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');

assert.ok(!exporter['_forceFlush']);
assert.ok(!exporter['_forceFlushOnShutdown']);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class BatchSpanProcessor implements SpanProcessor {
: DEFAULT_BUFFER_TIMEOUT_MS;

this._timer = setInterval(() => {
if (Date.now() - this._lastSpanFlush >= this._bufferTimeout) {
if (this._shouldFlush()) {
this._flush();
}
}, this._bufferTimeout);
Expand Down Expand Up @@ -73,6 +73,13 @@ export class BatchSpanProcessor implements SpanProcessor {
}
}

private _shouldFlush(): boolean {
return (
this._finishedSpans.length >= 0 &&
Date.now() - this._lastSpanFlush >= this._bufferTimeout
);
}

/** Send the span data list to exporter */
private _flush() {
this._exporter.export(this._finishedSpans, () => {});
Expand Down

0 comments on commit b4ab8a5

Please sign in to comment.