Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void collect(int doc, long bucket) throws IOException {
public abstract void collect(int doc, long owningBucketOrd) throws IOException;

@Override
public final void collect(int doc) throws IOException {
public void collect(int doc) throws IOException {
collect(doc, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@

package org.opensearch.search.aggregations.bucket.histogram;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -88,6 +93,8 @@
* @opensearch.internal
*/
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator, StarTreePreComputeCollector {
private static final Logger logger = LogManager.getLogger(DateHistogramAggregator.class);

private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
Expand All @@ -105,7 +112,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private boolean starTreeDateRoundingRequired = true;

private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
public final String fieldName;
private final String fieldName;
private final boolean fieldIndexSort;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -173,6 +181,7 @@ protected Function<Long, Long> bucketOrdProducer() {
this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
this.fieldIndexSort = this.fieldName == null ? false : context.getQueryShardContext().indexSortedOnField(fieldName);
this.starTreeDateDimension = (context.getQueryShardContext().getStarTreeQueryContext() != null)
? fetchStarTreeCalendarUnit()
: null;
Expand Down Expand Up @@ -209,9 +218,22 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}

DocValuesSkipper skipper = null;
if (this.fieldName != null) {
skipper = ctx.reader().getDocValuesSkipper(this.fieldName);
}
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final NumericDocValues singleton = DocValues.unwrapSingleton(values);

// If no subaggregations and index sorted on given field, we can use skip list based collector
logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper);
if (fieldIndexSort && skipper != null && singleton != null) {
// TODO: add hard bounds support
if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) {
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount);
}
}

if (singleton != null) {
// Optimized path for single-valued fields
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -397,4 +419,126 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
return 1.0;
}
}

private static class HistogramSkiplistLeafCollector extends LeafBucketCollector {

private final NumericDocValues values;
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final BiConsumer<Long, Long> incrementDocCount;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
*/
private int upToInclusive = -1;

/**
* Whether all docs up to {@link #upToInclusive} values map to the same bucket.
*/
private boolean upToSameBucket;

/**
* Index in bucketOrds for docs up to {@link #upToInclusive}.
*/
private long upToBucketIndex;

HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
BiConsumer<Long, Long> incrementDocCount
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.incrementDocCount = incrementDocCount;
}

@Override
public void setScorer(Scorable scorer) throws IOException {}

private void advanceSkipper(int doc) throws IOException {
if (doc > skipper.maxDocID(0)) {
skipper.advance(doc);
}
upToSameBucket = false;

if (skipper.minDocID(0) > doc) {
// Corner case which happens if `doc` doesn't have a value and is between two intervals of
// the doc-value skip index.
upToInclusive = skipper.minDocID(0) - 1;
return;
}

upToInclusive = skipper.maxDocID(0);

// Now find the highest level where all docs map to the same bucket.
for (int level = 0; level < skipper.numLevels(); ++level) {
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
long minBucket = preparedRounding.round(skipper.minValue(level));
long maxBucket = preparedRounding.round(skipper.maxValue(level));

if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(0, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
} else {
break;
}
}
}

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
collect(doc);
}

@Override
public void collect(int doc) throws IOException {
if (doc > upToInclusive) {
advanceSkipper(doc);
}

if (upToSameBucket) {
incrementDocCount.accept(upToBucketIndex, 1L);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(0, preparedRounding.round(value));
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
}
incrementDocCount.accept(bucketIndex, 1L);
}
}

@Override
public void collect(DocIdStream stream) throws IOException {
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
long count = stream.count(upToExclusive);
incrementDocCount.accept(upToBucketIndex, count);
} else {
stream.forEach(upToExclusive, this::collect);
}

if (stream.mayHaveRemaining()) {
advanceSkipper(upToExclusive);
} else {
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,28 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.util.TestUtil;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatters;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.fielddata.IndexNumericFieldData;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.MultiValueMode;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -246,6 +255,100 @@ public void testAsSubAgg() throws IOException {
});
}

public void testSkiplistWithSingleValueDates() throws IOException {
// Create index settings with an index sort.
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.putList("index.sort.field", AGGREGABLE_DATE)
.build();

IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);

MappedFieldType fieldType = new DateFieldMapper.DateFieldType(AGGREGABLE_DATE);
IndexNumericFieldData fieldData = (IndexNumericFieldData) fieldType.fielddataBuilder("index", () -> {
throw new UnsupportedOperationException();
}).build(null, null);
SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false);
try (Directory directory = newDirectory()) {
IndexWriterConfig config = newIndexWriterConfig();
config.setMergePolicy(NoMergePolicy.INSTANCE);
config.setIndexSort(new Sort(sortField));
String filterField = "type";
try (IndexWriter indexWriter = new IndexWriter(directory, config)) {

// First commit - 5 dates with type 1
for (int i = 0; i < 5; i++) {
Document doc = new Document();
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
.toInstant()
.toEpochMilli();
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
doc.add(new LongPoint(filterField, 1));
indexWriter.addDocument(doc);
}
indexWriter.commit();

// Second commit - 3 more dates with type 2, skiplist
for (int i = 5; i < 8; i++) {
Document doc = new Document();
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
.toInstant()
.toEpochMilli();
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
doc.add(new LongPoint(filterField, 2));
indexWriter.addDocument(doc);
}
indexWriter.commit();

// Third commit - 3 more dates with type 2
for (int i = 8; i < 10; i++) {
Document doc = new Document();
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
.toInstant()
.toEpochMilli();
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
doc.add(new LongPoint(filterField, 2));
indexWriter.addDocument(doc);
}
indexWriter.commit();
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("test").field(AGGREGABLE_DATE)
.calendarInterval(DateHistogramInterval.YEAR);

Query query = LongPoint.newExactQuery(filterField, 2);

InternalDateHistogram histogram = searchAndReduce(
indexSettings,
indexSearcher,
query,
aggregationBuilder,
1000,
false,
fieldType
);

assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs)

assertEquals("2015-01-01T00:00:00.000Z", histogram.getBuckets().get(0).getKeyAsString());
assertEquals(3, histogram.getBuckets().get(0).getDocCount());

assertEquals("2016-01-01T00:00:00.000Z", histogram.getBuckets().get(1).getKeyAsString());
assertEquals(1, histogram.getBuckets().get(1).getDocCount());

assertEquals("2017-01-01T00:00:00.000Z", histogram.getBuckets().get(2).getKeyAsString());
assertEquals(1, histogram.getBuckets().get(2).getDocCount());
}
}

}

public void testNoDocsDeprecatedInterval() throws IOException {
Query query = new MatchNoDocsQuery();
List<String> dates = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
C root = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);

if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) {
assertThat(ctx, instanceOf(CompositeReaderContext.class));
Expand Down
Loading