@@ -51,9 +51,14 @@ public static void startThreadPool() {
5151 threadPool = new TestThreadPool (TransportServiceHandshakeTests .class .getSimpleName ());
5252 }
5353
54- private List <TransportService > transportServices = new ArrayList <>();
54+ private final List <TransportService > transportServices = new ArrayList <>();
5555
56- private NetworkHandle startServices (String nodeNameAndId , Settings settings , Version version ) {
56+ private TransportService startServices (
57+ String nodeNameAndId ,
58+ Settings settings ,
59+ Version version ,
60+ TransportInterceptor transportInterceptor
61+ ) {
5762 TcpTransport transport = new Netty4Transport (
5863 settings ,
5964 Version .CURRENT ,
@@ -64,7 +69,6 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver
6469 new NoneCircuitBreakerService (),
6570 new SharedGroupFactory (settings )
6671 );
67- final DisruptingTransportInterceptor transportInterceptor = new DisruptingTransportInterceptor ();
6872 TransportService transportService = new MockTransportService (
6973 settings ,
7074 transport ,
@@ -84,7 +88,7 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver
8488 transportService .start ();
8589 transportService .acceptIncomingRequests ();
8690 transportServices .add (transportService );
87- return new NetworkHandle ( transportService , transportService . getLocalNode (), transportInterceptor ) ;
91+ return transportService ;
8892 }
8993
9094 @ After
@@ -102,91 +106,103 @@ public static void terminateThreadPool() {
102106 threadPool = null ;
103107 }
104108
105- public void testConnectToNodeLight () throws IOException {
109+ public void testConnectToNodeLight () {
106110 Settings settings = Settings .builder ().put ("cluster.name" , "test" ).build ();
107111
108- NetworkHandle handleA = startServices ("TS_A" , settings , Version .CURRENT );
109- NetworkHandle handleB = startServices (
112+ TransportService transportServiceA = startServices ("TS_A" , settings , Version .CURRENT , TransportService . NOOP_TRANSPORT_INTERCEPTOR );
113+ TransportService transportServiceB = startServices (
110114 "TS_B" ,
111115 settings ,
112- VersionUtils .randomVersionBetween (random (), Version .CURRENT .minimumCompatibilityVersion (), Version .CURRENT )
116+ VersionUtils .randomVersionBetween (random (), Version .CURRENT .minimumCompatibilityVersion (), Version .CURRENT ),
117+ TransportService .NOOP_TRANSPORT_INTERCEPTOR
113118 );
114119 DiscoveryNode discoveryNode = new DiscoveryNode (
115120 "" ,
116- handleB . discoveryNode .getAddress (),
121+ transportServiceB . getLocalNode () .getAddress (),
117122 emptyMap (),
118123 emptySet (),
119124 Version .CURRENT .minimumCompatibilityVersion ()
120125 );
121126 try (
122127 Transport .Connection connection = AbstractSimpleTransportTestCase .openConnection (
123- handleA . transportService ,
128+ transportServiceA ,
124129 discoveryNode ,
125130 TestProfiles .LIGHT_PROFILE
126131 )
127132 ) {
128- DiscoveryNode connectedNode = PlainActionFuture .get (fut -> handleA . transportService .handshake (connection , timeout , fut ));
133+ DiscoveryNode connectedNode = PlainActionFuture .get (fut -> transportServiceA .handshake (connection , timeout , fut ));
129134 assertNotNull (connectedNode );
130135 // the name and version should be updated
131136 assertEquals (connectedNode .getName (), "TS_B" );
132- assertEquals (connectedNode .getVersion (), handleB . discoveryNode .getVersion ());
133- assertFalse (handleA . transportService .nodeConnected (discoveryNode ));
137+ assertEquals (connectedNode .getVersion (), transportServiceB . getLocalNode () .getVersion ());
138+ assertFalse (transportServiceA .nodeConnected (discoveryNode ));
134139 }
135140 }
136141
137142 public void testMismatchedClusterName () {
138143
139- NetworkHandle handleA = startServices ("TS_A" , Settings .builder ().put ("cluster.name" , "a" ).build (), Version .CURRENT );
140- NetworkHandle handleB = startServices ("TS_B" , Settings .builder ().put ("cluster.name" , "b" ).build (), Version .CURRENT );
144+ TransportService transportServiceA = startServices (
145+ "TS_A" ,
146+ Settings .builder ().put ("cluster.name" , "a" ).build (),
147+ Version .CURRENT ,
148+ TransportService .NOOP_TRANSPORT_INTERCEPTOR
149+ );
150+ TransportService transportServiceB = startServices (
151+ "TS_B" ,
152+ Settings .builder ().put ("cluster.name" , "b" ).build (),
153+ Version .CURRENT ,
154+ TransportService .NOOP_TRANSPORT_INTERCEPTOR
155+ );
141156 DiscoveryNode discoveryNode = new DiscoveryNode (
142157 "" ,
143- handleB . discoveryNode .getAddress (),
158+ transportServiceB . getLocalNode () .getAddress (),
144159 emptyMap (),
145160 emptySet (),
146161 Version .CURRENT .minimumCompatibilityVersion ()
147162 );
148163 IllegalStateException ex = expectThrows (IllegalStateException .class , () -> {
149164 try (
150165 Transport .Connection connection = AbstractSimpleTransportTestCase .openConnection (
151- handleA . transportService ,
166+ transportServiceA ,
152167 discoveryNode ,
153168 TestProfiles .LIGHT_PROFILE
154169 )
155170 ) {
156- PlainActionFuture .get (fut -> handleA . transportService .handshake (connection , timeout , fut .map (x -> null )));
171+ PlainActionFuture .get (fut -> transportServiceA .handshake (connection , timeout , fut .map (x -> null )));
157172 }
158173 });
159174 assertThat (
160175 ex .getMessage (),
161176 containsString ("handshake with [" + discoveryNode + "] failed: remote cluster name [b] does not match local cluster name [a]" )
162177 );
163- assertFalse (handleA . transportService .nodeConnected (discoveryNode ));
178+ assertFalse (transportServiceA .nodeConnected (discoveryNode ));
164179 }
165180
166181 public void testIncompatibleVersions () {
167182 Settings settings = Settings .builder ().put ("cluster.name" , "test" ).build ();
168- NetworkHandle handleA = startServices ("TS_A" , settings , Version .CURRENT );
169- NetworkHandle handleB = startServices (
183+ TransportService transportServiceA = startServices ("TS_A" , settings , Version .CURRENT , TransportService . NOOP_TRANSPORT_INTERCEPTOR );
184+ TransportService transportServiceB = startServices (
170185 "TS_B" ,
171186 settings ,
172- VersionUtils .getPreviousVersion (Version .CURRENT .minimumCompatibilityVersion ())
187+ VersionUtils .getPreviousVersion (Version .CURRENT .minimumCompatibilityVersion ()),
188+ TransportService .NOOP_TRANSPORT_INTERCEPTOR
173189 );
174190 DiscoveryNode discoveryNode = new DiscoveryNode (
175191 "" ,
176- handleB . discoveryNode .getAddress (),
192+ transportServiceB . getLocalNode () .getAddress (),
177193 emptyMap (),
178194 emptySet (),
179195 Version .CURRENT .minimumCompatibilityVersion ()
180196 );
181197 IllegalStateException ex = expectThrows (IllegalStateException .class , () -> {
182198 try (
183199 Transport .Connection connection = AbstractSimpleTransportTestCase .openConnection (
184- handleA . transportService ,
200+ transportServiceA ,
185201 discoveryNode ,
186202 TestProfiles .LIGHT_PROFILE
187203 )
188204 ) {
189- PlainActionFuture .get (fut -> handleA . transportService .handshake (connection , timeout , fut .map (x -> null )));
205+ PlainActionFuture .get (fut -> transportServiceA .handshake (connection , timeout , fut .map (x -> null )));
190206 }
191207 });
192208 assertThat (
@@ -195,91 +211,89 @@ public void testIncompatibleVersions() {
195211 "handshake with ["
196212 + discoveryNode
197213 + "] failed: remote node version ["
198- + handleB . discoveryNode .getVersion ()
214+ + transportServiceB . getLocalNode () .getVersion ()
199215 + "] is incompatible with local node version ["
200216 + Version .CURRENT
201217 + "]"
202218 )
203219 );
204- assertFalse (handleA . transportService .nodeConnected (discoveryNode ));
220+ assertFalse (transportServiceA .nodeConnected (discoveryNode ));
205221 }
206222
207223 public void testNodeConnectWithDifferentNodeId () {
208224 Settings settings = Settings .builder ().put ("cluster.name" , "test" ).build ();
209- NetworkHandle handleA = startServices ("TS_A" , settings , Version .CURRENT );
210- NetworkHandle handleB = startServices ("TS_B" , settings , Version .CURRENT );
225+ TransportService transportServiceA = startServices ("TS_A" , settings , Version .CURRENT , TransportService . NOOP_TRANSPORT_INTERCEPTOR );
226+ TransportService transportServiceB = startServices ("TS_B" , settings , Version .CURRENT , TransportService . NOOP_TRANSPORT_INTERCEPTOR );
211227 DiscoveryNode discoveryNode = new DiscoveryNode (
212228 randomAlphaOfLength (10 ),
213- handleB . discoveryNode .getAddress (),
229+ transportServiceB . getLocalNode () .getAddress (),
214230 emptyMap (),
215231 emptySet (),
216- handleB . discoveryNode .getVersion ()
232+ transportServiceB . getLocalNode () .getVersion ()
217233 );
218234 ConnectTransportException ex = expectThrows (
219235 ConnectTransportException .class ,
220- () -> AbstractSimpleTransportTestCase .connectToNode (handleA . transportService , discoveryNode , TestProfiles .LIGHT_PROFILE )
236+ () -> AbstractSimpleTransportTestCase .connectToNode (transportServiceA , discoveryNode , TestProfiles .LIGHT_PROFILE )
221237 );
222238 assertThat (ex .getMessage (), containsString ("unexpected remote node" ));
223- assertFalse (handleA . transportService .nodeConnected (discoveryNode ));
239+ assertFalse (transportServiceA .nodeConnected (discoveryNode ));
224240 }
225241
226242 public void testRejectsMismatchedBuildHash () {
243+ final DisruptingTransportInterceptor transportInterceptorA = new DisruptingTransportInterceptor ();
244+ final DisruptingTransportInterceptor transportInterceptorB = new DisruptingTransportInterceptor ();
245+ transportInterceptorA .setModifyBuildHash (true );
246+ transportInterceptorB .setModifyBuildHash (true );
227247 final Settings settings = Settings .builder ()
228248 .put ("cluster.name" , "a" )
229249 .put (IGNORE_DESERIALIZATION_ERRORS_SETTING .getKey (), true ) // suppress assertions to test production error-handling
230250 .build ();
231- final NetworkHandle handleA = startServices ("TS_A" , settings , Version .CURRENT );
232- final NetworkHandle handleB = startServices ("TS_B" , settings , Version .CURRENT );
251+ final TransportService transportServiceA = startServices ("TS_A" , settings , Version .CURRENT , transportInterceptorA );
252+ final TransportService transportServiceB = startServices ("TS_B" , settings , Version .CURRENT , transportInterceptorB );
233253 final DiscoveryNode discoveryNode = new DiscoveryNode (
234254 "" ,
235- handleB . discoveryNode .getAddress (),
255+ transportServiceB . getLocalNode () .getAddress (),
236256 emptyMap (),
237257 emptySet (),
238258 Version .CURRENT .minimumCompatibilityVersion ()
239259 );
240- handleA .transportInterceptor .setModifyBuildHash (true );
241- handleB .transportInterceptor .setModifyBuildHash (true );
242260 TransportSerializationException ex = expectThrows (TransportSerializationException .class , () -> {
243261 try (
244262 Transport .Connection connection = AbstractSimpleTransportTestCase .openConnection (
245- handleA . transportService ,
263+ transportServiceA ,
246264 discoveryNode ,
247265 TestProfiles .LIGHT_PROFILE
248266 )
249267 ) {
250- PlainActionFuture .get (fut -> handleA . transportService .handshake (connection , timeout , fut .map (x -> null )));
268+ PlainActionFuture .get (fut -> transportServiceA .handshake (connection , timeout , fut .map (x -> null )));
251269 }
252270 });
253271 assertThat (
254272 ExceptionsHelper .unwrap (ex , IllegalArgumentException .class ).getMessage (),
255273 containsString ("which has an incompatible wire format" )
256274 );
257- assertFalse (handleA . transportService .nodeConnected (discoveryNode ));
275+ assertFalse (transportServiceA .nodeConnected (discoveryNode ));
258276 }
259277
260278 public void testAcceptsMismatchedBuildHashFromDifferentVersion () {
261- final NetworkHandle handleA = startServices ("TS_A" , Settings .builder ().put ("cluster.name" , "a" ).build (), Version .CURRENT );
262- final NetworkHandle handleB = startServices (
279+ final DisruptingTransportInterceptor transportInterceptorA = new DisruptingTransportInterceptor ();
280+ final DisruptingTransportInterceptor transportInterceptorB = new DisruptingTransportInterceptor ();
281+ transportInterceptorA .setModifyBuildHash (true );
282+ transportInterceptorB .setModifyBuildHash (true );
283+ final TransportService transportServiceA = startServices (
284+ "TS_A" ,
285+ Settings .builder ().put ("cluster.name" , "a" ).build (),
286+ Version .CURRENT ,
287+ transportInterceptorA
288+ );
289+ final TransportService transportServiceB = startServices (
263290 "TS_B" ,
264291 Settings .builder ().put ("cluster.name" , "a" ).build (),
265- Version .CURRENT .minimumCompatibilityVersion ()
292+ Version .CURRENT .minimumCompatibilityVersion (),
293+ transportInterceptorB
266294 );
267- handleA .transportInterceptor .setModifyBuildHash (true );
268- handleB .transportInterceptor .setModifyBuildHash (true );
269- AbstractSimpleTransportTestCase .connectToNode (handleA .transportService , handleB .discoveryNode , TestProfiles .LIGHT_PROFILE );
270- assertTrue (handleA .transportService .nodeConnected (handleB .discoveryNode ));
271- }
272-
273- private static class NetworkHandle {
274- final TransportService transportService ;
275- final DiscoveryNode discoveryNode ;
276- final DisruptingTransportInterceptor transportInterceptor ;
277-
278- NetworkHandle (TransportService transportService , DiscoveryNode discoveryNode , DisruptingTransportInterceptor transportInterceptor ) {
279- this .transportService = transportService ;
280- this .discoveryNode = discoveryNode ;
281- this .transportInterceptor = transportInterceptor ;
282- }
295+ AbstractSimpleTransportTestCase .connectToNode (transportServiceA , transportServiceB .getLocalNode (), TestProfiles .LIGHT_PROFILE );
296+ assertTrue (transportServiceA .nodeConnected (transportServiceB .getLocalNode ()));
283297 }
284298
285299 private static class DisruptingTransportInterceptor implements TransportInterceptor {
0 commit comments