Skip to content

Commit f49d176

Browse files
committed
Handle sub aggregation with date aggregation as top
* need to check for functional correctness Signed-off-by: Asim Mahmood <[email protected]>
1 parent a19434c commit f49d176

File tree

2 files changed

+156
-23
lines changed

2 files changed

+156
-23
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
225225
final SortedNumericDocValues values = valuesSource.longValues(ctx);
226226
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
227227

228-
// If no subaggregations and index sorted on given field, we can use skip list based collector
228+
// If index sorted on given field, we can use skip list based collector
229229
logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper);
230230
if (fieldIndexSort && skipper != null && singleton != null) {
231231
// TODO: add hard bounds support
232-
if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) {
233-
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount);
232+
if (hardBounds == null) {
233+
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this);
234234
}
235235
}
236236

@@ -426,7 +426,8 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
426426
private final DocValuesSkipper skipper;
427427
private final Rounding.Prepared preparedRounding;
428428
private final LongKeyedBucketOrds bucketOrds;
429-
private final BiConsumer<Long, Long> incrementDocCount;
429+
private final LeafBucketCollector sub;
430+
private final BucketsAggregator aggregator;
430431

431432
/**
432433
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
@@ -448,17 +449,23 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
448449
DocValuesSkipper skipper,
449450
Rounding.Prepared preparedRounding,
450451
LongKeyedBucketOrds bucketOrds,
451-
BiConsumer<Long, Long> incrementDocCount
452+
LeafBucketCollector sub,
453+
BucketsAggregator aggregator
452454
) {
453455
this.values = values;
454456
this.skipper = skipper;
455457
this.preparedRounding = preparedRounding;
456458
this.bucketOrds = bucketOrds;
457-
this.incrementDocCount = incrementDocCount;
459+
this.sub = sub;
460+
this.aggregator = aggregator;
458461
}
459462

460463
@Override
461-
public void setScorer(Scorable scorer) throws IOException {}
464+
public void setScorer(Scorable scorer) throws IOException {
465+
if (sub != null) {
466+
sub.setScorer(scorer);
467+
}
468+
}
462469

463470
private void advanceSkipper(int doc) throws IOException {
464471
if (doc > skipper.maxDocID(0)) {
@@ -485,6 +492,7 @@ private void advanceSkipper(int doc) throws IOException {
485492
// All docs at this level have a value, and all values map to the same bucket.
486493
upToInclusive = skipper.maxDocID(level);
487494
upToSameBucket = true;
495+
// Use owningBucketOrd = 0 for top-level aggregation
488496
upToBucketIndex = bucketOrds.add(0, maxBucket);
489497
if (upToBucketIndex < 0) {
490498
upToBucketIndex = -1 - upToBucketIndex;
@@ -497,27 +505,29 @@ private void advanceSkipper(int doc) throws IOException {
497505

498506
@Override
499507
public void collect(int doc, long owningBucketOrd) throws IOException {
500-
collect(doc);
501-
}
502-
503-
@Override
504-
public void collect(int doc) throws IOException {
505508
if (doc > upToInclusive) {
506509
advanceSkipper(doc);
507510
}
508511

509512
if (upToSameBucket) {
510-
incrementDocCount.accept(upToBucketIndex, 1L);
513+
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
511514
} else if (values.advanceExact(doc)) {
512515
final long value = values.longValue();
513-
long bucketIndex = bucketOrds.add(0, preparedRounding.round(value));
516+
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
514517
if (bucketIndex < 0) {
515518
bucketIndex = -1 - bucketIndex;
519+
aggregator.collectExistingBucket(sub, doc, bucketIndex);
520+
} else {
521+
aggregator.collectBucket(sub, doc, bucketIndex);
516522
}
517-
incrementDocCount.accept(bucketIndex, 1L);
518523
}
519524
}
520525

526+
@Override
527+
public void collect(int doc) throws IOException {
528+
collect(doc, 0);
529+
}
530+
521531
@Override
522532
public void collect(DocIdStream stream) throws IOException {
523533
for (;;) {
@@ -526,9 +536,9 @@ public void collect(DocIdStream stream) throws IOException {
526536
upToExclusive = Integer.MAX_VALUE;
527537
}
528538

529-
if (upToSameBucket) {
539+
if (upToSameBucket && sub == LeafBucketCollector.NO_OP_COLLECTOR) {
530540
long count = stream.count(upToExclusive);
531-
incrementDocCount.accept(upToBucketIndex, count);
541+
aggregator.incrementBucketDocCount(upToBucketIndex, count);
532542
} else {
533543
stream.forEach(upToExclusive, this::collect);
534544
}

server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java

Lines changed: 129 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,15 @@
6161
import org.opensearch.index.mapper.DateFieldMapper;
6262
import org.opensearch.index.mapper.DocCountFieldMapper;
6363
import org.opensearch.index.mapper.MappedFieldType;
64+
import org.opensearch.index.mapper.NumberFieldMapper;
6465
import org.opensearch.search.MultiValueMode;
6566
import org.opensearch.search.aggregations.AggregationBuilder;
6667
import org.opensearch.search.aggregations.BucketOrder;
6768
import org.opensearch.search.aggregations.InternalAggregation;
6869
import org.opensearch.search.aggregations.MultiBucketConsumerService;
6970
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
7071
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
72+
import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder;
7173
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
7274
import org.opensearch.search.aggregations.support.AggregationInspectionHelper;
7375

@@ -257,12 +259,7 @@ public void testAsSubAgg() throws IOException {
257259

258260
public void testSkiplistWithSingleValueDates() throws IOException {
259261
// Create index settings with an index sort.
260-
Settings settings = Settings.builder()
261-
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
262-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
263-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
264-
.putList("index.sort.field", AGGREGABLE_DATE)
265-
.build();
262+
Settings settings = getSettingsWithIndexSort();
266263

267264
IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
268265
IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
@@ -349,6 +346,132 @@ public void testSkiplistWithSingleValueDates() throws IOException {
349346

350347
}
351348

349+
public void testSkiplistWithSingleValueDatesAndSubAggs() throws IOException {
350+
// Create index settings with an index sort.
351+
Settings settings = getSettingsWithIndexSort();
352+
353+
IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
354+
IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
355+
356+
MappedFieldType dateType = new DateFieldMapper.DateFieldType(AGGREGABLE_DATE);
357+
String categoryField = "category";
358+
NumberFieldMapper.NumberFieldType categoryType = new NumberFieldMapper.NumberFieldType(categoryField, NumberFieldMapper.NumberType.LONG);
359+
360+
IndexNumericFieldData fieldData = (IndexNumericFieldData) dateType.fielddataBuilder("index", () -> {
361+
throw new UnsupportedOperationException();
362+
}).build(null, null);
363+
SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false);
364+
try (Directory directory = newDirectory()) {
365+
IndexWriterConfig config = newIndexWriterConfig();
366+
config.setMergePolicy(NoMergePolicy.INSTANCE);
367+
config.setIndexSort(new Sort(sortField));
368+
String filterField = "type";
369+
try (IndexWriter indexWriter = new IndexWriter(directory, config)) {
370+
371+
// First commit - 5 dates with type 1
372+
for (int i = 0; i < 5; i++) {
373+
Document doc = new Document();
374+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
375+
.toInstant()
376+
.toEpochMilli();
377+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
378+
doc.add(new LongPoint(filterField, 1));
379+
doc.add(new NumericDocValuesField(categoryField, i % 2)); // alternating categories
380+
indexWriter.addDocument(doc);
381+
}
382+
indexWriter.commit();
383+
384+
// Second commit - 3 more dates with type 2, skiplist
385+
for (int i = 5; i < 8; i++) {
386+
Document doc = new Document();
387+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
388+
.toInstant()
389+
.toEpochMilli();
390+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
391+
doc.add(new LongPoint(filterField, 2));
392+
doc.add(new NumericDocValuesField(categoryField, i % 2));
393+
indexWriter.addDocument(doc);
394+
}
395+
indexWriter.commit();
396+
397+
// Third commit - 2 more dates with type 2
398+
for (int i = 8; i < 10; i++) {
399+
Document doc = new Document();
400+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
401+
.toInstant()
402+
.toEpochMilli();
403+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
404+
doc.add(new LongPoint(filterField, 2));
405+
doc.add(new NumericDocValuesField(categoryField, i % 2));
406+
indexWriter.addDocument(doc);
407+
}
408+
indexWriter.commit();
409+
}
410+
411+
try (IndexReader indexReader = DirectoryReader.open(directory)) {
412+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
413+
414+
// Create date histogram with terms sub-aggregation
415+
DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("test")
416+
.field(AGGREGABLE_DATE)
417+
.calendarInterval(DateHistogramInterval.YEAR)
418+
.subAggregation(new MaxAggregationBuilder(categoryField).field(categoryField));
419+
420+
Query query = LongPoint.newExactQuery(filterField, 2);
421+
422+
InternalDateHistogram histogram = searchAndReduce(
423+
indexSettings,
424+
indexSearcher,
425+
query,
426+
aggregationBuilder,
427+
1000,
428+
false,
429+
dateType,
430+
categoryType
431+
);
432+
433+
assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs)
434+
435+
// Verify first bucket (2015) with sub-aggregations
436+
InternalDateHistogram.Bucket bucket2015 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(0);
437+
assertEquals("2015-01-01T00:00:00.000Z", bucket2015.getKeyAsString());
438+
assertEquals(3, bucket2015.getDocCount());
439+
440+
// The key test: verify that sub-aggregations exist, proving skiplist collector supports them
441+
assertNotNull("Sub-aggregation should exist for 2015 bucket", bucket2015.getAggregations());
442+
assertNotNull("Categories sub-agg should exist", bucket2015.getAggregations().get(categoryField));
443+
assertTrue("Should have sub-aggregations", bucket2015.getAggregations().asList().size() > 0);
444+
445+
// Verify second bucket (2016)
446+
InternalDateHistogram.Bucket bucket2016 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(1);
447+
assertEquals("2016-01-01T00:00:00.000Z", bucket2016.getKeyAsString());
448+
assertEquals(1, bucket2016.getDocCount());
449+
450+
assertNotNull("Sub-aggregation should exist for 2016 bucket", bucket2016.getAggregations());
451+
assertNotNull("Categories sub-agg should exist", bucket2016.getAggregations().get(categoryField));
452+
assertTrue("Should have sub-aggregations", bucket2016.getAggregations().asList().size() > 0);
453+
454+
// Verify third bucket (2017)
455+
InternalDateHistogram.Bucket bucket2017 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(2);
456+
assertEquals("2017-01-01T00:00:00.000Z", bucket2017.getKeyAsString());
457+
assertEquals(1, bucket2017.getDocCount());
458+
459+
assertNotNull("Sub-aggregation should exist for 2017 bucket", bucket2017.getAggregations());
460+
assertNotNull("Categories sub-agg should exist", bucket2017.getAggregations().get(categoryField));
461+
assertTrue("Should have sub-aggregations", bucket2017.getAggregations().asList().size() > 0);
462+
}
463+
}
464+
}
465+
466+
private static Settings getSettingsWithIndexSort() {
467+
return Settings.builder()
468+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
469+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
470+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
471+
.putList("index.sort.field", AGGREGABLE_DATE)
472+
.build();
473+
}
474+
352475
public void testNoDocsDeprecatedInterval() throws IOException {
353476
Query query = new MatchNoDocsQuery();
354477
List<String> dates = Collections.emptyList();

0 commit comments

Comments
 (0)