Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se
* refactor(sdk-trace-base)!: remove `new Span` constructor in favor of `Tracer.startSpan` API [#5048](https://github.com/open-telemetry/opentelemetry-js/pull/5048) @david-luna
* refactor(sdk-trace-base)!: remove `BasicTracerProvider.addSpanProcessor` API in favor of constructor options. [#5134](https://github.com/open-telemetry/opentelemetry-js/pull/5134) @david-luna
* refactor(sdk-trace-base)!: make `resource` property private in `BasicTracerProvider` and remove `getActiveSpanProcessor` API. [#5192](https://github.com/open-telemetry/opentelemetry-js/pull/5192) @david-luna
* fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor. [#5303](https://github.com/open-telemetry/opentelemetry-js/pull/5303) @anuraaga
Comment thread
trentm marked this conversation as resolved.
Outdated
* feat(sdk-metrics)!: extract `IMetricReader` interface and use it over abstract class [#5311](https://github.com/open-telemetry/opentelemetry-js/pull/5311)
* (user-facing): `MeterProviderOptions` now provides the more general `IMetricReader` type over `MetricReader`
* If you accept `MetricReader` in your public interface, consider accepting the more general `IMetricReader` instead to avoid unintentional breaking changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
ExportResultCode,
globalErrorHandler,
BindOnceFuture,
ExportResult,
} from '@opentelemetry/core';
import { Span } from '../Span';
import { SpanProcessor } from '../SpanProcessor';
Expand All @@ -38,16 +37,16 @@ import { Resource } from '@opentelemetry/resources';
*/
export class SimpleSpanProcessor implements SpanProcessor {
private _shutdownOnce: BindOnceFuture<void>;
private _unresolvedExports: Set<Promise<void>>;
private _pendingExports: Set<Promise<unknown>>;

constructor(private readonly _exporter: SpanExporter) {
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._unresolvedExports = new Set<Promise<void>>();
this._pendingExports = new Set<Promise<void>>();
Comment thread
trentm marked this conversation as resolved.
}

async forceFlush(): Promise<void> {
// await unresolved resources before resolving
await Promise.all(Array.from(this._unresolvedExports));
// await pending exports
Comment thread
trentm marked this conversation as resolved.
Outdated
await Promise.all(Array.from(this._pendingExports));
if (this._exporter.forceFlush) {
await this._exporter.forceFlush();
}
Expand All @@ -64,43 +63,26 @@ export class SimpleSpanProcessor implements SpanProcessor {
return;
}

const doExport = () =>
internal
._export(this._exporter, [span])
.then((result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error(
`SimpleSpanProcessor: span export failed (status ${result})`
)
);
}
})
.catch(error => {
globalErrorHandler(error);
});
const pendingExport = this.doExport(span).catch(err =>
globalErrorHandler(err)
);
// Enqueue this export to the pending list so it can be flushed by the user.
this._pendingExports.add(pendingExport);
pendingExport.finally(() => this._pendingExports.delete(pendingExport));
}

// Avoid scheduling a promise to make the behavior more predictable and easier to test

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since it seemed to be misleading - a promise is scheduled by doExport anyways so there's no way to avoid scheduling one

private async doExport(span: ReadableSpan): Promise<void> {
Comment thread
trentm marked this conversation as resolved.
Outdated
if (span.resource.asyncAttributesPending) {
const exportPromise = (span.resource as Resource)
.waitForAsyncAttributes?.()
.then(
() => {
if (exportPromise != null) {
this._unresolvedExports.delete(exportPromise);
}
return doExport();
},
err => globalErrorHandler(err)
);
// Ensure resource is fully resolved before exporting.
await (span.resource as Resource).waitForAsyncAttributes?.();
}

// store the unresolved exports
if (exportPromise != null) {
this._unresolvedExports.add(exportPromise);
}
} else {
void doExport();
const result = await internal._export(this._exporter, [span]);
if (result.code !== ExportResultCode.SUCCESS) {
throw (
result.error ??
new Error(`SimpleSpanProcessor: span export failed (status ${result})`)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,40 @@ describe('SimpleSpanProcessor', () => {
);
});

it('should await doExport() and delete from _unresolvedExports', async () => {
it('should await doExport() and delete from _pendingExports', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);
const spanContext: SpanContext = {
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
spanId: '5e0c63257de34c92',
traceFlags: TraceFlags.SAMPLED,
};
const tracer = provider.getTracer('default');
const span = new SpanImpl({
scope: tracer.instrumentationScope,
resource: tracer['_resource'],
context: ROOT_CONTEXT,
spanContext,
name: 'span-name',
kind: SpanKind.CLIENT,
spanLimits: tracer.getSpanLimits(),
spanProcessor: tracer['_spanProcessor'],
});
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

assert.strictEqual(exportedSpans.length, 1);
});

it('should await doExport() and delete from _pendingExports with async resource', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);

Expand Down Expand Up @@ -247,11 +280,11 @@ describe('SimpleSpanProcessor', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_unresolvedExports'].size, 1);
assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_unresolvedExports'].size, 0);
assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

Expand Down