4747import java .util .function .BiConsumer ;
4848import java .util .function .Function ;
4949import java .util .function .Supplier ;
50+ import java .util .function .LongConsumer ;
5051
5152/**
5253 * An aggregator of string values that hashes the strings on the fly rather
5354 * than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
5455 */
5556public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
57+ private final CollectorSource collectorSource ;
5658 private final ResultStrategy <?, ?> resultStrategy ;
57- private final ValuesSource valuesSource ;
5859 private final BytesKeyedBucketOrds bucketOrds ;
5960 private final IncludeExclude .StringFilter includeExclude ;
6061
6162 public MapStringTermsAggregator (
6263 String name ,
6364 AggregatorFactories factories ,
65+ CollectorSource collectorSource ,
6466 Function <MapStringTermsAggregator , ResultStrategy <?, ?>> resultStrategy ,
65- ValuesSource valuesSource ,
6667 BucketOrder order ,
6768 DocValueFormat format ,
6869 BucketCountThresholds bucketCountThresholds ,
@@ -75,56 +76,39 @@ public MapStringTermsAggregator(
7576 Map <String , Object > metadata
7677 ) throws IOException {
7778 super (name , factories , context , parent , order , format , bucketCountThresholds , collectionMode , showTermDocCountError , metadata );
79+ this .collectorSource = collectorSource ;
7880 this .resultStrategy = resultStrategy .apply (this ); // ResultStrategy needs a reference to the Aggregator to do its job.
79- this .valuesSource = valuesSource ;
8081 this .includeExclude = includeExclude ;
8182 bucketOrds = BytesKeyedBucketOrds .build (context .bigArrays (), collectsFromSingleBucket );
8283 }
8384
8485 @ Override
8586 public ScoreMode scoreMode () {
86- if (valuesSource != null && valuesSource .needsScores ()) {
87+ if (collectorSource .needsScores ()) {
8788 return ScoreMode .COMPLETE ;
8889 }
8990 return super .scoreMode ();
9091 }
9192
9293 @ Override
93- public LeafBucketCollector getLeafCollector (LeafReaderContext ctx ,
94- final LeafBucketCollector sub ) throws IOException {
95- SortedBinaryDocValues values = valuesSource .bytesValues (ctx );
96- return resultStrategy .wrapCollector (new LeafBucketCollectorBase (sub , values ) {
97- final BytesRefBuilder previous = new BytesRefBuilder ();
98-
99- @ Override
100- public void collect (int doc , long owningBucketOrd ) throws IOException {
101- if (false == values .advanceExact (doc )) {
102- return ;
103- }
104- int valuesCount = values .docValueCount ();
105-
106- // SortedBinaryDocValues don't guarantee uniqueness so we
107- // need to take care of dups
108- previous .clear ();
109- for (int i = 0 ; i < valuesCount ; ++i ) {
110- final BytesRef bytes = values .nextValue ();
111- if (includeExclude != null && false == includeExclude .accept (bytes )) {
112- continue ;
113- }
114- if (i > 0 && previous .get ().equals (bytes )) {
115- continue ;
116- }
94+ public LeafBucketCollector getLeafCollector (LeafReaderContext ctx , LeafBucketCollector sub ) throws IOException {
95+ return resultStrategy .wrapCollector (
96+ collectorSource .getLeafCollector (
97+ includeExclude ,
98+ ctx ,
99+ sub ,
100+ this ::addRequestCircuitBreakerBytes ,
101+ (s , doc , owningBucketOrd , bytes ) -> {
117102 long bucketOrdinal = bucketOrds .add (owningBucketOrd , bytes );
118103 if (bucketOrdinal < 0 ) { // already seen
119104 bucketOrdinal = -1 - bucketOrdinal ;
120- collectExistingBucket (sub , doc , bucketOrdinal );
105+ collectExistingBucket (s , doc , bucketOrdinal );
121106 } else {
122- collectBucket (sub , doc , bucketOrdinal );
107+ collectBucket (s , doc , bucketOrdinal );
123108 }
124- previous .copyBytes (bytes );
125109 }
126- }
127- } );
110+ )
111+ );
128112 }
129113
130114 @ Override
@@ -146,7 +130,82 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
146130
147131 @ Override
148132 public void doClose () {
149- Releasables .close (bucketOrds , resultStrategy );
133+ Releasables .close (collectorSource , resultStrategy , bucketOrds );
134+ }
135+
136+ /**
137+ * Abstaction on top of building collectors to fetch values.
138+ */
139+ public interface CollectorSource extends Releasable {
140+ boolean needsScores ();
141+
142+ LeafBucketCollector getLeafCollector (
143+ IncludeExclude .StringFilter includeExclude ,
144+ LeafReaderContext ctx ,
145+ LeafBucketCollector sub ,
146+ LongConsumer addRequestCircuitBreakerBytes ,
147+ CollectConsumer consumer
148+ ) throws IOException ;
149+ }
150+ @ FunctionalInterface
151+ public interface CollectConsumer {
152+ void accept (LeafBucketCollector sub , int doc , long owningBucketOrd , BytesRef bytes ) throws IOException ;
153+ }
154+
155+ /**
156+ * Fetch values from a {@link ValuesSource}.
157+ */
158+ public static class ValuesSourceCollectorSource implements CollectorSource {
159+ private final ValuesSource valuesSource ;
160+
161+ public ValuesSourceCollectorSource (ValuesSource valuesSource ) {
162+ this .valuesSource = valuesSource ;
163+ }
164+
165+ @ Override
166+ public boolean needsScores () {
167+ return valuesSource .needsScores ();
168+ }
169+
170+ @ Override
171+ public LeafBucketCollector getLeafCollector (
172+ IncludeExclude .StringFilter includeExclude ,
173+ LeafReaderContext ctx ,
174+ LeafBucketCollector sub ,
175+ LongConsumer addRequestCircuitBreakerBytes ,
176+ CollectConsumer consumer
177+ ) throws IOException {
178+ SortedBinaryDocValues values = valuesSource .bytesValues (ctx );
179+ return new LeafBucketCollectorBase (sub , values ) {
180+ final BytesRefBuilder previous = new BytesRefBuilder ();
181+
182+ @ Override
183+ public void collect (int doc , long owningBucketOrd ) throws IOException {
184+ if (false == values .advanceExact (doc )) {
185+ return ;
186+ }
187+ int valuesCount = values .docValueCount ();
188+
189+ // SortedBinaryDocValues don't guarantee uniqueness so we
190+ // need to take care of dups
191+ previous .clear ();
192+ for (int i = 0 ; i < valuesCount ; ++i ) {
193+ BytesRef bytes = values .nextValue ();
194+ if (includeExclude != null && false == includeExclude .accept (bytes )) {
195+ continue ;
196+ }
197+ if (i > 0 && previous .get ().equals (bytes )) {
198+ continue ;
199+ }
200+ previous .copyBytes (bytes );
201+ consumer .accept (sub , doc , owningBucketOrd , bytes );
202+ }
203+ }
204+ };
205+ }
206+
207+ @ Override
208+ public void close () {}
150209 }
151210
152211 /**
@@ -270,6 +329,12 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
270329 * Builds results for the standard {@code terms} aggregation.
271330 */
272331 class StandardTermsResults extends ResultStrategy <StringTerms , StringTerms .Bucket > {
332+ private final ValuesSource valuesSource ;
333+
334+ StandardTermsResults (ValuesSource valuesSource ) {
335+ this .valuesSource = valuesSource ;
336+ }
337+
273338 @ Override
274339 String describe () {
275340 return "terms" ;
0 commit comments