diff --git a/CHANGELOG.md b/CHANGELOG.md index e14e20ce212e4..64d098be0d8f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) +- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d664785a8eb0c..8575849a0b8ec 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -263,7 +263,9 @@ import org.opensearch.transport.TransportService; import org.opensearch.usage.UsageService; import org.opensearch.watcher.ResourceWatcherService; +import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.WorkloadManagementTransportInterceptor; +import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener; import javax.net.ssl.SNIHostName; @@ -996,11 +998,22 @@ protected Node( List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); + final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = + new QueryGroupRequestRejectionOperationListener( + new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService + threadPool + ); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener), + Stream.of( + searchRequestStats, + searchRequestSlowLog, + searchTaskRequestOperationsListener, + queryGroupRequestRejectionListener + ), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) .map(p -> (SearchRequestOperationsListener) p) @@ -1050,7 +1063,8 @@ protected Node( ); WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor( - threadPool + threadPool, + new QueryGroupService() // We will need to replace this with actual implementation ); final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java new file mode 100644 index 0000000000000..97c4e5169b4ed --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; + +/** + * This is stub at this point in time and will be replace by an acutal one in couple of days + */ +public class QueryGroupService { + /** + * + * @param queryGroupId query group identifier + */ + public void rejectIfNeeded(String queryGroupId) { + if (queryGroupId == null) return; + boolean reject = false; + final StringBuilder reason = new StringBuilder(); + // TODO: At this point this is dummy and we need to decide whether to cancel the request based on last + // reported resource usage for the queryGroup. We also need to increment the rejection count here for the + // query group + if (reject) { + throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.toString()); + } + } +} diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java index 848df8712549a..d382b4c729a38 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -20,9 +20,11 @@ */ public class WorkloadManagementTransportInterceptor implements TransportInterceptor { private final ThreadPool threadPool; + private final QueryGroupService queryGroupService; - public WorkloadManagementTransportInterceptor(ThreadPool threadPool) { + public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final QueryGroupService queryGroupService) { this.threadPool = threadPool; + this.queryGroupService = queryGroupService; } @Override @@ -32,7 +34,7 @@ public TransportRequestHandler interceptHandler( boolean forceExecution, TransportRequestHandler actualHandler ) { - return new RequestHandler(threadPool, actualHandler); + return new RequestHandler(threadPool, actualHandler, queryGroupService); } /** @@ -43,16 +45,20 @@ public static class RequestHandler implements Transp private final ThreadPool threadPool; TransportRequestHandler actualHandler; + private final QueryGroupService queryGroupService; - public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler) { + public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler, QueryGroupService queryGroupService) { this.threadPool = threadPool; this.actualHandler = actualHandler; + this.queryGroupService = queryGroupService; } @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { if (isSearchWorkloadRequest(task)) { ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); + queryGroupService.rejectIfNeeded(queryGroupId); } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java new file mode 100644 index 0000000000000..89f6fe709667f --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; + +/** + * This listener is used to perform the rejections for incoming requests into a queryGroup + */ +public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener { + + private final QueryGroupService queryGroupService; + private final ThreadPool threadPool; + + public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) { + this.queryGroupService = queryGroupService; + this.threadPool = threadPool; + } + + /** + * This method assumes that the queryGroupId is already populated in the thread context + * @param searchRequestContext SearchRequestContext instance + */ + @Override + protected void onRequestStart(SearchRequestContext searchRequestContext) { + final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); + queryGroupService.rejectIfNeeded(queryGroupId); + } +} diff --git a/server/src/main/java/org/opensearch/wlm/listeners/package-info.java b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java new file mode 100644 index 0000000000000..e900acf657085 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * WLM related listener constructs + */ +package org.opensearch.wlm.listeners; diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java index db4e5e45d49ed..4668b845150a9 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java @@ -25,7 +25,7 @@ public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestC public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); - sut = new WorkloadManagementTransportInterceptor(threadPool); + sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService()); } public void tearDown() throws Exception { diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java index 789c02345e774..59818ad3dbbd2 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -9,6 +9,7 @@ package org.opensearch.wlm; import org.opensearch.action.index.IndexRequest; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; @@ -20,12 +21,16 @@ import java.util.Collections; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { private WorkloadManagementTransportInterceptor.RequestHandler sut; private ThreadPool threadPool; + private QueryGroupService queryGroupService; private TestTransportRequestHandler actualHandler; @@ -33,8 +38,9 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); actualHandler = new TestTransportRequestHandler<>(); + queryGroupService = mock(QueryGroupService.class); - sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler); + sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler, queryGroupService); } public void tearDown() throws Exception { @@ -42,14 +48,22 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testMessageReceivedForSearchWorkload() throws Exception { + public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); - + doNothing().when(queryGroupService).rejectIfNeeded(anyString()); sut.messageReceived(request, mock(TransportChannel.class), spyTask); assertTrue(sut.isSearchWorkloadRequest(spyTask)); } + public void testMessageReceivedForSearchWorkload_RejectionCase() throws Exception { + ShardSearchRequest request = mock(ShardSearchRequest.class); + QueryGroupTask spyTask = getSpyTask(); + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(anyString()); + + assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.messageReceived(request, mock(TransportChannel.class), spyTask)); + } + public void testMessageReceivedForNonSearchWorkload() throws Exception { IndexRequest indexRequest = mock(IndexRequest.class); Task task = mock(Task.class); diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java new file mode 100644 index 0000000000000..19e82aca26153 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase { + ThreadPool testThreadPool; + QueryGroupService queryGroupService; + QueryGroupRequestRejectionOperationListener sut; + + public void setUp() throws Exception { + super.setUp(); + testThreadPool = new TestThreadPool("RejectionTestThreadPool"); + queryGroupService = mock(QueryGroupService.class); + sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); + } + + public void tearDown() throws Exception { + super.tearDown(); + testThreadPool.shutdown(); + } + + public void testRejectionCase() { + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(testQueryGroupId); + assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null)); + } + + public void testNonRejectionCase() { + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + doNothing().when(queryGroupService).rejectIfNeeded(testQueryGroupId); + + sut.onRequestStart(null); + } +}