Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: eager batch span processing #3828

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
BindOnceFuture,
ExportResultCode,
getEnv,
globalErrorHandler,
suppressTracing,
unrefTimer,
} from '@opentelemetry/core';
Expand All @@ -34,8 +33,7 @@ import { SpanExporter } from './SpanExporter';
* the SDK then pushes them to the exporter pipeline.
*/
export abstract class BatchSpanProcessorBase<T extends BufferConfig>
implements SpanProcessor
{
implements SpanProcessor {
private readonly _maxExportBatchSize: number;
private readonly _maxQueueSize: number;
private readonly _scheduledDelayMillis: number;
Expand All @@ -45,6 +43,8 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private _timer: NodeJS.Timeout | undefined;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;
private _waiting = false;
private _processing = false;

constructor(private readonly _exporter: SpanExporter, config?: T) {
const env = getEnv();
Expand Down Expand Up @@ -94,7 +94,15 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return;
}

this._addToBuffer(span);
if (this._finishedSpans.length >= this._maxQueueSize) {
this._droppedSpansCount++
return;
}

this._finishedSpans.push(span);

this._maybeEagerlyProcessBatch();
this._maybeStartTimer();
}

shutdown(): Promise<void> {
Expand All @@ -114,123 +122,125 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
});
}

/** Add a span in the buffer. */
private _addToBuffer(span: ReadableSpan) {
if (this._finishedSpans.length >= this._maxQueueSize) {
// limit reached, drop span

if (this._droppedSpansCount === 0) {
diag.debug('maxQueueSize reached, dropping spans');
}
this._droppedSpansCount++;

return;
}

if (this._droppedSpansCount > 0) {
// some spans were dropped, log once with count of spans dropped
diag.warn(
`Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`
);
this._droppedSpansCount = 0;
private _maybeEagerlyProcessBatch() {
if (!this._processing && this._finishedSpans.length >= this._maxExportBatchSize) {
this._processOneBatch();
}

this._finishedSpans.push(span);
this._maybeStartTimer();
}

/**
* Send all spans to the exporter respecting the batch size limit
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
private async _flushAll(): Promise<void> {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._processOneBatch());
}

await Promise.all(promises);
}

private _flushOneBatch(): Promise<void> {
private _processOneBatch() {
// because we eagerly export, we need to limit concurrency or we may have infinite concurrent exports
if (this._processing) {
this._waiting = true;
return;
};
this._waiting = false;
this._processing = true;
this._clearTimer();
if (this._finishedSpans.length === 0) {
return Promise.resolve();

if (this._shutdownOnce.isCalled) {
return;
}

// TODO decide if we want to favor spans without pending async resource attributes
// this is only important if a span processor is shared with multiple tracer providers
// this._finishedSpans.sort((a, b) => {
// if (a.resource.asyncAttributesPending == b.resource.asyncAttributesPending) return 0;
// if (a.resource.asyncAttributesPending && !b.resource.asyncAttributesPending) return 1;
// return -1;
// });

// drain 1 batch from queue.
const batch = this._finishedSpans.splice(0, this._maxExportBatchSize);

console.log('drained', batch.length, 'spans from the queue');
console.log('queue length', this._finishedSpans.length);


// start timer if there are still spans in the queue
this._maybeStartTimer();

if (this._droppedSpansCount > 0) {
diag.warn(
`Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`
);
}
return new Promise((resolve, reject) => {
this._droppedSpansCount = 0;

Promise.all(
batch
.map(span => span.resource)
.filter(resource => resource.asyncAttributesPending)
.map(res => res.waitForAsyncAttributes?.())
)
.then(() => this._promisifiedExportBatchWithTimeout(batch))
.catch((err) => {
// TODO improve error message with some contextual info and err object
diag.error("Failed to export batch")
})
.finally(() => {
this._processing = false;
if (this._waiting) {
this._processOneBatch();
}
this._maybeEagerlyProcessBatch();
this._maybeStartTimer();
});
}

private _promisifiedExportBatchWithTimeout(batch: ReadableSpan[]): Promise<void> {
// promisify and add to list of tracked export promises
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
// we can't cancel the export so we have to just forget about it
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
unrefTimer(timer);

context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
// could pass the same finished spans to the exporter if the buffer is cleared
// outside the execution of this callback.
const spans = this._finishedSpans.splice(0, this._maxExportBatchSize);

const doExport = () =>
this._exporter.export(spans, result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
}
});
const pendingResources = spans
.map(span => span.resource)
.filter(resource => resource.asyncAttributesPending);

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (pendingResources.length === 0) {
doExport();
} else {
Promise.all(
pendingResources.map(resource =>
resource.waitForAsyncAttributes?.()
)
).then(doExport, err => {
globalErrorHandler(err);
reject(err);
});
}
this._exporter.export(batch, (result) => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
}
});
});
});
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;
this._timer = setTimeout(() => {
this._flushOneBatch()
.then(() => {
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
unrefTimer(this._timer);
if (this._finishedSpans.length > 0 && this._timer == null) {
this._timer = setTimeout(this._processOneBatch.bind(this), this._scheduledDelayMillis);
unrefTimer(this._timer);
}
}

private _clearTimer() {
if (this._timer !== undefined) {
if (this._timer != null) {
clearTimeout(this._timer);
this._timer = undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ describe('BatchSpanProcessorBase', () => {

assert.strictEqual(spy.args.length, 1);
await processor.shutdown();
console.log(spy.args)
assert.strictEqual(spy.args.length, 2);
assert.strictEqual(exporter.getFinishedSpans().length, 0);

Expand Down