diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index 55eec028..afbb2dac 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -57,9 +57,8 @@ import static org.opensearch.common.UUIDs.randomBase64UUID; public class ExtensionsRunner { - private ExtensionSettings extensionSettings = getExtensionSettings(); + private ExtensionSettings extensionSettings = readExtensionSettings(); private DiscoveryNode opensearchNode; - private TransportService transportService; public ExtensionsRunner() throws IOException {} @@ -72,25 +71,35 @@ public ExtensionsRunner() throws IOException {} private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { }; - private ExtensionSettings getExtensionSettings() throws IOException { + private ExtensionSettings readExtensionSettings() throws IOException { File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR); ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class); return extensionSettings; } + private void setOpensearchNode(DiscoveryNode opensearchNode) { + this.opensearchNode = opensearchNode; + } + + private DiscoveryNode getOpensearchNode() { + return opensearchNode; + } + PluginResponse handlePluginsRequest(PluginRequest pluginRequest) { logger.info("Registering Plugin Request received from OpenSearch"); PluginResponse pluginResponse = new PluginResponse("RealExtension"); opensearchNode = pluginRequest.getSourceNode(); + setOpensearchNode(opensearchNode); return pluginResponse; } - IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest) { + IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) { logger.info("Registering Indices Module Request received from OpenSearch"); IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true); // CreateComponent + DiscoveryNode opensearchNode = getOpensearchNode(); transportService.connectToNode(opensearchNode); try { @@ -177,7 +186,7 @@ public TransportService createTransportService(Settings settings) throws IOExcep final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport); // create transport service - transportService = new TransportService( + return new TransportService( settings, transport, threadPool, @@ -191,8 +200,6 @@ public TransportService createTransportService(Settings settings) throws IOExcep emptySet(), connectionManager ); - - return transportService; } // manager method for transport service @@ -216,7 +223,7 @@ public void startTransportService(TransportService transportService) { false, false, IndicesModuleRequest::new, - ((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request))) + ((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService))) ); transportService.registerRequestHandler( @@ -230,10 +237,6 @@ public void startTransportService(TransportService transportService) { } - private TransportService getTransportService() { - return transportService; - } - private Settings getSettings() { return settings; } @@ -250,9 +253,7 @@ public static void main(String[] args) throws IOException { // configure and retrieve transport service with settings Settings settings = extensionsRunner.getSettings(); - extensionsRunner.createTransportService(settings); - // extensionsRunner.setTransportService(); - TransportService transportService = extensionsRunner.getTransportService(); + TransportService transportService = extensionsRunner.createTransportService(settings); // start transport service and action listener extensionsRunner.startTransportService(transportService);