diff --git a/src/main/java/org/opensearch/sdk/ClusterSettingsResponseHandler.java b/src/main/java/org/opensearch/sdk/ClusterSettingsResponseHandler.java new file mode 100644 index 00000000..79c0efdb --- /dev/null +++ b/src/main/java/org/opensearch/sdk/ClusterSettingsResponseHandler.java @@ -0,0 +1,35 @@ +package org.opensearch.sdk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterSettingsResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; + +public class ClusterSettingsResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(ClusterSettingsResponseHandler.class); + + @Override + public void handleResponse(ClusterSettingsResponse response) { + logger.info("received {}", response); + } + + @Override + public void handleException(TransportException exp) { + logger.info("ClusterSettingRequest failed", exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public ClusterSettingsResponse read(StreamInput in) throws IOException { + return new ClusterSettingsResponse(in); + } +} diff --git a/src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java b/src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java new file mode 100644 index 00000000..b6b5e9b3 --- /dev/null +++ b/src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java @@ -0,0 +1,35 @@ +package org.opensearch.sdk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; + +public class ClusterStateResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class); + + @Override + public void handleResponse(ClusterStateResponse response) { + logger.info("received {}", response); + } + + @Override + public void handleException(TransportException exp) { + logger.info("ExtensionClusterStateRequest failed", exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public ClusterStateResponse read(StreamInput in) throws IOException { + return new ClusterStateResponse(in); + } +} diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index 5de96279..b939c5f3 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -25,6 +25,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.discovery.PluginRequest; import org.opensearch.discovery.PluginResponse; +import org.opensearch.extensions.ExtensionRequest; import org.opensearch.extensions.ExtensionsOrchestrator; import org.opensearch.index.IndicesModuleNameResponse; import org.opensearch.index.IndicesModuleRequest; @@ -38,12 +39,11 @@ import org.opensearch.transport.ConnectionManager; import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; +import org.opensearch.transport.TransportInterceptor; import org.opensearch.sdk.netty4.Netty4Transport; import org.opensearch.sdk.netty4.SharedGroupFactory; -import org.opensearch.transport.TransportInterceptor; - import java.io.File; import java.io.IOException; import java.util.Collections; @@ -56,52 +56,56 @@ import static org.opensearch.common.UUIDs.randomBase64UUID; public class ExtensionsRunner { + private ExtensionSettings extensionSettings = readExtensionSettings(); + private DiscoveryNode opensearchNode; - public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions"; - - private static ExtensionSettings extensionSettings = null; - - static { - try { - extensionSettings = getExtensionSettings(); - } catch (IOException e) { - e.printStackTrace(); - } - } + public ExtensionsRunner() throws IOException {} - private static final Settings settings = Settings.builder() + private final Settings settings = Settings.builder() .put("node.name", extensionSettings.getExtensionname()) .put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostaddress()) .put(TransportSettings.PORT.getKey(), extensionSettings.getHostport()) .build(); - private static final Logger logger = LogManager.getLogger(ExtensionsRunner.class); - public static final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { + private final Logger logger = LogManager.getLogger(ExtensionsRunner.class); + private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { }; - public ExtensionsRunner() throws IOException {} - - public static ExtensionSettings getExtensionSettings() throws IOException { + private ExtensionSettings readExtensionSettings() throws IOException { File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR); ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class); return extensionSettings; } + private void setOpensearchNode(DiscoveryNode opensearchNode) { + this.opensearchNode = opensearchNode; + } + + private DiscoveryNode getOpensearchNode() { + return opensearchNode; + } + PluginResponse handlePluginsRequest(PluginRequest pluginRequest) { - logger.info("Handling Plugins Request"); + logger.info("Registering Plugin Request received from OpenSearch"); PluginResponse pluginResponse = new PluginResponse("RealExtension"); + opensearchNode = pluginRequest.getSourceNode(); + setOpensearchNode(opensearchNode); return pluginResponse; } - IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest) { - logger.info("Indices Module Request"); + IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) { + logger.info("Registering Indices Module Request received from OpenSearch"); IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true); + + // handlePluginsRequest will set the opensearchNode while bootstraping of OpenSearch + DiscoveryNode opensearchNode = getOpensearchNode(); + transportService.connectToNode(opensearchNode); return indicesModuleResponse; } // Works as beforeIndexRemoved IndicesModuleNameResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) { - logger.info("Indices Module Name Request"); + logger.info("Registering Indices Module Name Request received from OpenSearch"); IndicesModuleNameResponse indicesModuleNameResponse = new IndicesModuleNameResponse(true); return indicesModuleNameResponse; } @@ -140,7 +144,7 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo return transport; } - public TransportService getTransportService(Settings settings) throws IOException { + public TransportService createTransportService(Settings settings) throws IOException { ThreadPool threadPool = new ThreadPool(settings); @@ -149,7 +153,7 @@ public TransportService getTransportService(Settings settings) throws IOExceptio final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport); // create transport service - final TransportService transportService = new TransportService( + return new TransportService( settings, transport, threadPool, @@ -163,8 +167,6 @@ public TransportService getTransportService(Settings settings) throws IOExceptio emptySet(), connectionManager ); - - return transportService; } // manager method for transport service @@ -173,8 +175,11 @@ public void startTransportService(TransportService transportService) { // start transport service and accept incoming requests transportService.start(); transportService.acceptIncomingRequests(); + + // Extension Request is the first request for the transport communication. + // This request will initialize the extension and will be a part of OpenSearch bootstrap transportService.registerRequestHandler( - REQUEST_EXTENSION_ACTION_NAME, + ExtensionsOrchestrator.REQUEST_EXTENSION_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, @@ -188,7 +193,7 @@ public void startTransportService(TransportService transportService) { false, false, IndicesModuleRequest::new, - ((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request))) + ((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService))) ); transportService.registerRequestHandler( @@ -202,6 +207,58 @@ public void startTransportService(TransportService transportService) { } + // Extension can use this API to get ClusterState from OpenSearch + public void sendClusterStateRequest(TransportService transportService) { + logger.info("Sending Cluster State request to OpenSearch"); + ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler(); + try { + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE), + clusterStateResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster State request to OpenSearch", e); + } + } + + // Extension can use this API to get ClusterSettings from OpenSearch + public void sendClusterSettingRequest(TransportService transportService) { + logger.info("Sending Cluster Settings request to OpenSearch"); + ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler(); + try { + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS), + clusterSettingResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster Settings request to OpenSearch", e); + } + } + + // Extension can use this API to get LocalNode from OpenSearch + public void sendLocalNodeRequest(TransportService transportService) { + logger.info("Sending Local Node request to OpenSearch"); + LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler(); + try { + transportService.sendRequest( + opensearchNode, + ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE, + new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE), + localNodeResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send Cluster Settings request to OpenSearch", e); + } + } + + private Settings getSettings() { + return settings; + } + // manager method for action listener public void startActionListener(int timeout) { final ActionListener actionListener = new ActionListener(); @@ -213,7 +270,8 @@ public static void main(String[] args) throws IOException { ExtensionsRunner extensionsRunner = new ExtensionsRunner(); // configure and retrieve transport service with settings - TransportService transportService = extensionsRunner.getTransportService(settings); + Settings settings = extensionsRunner.getSettings(); + TransportService transportService = extensionsRunner.createTransportService(settings); // start transport service and action listener extensionsRunner.startTransportService(transportService); diff --git a/src/main/java/org/opensearch/sdk/LocalNodeResponseHandler.java b/src/main/java/org/opensearch/sdk/LocalNodeResponseHandler.java new file mode 100644 index 00000000..fad22041 --- /dev/null +++ b/src/main/java/org/opensearch/sdk/LocalNodeResponseHandler.java @@ -0,0 +1,35 @@ +package org.opensearch.sdk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.LocalNodeResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; + +public class LocalNodeResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(LocalNodeResponseHandler.class); + + @Override + public void handleResponse(LocalNodeResponse response) { + logger.info("received {}", response); + } + + @Override + public void handleException(TransportException exp) { + logger.info("LocalNodeRequest failed", exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public LocalNodeResponse read(StreamInput in) throws IOException { + return new LocalNodeResponse(in); + } +} diff --git a/src/test/java/org/opensearch/sdk/TestMainScript.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java similarity index 89% rename from src/test/java/org/opensearch/sdk/TestMainScript.java rename to src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index f1036ef5..cb1b2510 100644 --- a/src/test/java/org/opensearch/sdk/TestMainScript.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -21,7 +21,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.TransportService; -public class TestMainScript extends OpenSearchTestCase { +public class TestExtensionsRunner extends OpenSearchTestCase { private ExtensionsRunner extensionsRunner; private Settings settings; @@ -36,7 +36,7 @@ public void setUp() throws IOException { // test ExtensionsRunner getTransportService return type is transport service @Test public void testGetTransportService() throws IOException { - assert (extensionsRunner.getTransportService(settings) instanceof TransportService); + assert (extensionsRunner.createTransportService(settings) instanceof TransportService); } // test manager method invokes start on transport service @@ -44,7 +44,7 @@ public void testGetTransportService() throws IOException { public void testTransportServiceStarted() throws IOException { // retrieve and mock transport service - TransportService transportService = Mockito.spy(extensionsRunner.getTransportService(settings)); + TransportService transportService = Mockito.spy(extensionsRunner.createTransportService(settings)); // verify mocked object interaction in manager method extensionsRunner.startTransportService(transportService); @@ -56,7 +56,7 @@ public void testTransportServiceStarted() throws IOException { public void testTransportServiceAcceptedIncomingRequests() throws IOException { // retrieve and mock transport service - TransportService transportService = Mockito.spy(extensionsRunner.getTransportService(settings)); + TransportService transportService = Mockito.spy(extensionsRunner.createTransportService(settings)); // verify mocked object interaction in manager method extensionsRunner.startTransportService(transportService); diff --git a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java index f7f16286..6dbee1c2 100644 --- a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java +++ b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java @@ -145,7 +145,7 @@ private void startTransportandClient(Settings settings, Thread client) throws IO // retrieve transport service ExtensionsRunner extensionsRunner = new ExtensionsRunner(); - TransportService transportService = extensionsRunner.getTransportService(settings); + TransportService transportService = extensionsRunner.createTransportService(settings); // start transport service extensionsRunner.startTransportService(transportService);