Skip to content

Commit 76bbc9f

Browse files
authored
[Feature/extensions] Integrated CreateComponent extensionPoint (#3265)
* Draft createComponent extensionPoint Signed-off-by: Owais Kazi <[email protected]> * Integrated cluster state for createComponent Signed-off-by: Owais Kazi <[email protected]> * Decoupled extension points design Signed-off-by: Owais Kazi <[email protected]> * Changed ClusterServiceRequest to generic ExtensionRequest Signed-off-by: Owais Kazi <[email protected]> * PR comments Signed-off-by: Owais Kazi <[email protected]> * Using ClusterStateResponse Signed-off-by: Owais Kazi <[email protected]> * Rebased Signed-off-by: Owais Kazi <[email protected]>
1 parent 7625fce commit 76bbc9f

File tree

6 files changed

+265
-3
lines changed

6 files changed

+265
-3
lines changed

server/src/main/java/org/opensearch/action/admin/cluster/state/ClusterStateResponse.java

+5
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public void writeTo(StreamOutput out) throws IOException {
104104
out.writeBoolean(waitForTimedOut);
105105
}
106106

107+
@Override
108+
public String toString() {
109+
return "ClusterStateResponse{" + "clusterState=" + clusterState + '}';
110+
}
111+
107112
@Override
108113
public boolean equals(Object o) {
109114
if (this == o) return true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster;
10+
11+
import org.opensearch.cluster.service.ClusterService;
12+
import org.opensearch.common.io.stream.StreamInput;
13+
import org.opensearch.common.io.stream.StreamOutput;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.transport.TransportResponse;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* PluginSettings Response for Extensibility
22+
*
23+
* @opensearch.internal
24+
*/
25+
public class ClusterSettingsResponse extends TransportResponse {
26+
private final Settings clusterSettings;
27+
28+
public ClusterSettingsResponse(ClusterService clusterService) {
29+
this.clusterSettings = clusterService.getSettings();
30+
}
31+
32+
public ClusterSettingsResponse(StreamInput in) throws IOException {
33+
super(in);
34+
this.clusterSettings = Settings.readSettingsFromStream(in);
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
Settings.writeSettingsToStream(clusterSettings, out);
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "ClusterSettingsResponse{" + "clusterSettings=" + clusterSettings + '}';
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
ClusterSettingsResponse that = (ClusterSettingsResponse) o;
52+
return Objects.equals(clusterSettings, that.clusterSettings);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(clusterSettings);
58+
}
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.common.io.stream.StreamInput;
14+
import org.opensearch.common.io.stream.StreamOutput;
15+
import org.opensearch.transport.TransportResponse;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* LocalNode Response for Extensibility
22+
*
23+
* @opensearch.internal
24+
*/
25+
public class LocalNodeResponse extends TransportResponse {
26+
private final DiscoveryNode localNode;
27+
28+
public LocalNodeResponse(ClusterService clusterService) {
29+
this.localNode = clusterService.localNode();
30+
}
31+
32+
public LocalNodeResponse(StreamInput in) throws IOException {
33+
super(in);
34+
this.localNode = new DiscoveryNode(in);
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
this.localNode.writeTo(out);
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "LocalNodeResponse{" + "localNode=" + localNode + '}';
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
LocalNodeResponse that = (LocalNodeResponse) o;
52+
return Objects.equals(localNode, that.localNode);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(localNode);
58+
}
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.extensions;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.io.stream.StreamInput;
14+
import org.opensearch.common.io.stream.StreamOutput;
15+
import org.opensearch.transport.TransportRequest;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* CLusterService Request for Extensibility
22+
*
23+
* @opensearch.internal
24+
*/
25+
public class ExtensionRequest extends TransportRequest {
26+
private static final Logger logger = LogManager.getLogger(ExtensionRequest.class);
27+
private ExtensionsOrchestrator.RequestType requestType;
28+
29+
public ExtensionRequest(ExtensionsOrchestrator.RequestType requestType) {
30+
this.requestType = requestType;
31+
}
32+
33+
public ExtensionRequest(StreamInput in) throws IOException {
34+
super(in);
35+
this.requestType = in.readEnum(ExtensionsOrchestrator.RequestType.class);
36+
}
37+
38+
@Override
39+
public void writeTo(StreamOutput out) throws IOException {
40+
super.writeTo(out);
41+
out.writeEnum(requestType);
42+
}
43+
44+
public ExtensionsOrchestrator.RequestType getRequestType() {
45+
return this.requestType;
46+
}
47+
48+
public String toString() {
49+
return "ExtensionRequest{" + "requestType=" + requestType + '}';
50+
}
51+
52+
@Override
53+
public boolean equals(Object o) {
54+
55+
if (this == o) return true;
56+
if (o == null || getClass() != o.getClass()) return false;
57+
ExtensionRequest that = (ExtensionRequest) o;
58+
return Objects.equals(requestType, that.requestType);
59+
}
60+
61+
@Override
62+
public int hashCode() {
63+
return Objects.hash(requestType);
64+
}
65+
66+
}

server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java

+72-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
import org.apache.logging.log4j.message.ParameterizedMessage;
3030
import org.opensearch.Version;
3131
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
32+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
33+
import org.opensearch.cluster.*;
3234
import org.opensearch.cluster.node.DiscoveryNode;
35+
import org.opensearch.cluster.service.ClusterService;
3336
import org.opensearch.common.io.FileSystemUtils;
3437
import org.opensearch.common.io.stream.StreamInput;
3538
import org.opensearch.common.settings.Settings;
@@ -49,6 +52,7 @@
4952
import org.opensearch.plugins.PluginsService;
5053
import org.opensearch.threadpool.ThreadPool;
5154
import org.opensearch.transport.TransportException;
55+
import org.opensearch.transport.TransportResponse;
5256
import org.opensearch.transport.TransportResponseHandler;
5357
import org.opensearch.transport.TransportService;
5458

@@ -61,20 +65,39 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
6165
public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";
6266
public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions";
6367
public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name";
68+
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
69+
public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode";
70+
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";
6471

6572
private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);
73+
74+
/**
75+
* Enum for Extension Requests
76+
*
77+
* @opensearch.internal
78+
*/
79+
public static enum RequestType {
80+
REQUEST_EXTENSION_CLUSTER_STATE,
81+
REQUEST_EXTENSION_LOCAL_NODE,
82+
REQUEST_EXTENSION_CLUSTER_SETTINGS,
83+
CREATE_COMPONENT,
84+
ON_INDEX_MODULE,
85+
GET_SETTINGS
86+
};
87+
6688
private final Path extensionsPath;
6789
final Set<DiscoveryExtension> extensionsSet;
6890
Set<DiscoveryExtension> extensionsInitializedSet;
6991
TransportService transportService;
92+
ClusterService clusterService;
7093

7194
public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
7295
logger.info("ExtensionsOrchestrator initialized");
7396
this.extensionsPath = extensionsPath;
7497
this.transportService = null;
7598
this.extensionsSet = new HashSet<DiscoveryExtension>();
7699
this.extensionsInitializedSet = new HashSet<DiscoveryExtension>();
77-
100+
this.clusterService = null;
78101
/*
79102
* Now Discover extensions
80103
*/
@@ -86,6 +109,34 @@ public void setTransportService(TransportService transportService) {
86109
this.transportService = transportService;
87110
}
88111

112+
public void setClusterService(ClusterService clusterService) {
113+
this.clusterService = clusterService;
114+
transportService.registerRequestHandler(
115+
REQUEST_EXTENSION_CLUSTER_STATE,
116+
ThreadPool.Names.GENERIC,
117+
false,
118+
false,
119+
ExtensionRequest::new,
120+
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
121+
);
122+
transportService.registerRequestHandler(
123+
REQUEST_EXTENSION_LOCAL_NODE,
124+
ThreadPool.Names.GENERIC,
125+
false,
126+
false,
127+
ExtensionRequest::new,
128+
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
129+
);
130+
transportService.registerRequestHandler(
131+
REQUEST_EXTENSION_CLUSTER_SETTINGS,
132+
ThreadPool.Names.GENERIC,
133+
false,
134+
false,
135+
ExtensionRequest::new,
136+
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
137+
);
138+
}
139+
89140
@Override
90141
public PluginsAndModules info() {
91142
return null;
@@ -187,14 +238,33 @@ public String executor() {
187238
transportService.sendRequest(
188239
extensionNode,
189240
REQUEST_EXTENSION_ACTION_NAME,
190-
new PluginRequest(extensionNode, new ArrayList<DiscoveryExtension>(extensionsSet)),
241+
new PluginRequest(transportService.getLocalNode(), new ArrayList<DiscoveryExtension>(extensionsSet)),
191242
pluginResponseHandler
192243
);
193244
} catch (Exception e) {
194245
logger.error(e.toString());
195246
}
196247
}
197248

249+
TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) {
250+
// Read enum
251+
if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_STATE) {
252+
ClusterStateResponse clusterStateResponse = new ClusterStateResponse(
253+
clusterService.getClusterName(),
254+
clusterService.state(),
255+
false
256+
);
257+
return clusterStateResponse;
258+
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_LOCAL_NODE) {
259+
LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService);
260+
return localNodeResponse;
261+
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS) {
262+
ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService);
263+
return clusterSettingsResponse;
264+
}
265+
return null;
266+
}
267+
198268
public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
199269
for (DiscoveryNode extensionNode : extensionsSet) {
200270
onIndexModule(indexModule, extensionNode);

server/src/main/java/org/opensearch/node/Node.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,7 @@ protected Node(
758758
* This seems like a chicken and egg problem.
759759
*/
760760
this.extensionsOrchestrator.setTransportService(transportService);
761+
this.extensionsOrchestrator.setClusterService(clusterService);
761762
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
762763
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
763764
final SearchTransportService searchTransportService = new SearchTransportService(
@@ -1085,7 +1086,6 @@ public Node start() throws NodeValidationException {
10851086
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
10861087
: "transportService has a different local node than the factory provided";
10871088
injector.getInstance(PeerRecoverySourceService.class).start();
1088-
extensionsOrchestrator.extensionsInitialize();
10891089

10901090
// Load (and maybe upgrade) the metadata stored on disk
10911091
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
@@ -1130,6 +1130,7 @@ public Node start() throws NodeValidationException {
11301130
assert clusterService.localNode().equals(localNodeFactory.getNode())
11311131
: "clusterService has a different local node than the factory provided";
11321132
transportService.acceptIncomingRequests();
1133+
extensionsOrchestrator.extensionsInitialize();
11331134
discovery.startInitialJoin();
11341135
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings());
11351136
configureNodeAndClusterIdStateListener(clusterService);

0 commit comments

Comments
 (0)