Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk-trace-base): eager exporting for batch span processor #3458

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c339cd5
feat(sdk-trace-base): eager exporting for batch span processor
seemk Nov 30, 2022
48fab8d
fix: use interval not timeout
seemk Nov 30, 2022
c12124b
chore: cleanup, add test for periodic exports
seemk Nov 30, 2022
1b6fe87
Merge branch 'main' into bsp-eager-export
seemk Nov 30, 2022
e101c00
chore: update changelog
seemk Nov 30, 2022
d79ba67
fix: use setTimeout for browser compatibility
seemk Dec 6, 2022
faffbd2
refactor: spacing
seemk Dec 6, 2022
307ca7c
Merge branch 'main' into bsp-eager-export
seemk Dec 6, 2022
4e01111
fix: unref timeout timer
seemk Dec 7, 2022
1879569
Merge branch 'main' into bsp-eager-export
seemk Dec 7, 2022
746457e
Merge branch 'main' into bsp-eager-export
seemk Dec 7, 2022
b27af18
Merge branch 'main' into bsp-eager-export
seemk Dec 8, 2022
525ad54
Merge branch 'main' into bsp-eager-export
seemk Dec 13, 2022
487a3fb
Merge branch 'main' into bsp-eager-export
seemk Dec 15, 2022
72f2726
Merge branch 'main' into bsp-eager-export
seemk Dec 21, 2022
b3718d1
Merge branch 'main' into bsp-eager-export
seemk Dec 22, 2022
492458d
Merge branch 'main' into bsp-eager-export
seemk Jan 3, 2023
c16e528
Merge branch 'main' into bsp-eager-export
seemk Jan 13, 2023
946e33b
Merge branch 'main' into bsp-eager-export
seemk Jan 13, 2023
5598d25
Merge branch 'main' into bsp-eager-export
seemk Jan 16, 2023
b044f6d
Merge branch 'main' into bsp-eager-export
seemk Jan 16, 2023
72fa72a
Merge branch 'main' into bsp-eager-export
seemk Jan 18, 2023
2799733
Merge branch 'main' into bsp-eager-export
dyladan Feb 3, 2023
a1bfd82
Merge branch 'main' into bsp-eager-export
seemk Feb 17, 2023
73732f6
Merge branch 'main' into bsp-eager-export
seemk Mar 22, 2023
e6d728b
Merge branch 'main' into bsp-eager-export
seemk Mar 28, 2023
94597e1
Merge branch 'main' into bsp-eager-export
dyladan Apr 24, 2023
d5a1020
Remove duplicate changelog entry
dyladan Apr 24, 2023
3c01099
Merge branch 'main' into bsp-eager-export
seemk May 9, 2023
0bea748
Merge branch 'main' into bsp-eager-export
seemk May 31, 2023
8ff333e
Merge branch 'main' into bsp-eager-export
seemk Jul 26, 2023
68247b3
Merge branch 'main' into bsp-eager-export
seemk Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :bug: (Bug Fix)

* fix(sdk-trace-base): eager exporting for batch span processor [#3458](https://github.com/open-telemetry/opentelemetry-js/pull/3458) @seemk

### :books: (Refine Doc)

### :house: (Internal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>

private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
private _nextExport: number = 0;
private _shutdownOnce: BindOnceFuture<void>;

constructor(private readonly _exporter: SpanExporter, config?: T) {
Expand Down Expand Up @@ -72,6 +73,8 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
);
this._maxExportBatchSize = this._maxQueueSize;
}

this._resetTimer(this._scheduledDelayMillis);
}

forceFlush(): Promise<void> {
Expand Down Expand Up @@ -100,17 +103,11 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return this._shutdownOnce.call();
}

private _shutdown() {
return Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return this._flushAll();
})
.then(() => {
return this._exporter.shutdown();
});
private async _shutdown() {
this.onShutdown();
this._clearTimer();
await this._flushAll();
await this._exporter.shutdown();
}

/** Add a span in the buffer. */
Expand All @@ -120,42 +117,38 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return;
}
this._finishedSpans.push(span);
this._maybeStartTimer();
this._exportCompleteBatch();
}

/**
* Send all spans to the exporter respecting the batch size limit
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
private async _flushAll(): Promise<void> {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._exportOneBatch());
}

await Promise.all(promises);
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
private async _exportOneBatch(): Promise<void> {
if (this._finishedSpans.length === 0) {
return Promise.resolve();
return;
}

return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
unrefTimer(timer);
// prevent downstream exporter calls from generating spans
context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
Expand All @@ -179,20 +172,43 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
});
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;
this._timer = setTimeout(() => {
this._flushOneBatch()
.then(() => {
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
private async _tryExportOneBatch(): Promise<void> {
try {
await this._exportOneBatch();
} catch (e) {
globalErrorHandler(e);
}
}

private async _export() {
await this._tryExportOneBatch();

if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._resetTimer(0);
} else {
this._resetTimer(this._scheduledDelayMillis);
}
}

private _exportCompleteBatch() {
if (this._finishedSpans.length < this._maxExportBatchSize) {
return;
}

if (this._nextExport === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of this pattern of "detecting" whether we should start a timer or not -- I perfer more explicit start/reset descriptions.

It took me a while to understand that this it the "flag" you are effectively using to determine whether there is "more" data (or not) and then whether to "reset" the timer.

I also don't like that you can "reset" an already running timer, as it could be very easy for someone to come along later and cause an endless (until it got full) delay if something is getting batched at a regular interval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_nextExport is only used to avoid needlessly resetting the timer, i.e. it means that an export is already queued next cycle. Think about appending thousands of spans consecutively in the same event loop cycle.

What would your alternative be to resetting an already running timer? Starting an export in a new promise right away when the buffer reaches the threshold can't be done as it would cause both too many concurrent exports and would nullify the concept of max queue size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_nextExport is only used to avoid needlessly resetting the timer, i.e. it means that an export is already queued next cycle.

this._timer will (should) be undefined when no timer is already running 😄 (as long as you also set the value to undefined (or null) within the setTimeout() implementation as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is basically what the previous implementation was doing with if (this._timer !== undefined) return; in the _maybeSet function (although I'd also prefer to see a check that nothing is currently batched as well -- to avoid creating the timer in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer is now always running, basically this new implementation sets the timeout to 0 once the batch size is exceeded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if there is nothing in the batch we should not have any timer running...
ie. The timer should only be running if there is something waiting to be sent, otherwise, if an application is sitting doing nothing (because the user walked away) by having a running timer this can cause the device (client) to never go to sleep and therefore use more resource (power / battery) when not necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the spec (https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor) we could probably simplify this whole processor to just be a setInterval and a list of spans. During the most recent spec iteration it was made clear that there is no need to wait for the previous export to complete before starting another one. The word "returns" is used very explicitly in the spec and refers to thread safety, not to the actual async task of the export.

Copy link
Contributor Author

@seemk seemk Feb 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't not needing to wait for previous export to complete invalidate the maxQueueSize parameter in this case? E.g. when starting an export when the batch size has been reached

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify this whole processor to just be a setInterval

Noooo! Intervals are worse than timeout management as intervals get left behind and cause the APP/CPU to always be busy (at regular intervals)

return;
}

this._resetTimer(0);
}

private _resetTimer(timeout: number) {
this._nextExport = timeout;
this._clearTimer();

this._timer = setTimeout(async () => {
await this._export();
}, timeout);
unrefTimer(this._timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe('BatchSpanProcessorBase', () => {
assert.strictEqual(spy.args.length, 0);
});

it('should export the sampled spans with buffer size reached', done => {
it('should export the sampled spans with buffer size reached', async () => {
const clock = sinon.useFakeTimers();
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
Expand All @@ -189,14 +189,11 @@ describe('BatchSpanProcessorBase', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

setTimeout(async () => {
assert.strictEqual(exporter.getFinishedSpans().length, 5);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(0);
assert.strictEqual(exporter.getFinishedSpans().length, 5);
clock.restore();
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should force flush when timeout exceeded', done => {
Expand Down