Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otlp-exporter-http): add retries #2717

Closed
Closed
42 changes: 39 additions & 3 deletions packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export abstract class OTLPExporterBase<
protected _concurrencyLimit: number;
protected _sendingPromises: Promise<unknown>[] = [];
protected _shutdownOnce: BindOnceFuture<void>;
private DEFAULT_MAX_ATTEMPTS = 4;
private DEFAULT_INITIAL_BACKOFF = 1000;
private DEFAULT_BACKOFF_MULTIPLIER = 1.5;
private retryCodes = [429, 502, 503, 504];

/**
* @param config
Expand Down Expand Up @@ -64,8 +68,24 @@ export abstract class OTLPExporterBase<
* Export items.
* @param items
* @param resultCallback
* @param exportTimeoutMillis - optional
* @param onError - optional
*/
export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void {

export(items: ExportItem[], resultCallback: (result: ExportResult) => void, exportTimeoutMillis?: number, onError?: (error: object) => void): void {

let retryTimer: ReturnType<typeof setTimeout>;
let exportTimer: ReturnType<typeof setTimeout>;

if (exportTimeoutMillis && onError) {
exportTimer = setTimeout(() => {
clearTimeout(retryTimer);
if (onError !== undefined) {
onError(new Error('Timeout'));
}
}, exportTimeoutMillis);
}

if (this._shutdownOnce.isCalled) {
resultCallback({
code: ExportResultCode.FAILED,
Expand All @@ -82,13 +102,25 @@ export abstract class OTLPExporterBase<
return;
}

this._export(items)
const exportWithRetry = (retries = this.DEFAULT_MAX_ATTEMPTS, backoffMillis = this.DEFAULT_INITIAL_BACKOFF) => {
this._export(items)
.then(() => {
clearTimeout(exportTimer);
resultCallback({ code: ExportResultCode.SUCCESS });
})
.catch((error: ExportServiceError) => {
resultCallback({ code: ExportResultCode.FAILED, error });
if (this._isRetryable(error.code) && retries > 0) {
retryTimer = setTimeout(() => {
return exportWithRetry(retries - 1, backoffMillis * this.DEFAULT_BACKOFF_MULTIPLIER);
}, backoffMillis);
} else {
clearTimeout(exportTimer);
resultCallback({ code: ExportResultCode.FAILED, error });
}
});
};

exportWithRetry();
}

private _export(items: ExportItem[]): Promise<unknown> {
Expand All @@ -102,6 +134,10 @@ export abstract class OTLPExporterBase<
});
}

private _isRetryable(statusCode: number): boolean {
return this.retryCodes.includes(statusCode);
}

/**
* Shutdown the exporter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig> implements
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like from the spec that both the BSP and the OTLP exporter have different timeout (30s for BSP and 10s pour otlp). I think we are meant to keep a global timeout at the BSP level and add another one in the OTLP exporter. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree. I just learned about that and solving issue #2706 first (adding and using the timeout in the OTLP exporter) is going to help me with this retry logic and I can avoid touching the BSP level timeout.

// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
Expand All @@ -155,7 +151,6 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig> implements
this._exporter.export(
this._finishedSpans.splice(0, this._maxExportBatchSize),
result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
Expand All @@ -164,7 +159,9 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig> implements
new Error('BatchSpanProcessor: span export failed')
);
}
}
},
this._exportTimeoutMillis,
reject
);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ export interface SpanExporter {
/**
* Called to export sampled {@link ReadableSpan}s.
* @param spans the list of sampled Spans to be exported.
* @param resultCallback
* @param exportTimeoutMillis optional - currently only used by BatchSpanProcessorBase
* @param onError optional - currently only used by BatchSpanProcessorBase
*/
export(
spans: ReadableSpan[],
resultCallback: (result: ExportResult) => void
resultCallback: (result: ExportResult) => void,
exportTimeoutMillis?: number,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't add optional parameter in the middle of the arguments because that would be a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Thanks for your feedback.

onError?: (error: object) => void,
): void;

/** Stops the exporter. */
Expand Down