19
19
import org .opensearch .cluster .ClusterModule ;
20
20
import org .opensearch .cluster .ExtensionRequest ;
21
21
import org .opensearch .cluster .node .DiscoveryNode ;
22
+ import org .opensearch .common .concurrent .CompletableContext ;
22
23
import org .opensearch .common .io .stream .NamedWriteableRegistry ;
23
24
import org .opensearch .common .network .NetworkModule ;
24
25
import org .opensearch .common .network .NetworkService ;
49
50
import java .io .IOException ;
50
51
import java .util .Collections ;
51
52
import java .util .List ;
53
+ import java .util .concurrent .CompletableFuture ;
52
54
import java .util .concurrent .CountDownLatch ;
53
55
import java .util .concurrent .TimeUnit ;
54
56
import java .util .function .Function ;
59
61
import static org .opensearch .common .UUIDs .randomBase64UUID ;
60
62
61
63
public class ExtensionsRunner {
62
-
63
- public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions" ;
64
-
65
64
private static ExtensionSettings extensionSettings = null ;
66
-
67
65
private DiscoveryNode opensearchNode = null ;
66
+ TransportService transportService = null ;
68
67
69
68
static {
70
69
try {
@@ -93,35 +92,37 @@ public static ExtensionSettings getExtensionSettings() throws IOException {
93
92
}
94
93
95
94
PluginResponse handlePluginsRequest (PluginRequest pluginRequest ) {
96
- logger .info ("Handling Plugins Request" );
95
+ logger .info ("Registering Plugin Request received from OpenSearch " );
97
96
PluginResponse pluginResponse = new PluginResponse ("RealExtension" );
98
97
opensearchNode = pluginRequest .getSourceNode ();
99
98
return pluginResponse ;
100
99
}
101
100
102
- IndicesModuleResponse handleIndicesModuleRequest (IndicesModuleRequest indicesModuleRequest , TransportService transportService ) {
103
- logger .info ("Indices Module Request" );
101
+ IndicesModuleResponse handleIndicesModuleRequest (IndicesModuleRequest indicesModuleRequest ) {
102
+ logger .info ("Registering Indices Module Request received from OpenSearch " );
104
103
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse (true , true , true );
105
104
106
105
// CreateComponent
107
106
transportService .connectToNode (opensearchNode );
108
107
final CountDownLatch inProgressLatch = new CountDownLatch (1 );
109
108
try {
110
- logger .info ("Sending request to opensearch " );
111
- ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler ();
109
+ logger .info ("Sending Cluster State request to OpenSearch after creating index " );
110
+ ExtensionClusterStateResponseHandler clusterStateResponseHandler = new ExtensionClusterStateResponseHandler ();
112
111
transportService .sendRequest (
113
112
opensearchNode ,
114
113
ExtensionsOrchestrator .REQUEST_EXTENSION_CLUSTER_STATE ,
115
114
new ExtensionRequest (ExtensionsOrchestrator .RequestType .REQUEST_EXTENSION_CLUSTER_STATE ),
116
115
clusterStateResponseHandler
117
116
);
117
+ logger .info ("Sending Cluster Settings request to OpenSearch after creating index" );
118
118
ClusterSettingResponseHandler clusterSettingResponseHandler = new ClusterSettingResponseHandler ();
119
119
transportService .sendRequest (
120
120
opensearchNode ,
121
121
ExtensionsOrchestrator .REQUEST_EXTENSION_CLUSTER_SETTINGS ,
122
122
new ExtensionRequest (ExtensionsOrchestrator .RequestType .REQUEST_EXTENSION_CLUSTER_SETTINGS ),
123
123
clusterSettingResponseHandler
124
124
);
125
+ logger .info ("Sending Local Node request to OpenSearch after creating index" );
125
126
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler ();
126
127
transportService .sendRequest (
127
128
opensearchNode ,
@@ -130,7 +131,7 @@ IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesMod
130
131
localNodeResponseHandler
131
132
);
132
133
inProgressLatch .await (1 , TimeUnit .SECONDS );
133
- logger .info ("Received response from OpenSearch" );
134
+ logger .info ("Received response from OpenSearch for ClusterState, ClusterSettings and LocalNode " );
134
135
} catch (Exception e ) {
135
136
e .printStackTrace ();
136
137
logger .error (e .toString ());
@@ -141,7 +142,7 @@ IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesMod
141
142
142
143
// Works as beforeIndexRemoved
143
144
IndicesModuleNameResponse handleIndicesModuleNameRequest (IndicesModuleRequest indicesModuleRequest ) {
144
- logger .info ("Indices Module Name Request" );
145
+ logger .info ("Registering Indices Module Name Request received from OpenSearch " );
145
146
IndicesModuleNameResponse indicesModuleNameResponse = new IndicesModuleNameResponse (true );
146
147
return indicesModuleNameResponse ;
147
148
}
@@ -180,7 +181,7 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo
180
181
return transport ;
181
182
}
182
183
183
- public TransportService getTransportService (Settings settings ) throws IOException {
184
+ public TransportService createTransportService (Settings settings ) throws IOException {
184
185
185
186
ThreadPool threadPool = new ThreadPool (settings );
186
187
@@ -214,7 +215,7 @@ public void startTransportService(TransportService transportService) {
214
215
transportService .start ();
215
216
transportService .acceptIncomingRequests ();
216
217
transportService .registerRequestHandler (
217
- REQUEST_EXTENSION_ACTION_NAME ,
218
+ ExtensionsOrchestrator . REQUEST_EXTENSION_ACTION_NAME ,
218
219
ThreadPool .Names .GENERIC ,
219
220
false ,
220
221
false ,
@@ -228,7 +229,7 @@ public void startTransportService(TransportService transportService) {
228
229
false ,
229
230
false ,
230
231
IndicesModuleRequest ::new ,
231
- ((request , channel , task ) -> channel .sendResponse (handleIndicesModuleRequest (request , transportService )))
232
+ ((request , channel , task ) -> channel .sendResponse (handleIndicesModuleRequest (request )))
232
233
233
234
);
234
235
transportService .registerRequestHandler (
@@ -242,6 +243,10 @@ public void startTransportService(TransportService transportService) {
242
243
243
244
}
244
245
246
+ public void setTransportService (TransportService transportService ) {
247
+ this .transportService = transportService ;
248
+ }
249
+
245
250
// manager method for action listener
246
251
public void startActionListener (int timeout ) {
247
252
final ActionListener actionListener = new ActionListener ();
@@ -253,7 +258,8 @@ public static void main(String[] args) throws IOException {
253
258
ExtensionsRunner extensionsRunner = new ExtensionsRunner ();
254
259
255
260
// configure and retrieve transport service with settings
256
- TransportService transportService = extensionsRunner .getTransportService (settings );
261
+ TransportService transportService = extensionsRunner .createTransportService (settings );
262
+ extensionsRunner .setTransportService (transportService );
257
263
258
264
// start transport service and action listener
259
265
extensionsRunner .startTransportService (transportService );
0 commit comments