diff --git a/CHANGELOG.md b/CHANGELOG.md index ace34a4e4dc30..04bea0aa4c802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445)) - Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573)) +- [Workload Management] Add Workload Management Stats IT ([16341](https://github.com/opensearch-project/OpenSearch/pull/16341)) - Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620)) - Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612)) diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java index 6b68a83da94e2..498468cb7ae1c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java @@ -69,7 +69,7 @@ public class WorkloadManagementIT extends ParameterizedStaticSettingsOpenSearchI final static String CPU = "CPU"; final static String ENABLED = "enabled"; final static String DELETE = "DELETE"; - private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.SECONDS); + public final static TimeValue TIMEOUT = new TimeValue(3, TimeUnit.SECONDS); public WorkloadManagementIT(Settings nodeSettings) { super(nodeSettings); @@ -182,7 +182,7 @@ public void testNoCancellation() throws InterruptedException { updateQueryGroupInClusterState(DELETE, queryGroup); } - public Exception executeQueryGroupTask(String resourceType, String queryGroupId) throws InterruptedException { + public static Exception executeQueryGroupTask(String resourceType, String queryGroupId) throws InterruptedException { ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute( TestQueryGroupTaskTransportAction.ACTION, diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java new file mode 100644 index 0000000000000..5ce6e7015fbcf --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -0,0 +1,242 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.admin.cluster.wlm.WlmStatsAction; +import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest; +import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.backpressure.SearchBackpressureIT.ExceptionCatchingListener; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.opensearch.wlm.WorkloadManagementIT.TestClusterUpdatePlugin; +import org.opensearch.wlm.WorkloadManagementIT.TestClusterUpdateRequest; +import org.opensearch.wlm.WorkloadManagementIT.TestClusterUpdateTransportAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.wlm.QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER; +import static org.opensearch.wlm.WorkloadManagementIT.CPU; +import static org.opensearch.wlm.WorkloadManagementIT.DELETE; +import static org.opensearch.wlm.WorkloadManagementIT.MEMORY; +import static org.opensearch.wlm.WorkloadManagementIT.PUT; +import static org.opensearch.wlm.WorkloadManagementIT.TIMEOUT; +import static org.opensearch.wlm.WorkloadManagementIT.executeQueryGroupTask; + +public class WorkloadManagementStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + final static String DEFAULT_QUERY_GROUP = DEFAULT_QUERY_GROUP_ID_SUPPLIER.get(); + final static String _ALL = "_all"; + final static String NAME1 = "name1"; + final static String NAME2 = "name2"; + final static String INVALID_ID = "invalid_id"; + + public WorkloadManagementStatsIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestClusterUpdatePlugin.class); + return plugins; + } + + public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); + } + + public void testBasicWlmStats() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + updateQueryGroupInClusterState(DELETE, queryGroup); + WlmStatsResponse updated_response = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(updated_response, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); + } + + public void testWlmStatsWithQueryGroupId() throws Exception { + QueryGroup enforcedQueryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String enforcedQueryGroupId = enforcedQueryGroup.get_id(); + updateQueryGroupInClusterState(PUT, enforcedQueryGroup); + WlmStatsResponse enforcedCpuStats = getWlmStatsResponse(null, new String[] { enforcedQueryGroupId }, null); + validateResponse(enforcedCpuStats, new String[] { enforcedQueryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); + + WlmStatsResponse defaultGroupStats = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); + validateResponse(defaultGroupStats, new String[] { DEFAULT_QUERY_GROUP }, new String[] { enforcedQueryGroupId }); + + QueryGroup monitoredQueryGroup = new QueryGroup( + NAME2, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) + ); + String monitoredQueryGroupId = monitoredQueryGroup.get_id(); + updateQueryGroupInClusterState(PUT, monitoredQueryGroup); + WlmStatsResponse updatedStats = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, monitoredQueryGroupId }, null); + validateResponse(updatedStats, new String[] { DEFAULT_QUERY_GROUP, monitoredQueryGroupId }, new String[] { enforcedQueryGroupId }); + + WlmStatsResponse invalidStatsResponse = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); + validateResponse( + invalidStatsResponse, + null, + new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId, monitoredQueryGroupId, INVALID_ID } + ); + + updateQueryGroupInClusterState(DELETE, enforcedQueryGroup); + updateQueryGroupInClusterState(DELETE, monitoredQueryGroup); + } + + public void testWlmStatsWithBreach() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.0000001)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse breachedGroupsResponse = getWlmStatsResponse(null, new String[] { _ALL }, true); + validateResponse(breachedGroupsResponse, null, new String[] { DEFAULT_QUERY_GROUP, id }); + + WlmStatsResponse nonBreachedGroupsResponse = getWlmStatsResponse(null, new String[] { _ALL }, false); + validateResponse(nonBreachedGroupsResponse, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + WlmStatsResponse allGroupsResponse = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(allGroupsResponse, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + executeQueryGroupTask(MEMORY, id); + WlmStatsResponse updatedBreachedGroupsResponse = getWlmStatsResponse(null, new String[] { _ALL }, true); + validateResponse(updatedBreachedGroupsResponse, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + + public void testWlmStatsWithNodesId() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String queryGroupId = queryGroup.get_id(); + String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse breachedGroupsResponse = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); + validateResponse(breachedGroupsResponse, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + WlmStatsResponse nonBreachedGroupsResponse = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); + validateResponse( + nonBreachedGroupsResponse, + new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, + new String[] { INVALID_ID } + ); + + WlmStatsResponse invalidGroupsResponse = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); + validateResponse(invalidGroupsResponse, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + + public void testWlmStatsWithIdAndBreach() throws Exception { + QueryGroup enforcedQueryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String enforcedQueryGroupId = enforcedQueryGroup.get_id(); + String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); + updateQueryGroupInClusterState(PUT, enforcedQueryGroup); + QueryGroup softQueryGroup = new QueryGroup( + NAME2, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.000000001)) + ); + updateQueryGroupInClusterState(PUT, softQueryGroup); + + WlmStatsResponse nodeStatsResponse = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); + validateResponse(nodeStatsResponse, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId }); + WlmStatsResponse breachedGroupsResponse = getWlmStatsResponse( + null, + new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId }, + true + ); + validateResponse(breachedGroupsResponse, null, new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId }); + WlmStatsResponse nonBreachedGroupsResponse = getWlmStatsResponse( + null, + new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId }, + false + ); + validateResponse(nonBreachedGroupsResponse, new String[] { DEFAULT_QUERY_GROUP, enforcedQueryGroupId }, null); + + updateQueryGroupInClusterState(DELETE, enforcedQueryGroup); + updateQueryGroupInClusterState(DELETE, softQueryGroup); + } + + public void testWlmStatsWithBreachForSoftQueryGroup() throws Exception { + QueryGroup softQueryGroup = new QueryGroup( + NAME2, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.000000001)) + ); + String softQueryGroupId = softQueryGroup.get_id(); + updateQueryGroupInClusterState(PUT, softQueryGroup); + executeQueryGroupTask(CPU, softQueryGroupId); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { softQueryGroupId }, true); + validateResponse(response, new String[] { softQueryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); + updateQueryGroupInClusterState(DELETE, softQueryGroup); + } + + public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException { + ExceptionCatchingListener listener = new ExceptionCatchingListener(); + client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener); + assertTrue(listener.getLatch().await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + assertEquals(0, listener.getLatch().getCount()); + } + + public WlmStatsResponse getWlmStatsResponse(String[] nodesId, String[] queryGroupIds, Boolean breach) throws ExecutionException, + InterruptedException { + WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(nodesId, new HashSet<>(Arrays.asList(queryGroupIds)), breach); + return client().execute(WlmStatsAction.INSTANCE, wlmStatsRequest).get(); + } + + public void validateResponse(WlmStatsResponse response, String[] validIds, String[] invalidIds) { + assertNotNull(response.toString()); + String res = response.toString(); + if (validIds != null) { + for (String validId : validIds) { + assertTrue(res.contains(validId)); + } + } + if (invalidIds != null) { + for (String invalidId : invalidIds) { + assertFalse(res.contains(invalidId)); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 4451b3e7b62f4..1a94b6d609962 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -232,6 +232,9 @@ public QueryGroupStats nodeStats(Set queryGroupIds, Boolean requestedBre * @return if the QueryGroup breaches any resource limit based on the LastRecordedUsage */ public boolean resourceLimitBreached(String id, QueryGroupState currentState) { + if (id.equals("DEFAULT_QUERY_GROUP")) { + return false; + } QueryGroup queryGroup = clusterService.state().metadata().queryGroups().get(id); if (queryGroup == null) { throw new ResourceNotFoundException("QueryGroup with id " + id + " does not exist"); @@ -326,7 +329,7 @@ public Set getDeletedQueryGroups() { /** * This method determines whether the task should be accounted by SBP if both features co-exist * @param t QueryGroupTask - * @return whether or not SBP handle it + * @return whether SBP handles it */ public boolean shouldSBPHandle(Task t) { QueryGroupTask task = (QueryGroupTask) t;