Skip to content

Commit

Permalink
consume query level cpu and memory usage in query insights
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed May 17, 2024
1 parent 9e62ccf commit 80c4615
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
Expand All @@ -24,13 +26,20 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE;

/**
* The listener for query insights services.
Expand All @@ -45,6 +54,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
private static final Logger log = LogManager.getLogger(QueryInsightsListener.class);

private final QueryInsightsService queryInsightsService;
private final ClusterService clusterService;

/**
* Constructor for QueryInsightsListener
Expand All @@ -54,6 +64,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
*/
@Inject
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
Expand All @@ -69,11 +80,49 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_CPU_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.CPU, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_CPU_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_CPU_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).validateWindowSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_MEMORY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.MEMORY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_MEMORY_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).validateWindowSize(v)
);
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.CPU, clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.CPU)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.CPU)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.MEMORY, clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.MEMORY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.MEMORY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_WINDOW_SIZE));
}

/**
Expand Down Expand Up @@ -123,6 +172,18 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
tasksResourceUsages.add(
new TaskResourceInfo(
searchTask.getAction(),
searchTask.getId(),
searchTask.getParentTaskId().getId(),
clusterService.localNode().getId(),
searchTask.getTotalResourceStats()
)
);

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Expand All @@ -132,12 +193,25 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
);
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASKS_RESOURCE_USAGES, tasksResourceUsages);
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
NODE_ID;
NODE_ID,
/**
* Tasks level resource usages in this request
*/
TASKS_RESOURCE_USAGES;

/**
* Read an Attribute from a StreamInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum MetricType implements Comparator<Number> {
/**
* JVM heap usage metric type
*/
JVM;
MEMORY;

/**
* Read a MetricType from a StreamInput
Expand Down Expand Up @@ -93,10 +93,9 @@ public static Set<MetricType> allMetricTypes() {
public int compare(final Number a, final Number b) {
switch (this) {
case LATENCY:
return Long.compare(a.longValue(), b.longValue());
case JVM:
case CPU:
return Double.compare(a.doubleValue(), b.doubleValue());
case MEMORY:
return Long.compare(a.longValue(), b.longValue());
}
return -1;
}
Expand All @@ -110,10 +109,9 @@ public int compare(final Number a, final Number b) {
Number parseValue(final Object o) {
switch (this) {
case LATENCY:
return (Long) o;
case JVM:
case CPU:
return (Double) o;
case MEMORY:
return (Long) o;
default:
return (Number) o;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.rules.transport.top_queries;

import org.opensearch.OpenSearchException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
Expand All @@ -21,15 +20,13 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

/**
* Transport action for cluster/node level top queries information.
Expand Down Expand Up @@ -81,17 +78,18 @@ protected TopQueriesResponse newResponse(
final List<TopQueries> responses,
final List<FailedNodeException> failures
) {
if (topQueriesRequest.getMetricType() == MetricType.LATENCY) {
return new TopQueriesResponse(
clusterService.getClusterName(),
responses,
failures,
clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE),
MetricType.LATENCY
);
} else {
throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", topQueriesRequest.getMetricType()));
int size;
switch (topQueriesRequest.getMetricType()) {
case CPU:
size = clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE);
break;
case MEMORY:
size = clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE);
break;
default:
size = clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
}
return new TopQueriesResponse(clusterService.getClusterName(), responses, failures, size, topQueriesRequest.getMetricType());
}

@Override
Expand All @@ -107,15 +105,10 @@ protected TopQueries newNodeResponse(final StreamInput in) throws IOException {
@Override
protected TopQueries nodeOperation(final NodeRequest nodeRequest) {
final TopQueriesRequest request = nodeRequest.request;
if (request.getMetricType() == MetricType.LATENCY) {
return new TopQueries(
clusterService.localNode(),
queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(true)
);
} else {
throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType()));
}

return new TopQueries(
clusterService.localNode(),
queryInsightsService.getTopQueriesService(request.getMetricType()).getTopQueriesRecords(true)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class QueryInsightsSettings {
public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.insights.top_queries";
/** Default prefix for top N queries by latency feature */
public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency";
/** Default prefix for top N queries by cpu feature */
public static final String TOP_N_CPU_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".cpu";
/** Default prefix for top N queries by memory feature */
public static final String TOP_N_MEMORY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".memory";
/**
* Boolean setting for enabling top queries by latency.
*/
Expand Down Expand Up @@ -109,6 +113,66 @@ public class QueryInsightsSettings {
Setting.Property.Dynamic
);

/**
* Boolean setting for enabling top queries by cpu.
*/
public static final Setting<Boolean> TOP_N_CPU_QUERIES_ENABLED = Setting.boolSetting(
TOP_N_CPU_QUERIES_PREFIX + ".enabled",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Int setting to define the top n size for top queries by cpu.
*/
public static final Setting<Integer> TOP_N_CPU_QUERIES_SIZE = Setting.intSetting(
TOP_N_CPU_QUERIES_PREFIX + ".top_n_size",
DEFAULT_TOP_N_SIZE,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Time setting to define the window size in seconds for top queries by cpu.
*/
public static final Setting<TimeValue> TOP_N_CPU_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting(
TOP_N_CPU_QUERIES_PREFIX + ".window_size",
DEFAULT_WINDOW_SIZE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Boolean setting for enabling top queries by memory.
*/
public static final Setting<Boolean> TOP_N_MEMORY_QUERIES_ENABLED = Setting.boolSetting(
TOP_N_MEMORY_QUERIES_PREFIX + ".enabled",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Int setting to define the top n size for top queries by memory.
*/
public static final Setting<Integer> TOP_N_MEMORY_QUERIES_SIZE = Setting.intSetting(
TOP_N_MEMORY_QUERIES_PREFIX + ".top_n_size",
DEFAULT_TOP_N_SIZE,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Time setting to define the window size in seconds for top queries by memory.
*/
public static final Setting<TimeValue> TOP_N_MEMORY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting(
TOP_N_MEMORY_QUERIES_PREFIX + ".window_size",
DEFAULT_WINDOW_SIZE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int
randomLongBetween(1000, 10000),
MetricType.CPU,
randomDouble(),
MetricType.JVM,
MetricType.MEMORY,
randomDouble()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testSetEnabled() {

when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false);
when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false);
when(queryInsightsService.isCollectionEnabled(MetricType.JVM)).thenReturn(false);
when(queryInsightsService.isCollectionEnabled(MetricType.MEMORY)).thenReturn(false);
queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false);
assertFalse(queryInsightsListener.isEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void setup() {
queryInsightsService = new QueryInsightsService(threadPool);
queryInsightsService.enableCollection(MetricType.LATENCY, true);
queryInsightsService.enableCollection(MetricType.CPU, true);
queryInsightsService.enableCollection(MetricType.JVM, true);
queryInsightsService.enableCollection(MetricType.MEMORY, true);
}

public void testAddRecordToLimitAndDrain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testSerializationAndEquals() throws Exception {

public void testAllMetricTypes() {
Set<MetricType> allMetrics = MetricType.allMetricTypes();
Set<MetricType> expected = new HashSet<>(Arrays.asList(MetricType.LATENCY, MetricType.CPU, MetricType.JVM));
Set<MetricType> expected = new HashSet<>(Arrays.asList(MetricType.LATENCY, MetricType.CPU, MetricType.MEMORY));
assertEquals(expected, allMetrics);
}

Expand Down

0 comments on commit 80c4615

Please sign in to comment.