@@ -370,6 +370,9 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
370370 ClusterStateRequest request = new ClusterStateRequest ();
371371 request .clear ();
372372 request .nodes (true );
373+ // here we pass on the connection since we can only close it once the sendRequest returns otherwise
374+ // due to the async nature (it will return before it's actually sent) this can cause the request to fail
375+ // due to an already closed connection.
373376 transportService .sendRequest (connection ,
374377 ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
375378 new SniffClusterStateResponseHandler (transportService , connection , listener , seedNodes ,
@@ -443,24 +446,30 @@ public ClusterStateResponse newInstance() {
443446 @ Override
444447 public void handleResponse (ClusterStateResponse response ) {
445448 try {
446- cancellableThreads .executeIO (() -> {
447- DiscoveryNodes nodes = response .getState ().nodes ();
448- Iterable <DiscoveryNode > nodesIter = nodes .getNodes ()::valuesIt ;
449- for (DiscoveryNode node : nodesIter ) {
450- if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
451- try {
452- transportService .connectToNode (node , remoteProfile ); // noop if node is connected
453- connectedNodes .add (node );
454- } catch (ConnectTransportException | IllegalStateException ex ) {
455- // ISE if we fail the handshake with an version incompatible node
456- // fair enough we can't connect just move on
457- logger .debug ((Supplier <?>)
458- () -> new ParameterizedMessage ("failed to connect to node {}" , node ), ex );
449+ try (Closeable theConnection = connection ) { // the connection is unused - see comment in #collectRemoteNodes
450+ // we have to close this connection before we notify listeners - this is mainly needed for test correctness
451+ // since if we do it afterwards we might fail assertions that check if all high level connections are closed.
452+ // from a code correctness perspective we could also close it afterwards. This try/with block will
453+ // maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown
454+ // by closing the connection
455+ cancellableThreads .executeIO (() -> {
456+ DiscoveryNodes nodes = response .getState ().nodes ();
457+ Iterable <DiscoveryNode > nodesIter = nodes .getNodes ()::valuesIt ;
458+ for (DiscoveryNode node : nodesIter ) {
459+ if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
460+ try {
461+ transportService .connectToNode (node , remoteProfile ); // noop if node is connected
462+ connectedNodes .add (node );
463+ } catch (ConnectTransportException | IllegalStateException ex ) {
464+ // ISE if we fail the handshake with an version incompatible node
465+ // fair enough we can't connect just move on
466+ logger .debug ((Supplier <?>)
467+ () -> new ParameterizedMessage ("failed to connect to node {}" , node ), ex );
468+ }
459469 }
460470 }
461- }
462- });
463- connection .close ();
471+ });
472+ }
464473 listener .onResponse (null );
465474 } catch (CancellableThreads .ExecutionCancelledException ex ) {
466475 listener .onFailure (ex ); // we got canceled - fail the listener and step out
@@ -469,9 +478,6 @@ public void handleResponse(ClusterStateResponse response) {
469478 () -> new ParameterizedMessage ("fetching nodes from external cluster {} failed" ,
470479 clusterAlias ), ex );
471480 collectRemoteNodes (seedNodes , transportService , listener );
472- } finally {
473- // just to make sure we don't leak anything we close the connection here again even if we managed to do so before
474- IOUtils .closeWhileHandlingException (connection );
475481 }
476482 }
477483
0 commit comments