Skip to content

Commit e7cc244

Browse files
authored
Save memory when string terms are not on top (#57758) (#57876)
This reworks string flavored implementations of the `terms` aggregation to save memory when it is under another bucket by dropping the usage of `asMultiBucketAggregator`.
1 parent b5d3565 commit e7cc244

File tree

14 files changed

+884
-263
lines changed

14 files changed

+884
-263
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
4646
import static org.hamcrest.Matchers.equalTo;
4747
import static org.hamcrest.Matchers.greaterThan;
48+
import static org.hamcrest.Matchers.hasEntry;
4849
import static org.hamcrest.Matchers.notNullValue;
4950

5051
@ESIntegTestCase.SuiteScopeTestCase
@@ -59,7 +60,12 @@ public class AggregationProfilerIT extends ESIntegTestCase {
5960

6061
private static final String TOTAL_BUCKETS = "total_buckets";
6162
private static final String WRAPPED = "wrapped_in_multi_bucket_aggregator";
62-
private static final Object DEFERRED = "deferred_aggregators";
63+
private static final String DEFERRED = "deferred_aggregators";
64+
private static final String COLLECTION_STRAT = "collection_strategy";
65+
private static final String RESULT_STRAT = "result_strategy";
66+
private static final String HAS_FILTER = "has_filter";
67+
private static final String SEGMENTS_WITH_SINGLE = "segments_with_single_valued_ords";
68+
private static final String SEGMENTS_WITH_MULTI = "segments_with_multi_valued_ords";
6369

6470
private static final String NUMBER_FIELD = "number";
6571
private static final String TAG_FIELD = "tag";
@@ -73,6 +79,7 @@ protected int numberOfShards() {
7379
@Override
7480
protected void setupSuiteScopeCluster() throws Exception {
7581
assertAcked(client().admin().indices().prepareCreate("idx")
82+
.setSettings(org.elasticsearch.common.collect.Map.of("number_of_shards", 1, "number_of_replicas", 0))
7683
.addMapping("type", STRING_FIELD, "type=keyword", NUMBER_FIELD, "type=integer", TAG_FIELD, "type=keyword").get());
7784
List<IndexRequestBuilder> builders = new ArrayList<>();
7885

@@ -90,7 +97,7 @@ protected void setupSuiteScopeCluster() throws Exception {
9097
.endObject()));
9198
}
9299

93-
indexRandom(true, builders);
100+
indexRandom(true, false, builders);
94101
createIndex("idx_unmapped");
95102
}
96103

@@ -184,7 +191,7 @@ public void testMultiLevelProfile() {
184191
assertThat(termsBreakdown.get(COLLECT), greaterThan(0L));
185192
assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
186193
assertThat(termsBreakdown.get(REDUCE), equalTo(0L));
187-
assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
194+
assertRemapTermsDebugInfo(termsAggResult);
188195
assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
189196

190197
ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
@@ -204,6 +211,18 @@ public void testMultiLevelProfile() {
204211
}
205212
}
206213

214+
private void assertRemapTermsDebugInfo(ProfileResult termsAggResult) {
215+
assertThat(termsAggResult.getDebugInfo(), hasEntry(COLLECTION_STRAT, "remap"));
216+
assertThat(termsAggResult.getDebugInfo(), hasEntry(RESULT_STRAT, "terms"));
217+
assertThat(termsAggResult.getDebugInfo(), hasEntry(HAS_FILTER, false));
218+
// TODO we only index single valued docs but the ordinals ends up with multi valued sometimes
219+
assertThat(
220+
termsAggResult.getDebugInfo().toString(),
221+
(int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_SINGLE) + (int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_MULTI),
222+
greaterThan(0)
223+
);
224+
}
225+
207226
public void testMultiLevelProfileBreadthFirst() {
208227
SearchResponse response = client().prepareSearch("idx").setProfile(true)
209228
.addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L).subAggregation(terms("terms")
@@ -251,7 +270,7 @@ public void testMultiLevelProfileBreadthFirst() {
251270
assertThat(termsBreakdown.get(COLLECT), greaterThan(0L));
252271
assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
253272
assertThat(termsBreakdown.get(REDUCE), equalTo(0L));
254-
assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
273+
assertRemapTermsDebugInfo(termsAggResult);
255274
assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
256275

257276
ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
@@ -378,7 +397,7 @@ public void testComplexProfile() {
378397
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
379398
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
380399
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
381-
assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
400+
assertRemapTermsDebugInfo(tagsAggResult);
382401
assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2));
383402

384403
Map<String, ProfileResult> tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream()
@@ -423,7 +442,7 @@ public void testComplexProfile() {
423442
assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L));
424443
assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
425444
assertThat(stringsBreakdown.get(REDUCE), equalTo(0L));
426-
assertThat(stringsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
445+
assertRemapTermsDebugInfo(stringsAggResult);
427446
assertThat(stringsAggResult.getProfiledChildren().size(), equalTo(3));
428447

429448
Map<String, ProfileResult> stringsAggResultSubAggregations = stringsAggResult.getProfiledChildren().stream()
@@ -469,7 +488,7 @@ public void testComplexProfile() {
469488
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
470489
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
471490
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
472-
assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
491+
assertRemapTermsDebugInfo(tagsAggResult);
473492
assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2));
474493

475494
tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream()

server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public final InternalAggregation buildTopLevel() throws IOException {
180180
public abstract InternalAggregation buildEmptyAggregation();
181181

182182
/**
183-
* Collect debug information to add to the profiling results.. This will
183+
* Collect debug information to add to the profiling results. This will
184184
* only be called if the aggregation is being profiled.
185185
* <p>
186186
* Well behaved implementations will always call the superclass
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.bucket.terms;
21+
22+
import org.apache.lucene.util.BytesRef;
23+
import org.elasticsearch.common.lease.Releasable;
24+
import org.elasticsearch.common.lease.Releasables;
25+
import org.elasticsearch.common.util.BigArrays;
26+
import org.elasticsearch.common.util.BytesRefHash;
27+
28+
/**
29+
* Maps {@link BytesRef} bucket keys to bucket ordinals.
30+
*/
31+
public abstract class BytesKeyedBucketOrds implements Releasable {
32+
/**
33+
* Build a {@link LongKeyedBucketOrds}.
34+
*/
35+
public static BytesKeyedBucketOrds build(BigArrays bigArrays, boolean collectsFromSingleBucket) {
36+
return collectsFromSingleBucket ? new FromSingle(bigArrays) : new FromMany(bigArrays);
37+
}
38+
39+
private BytesKeyedBucketOrds() {}
40+
41+
/**
42+
* Add the {@code owningBucketOrd, value} pair. Return the ord for
43+
* their bucket if they have yet to be added, or {@code -1-ord}
44+
* if they were already present.
45+
*/
46+
public abstract long add(long owningBucketOrd, BytesRef value);
47+
48+
/**
49+
* Count the buckets in {@code owningBucketOrd}.
50+
*/
51+
public abstract long bucketsInOrd(long owningBucketOrd);
52+
53+
/**
54+
* The number of collected buckets.
55+
*/
56+
public abstract long size();
57+
58+
/**
59+
* Build an iterator for buckets inside {@code owningBucketOrd} in order
60+
* of increasing ord.
61+
* <p>
62+
* When this is first returns it is "unpositioned" and you must call
63+
* {@link BucketOrdsEnum#next()} to move it to the first value.
64+
*/
65+
public abstract BucketOrdsEnum ordsEnum(long owningBucketOrd);
66+
67+
/**
68+
* An iterator for buckets inside a particular {@code owningBucketOrd}.
69+
*/
70+
public interface BucketOrdsEnum {
71+
/**
72+
* Advance to the next value.
73+
* @return {@code true} if there *is* a next value,
74+
* {@code false} if there isn't
75+
*/
76+
boolean next();
77+
78+
/**
79+
* The ordinal of the current value.
80+
*/
81+
long ord();
82+
83+
/**
84+
* Read the current value.
85+
*/
86+
void readValue(BytesRef dest);
87+
88+
/**
89+
* An {@linkplain BucketOrdsEnum} that is empty.
90+
*/
91+
BucketOrdsEnum EMPTY = new BucketOrdsEnum() {
92+
@Override
93+
public boolean next() {
94+
return false;
95+
}
96+
97+
@Override
98+
public long ord() {
99+
return 0;
100+
}
101+
102+
@Override
103+
public void readValue(BytesRef dest) {}
104+
};
105+
}
106+
107+
/**
108+
* Implementation that only works if it is collecting from a single bucket.
109+
*/
110+
private static class FromSingle extends BytesKeyedBucketOrds {
111+
private final BytesRefHash ords;
112+
113+
private FromSingle(BigArrays bigArrays) {
114+
ords = new BytesRefHash(1, bigArrays);
115+
}
116+
117+
@Override
118+
public long add(long owningBucketOrd, BytesRef value) {
119+
assert owningBucketOrd == 0;
120+
return ords.add(value);
121+
}
122+
123+
@Override
124+
public long bucketsInOrd(long owningBucketOrd) {
125+
return ords.size();
126+
}
127+
128+
@Override
129+
public long size() {
130+
return ords.size();
131+
}
132+
133+
@Override
134+
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
135+
return new BucketOrdsEnum() {
136+
private int ord = -1;
137+
138+
@Override
139+
public boolean next() {
140+
ord++;
141+
return ord < ords.size();
142+
}
143+
144+
@Override
145+
public long ord() {
146+
return ord;
147+
}
148+
149+
@Override
150+
public void readValue(BytesRef dest) {
151+
ords.get(ord, dest);
152+
}
153+
};
154+
}
155+
156+
@Override
157+
public void close() {
158+
ords.close();
159+
}
160+
}
161+
162+
/**
163+
* Implementation that works properly when collecting from many buckets.
164+
*/
165+
private static class FromMany extends BytesKeyedBucketOrds {
166+
// TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff
167+
private final BytesRefHash bytesToLong;
168+
private final LongKeyedBucketOrds longToBucketOrds;
169+
170+
private FromMany(BigArrays bigArrays) {
171+
bytesToLong = new BytesRefHash(1, bigArrays);
172+
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, false);
173+
}
174+
175+
@Override
176+
public long add(long owningBucketOrd, BytesRef value) {
177+
long l = bytesToLong.add(value);
178+
if (l < 0) {
179+
l = -1 - l;
180+
}
181+
return longToBucketOrds.add(owningBucketOrd, l);
182+
}
183+
184+
@Override
185+
public long bucketsInOrd(long owningBucketOrd) {
186+
return longToBucketOrds.bucketsInOrd(owningBucketOrd);
187+
}
188+
189+
@Override
190+
public long size() {
191+
return longToBucketOrds.size();
192+
}
193+
194+
@Override
195+
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
196+
LongKeyedBucketOrds.BucketOrdsEnum delegate = longToBucketOrds.ordsEnum(owningBucketOrd);
197+
return new BucketOrdsEnum() {
198+
@Override
199+
public boolean next() {
200+
return delegate.next();
201+
}
202+
203+
@Override
204+
public long ord() {
205+
return delegate.ord();
206+
}
207+
208+
@Override
209+
public void readValue(BytesRef dest) {
210+
bytesToLong.get(delegate.value(), dest);
211+
}
212+
};
213+
}
214+
215+
@Override
216+
public void close() {
217+
Releasables.close(bytesToLong, longToBucketOrds);
218+
}
219+
}
220+
}

0 commit comments

Comments
 (0)