diff --git a/src/main/java/org/opensearch/sdk/ExtensionClusterStateResponseHandler.java b/src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java similarity index 57% rename from src/main/java/org/opensearch/sdk/ExtensionClusterStateResponseHandler.java rename to src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java index a2d8ff32a..b6b5e9b30 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionClusterStateResponseHandler.java +++ b/src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java @@ -2,19 +2,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.cluster.ExtensionClusterStateResponse; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; import java.io.IOException; -public class ExtensionClusterStateResponseHandler implements TransportResponseHandler { - private static final Logger logger = LogManager.getLogger(ExtensionClusterStateResponseHandler.class); +public class ClusterStateResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class); @Override - public void handleResponse(ExtensionClusterStateResponse response) { + public void handleResponse(ClusterStateResponse response) { logger.info("received {}", response); } @@ -29,7 +29,7 @@ public String executor() { } @Override - public ExtensionClusterStateResponse read(StreamInput in) throws IOException { - return new ExtensionClusterStateResponse(in); + public ClusterStateResponse read(StreamInput in) throws IOException { + return new ClusterStateResponse(in); } } diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index afbb2dac6..bed2509b8 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -17,7 +17,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; -import org.opensearch.cluster.ExtensionRequest; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkModule; @@ -26,6 +25,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.discovery.PluginRequest; import org.opensearch.discovery.PluginResponse; +import org.opensearch.extensions.ExtensionRequest; import org.opensearch.extensions.ExtensionsOrchestrator; import org.opensearch.index.IndicesModuleNameResponse; import org.opensearch.index.IndicesModuleRequest; @@ -35,16 +35,11 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.search.SearchModule; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.ClusterConnectionManager; -import org.opensearch.transport.ConnectionManager; -import org.opensearch.transport.TransportService; -import org.opensearch.transport.TransportSettings; +import org.opensearch.transport.*; import org.opensearch.sdk.netty4.Netty4Transport; import org.opensearch.sdk.netty4.SharedGroupFactory; -import org.opensearch.transport.TransportInterceptor; - import java.io.File; import java.io.IOException; import java.util.Collections; @@ -101,38 +96,6 @@ IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesMod // CreateComponent DiscoveryNode opensearchNode = getOpensearchNode(); transportService.connectToNode(opensearchNode); - - try { - logger.info("Sending Cluster State request to OpenSearch after creating index"); - ExtensionClusterStateResponseHandler clusterStateResponseHandler = new ExtensionClusterStateResponseHandler(); - transportService.sendRequest( - opensearchNode, - ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE, - new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE), - clusterStateResponseHandler - ); - logger.info("Sending Cluster Settings request to OpenSearch after creating index"); - ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler(); - transportService.sendRequest( - opensearchNode, - ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS, - new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS), - clusterSettingResponseHandler - ); - logger.info("Sending Local Node request to OpenSearch after creating index"); - LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler(); - transportService.sendRequest( - opensearchNode, - ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE, - new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE), - localNodeResponseHandler - ); - - logger.info("Received response from OpenSearch for ClusterState, ClusterSettings and LocalNode"); - } catch (Exception e) { - logger.info("Failed to send request to OpenSearch", e); - } - return indicesModuleResponse; } @@ -208,6 +171,9 @@ public void startTransportService(TransportService transportService) { // start transport service and accept incoming requests transportService.start(); transportService.acceptIncomingRequests(); + + // Extension Request is the first request for the transport communication. + // This request will initialize the extension and will be a part of OpenSearch bootstrap transportService.registerRequestHandler( ExtensionsOrchestrator.REQUEST_EXTENSION_ACTION_NAME, ThreadPool.Names.GENERIC, @@ -237,6 +203,54 @@ public void startTransportService(TransportService transportService) { } + // Extension can use this API to get ClusterState from OpenSearch + public void sendClusterStateRequest(TransportService transportService) { + logger.info("Sending Cluster State request to OpenSearch"); + ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler(); + try{ + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE), + clusterStateResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster State request to OpenSearch", e); + } + } + + // Extension can use this API to get ClusterSettings from OpenSearch + public void sendClusterSettingRequest(TransportService transportService) { + logger.info("Sending Cluster Settings request to OpenSearch"); + ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler(); + try{ + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS), + clusterSettingResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster Settings request to OpenSearch", e); + } + } + + // Extension can use this API to get LocalNode from OpenSearch + public void sendLocalNodeRequest(TransportService transportService) { + logger.info("Sending Local Node request to OpenSearch"); + LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler(); + try { + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE), + localNodeResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster Settings request to OpenSearch", e); + } + } + private Settings getSettings() { return settings; } diff --git a/src/test/java/org/opensearch/sdk/TestMainScript.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java similarity index 97% rename from src/test/java/org/opensearch/sdk/TestMainScript.java rename to src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index f52e3b92b..cb1b2510b 100644 --- a/src/test/java/org/opensearch/sdk/TestMainScript.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -21,7 +21,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.TransportService; -public class TestMainScript extends OpenSearchTestCase { +public class TestExtensionsRunner extends OpenSearchTestCase { private ExtensionsRunner extensionsRunner; private Settings settings;