Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otlp-exporter-http): add retries #2717

Closed
Closed
16 changes: 10 additions & 6 deletions packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ export abstract class OTLPExporterBase<
* Export items.
* @param items
* @param resultCallback
* @param exportTimeoutMillis - optional
* @param onError - optional
*/

export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void {

if (this._shutdownOnce.isCalled) {
resultCallback({
code: ExportResultCode.FAILED,
Expand All @@ -83,12 +87,12 @@ export abstract class OTLPExporterBase<
}

this._export(items)
.then(() => {
resultCallback({ code: ExportResultCode.SUCCESS });
})
.catch((error: ExportServiceError) => {
resultCallback({ code: ExportResultCode.FAILED, error });
});
.then(() => {
resultCallback({ code: ExportResultCode.SUCCESS });
})
.catch((error: ExportServiceError) => {
resultCallback({ code: ExportResultCode.FAILED, error });
});
}

private _export(items: ExportItem[]): Promise<unknown> {
Expand Down
109 changes: 77 additions & 32 deletions packages/exporter-trace-otlp-http/src/platform/node/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import { diag } from '@opentelemetry/api';
import { CompressionAlgorithm } from './types';

let gzip: zlib.Gzip | undefined;
const DEFAULT_MAX_ATTEMPTS = 4;
const DEFAULT_INITIAL_BACKOFF = 1000;
const DEFAULT_BACKOFF_MULTIPLIER = 1.5;

/**
* Sends data using http
Expand All @@ -43,6 +46,21 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
): void {
const parsedUrl = new url.URL(collector.url);

// temp code - this will be merged from timeout pr
const exporterTimeout = collector._timeoutMillis;
let reqIsDestroyed: boolean;

let req: http.ClientRequest;
let retryTimer: ReturnType<typeof setTimeout>;

const exporterTimer = setTimeout(() => {
clearTimeout(retryTimer);
reqIsDestroyed = true;
req.destroy();
// create error here?
// onError(new Error('Request Timeout'));
}, exporterTimeout);

const options: http.RequestOptions | https.RequestOptions = {
hostname: parsedUrl.hostname,
port: parsedUrl.port,
Expand All @@ -58,48 +76,75 @@ export function sendWithHttp<ExportItem, ServiceRequest>(

const request = parsedUrl.protocol === 'http:' ? http.request : https.request;

const req = request(options, (res: http.IncomingMessage) => {
let responseData = '';
res.on('data', chunk => (responseData += chunk));
res.on('end', () => {
if (res.statusCode && res.statusCode < 299) {
diag.debug(`statusCode: ${res.statusCode}`, responseData);
onSuccess();
} else {
const error = new otlpTypes.OTLPExporterError(
res.statusMessage,
res.statusCode,
responseData
const sendWithRetry = (retries = DEFAULT_MAX_ATTEMPTS, backoffMillis = DEFAULT_INITIAL_BACKOFF) => {
req = request(options, (res: http.IncomingMessage) => {
let responseData = '';
res.on('data', chunk => (responseData += chunk));
res.on('end', () => {
if (res.statusCode && res.statusCode < 299) {
diag.debug(`statusCode: ${res.statusCode}`, responseData);
// clear all timers since request was successful and resolve promise
clearTimeout(exporterTimer);
clearTimeout(retryTimer);
onSuccess();
} else if (res.statusCode && isRetryable(res.statusCode) && retries > 0) {
retryTimer = setTimeout(() => {
return sendWithRetry(retries - 1, backoffMillis * DEFAULT_BACKOFF_MULTIPLIER);
}, backoffMillis);
} else {
const error = new otlpTypes.OTLPExporterError(
res.statusMessage,
res.statusCode,
responseData
);
// clear all timers since request failed and there are no more retries left
// then reject promise
clearTimeout(exporterTimer);
clearTimeout(retryTimer);
onError(error);
}
});
});

// temp code - this will be merged from timeout pr
req.on('error', (error: Error | any) => {
if (reqIsDestroyed) {
const err = new otlpTypes.OTLPExporterError(
'Request Timeout', error.code
);
onError(err);
} else {
onError(error);
}
});
});


req.on('error', (error: Error) => {
onError(error);
});
switch (collector.compression) {
case CompressionAlgorithm.GZIP: {
if (!gzip) {
gzip = zlib.createGzip();
}
req.setHeader('Content-Encoding', 'gzip');
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);

switch (collector.compression) {
case CompressionAlgorithm.GZIP: {
if (!gzip) {
gzip = zlib.createGzip();
break;
}
req.setHeader('Content-Encoding', 'gzip');
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);
default:
req.write(data);
req.end();

break;
break;
}
default:
req.write(data);
req.end();
};
sendWithRetry();
}

break;
}
function isRetryable(statusCode: number) {
const retryCodes = [429, 502, 503, 504];

return retryCodes.includes(statusCode);
}

function readableFromBuffer(buff: string | Buffer): Readable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ export interface SpanExporter {
/**
* Called to export sampled {@link ReadableSpan}s.
* @param spans the list of sampled Spans to be exported.
* @param resultCallback
* @param exportTimeoutMillis optional - currently only used by BatchSpanProcessorBase
* @param onError optional - currently only used by BatchSpanProcessorBase
*/
export(
spans: ReadableSpan[],
resultCallback: (result: ExportResult) => void
resultCallback: (result: ExportResult) => void,
exportTimeoutMillis?: number,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't add optional parameter in the middle of the arguments because that would be a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Thanks for your feedback.

onError?: (error: object) => void,
): void;

/** Stops the exporter. */
Expand Down