diff --git a/docs/changelog/94998.yaml b/docs/changelog/94998.yaml new file mode 100644 index 0000000000000..99ce2dad32b89 --- /dev/null +++ b/docs/changelog/94998.yaml @@ -0,0 +1,5 @@ +pr: 94998 +summary: Retain underlying error on proxy mode connection failure +area: "Network" +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/NoSeedNodeLeftException.java b/server/src/main/java/org/elasticsearch/transport/NoSeedNodeLeftException.java index ef5e014b63c36..0faac63bc21ed 100644 --- a/server/src/main/java/org/elasticsearch/transport/NoSeedNodeLeftException.java +++ b/server/src/main/java/org/elasticsearch/transport/NoSeedNodeLeftException.java @@ -22,14 +22,6 @@ public NoSeedNodeLeftException(String message) { super(message); } - NoSeedNodeLeftException(RemoteConnectionStrategy.ConnectionStrategy connectionStrategy, String clusterName) { - super( - connectionStrategy == RemoteConnectionStrategy.ConnectionStrategy.SNIFF - ? "no seed node left for cluster: [" + clusterName + "]" - : "Unable to open any proxy connections to cluster [" + clusterName + "]" - ); - } - public NoSeedNodeLeftException(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 0bc1a022da977..c21c3b68b1e01 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -22,12 +22,16 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.Tuple; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -244,6 +248,8 @@ private void openConnections(ActionListener finished, int attemptNumber) { private final AtomicInteger successfulConnections = new AtomicInteger(0); private final CountDown countDown = new CountDown(remaining); + // Collecting exceptions during connection but deduplicate them by type and message to avoid excessive error reporting + private final Map, String>, Exception> exceptions = new ConcurrentHashMap<>(); @Override public void onResponse(Void v) { @@ -252,6 +258,7 @@ public void onResponse(Void v) { if (shouldOpenMoreConnections()) { openConnections(finished, attemptNumber + 1); } else { + assert connectionManager.size() > 0 : "must have at least one opened connection"; finished.onResponse(v); } } @@ -259,8 +266,19 @@ public void onResponse(Void v) { @Override public void onFailure(Exception e) { + exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e); if (countDown.countDown()) { - openConnections(finished, attemptNumber + 1); + if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) { + logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e); + if (exceptions.values().stream().allMatch(ProxyConnectionStrategy.this::isRetryableException)) { + finished.onFailure(getNoSeedNodeLeftException(exceptions.values())); + } else { + exceptions.values().stream().filter(e1 -> e1 != e).forEach(e::addSuppressed); + finished.onFailure(e); + } + } else { + openConnections(finished, attemptNumber + 1); + } } } }; @@ -292,7 +310,8 @@ public void onFailure(Exception e) { } else { int openConnections = connectionManager.size(); if (openConnections == 0) { - finished.onFailure(new NoSeedNodeLeftException(strategyType(), clusterAlias)); + assert false : "should not happen since onFailure should catch it and report with underlying cause"; + finished.onFailure(getNoSeedNodeLeftException(Set.of())); } else { logger.debug( "unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", @@ -305,6 +324,14 @@ public void onFailure(Exception e) { } } + private NoSeedNodeLeftException getNoSeedNodeLeftException(Collection suppressedExceptions) { + final var e = new NoSeedNodeLeftException( + "Unable to open any proxy connections to cluster [" + clusterAlias + "] at address [" + address.get() + "]" + ); + suppressedExceptions.forEach(e::addSuppressed); + return e; + } + private static TransportAddress resolveAddress(String address) { return new TransportAddress(parseConfiguredAddress(address)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index f0b837fa27cc5..802b703494792 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -24,6 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -382,6 +383,11 @@ boolean assertNoRunningConnections() { protected abstract RemoteConnectionInfo.ModeInfo getModeInfo(); + protected boolean isRetryableException(Exception e) { + // ISE if we fail the handshake with a version incompatible node + return e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException; + } + private List> getAndClearListeners() { final List> result; synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index b8eec9c31cb88..1dd52433711e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -230,8 +230,7 @@ private void collectRemoteNodes(Iterator> seedNodesSuppl if (seedNodesSuppliers.hasNext()) { final Consumer onFailure = e -> { - if (e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) { - // ISE if we fail the handshake with an version incompatible node + if (isRetryableException(e)) { if (seedNodesSuppliers.hasNext()) { logger.debug( () -> format("fetching nodes from external cluster [%s] failed moving to next seed node", clusterAlias), @@ -347,7 +346,7 @@ private void collectRemoteNodes(Iterator> seedNodesSuppl onFailure.accept(e); }); } else { - listener.onFailure(new NoSeedNodeLeftException(strategyType(), clusterAlias)); + listener.onFailure(new NoSeedNodeLeftException("no seed node left for cluster: [" + clusterAlias + "]")); } } diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 37b86567b7097..e619e31d9feb5 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; @@ -39,6 +40,8 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItemInArray; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -253,10 +256,73 @@ public void testConnectFailsWithIncompatibleNodes() { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); + final NoSeedNodeLeftException exception = expectThrows(NoSeedNodeLeftException.class, connectFuture::actionGet); assertThat( - expectThrows(NoSeedNodeLeftException.class, connectFuture::actionGet).getMessage(), - allOf(containsString("Unable to open any proxy connections"), containsString('[' + clusterAlias + ']')) + exception.getMessage(), + allOf( + containsString("Unable to open any proxy connections"), + containsString('[' + clusterAlias + ']'), + containsString("at address [" + address1 + "]") + ) ); + assertThat(exception.getSuppressed(), hasItemInArray(instanceOf(ConnectTransportException.class))); + + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertEquals(0, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testConnectFailsWithNonRetryableException() { + try (MockTransportService transport1 = startTransport("remote", Version.CURRENT, TransportVersion.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + + try ( + MockTransportService localService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + TransportVersion.CURRENT, + threadPool + ) + ) { + if (randomBoolean()) { + transport1.addRequestHandlingBehavior( + TransportService.HANDSHAKE_ACTION_NAME, + (handler, request, channel, task) -> channel.sendResponse(new ElasticsearchException("non-retryable")) + ); + } else { + localService.addSendBehavior(address1, (connection, requestId, action, request, options) -> { + throw new ElasticsearchException("non-retryable"); + }); + } + + localService.start(); + localService.acceptIncomingRequests(); + + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); + int numOfConnections = randomIntBetween(4, 8); + try ( + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( + clusterAlias, + localService, + remoteConnectionManager, + Settings.EMPTY, + numOfConnections, + address1.toString() + ) + ) { + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + final ElasticsearchException exception = expectThrows(ElasticsearchException.class, connectFuture::actionGet); + assertThat(exception.getMessage(), containsString("non-retryable")); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertEquals(0, connectionManager.size()); diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java index 5844b653636a1..19032bd750963 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java @@ -318,18 +318,21 @@ public void testCrossClusterSearchWithApiKey() throws Exception { // Check that authentication fails if we use a non-existent cross cluster access API key updateClusterSettings( - Settings.builder() - .put("cluster.remote.invalid_remote.mode", "proxy") - .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) - .build() + randomBoolean() + ? Settings.builder() + .put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() + : Settings.builder() + .put("cluster.remote.invalid_remote.mode", "proxy") + .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() ); final ResponseException exception4 = expectThrows( ResponseException.class, () -> performRequestWithApiKey(new Request("GET", "/invalid_remote:index1/_search"), apiKeyEncoded) ); - // TODO: improve the error code and message - assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500)); - assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]")); + assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401)); + assertThat(exception4.getMessage(), containsString("unable to authenticate user ")); } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java index 00d9d6033e1c1..60c25bf28ac63 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java @@ -168,14 +168,7 @@ public void testCrossClusterAccessFeatureTrackingAndLicensing() throws Exception ); // Check that CCS fails because we cannot establish connection due to the license check. - if (useProxyMode) { - // TODO: We should improve error handling so we get actual cause instead just NoSeedNodeLeftException. - var exception = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(searchRequest)); - assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(500)); - assertThat(exception.getMessage(), containsString("Unable to open any proxy connections to cluster [my_remote_cluster]")); - } else { - assertRequestFailsDueToUnsupportedLicense(() -> performRequestWithRemoteSearchUser(searchRequest)); - } + assertRequestFailsDueToUnsupportedLicense(() -> performRequestWithRemoteSearchUser(searchRequest)); // We start the trial license which supports all features. startTrialLicense(fulfillingClusterClient); diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java index b31a5e09699b9..42af82749f574 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java @@ -287,18 +287,21 @@ public void testCrossClusterSearch() throws Exception { // Check that authentication fails if we use a non-existent API key updateClusterSettings( - Settings.builder() - .put("cluster.remote.invalid_remote.mode", "proxy") - .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) - .build() + randomBoolean() + ? Settings.builder() + .put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() + : Settings.builder() + .put("cluster.remote.invalid_remote.mode", "proxy") + .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() ); final ResponseException exception4 = expectThrows( ResponseException.class, () -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search")) ); - // TODO: improve the error code and message - assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500)); - assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]")); + assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401)); + assertThat(exception4.getMessage(), containsString("unable to authenticate user ")); } }