From 49aae5e457a243ec560b2a267ec1bebe13d88fff Mon Sep 17 00:00:00 2001 From: penghuo Date: Wed, 17 Aug 2022 15:33:20 -0700 Subject: [PATCH] Deprecated ClusterService and Using NodeClient to fetch meta data Signed-off-by: penghuo --- .../plugin/OpenSearchSQLPluginConfig.java | 5 +- .../sql/legacy/plugin/RestSqlAction.java | 35 +++-- .../client/OpenSearchNodeClient.java | 72 ++------- .../sql/opensearch/executor/Scheduler.java | 33 ++++ .../client/OpenSearchNodeClientTest.java | 146 ++++++------------ .../opensearch/executor/SchedulerTest.java | 41 +++++ .../plugin/rest/OpenSearchPluginConfig.java | 6 +- .../sql/plugin/rest/RestPPLQueryAction.java | 12 +- 8 files changed, 170 insertions(+), 180 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java create mode 100644 opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/OpenSearchSQLPluginConfig.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/OpenSearchSQLPluginConfig.java index 91b3a589256..b396d896b01 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/OpenSearchSQLPluginConfig.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/OpenSearchSQLPluginConfig.java @@ -7,7 +7,6 @@ package org.opensearch.sql.legacy.plugin; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.expression.config.ExpressionConfig; @@ -34,8 +33,6 @@ @Configuration @Import({ExpressionConfig.class}) public class OpenSearchSQLPluginConfig { - @Autowired - private ClusterService clusterService; @Autowired private NodeClient nodeClient; @@ -48,7 +45,7 @@ public class OpenSearchSQLPluginConfig { @Bean public OpenSearchClient client() { - return new OpenSearchNodeClient(clusterService, nodeClient); + return new OpenSearchNodeClient(nodeClient); } @Bean diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 10d9dab0fa0..22041990c08 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -9,6 +9,7 @@ import static org.opensearch.rest.RestStatus.BAD_REQUEST; import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; +import static org.opensearch.sql.opensearch.executor.Scheduler.schedule; import com.alibaba.druid.sql.parser.ParserException; import com.google.common.collect.ImmutableList; @@ -147,19 +148,27 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - // Route request to new query engine if it's supported already - SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), - sqlRequest.getSql(), request.path(), request.params()); - RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client); - if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) { - LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId()); - return result; - } - LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine", - LogUtils.getRequestId(), newSqlRequest); - - final QueryAction queryAction = explainRequest(client, sqlRequest, format); - return channel -> executeSqlRequest(request, queryAction, client, channel); + return channel -> schedule(client, () -> { + try { + // Route request to new query engine if it's supported already + SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), + sqlRequest.getSql(), request.path(), request.params()); + RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client); + if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) { + LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId()); + result.accept(channel); + } else { + LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine", + LogUtils.getRequestId(), newSqlRequest); + + QueryAction queryAction = explainRequest(client, sqlRequest, format); + executeSqlRequest(request, queryAction, client, channel); + } + } catch (Exception e) { + logAndPublishMetrics(e); + reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE); + } + }); } catch (Exception e) { logAndPublishMetrics(e); return channel -> reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index fe262808121..514a6c55e28 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -6,10 +6,9 @@ package org.opensearch.sql.opensearch.client; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import java.io.IOException; +import com.google.common.collect.Streams; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -18,21 +17,14 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.logging.log4j.ThreadContext; import org.opensearch.action.admin.indices.get.GetIndexResponse; -import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.collect.ImmutableOpenMap; -import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -import org.opensearch.threadpool.ThreadPool; /** OpenSearch connection by node client. */ public class OpenSearchNodeClient implements OpenSearchClient { @@ -40,23 +32,16 @@ public class OpenSearchNodeClient implements OpenSearchClient { public static final Function> ALL_FIELDS = (anyIndex -> (anyField -> true)); - /** Current cluster state on local node. */ - private final ClusterService clusterService; - /** Node client provided by OpenSearch container. */ private final NodeClient client; /** Index name expression resolver to get concrete index name. */ private final IndexNameExpressionResolver resolver; - private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - /** * Constructor of ElasticsearchNodeClient. */ - public OpenSearchNodeClient(ClusterService clusterService, - NodeClient client) { - this.clusterService = clusterService; + public OpenSearchNodeClient(NodeClient client) { this.client = client; this.resolver = new IndexNameExpressionResolver(client.threadPool().getThreadContext()); } @@ -75,12 +60,14 @@ public OpenSearchNodeClient(ClusterService clusterService, @Override public Map getIndexMappings(String... indexExpression) { try { - ClusterState state = clusterService.state(); - String[] concreteIndices = resolveIndexExpression(state, indexExpression); - - return populateIndexMappings( - state.metadata().findMappings(concreteIndices, ALL_FIELDS)); - } catch (IOException e) { + GetMappingsResponse mappingsResponse = client.admin().indices() + .prepareGetMappings(indexExpression) + .setLocal(true) + .get(); + return Streams.stream(mappingsResponse.mappings().iterator()) + .collect(Collectors.toMap(cursor -> cursor.key, + cursor -> new IndexMapping(cursor.value))); + } catch (Exception e) { throw new IllegalStateException( "Failed to read mapping in cluster state for index pattern [" + indexExpression + "]", e); } @@ -123,9 +110,8 @@ public List indices() { */ @Override public Map meta() { - final ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); - builder.put(META_CLUSTER_NAME, clusterService.getClusterName().value()); - return builder.build(); + return ImmutableMap.of(META_CLUSTER_NAME, + client.settings().get("cluster.name", "opensearch")); } @Override @@ -135,40 +121,12 @@ public void cleanup(OpenSearchRequest request) { @Override public void schedule(Runnable task) { - ThreadPool threadPool = client.threadPool(); - threadPool.schedule( - withCurrentContext(task), - new TimeValue(0), - SQL_WORKER_THREAD_POOL_NAME - ); + // at that time, task already running the sql-worker ThreadPool. + task.run(); } @Override public NodeClient getNodeClient() { return client; } - - private String[] resolveIndexExpression(ClusterState state, String[] indices) { - return resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices); - } - - private Map populateIndexMappings( - ImmutableOpenMap indexMappings) { - - ImmutableMap.Builder result = ImmutableMap.builder(); - for (ObjectObjectCursor cursor: - indexMappings) { - result.put(cursor.key, new IndexMapping(cursor.value)); - } - return result.build(); - } - - /** Copy from LogUtils. */ - private static Runnable withCurrentContext(final Runnable task) { - final Map currentContext = ThreadContext.getImmutableContext(); - return () -> { - ThreadContext.putAll(currentContext); - task.run(); - }; - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java new file mode 100644 index 00000000000..5567d1f9b20 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.executor; + +import java.util.Map; +import lombok.experimental.UtilityClass; +import org.apache.logging.log4j.ThreadContext; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +/** The scheduler which schedule the task run in sql-worker thread pool. */ +@UtilityClass +public class Scheduler { + + public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + + public static void schedule(NodeClient client, Runnable task) { + ThreadPool threadPool = client.threadPool(); + threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); + } + + private static Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } +} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java index 50410e07ccd..bb883e3837f 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java @@ -12,8 +12,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -22,7 +23,6 @@ import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSortedMap; import com.google.common.io.Resources; import java.io.IOException; import java.net.URL; @@ -39,24 +39,20 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.action.search.ClearScrollRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.AliasMetadata; -import org.opensearch.cluster.metadata.IndexAbstraction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.DeprecationHandler; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.index.IndexNotFoundException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.sql.data.model.ExprIntegerValue; @@ -66,7 +62,6 @@ import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -import org.opensearch.threadpool.ThreadPool; @ExtendWith(MockitoExtension.class) class OpenSearchNodeClientTest { @@ -137,8 +132,8 @@ public void getIndexMappingsWithEmptyMapping() { @Test public void getIndexMappingsWithIOException() { String indexName = "test"; - ClusterService clusterService = mockClusterService(indexName, new IOException()); - OpenSearchNodeClient client = new OpenSearchNodeClient(clusterService, nodeClient); + when(nodeClient.admin().indices()).thenThrow(RuntimeException.class); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); assertThrows(IllegalStateException.class, () -> client.getIndexMappings(indexName)); } @@ -146,9 +141,8 @@ public void getIndexMappingsWithIOException() { @Test public void getIndexMappingsWithNonExistIndex() { OpenSearchNodeClient client = - new OpenSearchNodeClient(mockClusterService("test"), nodeClient); - - assertThrows(IndexNotFoundException.class, () -> client.getIndexMappings("non_exist_index")); + new OpenSearchNodeClient(mockNodeClient("test")); + assertTrue(client.getIndexMappings("non_exist_index").isEmpty()); } /** Jacoco enforce this constant lambda be tested. */ @@ -160,7 +154,7 @@ public void testAllFieldsPredicate() { @Test public void search() { OpenSearchNodeClient client = - new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + new OpenSearchNodeClient(nodeClient); // Mock first scroll request SearchResponse searchResponse = mock(SearchResponse.class); @@ -198,23 +192,12 @@ public void search() { @Test void schedule() { - ThreadPool threadPool = mock(ThreadPool.class); - when(nodeClient.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - doAnswer( - invocation -> { - Runnable task = invocation.getArgument(0); - task.run(); - return null; - }) - .when(threadPool) - .schedule(any(), any(), any()); - - OpenSearchNodeClient client = - new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); AtomicBoolean isRun = new AtomicBoolean(false); - client.schedule(() -> isRun.set(true)); + client.schedule( + () -> { + isRun.set(true); + }); assertTrue(isRun.get()); } @@ -225,8 +208,7 @@ void cleanup() { when(requestBuilder.addScrollId(any())).thenReturn(requestBuilder); when(requestBuilder.get()).thenReturn(null); - OpenSearchNodeClient client = - new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); request.setScrollId("scroll123"); client.cleanup(request); @@ -240,8 +222,7 @@ void cleanup() { @Test void cleanupWithoutScrollId() { - OpenSearchNodeClient client = - new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); client.cleanup(request); @@ -262,95 +243,66 @@ void getIndices() { when(indexResponse.getIndices()).thenReturn(new String[] {"index"}); when(indexResponse.aliases()).thenReturn(openMap); - OpenSearchNodeClient client = - new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); final List indices = client.indices(); assertEquals(2, indices.size()); } @Test void meta() { - ClusterName clusterName = mock(ClusterName.class); - ClusterService mockService = mock(ClusterService.class); - when(clusterName.value()).thenReturn("cluster-name"); - when(mockService.getClusterName()).thenReturn(clusterName); + Settings settings = mock(Settings.class); + when(nodeClient.settings()).thenReturn(settings); + when(settings.get(anyString(), anyString())).thenReturn("cluster-name"); - OpenSearchNodeClient client = - new OpenSearchNodeClient(mockService, nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); final Map meta = client.meta(); assertEquals("cluster-name", meta.get(META_CLUSTER_NAME)); } @Test void ml() { - OpenSearchNodeClient client = new OpenSearchNodeClient(mock(ClusterService.class), nodeClient); + OpenSearchNodeClient client = new OpenSearchNodeClient(nodeClient); assertNotNull(client.getNodeClient()); } private OpenSearchNodeClient mockClient(String indexName, String mappings) { - ClusterService clusterService = mockClusterService(indexName, mappings); - return new OpenSearchNodeClient(clusterService, nodeClient); + mockNodeClientIndicesMappings(indexName, mappings); + return new OpenSearchNodeClient(nodeClient); } - /** Mock getAliasAndIndexLookup() only for index name resolve test. */ - public ClusterService mockClusterService(String indexName) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); - - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - when(mockMetaData.getIndicesLookup()) - .thenReturn(ImmutableSortedMap.of(indexName, mock(IndexAbstraction.class))); - return mockService; - } - - public ClusterService mockClusterService(String indexName, String mappings) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); - - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); + public void mockNodeClientIndicesMappings(String indexName, String mappings) { + GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); + MappingMetadata emptyMapping = mock(MappingMetadata.class); + when(nodeClient.admin().indices() + .prepareGetMappings(any()) + .setLocal(anyBoolean()) + .get()).thenReturn(mockResponse); try { - ImmutableOpenMap.Builder builder = - ImmutableOpenMap.builder(); - MappingMetadata metadata; + ImmutableOpenMap metadata; if (mappings.isEmpty()) { - metadata = MappingMetadata.EMPTY_MAPPINGS; + when(emptyMapping.getSourceAsMap()).thenReturn(ImmutableMap.of()); + metadata = + new ImmutableOpenMap.Builder() + .fPut(indexName, emptyMapping) + .build(); } else { - metadata = IndexMetadata.fromXContent(createParser(mappings)).mapping(); + metadata = new ImmutableOpenMap.Builder().fPut(indexName, + IndexMetadata.fromXContent(createParser(mappings)).mapping()).build(); } - - - builder.put(indexName, metadata); - when(mockMetaData.findMappings(any(), any())).thenReturn(builder.build()); - - // IndexNameExpressionResolver use this method to check if index exists. If not, - // IndexNotFoundException is thrown. - when(mockMetaData.getIndicesLookup()) - .thenReturn(ImmutableSortedMap.of(indexName, mock(IndexAbstraction.class))); + when(mockResponse.mappings()).thenReturn(metadata); } catch (IOException e) { - throw new IllegalStateException("Failed to mock cluster service", e); + throw new IllegalStateException("Failed to mock node client", e); } - return mockService; } - public ClusterService mockClusterService(String indexName, Throwable t) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); - - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - try { - when(mockMetaData.findMappings(any(), any())).thenThrow(t); - when(mockMetaData.getIndicesLookup()) - .thenReturn(ImmutableSortedMap.of(indexName, mock(IndexAbstraction.class))); - } catch (IOException e) { - throw new IllegalStateException("Failed to mock cluster service", e); - } - return mockService; + public NodeClient mockNodeClient(String indexName) { + GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); + when(nodeClient.admin().indices() + .prepareGetMappings(any()) + .setLocal(anyBoolean()) + .get()).thenReturn(mockResponse); + when(mockResponse.mappings()).thenReturn(ImmutableOpenMap.of()); + return nodeClient; } private XContentParser createParser(String mappings) throws IOException { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java new file mode 100644 index 00000000000..f14bda7a95c --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.executor; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.node.NodeClient; +import org.opensearch.threadpool.ThreadPool; + +@ExtendWith(MockitoExtension.class) +class SchedulerTest { + @Test + public void schedule() { + NodeClient nodeClient = mock(NodeClient.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(nodeClient.threadPool()).thenReturn(threadPool); + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return null; + }) + .when(threadPool) + .schedule(any(), any(), any()); + AtomicBoolean isRun = new AtomicBoolean(false); + Scheduler.schedule(nodeClient, () -> isRun.set(true)); + assertTrue(isRun.get()); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/OpenSearchPluginConfig.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/OpenSearchPluginConfig.java index c1b860877b5..6d8dbf50bc9 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/OpenSearchPluginConfig.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/OpenSearchPluginConfig.java @@ -7,7 +7,6 @@ package org.opensearch.sql.plugin.rest; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.monitor.ResourceMonitor; @@ -31,9 +30,6 @@ @Configuration public class OpenSearchPluginConfig { - @Autowired - private ClusterService clusterService; - @Autowired private NodeClient nodeClient; @@ -42,7 +38,7 @@ public class OpenSearchPluginConfig { @Bean public OpenSearchClient client() { - return new OpenSearchNodeClient(clusterService, nodeClient); + return new OpenSearchNodeClient(nodeClient); } @Bean diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index d1219804fda..f97baafa002 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -10,6 +10,7 @@ import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; +import static org.opensearch.sql.opensearch.executor.Scheduler.schedule; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import com.google.common.collect.ImmutableList; @@ -136,10 +137,13 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod PPLService pplService = createPPLService(nodeClient); PPLQueryRequest pplRequest = PPLQueryRequestFactory.getPPLRequest(request); - if (pplRequest.isExplainRequest()) { - return channel -> pplService.explain(pplRequest, createExplainResponseListener(channel)); - } - return channel -> pplService.execute(pplRequest, createListener(channel, pplRequest)); + return channel -> schedule(nodeClient, () -> { + if (pplRequest.isExplainRequest()) { + pplService.explain(pplRequest, createExplainResponseListener(channel)); + } else { + pplService.execute(pplRequest, createListener(channel, pplRequest)); + } + }); } /**