Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class StreamStringTermsAggregator extends AbstractStringTermsAggregator {
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
protected final ResultStrategy<?, ?, ?> resultStrategy;
private boolean madeLeafOnce = false;

public StreamStringTermsAggregator(
String name,
Expand All @@ -72,6 +73,7 @@ public void doReset() {
super.doReset();
valueCount = 0;
sortedDocValuesPerBatch = null;
this.madeLeafOnce = false;
}

@Override
Expand All @@ -91,6 +93,11 @@ public InternalAggregation buildEmptyAggregation() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (this.madeLeafOnce) {
throw new IllegalStateException("pardon you might already collected " + docCounts);
} else {
this.madeLeafOnce = true;
}
this.sortedDocValuesPerBatch = valuesSource.ordinalsValues(ctx);
this.valueCount = sortedDocValuesPerBatch.getValueCount(); // for streaming case, the value count is reset to per batch
// cardinality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ public void testBuildAggregationsBatchReset() throws Exception {
Document document = new Document();
document.add(new SortedSetDocValuesField("field", new BytesRef("test")));
indexWriter.addDocument(document);
document = new Document();
document.add(new SortedSetDocValuesField("field", new BytesRef("best")));
indexWriter.addDocument(document);

try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
Expand All @@ -369,7 +372,7 @@ public void testBuildAggregationsBatchReset() throws Exception {
aggregator.postCollection();

StringTerms firstResult = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0];
assertThat(firstResult.getBuckets().size(), equalTo(1));
assertThat(firstResult.getBuckets().size(), equalTo(2));

aggregator.doReset();

Expand All @@ -379,7 +382,7 @@ public void testBuildAggregationsBatchReset() throws Exception {
aggregator.postCollection();

StringTerms secondResult = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0];
assertThat(secondResult.getBuckets().size(), equalTo(1));
assertThat(secondResult.getBuckets().size(), equalTo(2));
assertThat(secondResult.getBuckets().get(0).getDocCount(), equalTo(1L));
}
}
Expand Down
Loading