diff --git a/docs/changelog/99117.yaml b/docs/changelog/99117.yaml new file mode 100644 index 0000000000000..491692f232081 --- /dev/null +++ b/docs/changelog/99117.yaml @@ -0,0 +1,5 @@ +pr: 99117 +summary: Do not report failure after connections are made +area: Network +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 83a0860ba6324..35655f6260461 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -315,19 +314,13 @@ public void onFailure(Exception e) { })); } } else { - int openConnections = connectionManager.size(); - if (openConnections == 0) { - assert false : "should not happen since onFailure should catch it and report with underlying cause"; - finished.onFailure(getNoSeedNodeLeftException(Set.of())); - } else { - logger.debug( - "unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", - clusterAlias, - openConnections, - maxNumConnections - ); - finished.onResponse(null); - } + logger.debug( + "unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", + clusterAlias, + connectionManager.size(), + maxNumConnections + ); + finished.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index d2941bab3f91a..965288a989870 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -43,8 +43,10 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItemInArray; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -480,6 +482,55 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception } } + public void testConnectionsClosedAfterInitiallyEstablishedDoesNotLeadToFailure() throws InterruptedException { + try (MockTransportService remoteService = startTransport("proxy_node", VersionInformation.CURRENT, TransportVersion.current())) { + TransportAddress address = remoteService.boundAddress().publishAddress(); + + try ( + MockTransportService localService = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool + ) + ) { + localService.start(); + + final var connectionManager = new ClusterConnectionManager(profile, localService.transport, threadPool.getThreadContext()); + final int numOfConnections = randomIntBetween(4, 8); + final var connectionCountDown = new CountDownLatch(numOfConnections); + connectionManager.addListener(new TransportConnectionListener() { + @Override + public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { + // Count down to ensure at least the required number of connection are indeed initially established + connectionCountDown.countDown(); + // Simulate disconnection right after connection is made + connection.close(); + } + }); + + try ( + var remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + var strategy = new ProxyConnectionStrategy( + clusterAlias, + localService, + remoteConnectionManager, + Settings.EMPTY, + numOfConnections, + address.toString() + ) + ) { + final PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + // Should see no error and the connection size is 0 + connectFuture.actionGet(); + assertThat(connectionCountDown.await(30L, TimeUnit.SECONDS), is(true)); + assertThat(remoteConnectionManager.size(), equalTo(0)); + } + } + } + } + public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServerNameChange() { try (MockTransportService remoteTransport = startTransport("node1", VersionInformation.CURRENT, TransportVersion.current())) { TransportAddress remoteAddress = remoteTransport.boundAddress().publishAddress();