Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumType;
import org.opensearch.index.mapper.FieldValueConverter;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.aggregations.metrics.CompensatedSum;
Expand All @@ -21,87 +22,107 @@
*
* @opensearch.experimental
*/
class SumValueAggregator implements ValueAggregator<Double> {
class SumValueAggregator implements ValueAggregator<CompensatedSum> {

private final FieldValueConverter fieldValueConverter;
private final CompensatedSumType compensatedSumConverter;
private static final FieldValueConverter VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.DOUBLE;

private CompensatedSum kahanSummation = new CompensatedSum(0, 0);

public SumValueAggregator(FieldValueConverter fieldValueConverter) {
this.fieldValueConverter = fieldValueConverter;
this.compensatedSumConverter = new CompensatedSumType();
}

@Override
public FieldValueConverter getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
return compensatedSumConverter;
}

@Override
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
kahanSummation.reset(0, 0);
public CompensatedSum getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
// add takes care of the sum and compensation internally
if (segmentDocValue != null) {
kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
kahanSummation.reset(fieldValueConverter.toDoubleValue(segmentDocValue), 0);
} else {
kahanSummation.add(getIdentityMetricValue());
kahanSummation.reset(getIdentityMetricDoubleValue(), 0);
}
return kahanSummation.value();
return kahanSummation;
}

// we have overridden this method because the reset with sum and compensation helps us keep
// track of precision and avoids a potential loss in accuracy of sums.
@Override
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue) {
assert value == null || kahanSummation.value() == value;
public CompensatedSum mergeAggregatedValueAndSegmentValue(CompensatedSum value, Long segmentDocValue) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
if (value != null) {
kahanSummation.reset(value.value(), value.delta());
}
// add takes care of the sum and compensation internally
if (segmentDocValue != null) {
kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
} else {
kahanSummation.add(getIdentityMetricValue());
kahanSummation.add(getIdentityMetricDoubleValue());
}
return kahanSummation.value();
return kahanSummation;
}

@Override
public Double mergeAggregatedValues(Double value, Double aggregatedValue) {
assert aggregatedValue == null || kahanSummation.value() == aggregatedValue;
// add takes care of the sum and compensation internally
public CompensatedSum mergeAggregatedValues(CompensatedSum value, CompensatedSum aggregatedValue) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
if (aggregatedValue != null) {
kahanSummation.reset(aggregatedValue.value(), aggregatedValue.delta());
}
if (value != null) {
kahanSummation.add(value);
kahanSummation.add(value.value(), value.delta());
} else {
kahanSummation.add(getIdentityMetricValue());
kahanSummation.add(getIdentityMetricDoubleValue());
}
return kahanSummation.value();
return kahanSummation;
}

@Override
public Double getInitialAggregatedValue(Double value) {
kahanSummation.reset(0, 0);
public CompensatedSum getInitialAggregatedValue(CompensatedSum value) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
// add takes care of the sum and compensation internally
if (value != null) {
kahanSummation.add(value);
if (value == null) {
kahanSummation.reset(getIdentityMetricDoubleValue(), 0);
} else {
kahanSummation.add(getIdentityMetricValue());
kahanSummation.reset(value.value(), value.delta());
}
return kahanSummation.value();
return kahanSummation;
}

@Override
public Double toAggregatedValueType(Long value) {
public CompensatedSum toAggregatedValueType(Long value) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
try {
if (value == null) {
return getIdentityMetricValue();
kahanSummation.reset(getIdentityMetricDoubleValue(), 0);
return kahanSummation;
}
return VALUE_AGGREGATOR_TYPE.toDoubleValue(value);
kahanSummation.reset(VALUE_AGGREGATOR_TYPE.toDoubleValue(value), 0);
return kahanSummation;
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}

/**
* Since getIdentityMetricValue is called for every null document, and it creates a new object,
* in this class, calling getIdentityMetricDoubleValue to avoid initializing an object
*/
private double getIdentityMetricDoubleValue() {
return 0.0;
}

/**
* Since getIdentityMetricValue is called for every null document, and it creates a new object,
* in this class, calling getIdentityMetricDoubleValue to avoid initializing an object
*/
@Override
public Double getIdentityMetricValue() {
public CompensatedSum getIdentityMetricValue() {
// in present aggregations, if the metric behind sum is missing, we treat it as 0
return 0D;
return new CompensatedSum(0, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumType;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeDocumentBitSetUtil;
import org.opensearch.index.mapper.FieldValueConverter;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -125,6 +127,15 @@
} else {
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
} else if (aggregatedValueType instanceof CompensatedSumType) {
if (isAggregatedDoc) {
long val = NumericUtils.doubleToSortableLong(
starTreeDocument.metrics[i] == null ? 0.0 : ((CompensatedSum) starTreeDocument.metrics[i]).value()
);
buffer.putLong(val);
} else {
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
} else {
throw new IllegalStateException("Unsupported metric type");
}
Expand Down Expand Up @@ -232,6 +243,14 @@
metrics[i] = val;
}
offset += Long.BYTES;
} else if (aggregatedValueType instanceof CompensatedSumType) {
long val = input.readLong(offset);
if (isAggregatedDoc) {
metrics[i] = new CompensatedSum(aggregatedValueType.toDoubleValue(val), 0);
} else {
metrics[i] = val;

Check warning on line 251 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java#L251

Added line #L251 was not covered by tests
}
offset += Long.BYTES;
} else {
throw new IllegalStateException("Unsupported metric type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
Expand All @@ -50,6 +51,7 @@
import org.opensearch.index.mapper.FieldValueConverter;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -490,6 +492,13 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A
NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i])
);
}
} else if (aggregatedValueType instanceof CompensatedSumType) {
if (starTreeDocument.metrics[i] != null) {
((SortedNumericDocValuesWriterWrapper) (metricWriters.get(i))).addValue(
docId,
NumericUtils.doubleToSortableLong(((CompensatedSum) starTreeDocument.metrics[i]).value())
);
}
} else {
throw new IllegalStateException("Unknown metric doc value type");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.utils;

import org.opensearch.index.mapper.FieldValueConverter;

import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;

/**
* Field value converter for CompensatedSum - it's just a wrapper over Double
*
* @opensearch.internal
*/
public class CompensatedSumType implements FieldValueConverter {

public CompensatedSumType() {}

@Override
public double toDoubleValue(long value) {
return DOUBLE.toDoubleValue(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.aggregations.metrics;

import java.util.Objects;

/**
* Used to calculate sums using the Kahan summation algorithm.
*
Expand Down Expand Up @@ -110,4 +112,22 @@
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompensatedSum that = (CompensatedSum) o;
return Double.compare(that.value, value) == 0 && Double.compare(that.delta, delta) == 0;
}

@Override
public int hashCode() {
return Objects.hash(value, delta);

Check warning on line 125 in server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java#L125

Added line #L125 was not covered by tests
}

@Override
public String toString() {
return value + " " + delta;

Check warning on line 130 in server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java#L130

Added line #L130 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.FieldValueConverter;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -150,6 +151,18 @@ public static void assertStarTreeDocuments(StarTreeDocument[] starTreeDocuments,
for (int mi = 0; mi < resultStarTreeDocument.metrics.length; mi++) {
if (expectedStarTreeDocument.metrics[mi] instanceof Long) {
assertEquals(((Long) expectedStarTreeDocument.metrics[mi]).doubleValue(), resultStarTreeDocument.metrics[mi]);
} else if (resultStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
if (expectedStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
} else {
assertEquals((expectedStarTreeDocument.metrics[mi]), ((CompensatedSum) resultStarTreeDocument.metrics[mi]).value());
}
} else if (expectedStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
if (resultStarTreeDocument.metrics[mi] instanceof CompensatedSum) {
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
} else {
assertEquals(((CompensatedSum) expectedStarTreeDocument.metrics[mi]).value(), resultStarTreeDocument.metrics[mi]);
}
} else {
assertEquals(expectedStarTreeDocument.metrics[mi], resultStarTreeDocument.metrics[mi]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.opensearch.index.mapper.FieldValueConverter;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.aggregations.metrics.CompensatedSum;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -64,6 +65,10 @@ public void testGetInitialAggregatedValueForSegmentDocValue() {
long randomLong = randomLong();
if (aggregator instanceof CountValueAggregator) {
assertEquals(CountValueAggregator.DEFAULT_INITIAL_VALUE, aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong()));
} else if (aggregator instanceof SumValueAggregator) {
CompensatedSum sum = new CompensatedSum(0, 0);
sum.add(fieldValueConverter.toDoubleValue(randomLong));
assertEquals(sum, aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong));
} else {
assertEquals(fieldValueConverter.toDoubleValue(randomLong), aggregator.getInitialAggregatedValueForSegmentDocValue(randomLong));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ public void testKahanSummation() {
double expected = 1;

// initializing our sum aggregator to derive exact sum using kahan summation
double aggregatedValue = getAggregatedValue(numbers);
assertEquals(expected, aggregatedValue, 0);
CompensatedSum aggregatedSum = getAggregatedValue(numbers);
assertEquals(expected, aggregatedSum.value(), 0);

// assert kahan summation plain logic with our aggregated value
double actual = kahanSum(numbers);
assertEquals(actual, aggregatedValue, 0);
assertEquals(actual, aggregatedSum.value(), 0);

// assert that normal sum fails for this case
double normalSum = normalSum(numbers);
assertNotEquals(expected, normalSum, 0);
assertNotEquals(actual, normalSum, 0);
assertNotEquals(aggregatedValue, normalSum, 0);

assertNotEquals(aggregatedSum.value(), normalSum, 0);
}

private static double getAggregatedValue(double[] numbers) {
// explicitly took double to test for most precision
// hard to run similar tests for different data types dynamically as inputs and precision vary
private static CompensatedSum getAggregatedValue(double[] numbers) {
SumValueAggregator aggregator = new SumValueAggregator(NumberFieldMapper.NumberType.DOUBLE);
double aggregatedValue = aggregator.getInitialAggregatedValueForSegmentDocValue(NumericUtils.doubleToSortableLong(numbers[0]));
aggregatedValue = aggregator.mergeAggregatedValueAndSegmentValue(aggregatedValue, NumericUtils.doubleToSortableLong(numbers[1]));
aggregatedValue = aggregator.mergeAggregatedValueAndSegmentValue(aggregatedValue, NumericUtils.doubleToSortableLong(numbers[2]));
long sortableLong1 = NumericUtils.doubleToSortableLong(numbers[0]);
CompensatedSum aggregatedValue = aggregator.getInitialAggregatedValueForSegmentDocValue(sortableLong1);
long sortableLong2 = NumericUtils.doubleToSortableLong(numbers[1]);
aggregatedValue = aggregator.mergeAggregatedValueAndSegmentValue(aggregatedValue, sortableLong2);
long sortableLong3 = NumericUtils.doubleToSortableLong(numbers[2]);
aggregatedValue = aggregator.mergeAggregatedValueAndSegmentValue(aggregatedValue, sortableLong3);
return aggregatedValue;
}

Expand Down Expand Up @@ -129,5 +129,4 @@ public void testMinAggregatorExtremeValues_Infinity() {
}
assertEquals(expected, aggregatedValue, 0);
}

}
Loading
Loading