diff --git a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts index 38818980f95..53b567e4de0 100644 --- a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts +++ b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts @@ -1883,4 +1883,36 @@ describe('Node SDK', () => { await sdk.shutdown(); }); }); + + describe('shutdown', function () { + it('should shutdown within reasonable time when collector is not reachable', async function () { + // arrange + // force all exporters on + process.env.OTEL_EXPORTER_METRICS = 'otlp'; + process.env.OTEL_TRACES_EXPORTER = 'otlp'; + process.env.OTEL_LOGS_EXPORTER = 'otlp'; + + // set invalid endpoint to avoid hitting local collector endpoints + process.env.OTEL_EXPORTER_OTLP_ENDPOINT = 'http://example.invalid/'; + + const sdk = new NodeSDK(); + sdk.start(); + + // simulate exporting some data so that shutdown has something to flush + metrics.getMeter('my-meter').createCounter('my-counter').add(1); + trace.getTracer('my-tracer').startSpan('my-span').end(); + logs.getLogger('my-logger').emit({ body: 'my-log' }); + + // act + const shutdownStarted = Date.now(); + await sdk.shutdown(); + const shutdownDuration = Date.now() - shutdownStarted; + + // assert + assert.ok( + shutdownDuration < 1000, + `shutdown took too long: ${shutdownDuration}ms` + ); + }); + }); }); diff --git a/experimental/packages/otlp-exporter-base/src/exporter-transport.ts b/experimental/packages/otlp-exporter-base/src/exporter-transport.ts index 0123b26db99..ab9998bd20d 100644 --- a/experimental/packages/otlp-exporter-base/src/exporter-transport.ts +++ b/experimental/packages/otlp-exporter-base/src/exporter-transport.ts @@ -18,5 +18,10 @@ import { ExportResponse } from './export-response'; export interface IExporterTransport { send(data: Uint8Array, timeoutMillis: number): Promise; + + /** + * Finish pending requests as soon as possible, foregoing retries if possible. + */ + forceFlush?(): void; shutdown(): void; } diff --git a/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts b/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts index 06ac035281b..1f97c78c91f 100644 --- a/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts +++ b/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts @@ -141,6 +141,8 @@ class OTLPExportDelegate } forceFlush(): Promise { + // note: it is the responsibility of the caller to ensure not new exports are scheduled after this call. + this._transport.forceFlush?.(); return this._promiseQueue.awaitAll(); } diff --git a/experimental/packages/otlp-exporter-base/src/retrying-transport.ts b/experimental/packages/otlp-exporter-base/src/retrying-transport.ts index 58fb3b56f44..0a09b2637ff 100644 --- a/experimental/packages/otlp-exporter-base/src/retrying-transport.ts +++ b/experimental/packages/otlp-exporter-base/src/retrying-transport.ts @@ -31,11 +31,17 @@ function getJitter() { return Math.random() * (2 * JITTER) - JITTER; } +interface CancellableOperation { + cancelRetry(): void; +} + class RetryingTransport implements IExporterTransport { - private _transport: IExporterTransport; + private readonly _transport: IExporterTransport; + private readonly _cancellableOperations; constructor(transport: IExporterTransport) { this._transport = transport; + this._cancellableOperations = new Set(); } private retry( @@ -44,9 +50,22 @@ class RetryingTransport implements IExporterTransport { inMillis: number ): Promise { return new Promise((resolve, reject) => { - setTimeout(() => { + const timeoutHandle = setTimeout(() => { + // Remove from cancellable operations once executing + this._cancellableOperations.delete(operation); this._transport.send(data, timeoutMillis).then(resolve, reject); }, inMillis); + + const operation: CancellableOperation = { + cancelRetry: () => { + clearTimeout(timeoutHandle); + resolve({ + status: 'retryable', + error: new Error('Retry cancelled due to forceFlush()'), + }); + }, + }; + this._cancellableOperations.add(operation); }); } @@ -54,48 +73,81 @@ class RetryingTransport implements IExporterTransport { let attempts = MAX_ATTEMPTS; let nextBackoff = INITIAL_BACKOFF; - const deadline = Date.now() + timeoutMillis; - let result = await this._transport.send(data, timeoutMillis); - - while (result.status === 'retryable' && attempts > 0) { - attempts--; - - // use maximum of computed backoff and 0 to avoid negative timeouts - const backoff = Math.max( - Math.min(nextBackoff * (1 + getJitter()), MAX_BACKOFF), - 0 - ); - nextBackoff = nextBackoff * BACKOFF_MULTIPLIER; - const retryInMillis = result.retryInMillis ?? backoff; + // Create an operation to track this request and allow cancellation of retries + let shouldRetry = true; + const operation: CancellableOperation = { + cancelRetry: () => { + shouldRetry = false; + }, + }; + this._cancellableOperations.add(operation); + + try { + const deadline = Date.now() + timeoutMillis; + let result = await this._transport.send(data, timeoutMillis); + + while (result.status === 'retryable' && attempts > 0) { + attempts--; + + // Don't retry if forceFlush has been called for this request + if (!shouldRetry) { + diag.info('Foregoing retry as operation was forceFlushed'); + return result; + } + + // use maximum of computed backoff and 0 to avoid negative timeouts + const backoff = Math.max( + Math.min(nextBackoff * (1 + getJitter()), MAX_BACKOFF), + 0 + ); + nextBackoff = nextBackoff * BACKOFF_MULTIPLIER; + const retryInMillis = result.retryInMillis ?? backoff; + + // return when expected retry time is after the export deadline. + const remainingTimeoutMillis = deadline - Date.now(); + if (retryInMillis > remainingTimeoutMillis) { + diag.info( + `Export retry time ${Math.round(retryInMillis)}ms exceeds remaining timeout ${Math.round( + remainingTimeoutMillis + )}ms, not retrying further.` + ); + return result; + } + + diag.verbose( + `Scheduling export retry in ${Math.round(retryInMillis)}ms` + ); + result = await this.retry(data, remainingTimeoutMillis, retryInMillis); + } - // return when expected retry time is after the export deadline. - const remainingTimeoutMillis = deadline - Date.now(); - if (retryInMillis > remainingTimeoutMillis) { + if (result.status === 'success') { + diag.verbose( + `Export succeeded after ${MAX_ATTEMPTS - attempts} retry attempts.` + ); + } else if (result.status === 'retryable') { diag.info( - `Export retry time ${Math.round(retryInMillis)}ms exceeds remaining timeout ${Math.round( - remainingTimeoutMillis - )}ms, not retrying further.` + `Export failed after maximum retry attempts (${MAX_ATTEMPTS}).` ); - return result; + } else { + diag.info(`Export failed with non-retryable error: ${result.error}`); } - diag.verbose(`Scheduling export retry in ${Math.round(retryInMillis)}ms`); - result = await this.retry(data, remainingTimeoutMillis, retryInMillis); + return result; + } finally { + // Always remove the operation from the set when done to avoid memory leaks + this._cancellableOperations.delete(operation); } + } - if (result.status === 'success') { - diag.verbose( - `Export succeeded after ${MAX_ATTEMPTS - attempts} retry attempts.` - ); - } else if (result.status === 'retryable') { - diag.info( - `Export failed after maximum retry attempts (${MAX_ATTEMPTS}).` - ); - } else { - diag.info(`Export failed with non-retryable error: ${result.error}`); - } + forceFlush() { + this._transport.forceFlush?.(); - return result; + diag.debug('cancelling pending retries'); + // Cancel all pending retries and mark active requests to not retry + for (const operation of this._cancellableOperations) { + operation.cancelRetry(); + } + this._cancellableOperations.clear(); } shutdown() { diff --git a/experimental/packages/otlp-exporter-base/test/common/otlp-export-delegate.test.ts b/experimental/packages/otlp-exporter-base/test/common/otlp-export-delegate.test.ts index de27eaaf05e..b9a6c3a407c 100644 --- a/experimental/packages/otlp-exporter-base/test/common/otlp-export-delegate.test.ts +++ b/experimental/packages/otlp-exporter-base/test/common/otlp-export-delegate.test.ts @@ -104,7 +104,6 @@ describe('OTLPExportDelegate', function () { }; const mockSerializer = serializerStubs; - // promise queue has not reached capacity yet const promiseQueueStubs = { pushPromise: sinon.stub(), hasReachedLimit: sinon.stub(), diff --git a/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts index fe3c2a662e4..6449ece07bc 100644 --- a/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts +++ b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts @@ -19,10 +19,15 @@ import * as assert from 'assert'; import { IExporterTransport } from '../../src'; import { createRetryingTransport } from '../../src/retrying-transport'; import { ExportResponse } from '../../src'; +import { diag } from '@opentelemetry/api'; const timeoutMillis = 1000000; describe('RetryingTransport', function () { + afterEach(function () { + sinon.restore(); + }); + describe('send', function () { it('does not retry when underlying transport succeeds', async function () { // arrange @@ -243,4 +248,233 @@ describe('RetryingTransport', function () { assert.strictEqual(result, retryResponse); }); }); + + describe('forceFlush', function () { + it('cancels pending retries and returns retryable', async function () { + // arrange + const timer = sinon.useFakeTimers(); + const retryResponse: ExportResponse = { + status: 'retryable', + retryInMillis: 100, + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon.stub().resolves(retryResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // Start a send that will retry + const sendPromise = transport.send(mockData, timeoutMillis); + + // Break event loop to allow initial send to complete + await timer.tickAsync(1); + + // act - forceFlush while retry is pending (this should cancel the pending timeout) + transport.forceFlush?.(); + + // assert - the send promise should resolve with retryable status immediately + await timer.runAllAsync(); + const result = await sendPromise; + assert.strictEqual(result.status, 'retryable'); + assert.strictEqual( + result.error?.message, + 'Retry cancelled due to forceFlush()' + ); + sinon.assert.calledOnce(transportStubs.send); // Only initial attempt, retry was cancelled + }); + + it('allows new retries after forceFlush', async function () { + // arrange + const timer = sinon.useFakeTimers(); + const retryResponse: ExportResponse = { + status: 'retryable', + }; + const successResponse: ExportResponse = { + status: 'success', + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon + .stub() + .onFirstCall() + .resolves(retryResponse) + .onSecondCall() + .resolves(successResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act - forceFlush first + transport.forceFlush?.(); + + // Start a send after forceFlush() + const sendPromise = transport.send(mockData, timeoutMillis); + + // Wait for first attempt to complete + await timer.runAllAsync(); + + // assert + const result = await sendPromise; + assert.strictEqual(result.status, 'success'); + sinon.assert.calledTwice(transportStubs.send); // Initial attempt + one retry + }); + + it('cancels multiple pending retries', async function () { + // arrange + const timer = sinon.useFakeTimers(); + const retryResponse: ExportResponse = { + status: 'retryable', + retryInMillis: 100, // Short retry delay + }; + const mockData1 = Uint8Array.from([1, 2, 3]); + const mockData2 = Uint8Array.from([4, 5, 6]); + + const transportStubs = { + send: sinon.stub().resolves(retryResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // Start multiple sends that will retry + const sendPromise1 = transport.send(mockData1, timeoutMillis); + const sendPromise2 = transport.send(mockData2, timeoutMillis); + + // Break event loop to allow initial sends to complete + await timer.tickAsync(1); + + // act - forceFlush while retries are pending (cancel pending retries) + transport.forceFlush?.(); + + // assert - the send promises should resolve with retryable status immediately + const result1 = await sendPromise1; + const result2 = await sendPromise2; + + assert.strictEqual(result1.status, 'retryable'); + assert.strictEqual( + result1.error?.message, + 'Retry cancelled due to forceFlush()' + ); + assert.strictEqual(result2.status, 'retryable'); + assert.strictEqual( + result2.error?.message, + 'Retry cancelled due to forceFlush()' + ); + + // Only 2 initial attempts (one for each send), all retries were cancelled + sinon.assert.calledTwice(transportStubs.send); + }); + + it('cancels retries that have not been scheduled yet', async function () { + // arrange + const timer = sinon.useFakeTimers(); + const infoSpy = sinon.spy(diag, 'info'); + const originalError = new Error(); + const retryResponse: ExportResponse = { + status: 'retryable', + retryInMillis: 100, // Short retry delay + error: originalError, // include an error to verify it's retained in the response + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon.stub().resolves(retryResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // send, but do not break event loop yet, so retry is not yet scheduled + const sendPromise = transport.send(mockData, timeoutMillis); + + // act - forceFlush, no retries are pending but send is still in progress + transport.forceFlush?.(); + + // assert + // let everything play out + await timer.runAllAsync(); + const result = await sendPromise; + assert.strictEqual(result.status, 'retryable'); + assert.strictEqual(result.error, originalError); + infoSpy.calledOnceWithExactly( + 'foregoing retry as operation was forceFlushed' + ); + + // Only 1 attempt (initial send, no retry) + sinon.assert.calledOnce(transportStubs.send); + }); + + it('calls forceFlush on underlying transport', function () { + // arrange + const transportStubs = { + send: sinon.stub(), + shutdown: sinon.stub(), + forceFlush: sinon.stub().resolves(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + transport.forceFlush?.(); + + // assert + sinon.assert.calledOnce(transportStubs.forceFlush); + }); + }); + + describe('shutdown', function () { + it('calls shutdown on underlying transport', function () { + // arrange + const transportStubs = { + send: sinon.stub(), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + transport.shutdown(); + + // assert + sinon.assert.calledOnce(transportStubs.shutdown); + }); + + it('allows initial send attempt to complete even during shutdown', async function () { + // arrange + const successResponse: ExportResponse = { + status: 'success', + }; + const mockData = Uint8Array.from([1, 2, 3]); + + let resolveTransportSend: (value: ExportResponse) => void; + const transportSendPromise = new Promise(resolve => { + resolveTransportSend = resolve; + }); + + const transportStubs = { + send: sinon.stub().returns(transportSendPromise), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // Start a send + const sendPromise = transport.send(mockData, timeoutMillis); + + // Shutdown while initial send is in progress + transport.shutdown(); + + // Complete the initial send + resolveTransportSend!(successResponse); + + // assert - initial send should complete successfully + const result = await sendPromise; + assert.strictEqual(result.status, 'success'); + }); + }); }); diff --git a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts index 413ea13df04..cff07cee19a 100644 --- a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts +++ b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts @@ -14,15 +14,13 @@ * limitations under the License. */ -import { diag } from '@opentelemetry/api'; +import { context, diag } from '@opentelemetry/api'; import { - ExportResult, ExportResultCode, getNumberFromEnv, globalErrorHandler, BindOnceFuture, - internal, - callWithTimeout, + suppressTracing, } from '@opentelemetry/core'; import type { BufferConfig } from '../types'; @@ -30,6 +28,113 @@ import type { SdkLogRecord } from './SdkLogRecord'; import type { LogRecordExporter } from './LogRecordExporter'; import type { LogRecordProcessor } from '../LogRecordProcessor'; +/** + * Waits for all pending async resources in the log records to be resolved. + */ +async function waitForResources(logRecords: SdkLogRecord[]): Promise { + const pendingResources: Array> = []; + for (let i = 0, len = logRecords.length; i < len; i++) { + const logRecord = logRecords[i]; + if ( + logRecord.resource.asyncAttributesPending && + logRecord.resource.waitForAsyncAttributes + ) { + pendingResources.push(logRecord.resource.waitForAsyncAttributes()); + } + } + + if (pendingResources.length > 0) { + await Promise.all(pendingResources); + } +} + +/** + * Represents an export operation that handles the entire export workflow. + */ +class ExportOperation { + private _exportCompleted: Promise; + private _exportScheduledPromise: Promise; + private _exportScheduledResolve!: () => void; + + constructor( + exporter: LogRecordExporter, + logRecords: SdkLogRecord[], + exportTimeoutMillis: number + ) { + this._exportScheduledPromise = new Promise(resolve => { + this._exportScheduledResolve = resolve; + }); + this._exportCompleted = this._executeExport( + exporter, + logRecords, + exportTimeoutMillis + ); + } + + /** Get the promise that resolves when the export completes */ + get exportCompleted(): Promise { + return this._exportCompleted; + } + + /** Get the promise that resolves when exporter.export() has been called */ + get exportScheduled(): Promise { + return this._exportScheduledPromise; + } + + private async _executeExport( + exporter: LogRecordExporter, + logRecords: SdkLogRecord[], + exportTimeoutMillis: number + ): Promise { + try { + // Wait for all pending resources before exporting + await waitForResources(logRecords); + + // Export with timeout, wrapped in suppressTracing context + await context.with(suppressTracing(context.active()), async () => { + await this._exportWithTimeout( + exporter, + logRecords, + exportTimeoutMillis + ); + }); + } catch (e) { + // ensure we never reject here, as we may call await after it has already resolved. + globalErrorHandler(e); + // resolve, as the error may have occurred before export was scheduled + this._exportScheduledResolve(); + } + } + + private async _exportWithTimeout( + exporter: LogRecordExporter, + logRecords: SdkLogRecord[], + exportTimeoutMillis: number + ): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error('Timeout')); + }, exportTimeoutMillis); + + // Call exporter.export() and immediately resolve exportScheduled + exporter.export(logRecords, result => { + clearTimeout(timer); + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject( + result.error ?? + new Error('BatchLogRecordProcessor: log record export failed') + ); + } + }); + + // Resolve exportScheduled immediately after calling exporter.export() + this._exportScheduledResolve(); + }); + } +} + export abstract class BatchLogRecordProcessorBase implements LogRecordProcessor { @@ -39,10 +144,11 @@ export abstract class BatchLogRecordProcessorBase private readonly _exportTimeoutMillis: number; private readonly _exporter: LogRecordExporter; - private _isExporting = false; + private _currentExport: ExportOperation | null = null; private _finishedLogRecords: SdkLogRecord[] = []; private _timer: NodeJS.Timeout | number | undefined; private _shutdownOnce: BindOnceFuture; + private _droppedLogRecordsCount: number = 0; constructor(exporter: LogRecordExporter, config?: T) { this._exporter = exporter; @@ -77,7 +183,25 @@ export abstract class BatchLogRecordProcessorBase if (this._shutdownOnce.isCalled) { return; } - this._addToBuffer(logRecord); + + if (this._finishedLogRecords.length >= this._maxQueueSize) { + if (this._droppedLogRecordsCount === 0) { + diag.debug('maxQueueSize reached, dropping log records'); + } + this._droppedLogRecordsCount++; + return; + } + + if (this._droppedLogRecordsCount > 0) { + // some log records were dropped, log once with count of log records dropped + diag.warn( + `Dropped ${this._droppedLogRecordsCount} log records because maxQueueSize reached` + ); + this._droppedLogRecordsCount = 0; + } + + this._finishedLogRecords.push(logRecord); + this._maybeStartTimer(); } public forceFlush(): Promise { @@ -97,74 +221,128 @@ export abstract class BatchLogRecordProcessorBase await this._exporter.shutdown(); } - /** Add a LogRecord in the buffer. */ - private _addToBuffer(logRecord: SdkLogRecord) { - if (this._finishedLogRecords.length >= this._maxQueueSize) { - return; - } - this._finishedLogRecords.push(logRecord); - this._maybeStartTimer(); - } - /** * Send all LogRecords to the exporter respecting the batch size limit * This function is used only on forceFlush or shutdown, * for all other cases _flush should be used * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - const batchCount = Math.ceil( - this._finishedLogRecords.length / this._maxExportBatchSize + private async _flushAll(): Promise { + // Clear timer to prevent concurrent exports + this._clearTimer(); + + // Wait for any in-progress export to complete + if (this._currentExport !== null) { + // speed up execution for current export + await this._exporter.forceFlush?.(); + await this._currentExport.exportCompleted; + this._currentExport = null; + } + + // Now flush all batches sequentially to avoid race conditions + while (this._finishedLogRecords.length > 0) { + const logRecords = this._extractBatch(); + if (logRecords === null) { + break; + } + + const exportOp = new ExportOperation( + this._exporter, + logRecords, + this._exportTimeoutMillis ); - for (let i = 0; i < batchCount; i++) { - promises.push(this._flushOneBatch()); + this._currentExport = exportOp; + + // await export scheduled, then force flush exporter to speed up export + try { + // TODO: figure out which ones of these throw, what the impact is on not doing the others. + await exportOp.exportScheduled; + await this._exporter.forceFlush?.(); + await exportOp.exportCompleted; + } catch (e) { + globalErrorHandler(e); + } finally { + this._currentExport = null; } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + } } - private _flushOneBatch(): Promise { - this._clearTimer(); + /** + * Extracts one batch from the buffer. + * Returns null if buffer is empty. + */ + private _extractBatch(): SdkLogRecord[] | null { if (this._finishedLogRecords.length === 0) { - return Promise.resolve(); + return null; } - return callWithTimeout( - this._export( - this._finishedLogRecords.splice(0, this._maxExportBatchSize) - ), + + if (this._finishedLogRecords.length <= this._maxExportBatchSize) { + const batch = this._finishedLogRecords; + this._finishedLogRecords = []; + return batch; + } else { + return this._finishedLogRecords.splice(0, this._maxExportBatchSize); + } + } + + private _exportOneBatch(): void { + this._clearTimer(); + + const logRecords = this._extractBatch(); + if (logRecords === null) { + return; + } + + const exportOp = new ExportOperation( + this._exporter, + logRecords, this._exportTimeoutMillis ); + this._currentExport = exportOp; + + // Handle completion asynchronously + exportOp.exportCompleted + .then(() => { + this._currentExport = null; + this._maybeStartTimer(); + }) + .catch(error => { + this._currentExport = null; + globalErrorHandler(error); + this._maybeStartTimer(); + }); } private _maybeStartTimer() { - if (this._isExporting) return; - const flush = () => { - this._isExporting = true; - this._flushOneBatch() - .then(() => { - this._isExporting = false; - if (this._finishedLogRecords.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - this._isExporting = false; - globalErrorHandler(e); - }); - }; - // we only wait if the queue doesn't have enough elements yet + if (this._shutdownOnce.isCalled) { + return; + } + + if (this._finishedLogRecords.length === 0) { + return; + } + + if (this._currentExport !== null) { + return; + } + + // If batch is full, export immediately if (this._finishedLogRecords.length >= this._maxExportBatchSize) { - return flush(); + this._exportOneBatch(); + return; } - if (this._timer !== undefined) return; - this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); - // depending on runtime, this may be a 'number' or NodeJS.Timeout + + // If timer is already set, don't set another one + if (this._timer !== undefined) { + return; + } + + // Set timer for scheduled export + this._timer = setTimeout(() => { + this._timer = undefined; + this._exportOneBatch(); + }, this._scheduledDelayMillis); + + // Unref timer so it doesn't keep process alive if (typeof this._timer !== 'number') { this._timer.unref(); } @@ -177,35 +355,5 @@ export abstract class BatchLogRecordProcessorBase } } - private _export(logRecords: SdkLogRecord[]): Promise { - const doExport = () => - internal - ._export(this._exporter, logRecords) - .then((result: ExportResult) => { - if (result.code !== ExportResultCode.SUCCESS) { - globalErrorHandler( - result.error ?? - new Error( - `BatchLogRecordProcessor: log record export failed (status ${result})` - ) - ); - } - }) - .catch(globalErrorHandler); - - const pendingResources = logRecords - .map(logRecord => logRecord.resource) - .filter(resource => resource.asyncAttributesPending); - - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (pendingResources.length === 0) { - return doExport(); - } else { - return Promise.all( - pendingResources.map(resource => resource.waitForAsyncAttributes?.()) - ).then(doExport, globalErrorHandler); - } - } - protected abstract onShutdown(): void; } diff --git a/experimental/packages/sdk-logs/src/export/LogRecordExporter.ts b/experimental/packages/sdk-logs/src/export/LogRecordExporter.ts index 4fecb2a8c90..a5701bd7de1 100644 --- a/experimental/packages/sdk-logs/src/export/LogRecordExporter.ts +++ b/experimental/packages/sdk-logs/src/export/LogRecordExporter.ts @@ -30,4 +30,6 @@ export interface LogRecordExporter { /** Stops the exporter. */ shutdown(): Promise; + + forceFlush?(): Promise; } diff --git a/experimental/packages/sdk-logs/test-batch-processor.js b/experimental/packages/sdk-logs/test-batch-processor.js new file mode 100644 index 00000000000..e69de29bb2d diff --git a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts index 36a396d4c75..ca175f27954 100644 --- a/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts +++ b/experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts @@ -41,7 +41,6 @@ import { import { LogRecordImpl } from '../../../src/LogRecordImpl'; class BatchLogRecordProcessor extends BatchLogRecordProcessorBase { - onInit() {} onShutdown() {} } @@ -292,12 +291,15 @@ describe('BatchLogRecordProcessorBase', () => { await processor.shutdown(); }); - it('should force flush when timeout exceeded for partial batches', done => { + it('should force flush when timeout exceeded for partial batches', async function () { + // arrange const clock = sinon.useFakeTimers(); const processor = new BatchLogRecordProcessor( exporter, defaultBufferConfig ); + + // act // Add only a partial batch (less than maxExportBatchSize) const partialBatchSize = Math.floor( defaultBufferConfig.maxExportBatchSize / 2 @@ -307,15 +309,14 @@ describe('BatchLogRecordProcessorBase', () => { processor.onEmit(logRecord); assert.strictEqual(exporter.getFinishedLogRecords().length, 0); } - setTimeout(() => { - // Should export the partial batch after timeout - assert.strictEqual( - exporter.getFinishedLogRecords().length, - partialBatchSize - ); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 1000); + + // assert + // Should export the partial batch after timeout + assert.strictEqual( + exporter.getFinishedLogRecords().length, + partialBatchSize + ); clock.restore(); }); @@ -365,7 +366,8 @@ describe('BatchLogRecordProcessorBase', () => { }); }); - it('should call globalErrorHandler when exporting fails', done => { + it('should call globalErrorHandler when exporting fails', async function () { + // arrange const clock = sinon.useFakeTimers(); const expectedError = new Error('Exporter failed'); sinon.stub(exporter, 'export').callsFake((_, callback) => { @@ -379,23 +381,21 @@ describe('BatchLogRecordProcessorBase', () => { exporter, defaultBufferConfig ); + + // act for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const logRecord = createLogRecord(); processor.onEmit(logRecord); } - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); - setTimeout(() => { - assert.strictEqual(errorHandlerSpy.callCount, 1); - const [[error]] = errorHandlerSpy.args; - assert.deepStrictEqual(error, expectedError); - // reset global error handler - setGlobalErrorHandler(loggingErrorHandler()); - done(); - }); + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 1000); + + // assert + sinon.assert.calledOnceWithExactly(errorHandlerSpy, expectedError); + // reset global error handler + setGlobalErrorHandler(loggingErrorHandler()); }); - it('should drop logRecords when there are more logRecords than "maxQueueSize"', () => { + it('should drop logRecords when there are more logRecords than "maxQueueSize"', function () { // Use a large batch size to prevent automatic exports during this test const maxQueueSize = 6; const maxExportBatchSize = 20; // Will be clamped to maxQueueSize (6) by constructor @@ -505,6 +505,7 @@ describe('BatchLogRecordProcessorBase', () => { describe('Concurrency', () => { it('should only send a single batch at a time', async () => { + // arrange const callbacks: ((result: ExportResult) => void)[] = []; const logRecords: SdkLogRecord[] = []; const exporter: LogRecordExporter = { @@ -521,11 +522,17 @@ describe('BatchLogRecordProcessorBase', () => { maxExportBatchSize: 5, maxQueueSize: 6, }); + + // act const totalLogRecords = 50; for (let i = 0; i < totalLogRecords; i++) { const logRecord = createLogRecord(); processor.onEmit(logRecord); } + + // yield to allow an export to start + await new Promise(resolve => setTimeout(resolve, 0)); + // assert assert.equal(callbacks.length, 1); assert.equal(logRecords.length, 5); callbacks[0]({ code: ExportResultCode.SUCCESS }); diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 254cf126648..46a109f52da 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -28,6 +28,109 @@ import { BufferConfig } from '../types'; import { ReadableSpan } from './ReadableSpan'; import { SpanExporter } from './SpanExporter'; +/** + * Waits for all pending async resources in the log records to be resolved. + */ +async function waitForResources(logRecords: ReadableSpan[]): Promise { + const pendingResources: Array> = []; + for (let i = 0, len = logRecords.length; i < len; i++) { + const logRecord = logRecords[i]; + if ( + logRecord.resource.asyncAttributesPending && + logRecord.resource.waitForAsyncAttributes + ) { + pendingResources.push(logRecord.resource.waitForAsyncAttributes()); + } + } + + if (pendingResources.length > 0) { + await Promise.all(pendingResources); + } +} + +/** + * Represents an export operation that handles the entire export workflow. + */ +class ExportOperation { + private _exportCompleted: Promise; + private _exportScheduledPromise: Promise; + private _exportScheduledResolve!: () => void; + + constructor( + exporter: SpanExporter, + logRecords: ReadableSpan[], + exportTimeoutMillis: number + ) { + this._exportScheduledPromise = new Promise(resolve => { + this._exportScheduledResolve = resolve; + }); + this._exportCompleted = this._executeExport( + exporter, + logRecords, + exportTimeoutMillis + ); + } + + /** Get the promise that resolves when the export completes */ + get exportCompleted(): Promise { + return this._exportCompleted; + } + + /** Get the promise that resolves when exporter.export() has been called */ + get exportScheduled(): Promise { + return this._exportScheduledPromise; + } + + private async _executeExport( + exporter: SpanExporter, + spans: ReadableSpan[], + exportTimeoutMillis: number + ): Promise { + try { + // Wait for all pending resources before exporting + await waitForResources(spans); + + // Export with timeout, wrapped in suppressTracing context + await context.with(suppressTracing(context.active()), async () => { + await this._exportWithTimeout(exporter, spans, exportTimeoutMillis); + }); + } catch (e) { + // ensure we never reject here, as we may call await after it has already resolved. + globalErrorHandler(e); + // resolve, as the error may have occurred before export was scheduled + this._exportScheduledResolve(); + } + } + + private async _exportWithTimeout( + exporter: SpanExporter, + spans: ReadableSpan[], + exportTimeoutMillis: number + ): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error('Timeout')); + }, exportTimeoutMillis); + + // Call exporter.export() and immediately resolve exportScheduled + exporter.export(spans, result => { + clearTimeout(timer); + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject( + result.error ?? + new Error('BatchLogRecordProcessor: log record export failed') + ); + } + }); + + // Resolve exportScheduled immediately after calling exporter.export() + this._exportScheduledResolve(); + }); + } +} + /** * Implementation of the {@link SpanProcessor} that batches spans exported by * the SDK then pushes them to the exporter pipeline. @@ -41,7 +144,7 @@ export abstract class BatchSpanProcessorBase private readonly _exportTimeoutMillis: number; private readonly _exporter: SpanExporter; - private _isExporting = false; + private _currentExport: ExportOperation | null = null; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | number | undefined; private _shutdownOnce: BindOnceFuture; @@ -102,17 +205,10 @@ export abstract class BatchSpanProcessorBase return this._shutdownOnce.call(); } - private _shutdown() { - return Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }); + private async _shutdown(): Promise { + this.onShutdown(); + await this._flushAll(); + await this._exporter.shutdown(); } /** Add a span in the buffer. */ @@ -141,114 +237,127 @@ export abstract class BatchSpanProcessorBase } /** - * Send all spans to the exporter respecting the batch size limit + * Send all LogRecords to the exporter respecting the batch size limit * This function is used only on forceFlush or shutdown, * for all other cases _flush should be used * */ - private _flushAll(): Promise { - return new Promise((resolve, reject) => { - const promises = []; - // calculate number of batches - const count = Math.ceil( - this._finishedSpans.length / this._maxExportBatchSize + private async _flushAll(): Promise { + // Clear timer to prevent concurrent exports + this._clearTimer(); + + // Wait for any in-progress export to complete + if (this._currentExport !== null) { + // speed up execution for current export + await this._exporter.forceFlush?.(); + await this._currentExport.exportCompleted; + this._currentExport = null; + } + + // Now flush all batches sequentially to avoid race conditions + while (this._finishedSpans.length > 0) { + const logRecords = this._extractBatch(); + if (logRecords === null) { + break; + } + + const exportOp = new ExportOperation( + this._exporter, + logRecords, + this._exportTimeoutMillis ); - for (let i = 0, j = count; i < j; i++) { - promises.push(this._flushOneBatch()); + this._currentExport = exportOp; + + // await export scheduled, then force flush exporter to speed up export + try { + // TODO: figure out which ones of these throw, what the impact is on not doing the others. + await exportOp.exportScheduled; + await this._exporter.forceFlush?.(); + await exportOp.exportCompleted; + } catch (e) { + globalErrorHandler(e); + } finally { + this._currentExport = null; } - Promise.all(promises) - .then(() => { - resolve(); - }) - .catch(reject); - }); + } } - private _flushOneBatch(): Promise { - this._clearTimer(); + /** + * Extracts one batch from the buffer. + * Returns null if buffer is empty. + */ + private _extractBatch(): ReadableSpan[] | null { if (this._finishedSpans.length === 0) { - return Promise.resolve(); + return null; } - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - // don't wait anymore for export, this way the next batch can start - reject(new Error('Timeout')); - }, this._exportTimeoutMillis); - // prevent downstream exporter calls from generating spans - context.with(suppressTracing(context.active()), () => { - // Reset the finished spans buffer here because the next invocations of the _flush method - // could pass the same finished spans to the exporter if the buffer is cleared - // outside the execution of this callback. - let spans: ReadableSpan[]; - if (this._finishedSpans.length <= this._maxExportBatchSize) { - spans = this._finishedSpans; - this._finishedSpans = []; - } else { - spans = this._finishedSpans.splice(0, this._maxExportBatchSize); - } - const doExport = () => - this._exporter.export(spans, result => { - clearTimeout(timer); - if (result.code === ExportResultCode.SUCCESS) { - resolve(); - } else { - reject( - result.error ?? - new Error('BatchSpanProcessor: span export failed') - ); - } - }); - - let pendingResources: Array> | null = null; - for (let i = 0, len = spans.length; i < len; i++) { - const span = spans[i]; - if ( - span.resource.asyncAttributesPending && - span.resource.waitForAsyncAttributes - ) { - pendingResources ??= []; - pendingResources.push(span.resource.waitForAsyncAttributes()); - } - } + if (this._finishedSpans.length <= this._maxExportBatchSize) { + const batch = this._finishedSpans; + this._finishedSpans = []; + return batch; + } else { + return this._finishedSpans.splice(0, this._maxExportBatchSize); + } + } - // Avoid scheduling a promise to make the behavior more predictable and easier to test - if (pendingResources === null) { - doExport(); - } else { - Promise.all(pendingResources).then(doExport, err => { - globalErrorHandler(err); - reject(err); - }); - } + private _exportOneBatch(): void { + this._clearTimer(); + + const logRecords = this._extractBatch(); + if (logRecords === null) { + return; + } + + const exportOp = new ExportOperation( + this._exporter, + logRecords, + this._exportTimeoutMillis + ); + this._currentExport = exportOp; + + // Handle completion asynchronously + exportOp.exportCompleted + .then(() => { + this._currentExport = null; + this._maybeStartTimer(); + }) + .catch(error => { + this._currentExport = null; + globalErrorHandler(error); + this._maybeStartTimer(); }); - }); } private _maybeStartTimer() { - if (this._isExporting) return; - const flush = () => { - this._isExporting = true; - this._flushOneBatch() - .finally(() => { - this._isExporting = false; - if (this._finishedSpans.length > 0) { - this._clearTimer(); - this._maybeStartTimer(); - } - }) - .catch(e => { - this._isExporting = false; - globalErrorHandler(e); - }); - }; - // we only wait if the queue doesn't have enough elements yet + if (this._shutdownOnce.isCalled) { + return; + } + + if (this._finishedSpans.length === 0) { + return; + } + + if (this._currentExport !== null) { + return; + } + + // If batch is full, export immediately if (this._finishedSpans.length >= this._maxExportBatchSize) { - return flush(); + this._exportOneBatch(); + return; } - if (this._timer !== undefined) return; - this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); - // depending on runtime, this may be a 'number' or NodeJS.Timeout + // If timer is already set, don't set another one + if (this._timer !== undefined) { + return; + } + + // Set timer for scheduled export + this._timer = setTimeout(() => { + this._timer = undefined; + this._exportOneBatch(); + }, this._scheduledDelayMillis); + + // Unref timer so it doesn't keep process alive if (typeof this._timer !== 'number') { this._timer.unref(); } diff --git a/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts b/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts index c9ca9c0c897..d37e77c65a8 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts @@ -21,7 +21,7 @@ import { ReadableSpan } from './ReadableSpan'; * An interface that allows different tracing services to export recorded data * for sampled spans in their own format. * - * To export data this MUST be register to the Tracer SDK using a optional + * To export data this MUST be registered to the Tracer SDK using an optional * config. */ export interface SpanExporter { diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 4d381482530..8b9b27d7c2f 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -58,7 +58,6 @@ function createUnsampledSpan(spanName: string): Span { } class BatchSpanProcessor extends BatchSpanProcessorBase { - onInit() {} onShutdown() {} } @@ -164,32 +163,31 @@ describe('BatchSpanProcessorBase', () => { } processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); + await Promise.resolve(); // yield to allow export to schedule assert.strictEqual(exporter.getFinishedSpans().length, 5); await processor.shutdown(); assert.strictEqual(exporter.getFinishedSpans().length, 0); }); - it('should force flush when timeout exceeded', done => { + it('should export when scheduledDelayMillis is exceeded', async function () { + // arrange const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); const span = createSampledSpan(name); + + // act for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 1000); - setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 4); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - - clock.restore(); + // assert + assert.strictEqual(exporter.getFinishedSpans().length, 4); }); - it('should force flush on demand', () => { + it('should force flush on demand', async function () { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); const span = createSampledSpan(name); for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { @@ -197,7 +195,7 @@ describe('BatchSpanProcessorBase', () => { processor.onEnd(span); } assert.strictEqual(exporter.getFinishedSpans().length, 0); - processor.forceFlush(); + await processor.forceFlush(); assert.strictEqual(exporter.getFinishedSpans().length, 4); }); @@ -231,48 +229,28 @@ describe('BatchSpanProcessorBase', () => { clock.restore(); }); - it( - 'should export each sampled span exactly once with buffer size' + - ' reached multiple times', - done => { - const originalTimeout = setTimeout; - const clock = sinon.useFakeTimers(); - const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - const totalSpans = defaultBufferConfig.maxExportBatchSize * 2; - for (let i = 0; i < totalSpans; i++) { - const span = createSampledSpan(`${name}_${i}`); - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - } - const span = createSampledSpan(`${name}_last`); + it('should export each sampled span exactly once with buffer size reached multiple times', async function () { + const clock = sinon.useFakeTimers(); + const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); + const totalSpans = defaultBufferConfig.maxExportBatchSize * 2; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); - - // because there is an async promise that will be trigger original - // timeout is needed to simulate a real tick to the next - originalTimeout(() => { - clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); - originalTimeout(async () => { - clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); - clock.restore(); - - diag.info( - 'finished spans count', - exporter.getFinishedSpans().length - ); - assert.strictEqual( - exporter.getFinishedSpans().length, - totalSpans + 1 - ); - - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }); - }); } - ); + const span = createSampledSpan(`${name}_last`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 10); + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 10); + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 10); + + diag.info('finished spans count', exporter.getFinishedSpans().length); + assert.strictEqual(exporter.getFinishedSpans().length, totalSpans + 1); + + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); + }); }); describe('force flush', () => { @@ -327,7 +305,8 @@ describe('BatchSpanProcessorBase', () => { }); }); - it('should call globalErrorHandler when exporting fails', done => { + it('should call globalErrorHandler when exporting fails', async function () { + // arrange const clock = sinon.useFakeTimers(); const expectedError = new Error('Exporter failed'); sinon.stub(exporter, 'export').callsFake((_, callback) => { @@ -335,30 +314,22 @@ describe('BatchSpanProcessorBase', () => { callback({ code: ExportResultCode.FAILED, error: expectedError }); }, 0); }); - const errorHandlerSpy = sinon.spy(); - setGlobalErrorHandler(errorHandlerSpy); + // act for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = createSampledSpan('test'); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } + await clock.tickAsync(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); - setTimeout(async () => { - assert.strictEqual(errorHandlerSpy.callCount, 1); - - const [[error]] = errorHandlerSpy.args; - - assert.deepStrictEqual(error, expectedError); - - //reset global error handler - setGlobalErrorHandler(loggingErrorHandler()); - done(); - }); + // assert + sinon.assert.calledOnce(errorHandlerSpy); + sinon.assert.calledOnceWithExactly(errorHandlerSpy, expectedError); + // reset global error handler + setGlobalErrorHandler(loggingErrorHandler()); }); it('should still export when previously failed', async () => { @@ -554,6 +525,7 @@ describe('BatchSpanProcessorBase', () => { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } + await new Promise(resolve => setTimeout(resolve, 0)); assert.equal(callbacks.length, 1); assert.equal(spans.length, 5); callbacks[0]({ code: ExportResultCode.SUCCESS }); diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index 93746e9a515..6def3bb81a6 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -116,7 +116,7 @@ export class PeriodicExportingMetricReader extends MetricReader { } } - private async _doRun(): Promise { + private async _collectAndPrepareMetrics() { const { resourceMetrics, errors } = await this.collect({ timeoutMillis: this._exportTimeout, }); @@ -137,6 +137,12 @@ export class PeriodicExportingMetricReader extends MetricReader { } } + return resourceMetrics; + } + + private async _doRun(): Promise { + const resourceMetrics = await this._collectAndPrepareMetrics(); + if (resourceMetrics.scopeMetrics.length === 0) { return; } @@ -171,7 +177,31 @@ export class PeriodicExportingMetricReader extends MetricReader { if (this._interval) { clearInterval(this._interval); } - await this.onForceFlush(); + + // Include the "effects" of forceFlush on shutdown, but do not call this.forceFlush directly, as it would wait for + // the export to finish before flushing which may include long retries. + const resourceMetrics = await this._collectAndPrepareMetrics(); + + if (resourceMetrics.scopeMetrics.length > 0) { + // Schedule the export but don't wait for it + internal._export(this._exporter, resourceMetrics).then( + result => { + if (result.code !== ExportResultCode.SUCCESS) { + globalErrorHandler( + new Error( + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + ) + ); + } + }, + err => { + globalErrorHandler(err); + } + ); + } + + // tell exporter to hurry-up, we're going home. + await this._exporter.forceFlush(); await this._exporter.shutdown(); } } diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 28136f1c006..eb2c07c7736 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -653,14 +653,16 @@ describe('PeriodicExportingMetricReader', () => { it('should throw on non-initialized instance.', async () => { const exporter = new TestMetricExporter(); - exporter.throwFlush = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, exportTimeoutMillis: 80, }); - await assert.rejects(() => reader.shutdown(), /Error during forceFlush/); + await assert.rejects( + () => reader.shutdown(), + /Error: MetricReader is not bound to a MetricProducer/ + ); }); }); });