diff --git a/docs/changelog/134415.yaml b/docs/changelog/134415.yaml new file mode 100644 index 0000000000000..750f3bc2736c1 --- /dev/null +++ b/docs/changelog/134415.yaml @@ -0,0 +1,5 @@ +pr: 134415 +summary: Differentiate between initial and reconnect RCS connections +area: Network +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 45c07aadd1b1b..f020c2408aa47 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -163,7 +163,6 @@ public void onFailure(Exception e) { exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e); if (countDown.countDown()) { if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) { - logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e); if (exceptions.values().stream().allMatch(RemoteConnectionStrategy::isRetryableException)) { finished.onFailure(getNoSeedNodeLeftException(exceptions.values())); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 7bf4689449b9c..70a17ee67d307 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -14,6 +14,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -76,12 +77,17 @@ public Writeable.Reader getReader() { private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); private List> listeners = new ArrayList<>(); + private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false); protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; + protected final ProjectId originProjectId; + protected final ProjectId linkedProjectId; protected final String clusterAlias; RemoteConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) { + this.originProjectId = config.originProjectId(); + this.linkedProjectId = config.linkedProjectId(); this.clusterAlias = config.linkedProjectAlias(); this.transportService = transportService; this.connectionManager = connectionManager; @@ -190,11 +196,13 @@ protected void doRun() { connectImpl(new ActionListener<>() { @Override public void onResponse(Void aVoid) { + connectionAttemptCompleted(null); ActionListener.onResponse(getAndClearListeners(), aVoid); } @Override public void onFailure(Exception e) { + connectionAttemptCompleted(e); ActionListener.onFailure(getAndClearListeners(), e); } }); @@ -203,6 +211,24 @@ public void onFailure(Exception e) { } } + private void connectionAttemptCompleted(@Nullable Exception e) { + final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true); + final org.apache.logging.log4j.util.Supplier msgSupplier = () -> format( + "Origin project [%s] %s linked project [%s] with alias [%s] on %s attempt", + originProjectId, + e == null ? "successfully connected to" : "failed to connect to", + linkedProjectId, + clusterAlias, + isInitialAttempt ? "the initial connection" : "a reconnection" + ); + if (e == null) { + logger.debug(msgSupplier); + } else { + logger.warn(msgSupplier, e); + // TODO: ES-12695: Increment either the initial or retry connection failure metric. + } + } + boolean shouldRebuildConnection(LinkedProjectConfig config) { return config.connectionStrategy().equals(strategyType()) == false || connectionProfileChanged(config) diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 4b2c6443da98f..fbe6e61d5eab4 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -144,7 +144,6 @@ private void collectRemoteNodes(Iterator> seedNodesSuppl logger.debug(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed moving to next seed node", e); collectRemoteNodes(seedNodesSuppliers, listener); } else { - logger.warn(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed", e); listener.onFailure(e); } }; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index cd9b6407cc507..1e530c528b109 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -9,13 +9,26 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.Level; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS; @@ -171,21 +184,148 @@ public void testConnectionStrategySerialization() { ); } + @TestLogging( + value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG", + reason = "logging verification" + ) + public void testConnectionAttemptLogging() { + final var originProjectId = randomUniqueProjectId(); + final var linkedProjectId = randomUniqueProjectId(); + final var alias = randomAlphanumericOfLength(10); + + try ( + var threadPool = new TestThreadPool(getClass().getName()); + var transportService = startTransport(threadPool); + var connectionManager = new RemoteConnectionManager( + alias, + RemoteClusterCredentialsManager.EMPTY, + new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext) + ) + ) { + for (boolean shouldConnectFail : new boolean[] { true, false }) { + for (boolean isIntialConnectAttempt : new boolean[] { true, false }) { + final var strategy = new FakeConnectionStrategy( + originProjectId, + linkedProjectId, + alias, + transportService, + connectionManager + ); + if (isIntialConnectAttempt == false) { + waitForConnect(strategy); + } + strategy.setShouldConnectFail(shouldConnectFail); + final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.DEBUG; + final var expectedLogMessage = Strings.format( + "Origin project [%s] %s to linked project [%s] with alias [%s] on %s attempt", + originProjectId, + shouldConnectFail ? "failed to connect" : "successfully connected", + linkedProjectId, + alias, + isIntialConnectAttempt ? "the initial connection" : "a reconnection" + ); + assertThatLogger(() -> { + if (shouldConnectFail) { + assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); + } else { + waitForConnect(strategy); + } + }, + strategy.getClass(), + new MockLog.SeenEventExpectation( + "connection strategy should log at " + + expectedLogLevel + + " after a " + + (shouldConnectFail ? "failed" : "successful") + + (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), + strategy.getClass().getCanonicalName(), + expectedLogLevel, + expectedLogMessage + ) + ); + } + } + } + } + + private MockTransportService startTransport(ThreadPool threadPool) { + boolean success = false; + final Settings s = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "cluster1").put("node.name", "node1").build(); + MockTransportService newService = MockTransportService.createNewService( + s, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool + ); + try { + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + private static void waitForConnect(RemoteConnectionStrategy strategy) { + PlainActionFuture connectFuture = new PlainActionFuture<>(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + } + private static class FakeConnectionStrategy extends RemoteConnectionStrategy { private final ConnectionStrategy strategy; + private boolean shouldConnectFail; FakeConnectionStrategy( + ProjectId originProjectId, + ProjectId linkedProjectId, + String clusterAlias, + TransportService transportService, + RemoteConnectionManager connectionManager + ) { + this( + originProjectId, + linkedProjectId, + clusterAlias, + transportService, + connectionManager, + randomFrom(RemoteConnectionStrategy.ConnectionStrategy.values()) + ); + } + + FakeConnectionStrategy( + String clusterAlias, + TransportService transportService, + RemoteConnectionManager connectionManager, + RemoteConnectionStrategy.ConnectionStrategy strategy + ) { + this(ProjectId.DEFAULT, ProjectId.DEFAULT, clusterAlias, transportService, connectionManager, strategy); + } + + FakeConnectionStrategy( + ProjectId originProjectId, + ProjectId linkedProjectId, String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, RemoteConnectionStrategy.ConnectionStrategy strategy ) { super(switch (strategy) { - case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(clusterAlias).build(); - case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(clusterAlias).build(); + case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias) + .build(); + case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias) + .build(); }, transportService, connectionManager); this.strategy = strategy; + this.shouldConnectFail = false; + } + + void setShouldConnectFail(boolean shouldConnectFail) { + this.shouldConnectFail = shouldConnectFail; } @Override @@ -205,7 +345,11 @@ protected boolean shouldOpenMoreConnections() { @Override protected void connectImpl(ActionListener listener) { - + if (shouldConnectFail) { + listener.onFailure(new RuntimeException("simulated failure")); + } else { + listener.onResponse(null); + } } @Override