Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Star tree] Performance optimizations during flush flow #16037

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

package org.opensearch.common.util;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A bitset backed by a byte array. This will initialize and set bits in the byte array based on the index.
Expand All @@ -39,18 +38,6 @@ public ByteArrayBackedBitset(RandomAccessInput in, long offset, int length) thro
}
}

/**
* Constructor which set the Lucene's IndexInput to read the bitset into a read-only buffer.
*/
public ByteArrayBackedBitset(IndexInput in, int length) throws IOException {
byteArray = new byte[length];
int i = 0;
while (i < length) {
byteArray[i] = in.readByte();
i++;
}
}

/**
* Sets the bit at the given index to 1.
* Each byte can indicate 8 bits, so the index is divided by 8 to get the byte array index.
Expand All @@ -61,10 +48,10 @@ public void set(int index) {
byteArray[byteArrIndex] |= (byte) (1 << (index & 7));
}

public int write(IndexOutput output) throws IOException {
public int write(ByteBuffer output) throws IOException {
int numBytes = 0;
for (Byte bitSet : byteArray) {
output.writeByte(bitSet);
output.put(bitSet);
numBytes += Byte.BYTES;
}
return numBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
Expand All @@ -24,6 +26,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;

import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
Expand Down Expand Up @@ -67,54 +71,84 @@ private void setDocSizeInBytes(int numBytes) {
}

/**
* Write the star tree document to file associated with dimensions and metrics
* Write the star tree document to a byte buffer
*/
protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = writeDimensions(starTreeDocument, output);
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
int numBytes = calculateDocumentSize(starTreeDocument, isAggregatedDoc);
byte[] bytes = new byte[numBytes];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
writeDimensions(starTreeDocument, buffer);
if (isAggregatedDoc == false) {
writeFlushMetrics(starTreeDocument, buffer);
} else {
writeMetrics(starTreeDocument, buffer, isAggregatedDoc);
}
output.writeBytes(bytes, bytes.length);
setDocSizeInBytes(numBytes);
return numBytes;
return bytes.length;
}

/**
* Write dimensions to file
* Write dimensions to the byte buffer
*/
protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
output.writeLong(starTreeDocument.dimensions[i] == null ? 0L : starTreeDocument.dimensions[i]);
numBytes += Long.BYTES;
protected void writeDimensions(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (Long dimension : starTreeDocument.dimensions) {
buffer.putLong(dimension == null ? 0L : dimension);
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, buffer);
}

/**
* Write star tree document metrics to file
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = 0;
protected void writeFlushMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Write star tree document metrics to the byte buffer
*/
protected void writeMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer, boolean isAggregatedDoc) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
if (aggregatedValueType.equals(LONG)) {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
} else if (aggregatedValueType.equals(DOUBLE)) {
if (isAggregatedDoc) {
long val = NumericUtils.doubleToSortableLong(
starTreeDocument.metrics[i] == null ? 0.0 : (Double) starTreeDocument.metrics[i]
);
output.writeLong(val);
buffer.putLong(val);
} else {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
numBytes += Long.BYTES;
} else {
throw new IllegalStateException("Unsupported metric type");
}
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Calculate the size of the serialized StarTreeDocument
*/
private int calculateDocumentSize(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) {
int size = starTreeDocument.dimensions.length * Long.BYTES;
size += getLength(starTreeDocument.dimensions);

for (int i = 0; i < starTreeDocument.metrics.length; i++) {
size += Long.BYTES;
}
size += getLength(starTreeDocument.metrics);

return size;
}

private static int getLength(Object[] array) {
return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1);
}

/**
Expand All @@ -132,7 +166,11 @@ protected StarTreeDocument readStarTreeDocument(RandomAccessInput input, long of
offset = readDimensions(dimensions, input, offset);

Object[] metrics = new Object[numMetrics];
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
if (isAggregatedDoc == false) {
offset = readMetrics(input, offset, metrics);
} else {
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
}
assert (offset - initialOffset) == docSizeInBytes;
return new StarTreeDocument(dimensions, metrics);
}
Expand All @@ -154,10 +192,32 @@ protected long readDimensions(Long[] dimensions, RandomAccessInput input, long o
return offset;
}

/**
* Read metrics based on metric field values. Then we reuse the metric field values to each of the metric stats.
*/
private long readMetrics(RandomAccessInput input, long offset, Object[] metrics) throws IOException {
Object[] fieldMetrics = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
fieldMetrics[i] = input.readLong(offset);
offset += Long.BYTES;
}
offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, fieldMetrics, index -> null);
int fieldIndex = 0;
int numMetrics = 0;
for (Metric metric : starTreeField.getMetrics()) {
for (MetricStat stat : metric.getBaseMetrics()) {
metrics[numMetrics] = fieldMetrics[fieldIndex];
numMetrics++;
}
fieldIndex++;
}
return offset;
}

/**
* Read star tree metrics from file
*/
protected long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
private long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
throws IOException {
for (int i = 0; i < numMetrics; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@

/**
* Generates the configuration required to perform aggregation for all the metrics on a field
* Each metric field is associated with a metric reader
*
* @return list of MetricAggregatorInfo
*/
Expand All @@ -191,24 +192,20 @@

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
// _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metric.getField().equals(DocCountFieldMapper.NAME)) {
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metric.getBaseMetrics().isEmpty()) continue;
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReaders.add(metricReader);
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo))
);
}
metricReaders.add(metricReader);
}
return metricReaders;
}
Expand Down Expand Up @@ -572,21 +569,29 @@
*/
private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricsReaders) throws IOException {
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
int metricIndex = 0;
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
Metric metric = starTreeField.getMetrics().get(i);
if (metric.getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricsReaders.get(i);
if (metricReader != null) {
try {
metricStatReader.nextEntry(currentDocId);
metricReader.nextEntry(currentDocId);
Object metricValue = metricReader.value(currentDocId);

for (MetricStat metricStat : metric.getBaseMetrics()) {
metrics[metricIndex] = metricValue;
metricIndex++;
}
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
metrics[i] = metricStatReader.value(currentDocId);
} else {
throw new IllegalStateException("metric readers are empty");
throw new IllegalStateException("metric reader is empty for metric: " + metric.getField());

Check warning on line 594 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L594

Added line #L594 was not covered by tests
}
}
return metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
}
try {
for (int i = 0; i < totalSegmentDocs; i++) {
StarTreeDocument document = getSegmentStarTreeDocument(i, dimensionReaders, metricReaders);
StarTreeDocument document = getSegmentStarTreeDocumentWithMetricFieldValues(i, dimensionReaders, metricReaders);
segmentDocumentFileManager.writeStarTreeDocument(document, false);
}
} catch (IOException ex) {
Expand All @@ -128,6 +128,45 @@
return sortAndReduceDocuments(sortedDocIds, totalSegmentDocs, false);
}

/**
* Returns the star-tree document from the segment based on the current doc id
*/
StarTreeDocument getSegmentStarTreeDocumentWithMetricFieldValues(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
) throws IOException {
Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders);
Object[] metricValues = getStarTreeMetricFieldValuesFromSegment(currentDocId, metricReaders);
return new StarTreeDocument(dimensions, metricValues);
}

/**
* Returns the metric field values for the star-tree document from the segment based on the current doc id
*/
private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricReaders) {
Object[] metricValues = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
if (starTreeField.getMetrics().get(i).getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricReaders.get(i);
if (metricReader != null) {
try {
metricReader.nextEntry(currentDocId);
metricValues[i] = metricReader.value(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);

Check warning on line 161 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L156-L161

Added lines #L156 - L161 were not covered by tests
}
} else {
throw new IllegalStateException("metric reader is empty");

Check warning on line 164 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L164

Added line #L164 was not covered by tests
}
}
return metricValues;
}

/**
* Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly
* aggregated star-tree documents
Expand Down
Loading
Loading