Skip to content

Commit

Permalink
Abstracted send request to APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Jun 24, 2022
1 parent ebeda8a commit e74de57
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.cluster.ExtensionClusterStateResponse;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

public class ExtensionClusterStateResponseHandler implements TransportResponseHandler<ExtensionClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ExtensionClusterStateResponseHandler.class);
public class ClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class);

@Override
public void handleResponse(ExtensionClusterStateResponse response) {
public void handleResponse(ClusterStateResponse response) {
logger.info("received {}", response);
}

Expand All @@ -29,7 +29,7 @@ public String executor() {
}

@Override
public ExtensionClusterStateResponse read(StreamInput in) throws IOException {
return new ExtensionClusterStateResponse(in);
public ClusterStateResponse read(StreamInput in) throws IOException {
return new ClusterStateResponse(in);
}
}
92 changes: 53 additions & 39 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.ExtensionRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
Expand All @@ -26,6 +25,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.PluginRequest;
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleNameResponse;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -35,16 +35,11 @@
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.*;

import org.opensearch.sdk.netty4.Netty4Transport;
import org.opensearch.sdk.netty4.SharedGroupFactory;

import org.opensearch.transport.TransportInterceptor;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -101,38 +96,6 @@ IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesMod
// CreateComponent
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);

try {
logger.info("Sending Cluster State request to OpenSearch after creating index");
ExtensionClusterStateResponseHandler clusterStateResponseHandler = new ExtensionClusterStateResponseHandler();
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
logger.info("Sending Cluster Settings request to OpenSearch after creating index");
ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler();
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingResponseHandler
);
logger.info("Sending Local Node request to OpenSearch after creating index");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);

logger.info("Received response from OpenSearch for ClusterState, ClusterSettings and LocalNode");
} catch (Exception e) {
logger.info("Failed to send request to OpenSearch", e);
}

return indicesModuleResponse;
}

Expand Down Expand Up @@ -208,6 +171,9 @@ public void startTransportService(TransportService transportService) {
// start transport service and accept incoming requests
transportService.start();
transportService.acceptIncomingRequests();

// Extension Request is the first request for the transport communication.
// This request will initialize the extension and will be a part of OpenSearch bootstrap
transportService.registerRequestHandler(
ExtensionsOrchestrator.REQUEST_EXTENSION_ACTION_NAME,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -237,6 +203,54 @@ public void startTransportService(TransportService transportService) {

}

// Extension can use this API to get ClusterState from OpenSearch
public void sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try{
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}
}

// Extension can use this API to get ClusterSettings from OpenSearch
public void sendClusterSettingRequest(TransportService transportService) {
logger.info("Sending Cluster Settings request to OpenSearch");
ClusterSettingsResponseHandler clusterSettingResponseHandler = new ClusterSettingsResponseHandler();
try{
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}

// Extension can use this API to get LocalNode from OpenSearch
public void sendLocalNodeRequest(TransportService transportService) {
logger.info("Sending Local Node request to OpenSearch");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}

private Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.TransportService;

public class TestMainScript extends OpenSearchTestCase {
public class TestExtensionsRunner extends OpenSearchTestCase {

private ExtensionsRunner extensionsRunner;
private Settings settings;
Expand Down

0 comments on commit e74de57

Please sign in to comment.