Skip to content

Commit

Permalink
New Design with createComponent extension point (#8)
Browse files Browse the repository at this point in the history
* Draft createComponent extension point

Signed-off-by: Owais Kazi <[email protected]>

* Resolved issue of transport stopped

Signed-off-by: Owais Kazi <[email protected]>

* Decoupled response handler to separate classes

Signed-off-by: Owais Kazi <[email protected]>

* Addressed PR Comments

Signed-off-by: Owais Kazi <[email protected]>

* Removed CountDown Latch

Signed-off-by: Owais Kazi <[email protected]>

* Refactoring and PR comments

Signed-off-by: Owais Kazi <[email protected]>

* Addressed PR Comments

Signed-off-by: Owais Kazi <[email protected]>

* Abstracted send request to APIs

Signed-off-by: Owais Kazi <[email protected]>

* Added comment for OpenSearchNode

Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 authored Jun 29, 2022
1 parent e38ac94 commit c825b83
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opensearch.sdk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

public class ClusterSettingsResponseHandler implements TransportResponseHandler<ClusterSettingsResponse> {
private static final Logger logger = LogManager.getLogger(ClusterSettingsResponseHandler.class);

@Override
public void handleResponse(ClusterSettingsResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("ClusterSettingRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public ClusterSettingsResponse read(StreamInput in) throws IOException {
return new ClusterSettingsResponse(in);
}
}
35 changes: 35 additions & 0 deletions src/main/java/org/opensearch/sdk/ClusterStateResponseHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opensearch.sdk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

public class ClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class);

@Override
public void handleResponse(ClusterStateResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("ExtensionClusterStateRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public ClusterStateResponse read(StreamInput in) throws IOException {
return new ClusterStateResponse(in);
}
}
118 changes: 88 additions & 30 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.PluginRequest;
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleNameResponse;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -38,12 +39,11 @@
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportInterceptor;

import org.opensearch.sdk.netty4.Netty4Transport;
import org.opensearch.sdk.netty4.SharedGroupFactory;

import org.opensearch.transport.TransportInterceptor;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -56,52 +56,56 @@
import static org.opensearch.common.UUIDs.randomBase64UUID;

public class ExtensionsRunner {
private ExtensionSettings extensionSettings = readExtensionSettings();
private DiscoveryNode opensearchNode;

public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";

private static ExtensionSettings extensionSettings = null;

static {
try {
extensionSettings = getExtensionSettings();
} catch (IOException e) {
e.printStackTrace();
}
}
public ExtensionsRunner() throws IOException {}

private static final Settings settings = Settings.builder()
private final Settings settings = Settings.builder()
.put("node.name", extensionSettings.getExtensionname())
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostaddress())
.put(TransportSettings.PORT.getKey(), extensionSettings.getHostport())
.build();
private static final Logger logger = LogManager.getLogger(ExtensionsRunner.class);
public static final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
private final Logger logger = LogManager.getLogger(ExtensionsRunner.class);
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};

public ExtensionsRunner() throws IOException {}

public static ExtensionSettings getExtensionSettings() throws IOException {
private ExtensionSettings readExtensionSettings() throws IOException {
File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class);
return extensionSettings;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

private DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

PluginResponse handlePluginsRequest(PluginRequest pluginRequest) {
logger.info("Handling Plugins Request");
logger.info("Registering Plugin Request received from OpenSearch");
PluginResponse pluginResponse = new PluginResponse("RealExtension");
opensearchNode = pluginRequest.getSourceNode();
setOpensearchNode(opensearchNode);
return pluginResponse;
}

IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest) {
logger.info("Indices Module Request");
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);

// handlePluginsRequest will set the opensearchNode while bootstraping of OpenSearch
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);
return indicesModuleResponse;
}

// Works as beforeIndexRemoved
IndicesModuleNameResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
logger.info("Indices Module Name Request");
logger.info("Registering Indices Module Name Request received from OpenSearch");
IndicesModuleNameResponse indicesModuleNameResponse = new IndicesModuleNameResponse(true);
return indicesModuleNameResponse;
}
Expand Down Expand Up @@ -140,7 +144,7 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo
return transport;
}

public TransportService getTransportService(Settings settings) throws IOException {
public TransportService createTransportService(Settings settings) throws IOException {

ThreadPool threadPool = new ThreadPool(settings);

Expand All @@ -149,7 +153,7 @@ public TransportService getTransportService(Settings settings) throws IOExceptio
final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);

// create transport service
final TransportService transportService = new TransportService(
return new TransportService(
settings,
transport,
threadPool,
Expand All @@ -163,8 +167,6 @@ public TransportService getTransportService(Settings settings) throws IOExceptio
emptySet(),
connectionManager
);

return transportService;
}

// manager method for transport service
Expand All @@ -173,8 +175,11 @@ public void startTransportService(TransportService transportService) {
// start transport service and accept incoming requests
transportService.start();
transportService.acceptIncomingRequests();

// Extension Request is the first request for the transport communication.
// This request will initialize the extension and will be a part of OpenSearch bootstrap
transportService.registerRequestHandler(
REQUEST_EXTENSION_ACTION_NAME,
ExtensionsOrchestrator.REQUEST_EXTENSION_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
Expand All @@ -188,7 +193,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request)))
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService)))

);
transportService.registerRequestHandler(
Expand All @@ -202,6 +207,58 @@ public void startTransportService(TransportService transportService) {

}

// Extension can use this API to get ClusterState from OpenSearch
public void sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}
}

// Extension can use this API to get ClusterSettings from OpenSearch
public void sendClusterSettingRequest(TransportService transportService) {
logger.info("Sending Cluster Settings request to OpenSearch");
ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}

// Extension can use this API to get LocalNode from OpenSearch
public void sendLocalNodeRequest(TransportService transportService) {
logger.info("Sending Local Node request to OpenSearch");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}

private Settings getSettings() {
return settings;
}

// manager method for action listener
public void startActionListener(int timeout) {
final ActionListener actionListener = new ActionListener();
Expand All @@ -213,7 +270,8 @@ public static void main(String[] args) throws IOException {
ExtensionsRunner extensionsRunner = new ExtensionsRunner();

// configure and retrieve transport service with settings
TransportService transportService = extensionsRunner.getTransportService(settings);
Settings settings = extensionsRunner.getSettings();
TransportService transportService = extensionsRunner.createTransportService(settings);

// start transport service and action listener
extensionsRunner.startTransportService(transportService);
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/org/opensearch/sdk/LocalNodeResponseHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opensearch.sdk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.LocalNodeResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

public class LocalNodeResponseHandler implements TransportResponseHandler<LocalNodeResponse> {
private static final Logger logger = LogManager.getLogger(LocalNodeResponseHandler.class);

@Override
public void handleResponse(LocalNodeResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("LocalNodeRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public LocalNodeResponse read(StreamInput in) throws IOException {
return new LocalNodeResponse(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.TransportService;

public class TestMainScript extends OpenSearchTestCase {
public class TestExtensionsRunner extends OpenSearchTestCase {

private ExtensionsRunner extensionsRunner;
private Settings settings;
Expand All @@ -36,15 +36,15 @@ public void setUp() throws IOException {
// test ExtensionsRunner getTransportService return type is transport service
@Test
public void testGetTransportService() throws IOException {
assert (extensionsRunner.getTransportService(settings) instanceof TransportService);
assert (extensionsRunner.createTransportService(settings) instanceof TransportService);
}

// test manager method invokes start on transport service
@Test
public void testTransportServiceStarted() throws IOException {

// retrieve and mock transport service
TransportService transportService = Mockito.spy(extensionsRunner.getTransportService(settings));
TransportService transportService = Mockito.spy(extensionsRunner.createTransportService(settings));

// verify mocked object interaction in manager method
extensionsRunner.startTransportService(transportService);
Expand All @@ -56,7 +56,7 @@ public void testTransportServiceStarted() throws IOException {
public void testTransportServiceAcceptedIncomingRequests() throws IOException {

// retrieve and mock transport service
TransportService transportService = Mockito.spy(extensionsRunner.getTransportService(settings));
TransportService transportService = Mockito.spy(extensionsRunner.createTransportService(settings));

// verify mocked object interaction in manager method
extensionsRunner.startTransportService(transportService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void startTransportandClient(Settings settings, Thread client) throws IO

// retrieve transport service
ExtensionsRunner extensionsRunner = new ExtensionsRunner();
TransportService transportService = extensionsRunner.getTransportService(settings);
TransportService transportService = extensionsRunner.createTransportService(settings);

// start transport service
extensionsRunner.startTransportService(transportService);
Expand Down

0 comments on commit c825b83

Please sign in to comment.