Skip to content

Commit 2276977

Browse files
fix(sdk-metrics): merge uncollected delta accumulations (#3667)
Co-authored-by: Marc Pichler <[email protected]>
1 parent d82a098 commit 2276977

File tree

4 files changed

+97
-0
lines changed

4 files changed

+97
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/
3737

3838
* fix(core): added falsy check to make otel core work with browser where webpack config had process as false or null [#3613](https://github.com/open-telemetry/opentelemetry-js/issues/3613) @ravindra-dyte
3939
* fix(instrumentation-http): include query params in http.target [#3646](https://github.com/open-telemetry/opentelemetry-js/pull/3646) @kobi-co
40+
* fix(sdk-metrics): merge uncollected delta accumulations [#3667](https://github.com/open-telemetry/opentelemetry-js/pull/3667) @legendecas
4041

4142
### :books: (Refine Doc)
4243

packages/sdk-metrics/src/state/DeltaMetricProcessor.ts

+11
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
5757
this._aggregator.createAccumulation(collectionTime);
5858
accumulation?.record(value);
5959
let delta = accumulation;
60+
// Diff with recorded cumulative memo.
6061
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
6162
// has() returned true, previous is present.
6263
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -66,6 +67,16 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
6667
)!;
6768
delta = this._aggregator.diff(previous, accumulation);
6869
}
70+
// Merge with uncollected active delta.
71+
if (this._activeCollectionStorage.has(attributes, hashCode)) {
72+
// has() returned true, previous is present.
73+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
74+
const active = this._activeCollectionStorage.get(
75+
attributes,
76+
hashCode
77+
)!;
78+
delta = this._aggregator.merge(active, delta);
79+
}
6980

7081
// Save the current record and the delta record.
7182
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import * as assert from 'assert';
18+
import { DataPointType, MeterProvider, MetricReader } from '../../src';
19+
import { TestDeltaMetricReader } from '../export/TestMetricReader';
20+
import { assertDataPoint, assertMetricData } from '../util';
21+
22+
// https://github.com/open-telemetry/opentelemetry-js/issues/3664
23+
24+
describe('two-metric-readers-async-instrument', () => {
25+
it('both metric readers should collect metrics', async () => {
26+
const meterProvider = new MeterProvider();
27+
const reader1 = new TestDeltaMetricReader();
28+
const reader2 = new TestDeltaMetricReader();
29+
30+
meterProvider.addMetricReader(reader1);
31+
meterProvider.addMetricReader(reader2);
32+
33+
const meter = meterProvider.getMeter('my-meter');
34+
35+
let counter = 1;
36+
const asyncUpDownCounter = meter.createObservableUpDownCounter(
37+
'my_async_updowncounter'
38+
);
39+
asyncUpDownCounter.addCallback(observableResult => {
40+
observableResult.observe(counter);
41+
});
42+
43+
await assertCollection(reader1, 1);
44+
await assertCollection(reader2, 1);
45+
46+
counter = 10;
47+
await assertCollection(reader1, 9);
48+
await assertCollection(reader2, 9);
49+
50+
async function assertCollection(reader: MetricReader, value: number) {
51+
const { errors, resourceMetrics } = await reader.collect();
52+
assert.strictEqual(errors.length, 0);
53+
54+
// Collected only one Metric.
55+
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
56+
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
57+
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
58+
59+
assertMetricData(metric, DataPointType.SUM, {
60+
name: 'my_async_updowncounter',
61+
});
62+
assertDataPoint(metric.dataPoints[0], {}, value);
63+
}
64+
});
65+
});

packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts

+20
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,26 @@ describe('DeltaMetricProcessor', () => {
100100
assert.strictEqual(accumulation?.toPointValue(), 11);
101101
}
102102
});
103+
104+
it('should merge with active delta of accumulations', () => {
105+
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));
106+
107+
{
108+
const measurements = new AttributeHashMap<number>();
109+
measurements.set({}, 10);
110+
metricProcessor.batchCumulate(measurements, [0, 0]);
111+
}
112+
113+
{
114+
const measurements = new AttributeHashMap<number>();
115+
measurements.set({}, 20);
116+
metricProcessor.batchCumulate(measurements, [1, 1]);
117+
}
118+
119+
const accumulations = metricProcessor.collect();
120+
const accumulation = accumulations.get({});
121+
assert.strictEqual(accumulation?.toPointValue(), 20);
122+
});
103123
});
104124

105125
describe('collect', () => {

0 commit comments

Comments
 (0)