Skip to content

Commit 896573f

Browse files
authored
Add support for aggregation profiler with concurrent aggregation (opensearch-project#8801) (opensearch-project#9015)
* Add support for aggregation profiler with concurrent aggregation (opensearch-project#8801) * Address review comments for support for aggregation profiler with concurrent aggregation (opensearch-project#8801) * Refactor ProfileResult class and add more tests * Fix flaky QueryProfilePhaseTests.testCollapseQuerySearchResults test --------- Signed-off-by: Ticheng Lin <[email protected]>
1 parent 6e47095 commit 896573f

File tree

18 files changed

+800
-68
lines changed

18 files changed

+800
-68
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))
4141
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
4242
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
43+
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
4344

4445
### Deprecated
4546

server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java

Lines changed: 166 additions & 21 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1271,7 +1271,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
12711271
context.minimumScore(source.minScore());
12721272
}
12731273
if (source.profile()) {
1274-
context.setProfilers(new Profilers(context.searcher()));
1274+
context.setProfilers(new Profilers(context.searcher(), context.isConcurrentSegmentSearchEnabled()));
12751275
}
12761276
if (source.timeout() != null) {
12771277
context.timeout(source.timeout());

server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
5050
/**
5151
* The accumulated timings for this query node
5252
*/
53-
private final Timer[] timings;
54-
private final T[] timingTypes;
53+
protected final Timer[] timings;
54+
protected final T[] timingTypes;
55+
public static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
56+
public static final String TIMING_TYPE_START_TIME_SUFFIX = "_start_time";
5557

5658
/** Sole constructor. */
5759
public AbstractProfileBreakdown(Class<T> clazz) {
@@ -74,17 +76,10 @@ public void setTimer(T timing, Timer timer) {
7476
* Build a timing count breakdown for current instance
7577
*/
7678
public Map<String, Long> toBreakdownMap() {
77-
return buildBreakdownMap(this);
78-
}
79-
80-
/**
81-
* Build a timing count breakdown for arbitrary instance
82-
*/
83-
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
84-
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
85-
for (T timingType : breakdown.timingTypes) {
86-
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
87-
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
79+
Map<String, Long> map = new HashMap<>(this.timings.length * 3);
80+
for (T timingType : this.timingTypes) {
81+
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
82+
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
8883
}
8984
return Collections.unmodifiableMap(map);
9085
}

server/src/main/java/org/opensearch/search/profile/ProfileResult.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.search.profile;
3434

3535
import org.opensearch.LegacyESVersion;
36+
import org.opensearch.Version;
3637
import org.opensearch.core.ParseField;
3738
import org.opensearch.core.common.io.stream.StreamInput;
3839
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -45,8 +46,10 @@
4546

4647
import java.io.IOException;
4748
import java.util.Collections;
49+
import java.util.Iterator;
4850
import java.util.List;
4951
import java.util.Map;
52+
import java.util.LinkedHashMap;
5053
import java.util.Objects;
5154
import java.util.concurrent.TimeUnit;
5255

@@ -70,14 +73,23 @@ public final class ProfileResult implements Writeable, ToXContentObject {
7073
static final ParseField BREAKDOWN = new ParseField("breakdown");
7174
static final ParseField DEBUG = new ParseField("debug");
7275
static final ParseField NODE_TIME = new ParseField("time");
76+
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
77+
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
78+
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
7379
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
80+
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
81+
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
82+
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
7483
static final ParseField CHILDREN = new ParseField("children");
7584

7685
private final String type;
7786
private final String description;
7887
private final Map<String, Long> breakdown;
7988
private final Map<String, Object> debug;
8089
private final long nodeTime;
90+
private Long maxSliceNodeTime;
91+
private Long minSliceNodeTime;
92+
private Long avgSliceNodeTime;
8193
private final List<ProfileResult> children;
8294

8395
public ProfileResult(
@@ -87,13 +99,30 @@ public ProfileResult(
8799
Map<String, Object> debug,
88100
long nodeTime,
89101
List<ProfileResult> children
102+
) {
103+
this(type, description, breakdown, debug, nodeTime, children, null, null, null);
104+
}
105+
106+
public ProfileResult(
107+
String type,
108+
String description,
109+
Map<String, Long> breakdown,
110+
Map<String, Object> debug,
111+
long nodeTime,
112+
List<ProfileResult> children,
113+
Long maxSliceNodeTime,
114+
Long minSliceNodeTime,
115+
Long avgSliceNodeTime
90116
) {
91117
this.type = type;
92118
this.description = description;
93119
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
94120
this.debug = debug == null ? Map.of() : debug;
95121
this.children = children == null ? List.of() : children;
96122
this.nodeTime = nodeTime;
123+
this.maxSliceNodeTime = maxSliceNodeTime;
124+
this.minSliceNodeTime = minSliceNodeTime;
125+
this.avgSliceNodeTime = avgSliceNodeTime;
97126
}
98127

99128
/**
@@ -110,6 +139,15 @@ public ProfileResult(StreamInput in) throws IOException {
110139
debug = Map.of();
111140
}
112141
children = in.readList(ProfileResult::new);
142+
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
143+
this.maxSliceNodeTime = in.readOptionalLong();
144+
this.minSliceNodeTime = in.readOptionalLong();
145+
this.avgSliceNodeTime = in.readOptionalLong();
146+
} else {
147+
this.maxSliceNodeTime = null;
148+
this.minSliceNodeTime = null;
149+
this.avgSliceNodeTime = null;
150+
}
113151
}
114152

115153
@Override
@@ -122,6 +160,11 @@ public void writeTo(StreamOutput out) throws IOException {
122160
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
123161
}
124162
out.writeList(children);
163+
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
164+
out.writeOptionalLong(maxSliceNodeTime);
165+
out.writeOptionalLong(minSliceNodeTime);
166+
out.writeOptionalLong(avgSliceNodeTime);
167+
}
125168
}
126169

127170
/**
@@ -161,6 +204,18 @@ public long getTime() {
161204
return nodeTime;
162205
}
163206

207+
public Long getMaxSliceTime() {
208+
return maxSliceNodeTime;
209+
}
210+
211+
public Long getMinSliceTime() {
212+
return minSliceNodeTime;
213+
}
214+
215+
public Long getAvgSliceTime() {
216+
return avgSliceNodeTime;
217+
}
218+
164219
/**
165220
* Returns a list of all profiled children queries
166221
*/
@@ -175,9 +230,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
175230
builder.field(DESCRIPTION.getPreferredName(), description);
176231
if (builder.humanReadable()) {
177232
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
233+
if (getMaxSliceTime() != null) {
234+
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
235+
}
236+
if (getMinSliceTime() != null) {
237+
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
238+
}
239+
if (getAvgSliceTime() != null) {
240+
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
241+
}
178242
}
179243
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
180-
builder.field(BREAKDOWN.getPreferredName(), breakdown);
244+
if (getMaxSliceTime() != null) {
245+
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
246+
}
247+
if (getMinSliceTime() != null) {
248+
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
249+
}
250+
if (getAvgSliceTime() != null) {
251+
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
252+
}
253+
createBreakdownView(builder);
181254
if (false == debug.isEmpty()) {
182255
builder.field(DEBUG.getPreferredName(), debug);
183256
}
@@ -193,6 +266,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
193266
return builder.endObject();
194267
}
195268

269+
private void createBreakdownView(XContentBuilder builder) throws IOException {
270+
Map<String, Long> modifiedBreakdown = new LinkedHashMap<>(breakdown);
271+
removeStartTimeFields(modifiedBreakdown);
272+
builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown);
273+
}
274+
275+
static void removeStartTimeFields(Map<String, Long> modifiedBreakdown) {
276+
Iterator<Map.Entry<String, Long>> iterator = modifiedBreakdown.entrySet().iterator();
277+
while (iterator.hasNext()) {
278+
Map.Entry<String, Long> entry = iterator.next();
279+
if (entry.getKey().endsWith(AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX)) {
280+
iterator.remove();
281+
}
282+
}
283+
}
284+
196285
private static final InstantiatingObjectParser<ProfileResult, Void> PARSER;
197286
static {
198287
InstantiatingObjectParser.Builder<ProfileResult, Void> parser = InstantiatingObjectParser.builder(
@@ -206,6 +295,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
206295
parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG);
207296
parser.declareLong(constructorArg(), NODE_TIME_RAW);
208297
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN);
298+
parser.declareLong(optionalConstructorArg(), MAX_SLICE_NODE_TIME_RAW);
299+
parser.declareLong(optionalConstructorArg(), MIN_SLICE_NODE_TIME_RAW);
300+
parser.declareLong(optionalConstructorArg(), AVG_SLICE_NODE_TIME_RAW);
209301
PARSER = parser.build();
210302
}
211303

server/src/main/java/org/opensearch/search/profile/Profilers.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.search.internal.ContextIndexSearcher;
3636
import org.opensearch.search.profile.aggregation.AggregationProfiler;
37+
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
3738
import org.opensearch.search.profile.query.QueryProfiler;
3839

3940
import java.util.ArrayList;
@@ -50,18 +51,20 @@ public final class Profilers {
5051
private final ContextIndexSearcher searcher;
5152
private final List<QueryProfiler> queryProfilers;
5253
private final AggregationProfiler aggProfiler;
54+
private final boolean isConcurrentSegmentSearchEnabled;
5355

5456
/** Sole constructor. This {@link Profilers} instance will initially wrap one {@link QueryProfiler}. */
55-
public Profilers(ContextIndexSearcher searcher) {
57+
public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearchEnabled) {
5658
this.searcher = searcher;
59+
this.isConcurrentSegmentSearchEnabled = isConcurrentSegmentSearchEnabled;
5760
this.queryProfilers = new ArrayList<>();
58-
this.aggProfiler = new AggregationProfiler();
61+
this.aggProfiler = isConcurrentSegmentSearchEnabled ? new ConcurrentAggregationProfiler() : new AggregationProfiler();
5962
addQueryProfiler();
6063
}
6164

6265
/** Switch to a new profile. */
6366
public QueryProfiler addQueryProfiler() {
64-
QueryProfiler profiler = new QueryProfiler(searcher.getExecutor() != null);
67+
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
6568
searcher.setProfiler(profiler);
6669
queryProfilers.add(profiler);
6770
return profiler;

server/src/main/java/org/opensearch/search/profile/Timer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
public class Timer {
5252

5353
private boolean doTiming;
54-
private long timing, count, lastCount, start;
54+
private long timing, count, lastCount, start, earliestTimerStartTime;
5555

5656
/** pkg-private for testing */
5757
long nanoTime() {
@@ -71,6 +71,9 @@ public final void start() {
7171
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
7272
if (doTiming) {
7373
start = nanoTime();
74+
if (count == 0) {
75+
earliestTimerStartTime = start;
76+
}
7477
}
7578
count++;
7679
}
@@ -92,6 +95,14 @@ public final long getCount() {
9295
return count;
9396
}
9497

98+
/** Return the timer start time in nanoseconds.*/
99+
public final long getEarliestTimerStartTime() {
100+
if (start != 0) {
101+
throw new IllegalStateException("#start call misses a matching #stop call");
102+
}
103+
return earliestTimerStartTime;
104+
}
105+
95106
/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
96107
public final long getApproximateTiming() {
97108
if (start != 0) {

server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.search.profile.AbstractProfileBreakdown;
3636

37+
import java.util.Collections;
3738
import java.util.HashMap;
3839
import java.util.Map;
3940

@@ -62,4 +63,18 @@ public void addDebugInfo(String key, Object value) {
6263
protected Map<String, Object> toDebugMap() {
6364
return unmodifiableMap(extra);
6465
}
66+
67+
/**
68+
* Build a timing count startTime breakdown for aggregation timing types
69+
*/
70+
@Override
71+
public Map<String, Long> toBreakdownMap() {
72+
Map<String, Long> map = new HashMap<>(timings.length * 3);
73+
for (AggregationTimingType timingType : timingTypes) {
74+
map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming());
75+
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, timings[timingType.ordinal()].getCount());
76+
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, timings[timingType.ordinal()].getEarliestTimerStartTime());
77+
}
78+
return Collections.unmodifiableMap(map);
79+
}
6580
}

server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.opensearch.search.profile.AbstractProfiler;
3737

3838
import java.util.HashMap;
39-
import java.util.LinkedList;
40-
import java.util.List;
4139
import java.util.Map;
4240

4341
/**
@@ -47,29 +45,25 @@
4745
*/
4846
public class AggregationProfiler extends AbstractProfiler<AggregationProfileBreakdown, Aggregator> {
4947

50-
private final Map<List<String>, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();
48+
private final Map<Aggregator, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();
5149

5250
public AggregationProfiler() {
5351
super(new InternalAggregationProfileTree());
5452
}
5553

54+
/**
55+
* This method does not need to be thread safe for concurrent search use case as well.
56+
* The {@link AggregationProfileBreakdown} for each Aggregation operator is created in sync path when
57+
* {@link org.opensearch.search.aggregations.BucketCollector#preCollection()} is called
58+
* on the Aggregation collector instances during construction.
59+
*/
5660
@Override
5761
public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) {
58-
List<String> path = getAggregatorPath(agg);
59-
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path);
62+
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg);
6063
if (aggregationProfileBreakdown == null) {
6164
aggregationProfileBreakdown = super.getQueryBreakdown(agg);
62-
profileBreakdownLookup.put(path, aggregationProfileBreakdown);
65+
profileBreakdownLookup.put(agg, aggregationProfileBreakdown);
6366
}
6467
return aggregationProfileBreakdown;
6568
}
66-
67-
public static List<String> getAggregatorPath(Aggregator agg) {
68-
LinkedList<String> path = new LinkedList<>();
69-
while (agg != null) {
70-
path.addFirst(agg.name());
71-
agg = agg.parent();
72-
}
73-
return path;
74-
}
7569
}

0 commit comments

Comments
 (0)