Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99117.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99117
summary: Do not report failure after connections are made
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -315,19 +314,13 @@ public void onFailure(Exception e) {
}));
}
} else {
int openConnections = connectionManager.size();
if (openConnections == 0) {
assert false : "should not happen since onFailure should catch it and report with underlying cause";
finished.onFailure(getNoSeedNodeLeftException(Set.of()));
} else {
logger.debug(
"unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]",
clusterAlias,
openConnections,
maxNumConnections
);
finished.onResponse(null);
}
logger.debug(
"unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]",
clusterAlias,
connectionManager.size(),
maxNumConnections
);
finished.onResponse(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -480,6 +482,55 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception
}
}

public void testConnectionsClosedAfterInitiallyEstablishedDoesNotLeadToFailure() throws InterruptedException {
try (MockTransportService remoteService = startTransport("proxy_node", VersionInformation.CURRENT, TransportVersion.current())) {
TransportAddress address = remoteService.boundAddress().publishAddress();

try (
MockTransportService localService = MockTransportService.createNewService(
Settings.EMPTY,
VersionInformation.CURRENT,
TransportVersion.current(),
threadPool
)
) {
localService.start();

final var connectionManager = new ClusterConnectionManager(profile, localService.transport, threadPool.getThreadContext());
final int numOfConnections = randomIntBetween(4, 8);
final var connectionCountDown = new CountDownLatch(numOfConnections);
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
// Count down to ensure at least the required number of connection are indeed initially established
connectionCountDown.countDown();
// Simulate disconnection right after connection is made
connection.close();
}
});

try (
var remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
var strategy = new ProxyConnectionStrategy(
clusterAlias,
localService,
remoteConnectionManager,
Settings.EMPTY,
numOfConnections,
address.toString()
)
) {
final PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
// Should see no error and the connection size is 0
connectFuture.actionGet();
assertThat(connectionCountDown.await(30L, TimeUnit.SECONDS), is(true));
assertThat(remoteConnectionManager.size(), equalTo(0));
}
}
}
}

public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServerNameChange() {
try (MockTransportService remoteTransport = startTransport("node1", VersionInformation.CURRENT, TransportVersion.current())) {
TransportAddress remoteAddress = remoteTransport.boundAddress().publishAddress();
Expand Down