Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/94998.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94998
summary: Retain underlying error on proxy mode connection failure
area: "Network"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ public NoSeedNodeLeftException(String message) {
super(message);
}

NoSeedNodeLeftException(RemoteConnectionStrategy.ConnectionStrategy connectionStrategy, String clusterName) {
super(
connectionStrategy == RemoteConnectionStrategy.ConnectionStrategy.SNIFF
? "no seed node left for cluster: [" + clusterName + "]"
: "Unable to open any proxy connections to cluster [" + clusterName + "]"
);
}

public NoSeedNodeLeftException(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -244,6 +248,8 @@ private void openConnections(ActionListener<Void> finished, int attemptNumber) {

private final AtomicInteger successfulConnections = new AtomicInteger(0);
private final CountDown countDown = new CountDown(remaining);
// Collecting exceptions during connection but deduplicate them by type and message to avoid excessive error reporting
private final Map<Tuple<Class<?>, String>, Exception> exceptions = new ConcurrentHashMap<>();

@Override
public void onResponse(Void v) {
Expand All @@ -252,15 +258,27 @@ public void onResponse(Void v) {
if (shouldOpenMoreConnections()) {
openConnections(finished, attemptNumber + 1);
} else {
assert connectionManager.size() > 0 : "must have at least one opened connection";
finished.onResponse(v);
}
}
}

@Override
public void onFailure(Exception e) {
exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e);
if (countDown.countDown()) {
openConnections(finished, attemptNumber + 1);
if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) {
logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity checking that warn is the right log level: this could be caused by longer-lasting but still temporary network glitches, so I'm wondering if we want info or even debug here instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn is good. info shouldn't really be used to report exceptions, and debug is hidden by default so will make troubleshooting too difficult. See these docs for more info.

Copy link
Contributor

@n1v0lg n1v0lg Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cheers for the pointer, all good to keep this at warn here!

if (exceptions.values().stream().allMatch(ProxyConnectionStrategy.this::isRetryableException)) {
finished.onFailure(getNoSeedNodeLeftException(exceptions.values()));
} else {
exceptions.values().stream().filter(e1 -> e1 != e).forEach(e::addSuppressed);
finished.onFailure(e);
}
} else {
openConnections(finished, attemptNumber + 1);
}
}
}
};
Expand Down Expand Up @@ -292,7 +310,8 @@ public void onFailure(Exception e) {
} else {
int openConnections = connectionManager.size();
if (openConnections == 0) {
finished.onFailure(new NoSeedNodeLeftException(strategyType(), clusterAlias));
assert false : "should not happen since onFailure should catch it and report with underlying cause";
finished.onFailure(getNoSeedNodeLeftException(Set.of()));
} else {
logger.debug(
"unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]",
Expand All @@ -305,6 +324,14 @@ public void onFailure(Exception e) {
}
}

private NoSeedNodeLeftException getNoSeedNodeLeftException(Collection<Exception> suppressedExceptions) {
final var e = new NoSeedNodeLeftException(
"Unable to open any proxy connections to cluster [" + clusterAlias + "] at address [" + address.get() + "]"
);
suppressedExceptions.forEach(e::addSuppressed);
return e;
}

private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseConfiguredAddress(address));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -382,6 +383,11 @@ boolean assertNoRunningConnections() {

protected abstract RemoteConnectionInfo.ModeInfo getModeInfo();

protected boolean isRetryableException(Exception e) {
// ISE if we fail the handshake with a version incompatible node
return e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException;
}

private List<ActionListener<Void>> getAndClearListeners() {
final List<ActionListener<Void>> result;
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl

if (seedNodesSuppliers.hasNext()) {
final Consumer<Exception> onFailure = e -> {
if (e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) {
// ISE if we fail the handshake with an version incompatible node
if (isRetryableException(e)) {
if (seedNodesSuppliers.hasNext()) {
logger.debug(
() -> format("fetching nodes from external cluster [%s] failed moving to next seed node", clusterAlias),
Expand Down Expand Up @@ -347,7 +346,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
onFailure.accept(e);
});
} else {
listener.onFailure(new NoSeedNodeLeftException(strategyType(), clusterAlias));
listener.onFailure(new NoSeedNodeLeftException("no seed node left for cluster: [" + clusterAlias + "]"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
Expand Down Expand Up @@ -39,6 +40,8 @@

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -253,10 +256,73 @@ public void testConnectFailsWithIncompatibleNodes() {

PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
final NoSeedNodeLeftException exception = expectThrows(NoSeedNodeLeftException.class, connectFuture::actionGet);
assertThat(
expectThrows(NoSeedNodeLeftException.class, connectFuture::actionGet).getMessage(),
allOf(containsString("Unable to open any proxy connections"), containsString('[' + clusterAlias + ']'))
exception.getMessage(),
allOf(
containsString("Unable to open any proxy connections"),
containsString('[' + clusterAlias + ']'),
containsString("at address [" + address1 + "]")
)
);
assertThat(exception.getSuppressed(), hasItemInArray(instanceOf(ConnectTransportException.class)));

assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertEquals(0, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
}
}
}
}

public void testConnectFailsWithNonRetryableException() {
try (MockTransportService transport1 = startTransport("remote", Version.CURRENT, TransportVersion.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();

try (
MockTransportService localService = MockTransportService.createNewService(
Settings.EMPTY,
Version.CURRENT,
TransportVersion.CURRENT,
threadPool
)
) {
if (randomBoolean()) {
transport1.addRequestHandlingBehavior(
TransportService.HANDSHAKE_ACTION_NAME,
(handler, request, channel, task) -> channel.sendResponse(new ElasticsearchException("non-retryable"))
);
} else {
localService.addSendBehavior(address1, (connection, requestId, action, request, options) -> {
throw new ElasticsearchException("non-retryable");
});
}

localService.start();
localService.acceptIncomingRequests();

final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
profile,
localService.transport,
threadPool.getThreadContext()
);
int numOfConnections = randomIntBetween(4, 8);
try (
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
clusterAlias,
localService,
remoteConnectionManager,
Settings.EMPTY,
numOfConnections,
address1.toString()
)
) {

PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
final ElasticsearchException exception = expectThrows(ElasticsearchException.class, connectFuture::actionGet);
assertThat(exception.getMessage(), containsString("non-retryable"));

assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertEquals(0, connectionManager.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,21 @@ public void testCrossClusterSearchWithApiKey() throws Exception {

// Check that authentication fails if we use a non-existent cross cluster access API key
updateClusterSettings(
Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
randomBoolean()
? Settings.builder()
.put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
: Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
final ResponseException exception4 = expectThrows(
ResponseException.class,
() -> performRequestWithApiKey(new Request("GET", "/invalid_remote:index1/_search"), apiKeyEncoded)
);
// TODO: improve the error code and message
assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500));
assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]"));
assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(exception4.getMessage(), containsString("unable to authenticate user "));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,7 @@ public void testCrossClusterAccessFeatureTrackingAndLicensing() throws Exception
);

// Check that CCS fails because we cannot establish connection due to the license check.
if (useProxyMode) {
// TODO: We should improve error handling so we get actual cause instead just NoSeedNodeLeftException.
var exception = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(searchRequest));
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(500));
assertThat(exception.getMessage(), containsString("Unable to open any proxy connections to cluster [my_remote_cluster]"));
} else {
assertRequestFailsDueToUnsupportedLicense(() -> performRequestWithRemoteSearchUser(searchRequest));
}
assertRequestFailsDueToUnsupportedLicense(() -> performRequestWithRemoteSearchUser(searchRequest));

// We start the trial license which supports all features.
startTrialLicense(fulfillingClusterClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,18 +287,21 @@ public void testCrossClusterSearch() throws Exception {

// Check that authentication fails if we use a non-existent API key
updateClusterSettings(
Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
randomBoolean()
? Settings.builder()
.put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
: Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
final ResponseException exception4 = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"))
);
// TODO: improve the error code and message
assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500));
assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]"));
assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(exception4.getMessage(), containsString("unable to authenticate user "));
}
}

Expand Down