From c87a3048984325e96889689bc566c0042ff2f316 Mon Sep 17 00:00:00 2001 From: Chengzhong Wu Date: Sat, 14 Jan 2023 19:37:19 +0800 Subject: [PATCH] fix(sdk-metrics): collect metrics when periodic exporting metric reader flushes (#3517) Co-authored-by: Daniel Dyla --- CHANGELOG.md | 1 + .../export/PeriodicExportingMetricReader.ts | 38 +++++++++++-------- .../PeriodicExportingMetricReader.test.ts | 36 +++++++++++------- 3 files changed, 47 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f4e1382e4..4a09363008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * `telemetry.sdk.version` * fix(sdk-trace): make spans resilient to clock drift [#3434](https://github.com/open-telemetry/opentelemetry-js/pull/3434) @dyladan * fix(selenium-tests): updated webpack version for selenium test issue [#3456](https://github.com/open-telemetry/opentelemetry-js/issues/3456) @SaumyaBhushan +* fix(sdk-metrics): collect metrics when periodic exporting metric reader flushes [#3517](https://github.com/open-telemetry/opentelemetry-js/pull/3517) @legendecas * fix(sdk-metrics): fix duplicated registration of metrics for collectors [#3488](https://github.com/open-telemetry/opentelemetry-js/pull/3488) @legendecas * fix(core): fix precision loss in numberToHrtime [#3480](https://github.com/open-telemetry/opentelemetry-js/pull/3480) @legendecas diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index e9da187a45..c156e883ad 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -90,7 +90,25 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _runOnce(): Promise { - const { resourceMetrics, errors } = await this.collect({}); + try { + await callWithTimeout(this._doRun(), this._exportTimeout); + } catch (err) { + if (err instanceof TimeoutError) { + api.diag.error( + 'Export took longer than %s milliseconds and timed out.', + this._exportTimeout + ); + return; + } + + globalErrorHandler(err); + } + } + + private async _doRun(): Promise { + const { resourceMetrics, errors } = await this.collect({ + timeoutMillis: this._exportTimeout, + }); if (errors.length > 0) { api.diag.error( @@ -109,25 +127,15 @@ export class PeriodicExportingMetricReader extends MetricReader { protected override onInitialized(): void { // start running the interval as soon as this reader is initialized and keep handle for shutdown. - this._interval = setInterval(async () => { - try { - await callWithTimeout(this._runOnce(), this._exportTimeout); - } catch (err) { - if (err instanceof TimeoutError) { - api.diag.error( - 'Export took longer than %s milliseconds and timed out.', - this._exportTimeout - ); - return; - } - - globalErrorHandler(err); - } + this._interval = setInterval(() => { + // this._runOnce never rejects. Using void operator to suppress @typescript-eslint/no-floating-promises. + void this._runOnce(); }, this._exportInterval); unrefTimer(this._interval); } protected async onForceFlush(): Promise { + await this._runOnce(); await this._exporter.forceFlush(); } diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index d6fa69df9d..d5ab553126 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -38,8 +38,9 @@ const MAX_32_BIT_INT = 2 ** 31 - 1; class TestMetricExporter implements PushMetricExporter { public exportTime = 0; public forceFlushTime = 0; - public throwException = false; - public failureResult = false; + public throwExport = false; + public throwFlush = false; + public rejectExport = false; private _batches: ResourceMetrics[] = []; private _shutdown: boolean = false; @@ -49,11 +50,11 @@ class TestMetricExporter implements PushMetricExporter { ): void { this._batches.push(metrics); - if (this.throwException) { + if (this.throwExport) { throw new Error('Error during export'); } setTimeout(() => { - if (this.failureResult) { + if (this.rejectExport) { resultCallback({ code: ExportResultCode.FAILED, error: new Error('some error'), @@ -72,7 +73,7 @@ class TestMetricExporter implements PushMetricExporter { } async forceFlush(): Promise { - if (this.throwException) { + if (this.throwFlush) { throw new Error('Error during forceFlush'); } @@ -91,6 +92,10 @@ class TestMetricExporter implements PushMetricExporter { } return this._batches.slice(0, numberOfExports); } + + getExports(): ResourceMetrics[] { + return this._batches.slice(0); + } } class TestDeltaMetricExporter extends TestMetricExporter { @@ -203,7 +208,7 @@ describe('PeriodicExportingMetricReader', () => { describe('periodic export', () => { it('should keep running on export errors', async () => { const exporter = new TestMetricExporter(); - exporter.throwException = true; + exporter.throwExport = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: 30, @@ -218,13 +223,13 @@ describe('PeriodicExportingMetricReader', () => { emptyResourceMetrics, ]); - exporter.throwException = false; + exporter.throwExport = false; await reader.shutdown(); }); it('should keep running on export failure', async () => { const exporter = new TestMetricExporter(); - exporter.failureResult = true; + exporter.rejectExport = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: 30, @@ -239,7 +244,7 @@ describe('PeriodicExportingMetricReader', () => { emptyResourceMetrics, ]); - exporter.failureResult = false; + exporter.rejectExport = false; await reader.shutdown(); }); @@ -261,7 +266,7 @@ describe('PeriodicExportingMetricReader', () => { emptyResourceMetrics, ]); - exporter.throwException = false; + exporter.throwExport = false; await reader.shutdown(); }); }); @@ -271,7 +276,7 @@ describe('PeriodicExportingMetricReader', () => { sinon.restore(); }); - it('should forceFlush exporter', async () => { + it('should collect and forceFlush exporter', async () => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); exporterMock.expects('forceFlush').calledOnceWithExactly(); @@ -284,6 +289,10 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.forceFlush(); exporterMock.verify(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 1); + await reader.shutdown(); }); @@ -307,12 +316,13 @@ describe('PeriodicExportingMetricReader', () => { it('should throw when exporter throws', async () => { const exporter = new TestMetricExporter(); - exporter.throwException = true; + exporter.throwFlush = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, exportTimeoutMillis: 80, }); + reader.setMetricProducer(new TestMetricProducer()); await assertRejects(() => reader.forceFlush(), /Error during forceFlush/); }); @@ -454,7 +464,7 @@ describe('PeriodicExportingMetricReader', () => { it('should throw on non-initialized instance.', async () => { const exporter = new TestMetricExporter(); - exporter.throwException = true; + exporter.throwFlush = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT,