Skip to content

Commit 4bb5e42

Browse files
Star tree kahan summation fix
Signed-off-by: bharath-techie <[email protected]>
1 parent 40dfbd3 commit 4bb5e42

File tree

15 files changed

+278
-105
lines changed

15 files changed

+278
-105
lines changed

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88
package org.opensearch.index.compositeindex.datacube.startree.aggregators;
99

10+
import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumFieldValueConverter;
1011
import org.opensearch.index.mapper.FieldValueConverter;
1112
import org.opensearch.index.mapper.NumberFieldMapper;
1213
import org.opensearch.search.aggregations.metrics.CompensatedSum;
@@ -21,87 +22,112 @@
2122
*
2223
* @opensearch.experimental
2324
*/
24-
class SumValueAggregator implements ValueAggregator<Double> {
25+
class SumValueAggregator implements ValueAggregator<CompensatedSum> {
2526

2627
private final FieldValueConverter fieldValueConverter;
28+
private final CompensatedSumFieldValueConverter compensatedSumConverter;
2729
private static final FieldValueConverter VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.DOUBLE;
2830

29-
private CompensatedSum kahanSummation = new CompensatedSum(0, 0);
30-
3131
public SumValueAggregator(FieldValueConverter fieldValueConverter) {
3232
this.fieldValueConverter = fieldValueConverter;
33+
this.compensatedSumConverter = new CompensatedSumFieldValueConverter();
3334
}
3435

3536
@Override
3637
public FieldValueConverter getAggregatedValueType() {
37-
return VALUE_AGGREGATOR_TYPE;
38+
return compensatedSumConverter;
3839
}
3940

4041
@Override
41-
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
42-
kahanSummation.reset(0, 0);
42+
public CompensatedSum getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
43+
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
4344
// add takes care of the sum and compensation internally
4445
if (segmentDocValue != null) {
4546
kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
4647
} else {
47-
kahanSummation.add(getIdentityMetricValue());
48+
kahanSummation.add(getIdentityMetricValue().value());
4849
}
49-
return kahanSummation.value();
50+
return kahanSummation;
5051
}
5152

5253
// we have overridden this method because the reset with sum and compensation helps us keep
5354
// track of precision and avoids a potential loss in accuracy of sums.
5455
@Override
55-
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue) {
56-
assert value == null || kahanSummation.value() == value;
56+
public CompensatedSum mergeAggregatedValueAndSegmentValue(CompensatedSum value, Long segmentDocValue) {
57+
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
58+
if (value == null) {
59+
value = new CompensatedSum(0, 0);
60+
}
61+
kahanSummation.reset(value.value(), value.delta());
5762
// add takes care of the sum and compensation internally
5863
if (segmentDocValue != null) {
5964
kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
6065
} else {
61-
kahanSummation.add(getIdentityMetricValue());
66+
kahanSummation.add(getIdentityMetricValue().value());
6267
}
63-
return kahanSummation.value();
68+
return kahanSummation;
6469
}
70+
// @Override
71+
// public CompensatedSum mergeAggregatedValueAndSegmentValue(CompensatedSum value, Long segmentDocValue) {
72+
// // Create new CompensatedSum with the previous value and delta
73+
// CompensatedSum kahanSummation = new CompensatedSum(
74+
// value != null ? value.value() : 0.0,
75+
// value != null ? value.delta() : 0.0
76+
// );
77+
//
78+
// // Add the new value to it
79+
// if (segmentDocValue != null) {
80+
// kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
81+
// } else {
82+
// kahanSummation.add(getIdentityMetricValue().value());
83+
// }
84+
// return kahanSummation;
85+
// }
6586

6687
@Override
67-
public Double mergeAggregatedValues(Double value, Double aggregatedValue) {
68-
assert aggregatedValue == null || kahanSummation.value() == aggregatedValue;
69-
// add takes care of the sum and compensation internally
88+
public CompensatedSum mergeAggregatedValues(CompensatedSum value, CompensatedSum aggregatedValue) {
89+
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
90+
if (aggregatedValue != null) {
91+
kahanSummation.reset(aggregatedValue.value(), aggregatedValue.delta());
92+
}
7093
if (value != null) {
71-
kahanSummation.add(value);
94+
kahanSummation.add(value.value(), value.delta());
7295
} else {
73-
kahanSummation.add(getIdentityMetricValue());
96+
kahanSummation.add(getIdentityMetricValue().value());
7497
}
75-
return kahanSummation.value();
98+
return kahanSummation;
7699
}
77100

78101
@Override
79-
public Double getInitialAggregatedValue(Double value) {
80-
kahanSummation.reset(0, 0);
102+
public CompensatedSum getInitialAggregatedValue(CompensatedSum value) {
103+
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
81104
// add takes care of the sum and compensation internally
82-
if (value != null) {
83-
kahanSummation.add(value);
105+
if (value == null) {
106+
kahanSummation.add(getIdentityMetricValue().value());
84107
} else {
85-
kahanSummation.add(getIdentityMetricValue());
108+
kahanSummation.add(value.value(), value.delta());
86109
}
87-
return kahanSummation.value();
110+
return kahanSummation;
88111
}
89112

90113
@Override
91-
public Double toAggregatedValueType(Long value) {
114+
public CompensatedSum toAggregatedValueType(Long value) {
115+
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
92116
try {
93117
if (value == null) {
94-
return getIdentityMetricValue();
118+
kahanSummation.add(getIdentityMetricValue().value());
119+
return kahanSummation;
95120
}
96-
return VALUE_AGGREGATOR_TYPE.toDoubleValue(value);
121+
kahanSummation.add(VALUE_AGGREGATOR_TYPE.toDoubleValue(value));
122+
return kahanSummation;
97123
} catch (Exception e) {
98124
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
99125
}
100126
}
101127

102128
@Override
103-
public Double getIdentityMetricValue() {
129+
public CompensatedSum getIdentityMetricValue() {
104130
// in present aggregations, if the metric behind sum is missing, we treat it as 0
105-
return 0D;
131+
return new CompensatedSum(0, 0);
106132
}
107133
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
2222
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
2323
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
24+
import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumFieldValueConverter;
2425
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeDocumentBitSetUtil;
2526
import org.opensearch.index.mapper.FieldValueConverter;
27+
import org.opensearch.search.aggregations.metrics.CompensatedSum;
2628

2729
import java.io.Closeable;
2830
import java.io.IOException;
@@ -125,6 +127,15 @@ protected void writeMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer
125127
} else {
126128
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
127129
}
130+
} else if (aggregatedValueType instanceof CompensatedSumFieldValueConverter) {
131+
if (isAggregatedDoc) {
132+
long val = NumericUtils.doubleToSortableLong(
133+
starTreeDocument.metrics[i] == null ? 0.0 : ((CompensatedSum) starTreeDocument.metrics[i]).value()
134+
);
135+
buffer.putLong(val);
136+
} else {
137+
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
138+
}
128139
} else {
129140
throw new IllegalStateException("Unsupported metric type");
130141
}
@@ -232,6 +243,14 @@ private long readMetrics(RandomAccessInput input, long offset, int numMetrics, O
232243
metrics[i] = val;
233244
}
234245
offset += Long.BYTES;
246+
} else if (aggregatedValueType instanceof CompensatedSumFieldValueConverter) {
247+
long val = input.readLong(offset);
248+
if (isAggregatedDoc) {
249+
metrics[i] = new CompensatedSum(DOUBLE.toDoubleValue(val), 0);
250+
} else {
251+
metrics[i] = val;
252+
}
253+
offset += Long.BYTES;
235254
} else {
236255
throw new IllegalStateException("Unsupported metric type");
237256
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
4343
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
4444
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
45+
import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumFieldValueConverter;
4546
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
4647
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
4748
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
@@ -50,6 +51,7 @@
5051
import org.opensearch.index.mapper.FieldValueConverter;
5152
import org.opensearch.index.mapper.Mapper;
5253
import org.opensearch.index.mapper.MapperService;
54+
import org.opensearch.search.aggregations.metrics.CompensatedSum;
5355

5456
import java.io.IOException;
5557
import java.util.ArrayList;
@@ -490,6 +492,13 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A
490492
NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i])
491493
);
492494
}
495+
} else if (aggregatedValueType instanceof CompensatedSumFieldValueConverter) {
496+
if (starTreeDocument.metrics[i] != null) {
497+
((SortedNumericDocValuesWriterWrapper) (metricWriters.get(i))).addValue(
498+
docId,
499+
NumericUtils.doubleToSortableLong(((CompensatedSum) starTreeDocument.metrics[i]).value())
500+
);
501+
}
493502
} else {
494503
throw new IllegalStateException("Unknown metric doc value type");
495504
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.compositeindex.datacube.startree.utils;
10+
11+
import org.opensearch.index.mapper.FieldValueConverter;
12+
13+
import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
14+
15+
/**
16+
* Field value converter for CompensatedSum
17+
*
18+
* @opensearch.internal
19+
*/
20+
public class CompensatedSumFieldValueConverter implements FieldValueConverter {
21+
22+
public CompensatedSumFieldValueConverter() {}
23+
24+
@Override
25+
public double toDoubleValue(long value) {
26+
return DOUBLE.toDoubleValue(value);
27+
}
28+
}

server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.search.aggregations.metrics;
3434

35+
import java.util.Objects;
36+
3537
/**
3638
* Used to calculate sums using the Kahan summation algorithm.
3739
*
@@ -110,4 +112,22 @@ public CompensatedSum add(double value, double delta) {
110112
return this;
111113
}
112114

115+
@Override
116+
public boolean equals(Object o) {
117+
if (this == o) return true;
118+
if (o == null || getClass() != o.getClass()) return false;
119+
CompensatedSum that = (CompensatedSum) o;
120+
return Double.compare(that.value, value) == 0 && Double.compare(that.delta, delta) == 0;
121+
}
122+
123+
@Override
124+
public int hashCode() {
125+
return Objects.hash(value, delta);
126+
}
127+
128+
@Override
129+
public String toString() {
130+
return value + " " + delta;
131+
}
132+
113133
}

server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
2222
import org.opensearch.index.mapper.CompositeMappedFieldType;
2323
import org.opensearch.index.mapper.FieldValueConverter;
24+
import org.opensearch.search.aggregations.metrics.CompensatedSum;
2425

2526
import java.io.IOException;
2627
import java.util.ArrayDeque;
@@ -137,6 +138,8 @@ public static void assertStarTreeDocuments(StarTreeDocument[] starTreeDocuments,
137138
StarTreeDocument resultStarTreeDocument = starTreeDocuments[i];
138139
StarTreeDocument expectedStarTreeDocument = expectedStarTreeDocuments[i];
139140

141+
System.out.println("result : " + resultStarTreeDocument);
142+
System.out.println("expected : " + expectedStarTreeDocument);
140143
assertNotNull(resultStarTreeDocument.dimensions);
141144
assertNotNull(resultStarTreeDocument.metrics);
142145

@@ -150,6 +153,18 @@ public static void assertStarTreeDocuments(StarTreeDocument[] starTreeDocuments,
150153
for (int mi = 0; mi < resultStarTreeDocument.metrics.length; mi++) {
151154
if (expectedStarTreeDocument.metrics[mi] instanceof Long) {
152155
assertEquals(((Long) expectedStarTreeDocument.metrics[mi]).doubleValue(), resultStarTreeDocument.metrics[mi]);
156+
} else if (resultStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
157+
if (expectedStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
158+
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
159+
} else {
160+
assertEquals((expectedStarTreeDocument.metrics[mi]), ((CompensatedSum) resultStarTreeDocument.metrics[mi]).value());
161+
}
162+
} else if (expectedStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
163+
if (resultStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
164+
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
165+
} else {
166+
assertEquals(((CompensatedSum) expectedStarTreeDocument.metrics[mi]).value(), resultStarTreeDocument.metrics[mi]);
167+
}
153168
} else {
154169
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
155170
}

server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/AbstractValueAggregatorTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.opensearch.index.mapper.FieldValueConverter;
1414
import org.opensearch.index.mapper.NumberFieldMapper;
15+
import org.opensearch.search.aggregations.metrics.CompensatedSum;
1516
import org.opensearch.test.OpenSearchTestCase;
1617
import org.junit.Before;
1718

@@ -64,6 +65,10 @@ public void testGetInitialAggregatedValueForSegmentDocValue() {
6465
long randomLong = randomLong();
6566
if (aggregator instanceof CountValueAggregator) {
6667
assertEquals(CountValueAggregator.DEFAULT_INITIAL_VALUE, aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong()));
68+
} else if (aggregator instanceof SumValueAggregator) {
69+
CompensatedSum sum = new CompensatedSum(0, 0);
70+
sum.add(fieldValueConverter.toDoubleValue(randomLong));
71+
assertEquals(sum, aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong));
6772
} else {
6873
assertEquals(fieldValueConverter.toDoubleValue(randomLong), aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong));
6974
}

0 commit comments

Comments
 (0)