From c79dd860f983d0f40c14bfe48c78b694c7c2e24e Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Tue, 9 Sep 2025 21:27:37 -0400 Subject: [PATCH 1/8] Differentiate between initial and reconnect RCS connections Adds a connection attempt counter to RemoteConnectionStrategy, with info logging on connection success and warning logging on connection failure, and 30 secs between repeat failure attempt logging. This change will be used in a follow up PR where we will increment either an initial connection failure metric or a reconnection attempt failure metric. Resolves: ES-12694 --- .../transport/RemoteConnectionStrategy.java | 36 ++++ .../RemoteConnectionStrategyTests.java | 187 +++++++++++++++++- 2 files changed, 220 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 7bf4689449b9c..90e46b4cb5163 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -14,11 +14,13 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -69,6 +71,8 @@ public Writeable.Reader getReader() { } } + public static final TimeValue CONNECTION_FAILURE_WARN_INTERVAL = TimeValue.timeValueSeconds(30); + private final int maxPendingConnectionListeners; protected final Logger logger = LogManager.getLogger(getClass()); @@ -76,12 +80,18 @@ public Writeable.Reader getReader() { private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); private List> listeners = new ArrayList<>(); + private long connectionAttempts = 0L; + private long lastFailedConnectionAttemptWarningTimeMillis = -1L; protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; + protected final ProjectId originProjectId; + protected final ProjectId linkedProjectId; protected final String clusterAlias; RemoteConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) { + this.originProjectId = config.originProjectId(); + this.linkedProjectId = config.linkedProjectId(); this.clusterAlias = config.linkedProjectAlias(); this.transportService = transportService; this.connectionManager = connectionManager; @@ -190,11 +200,13 @@ protected void doRun() { connectImpl(new ActionListener<>() { @Override public void onResponse(Void aVoid) { + connectionAttemptCompleted(null); ActionListener.onResponse(getAndClearListeners(), aVoid); } @Override public void onFailure(Exception e) { + connectionAttemptCompleted(e); ActionListener.onFailure(getAndClearListeners(), e); } }); @@ -203,6 +215,30 @@ public void onFailure(Exception e) { } } + private void connectionAttemptCompleted(Exception e) { + connectionAttempts++; + final var nowMillis = transportService.threadPool.relativeTimeInMillis(); + final org.apache.logging.log4j.util.Supplier msgSupplier = () -> format( + "Origin project [%s] %s linked project [%s] alias [%s] on attempt [%d]", + originProjectId, + e == null ? "successfully connected to" : "failed to connect to", + linkedProjectId, + clusterAlias, + connectionAttempts + ); + if (e == null) { + logger.info(msgSupplier); + lastFailedConnectionAttemptWarningTimeMillis = -1L; + } else { + if (lastFailedConnectionAttemptWarningTimeMillis == -1L + || nowMillis - lastFailedConnectionAttemptWarningTimeMillis >= CONNECTION_FAILURE_WARN_INTERVAL.getMillis()) { + logger.warn(msgSupplier, e); + lastFailedConnectionAttemptWarningTimeMillis = nowMillis; + } + // TODO: ES-12695: Increment either the initial (connectionAttempts == 1) or retry connection failure metric. + } + } + boolean shouldRebuildConnection(LinkedProjectConfig config) { return config.connectionStrategy().equals(strategyType()) == false || connectionProfileChanged(config) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index cd9b6407cc507..b2a78742b304c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -9,13 +9,31 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.Level; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders; +import org.elasticsearch.threadpool.ThreadPool; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS; @@ -171,21 +189,152 @@ public void testConnectionStrategySerialization() { ); } + public void testConnectionAttemptLogging() { + final var originProjectId = randomUniqueProjectId(); + final var linkedProjectId = randomUniqueProjectId(); + final var alias = randomAlphanumericOfLength(10); + final BiFunction getExpectedLogMessage = (connectedOrFailed, connectionAttempts) -> Strings.format( + "Origin project [%s] %s to linked project [%s] alias [%s] on attempt [%d]", + originProjectId, + connectedOrFailed, + linkedProjectId, + alias, + connectionAttempts + ); + + try ( + var threadPool = new ControlledRelativeTimeThreadPool(getClass().getName(), System.currentTimeMillis()); + var transportService = startTransport(threadPool) + ) { + final var connectionManager = new RemoteConnectionManager( + alias, + RemoteClusterCredentialsManager.EMPTY, + new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext) + ); + final var strategy = new FakeConnectionStrategy(originProjectId, linkedProjectId, alias, transportService, connectionManager); + final var strategyClassName = strategy.getClass().getCanonicalName(); + + // Initial successful connection attempt. + assertThatLogger( + () -> waitForConnect(strategy), + strategy.getClass(), + new MockLog.SeenEventExpectation( + "connection strategy should log after successful connection", + strategyClassName, + Level.INFO, + getExpectedLogMessage.apply("successfully connected", 1) + ) + ); + + // Now test a series of failed connection attempts, verifying the warn logging interval check. + strategy.setShouldConnectFail(true); + assertThatLogger(() -> { + assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); + assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); + threadPool.advance(RemoteConnectionStrategy.CONNECTION_FAILURE_WARN_INTERVAL); + assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); + }, + strategy.getClass(), + new MockLog.SeenEventExpectation( + "connection strategy should log after the first failed connection attempt", + strategyClassName, + Level.WARN, + getExpectedLogMessage.apply("failed to connect", 2) + ), + new MockLog.UnseenEventExpectation( + "connection strategy should not log until warning interval has passed", + strategyClassName, + Level.WARN, + getExpectedLogMessage.apply("failed to connect", 3) + ), + new MockLog.SeenEventExpectation( + "connection strategy should log after a failed connection attempt and the warning interval has passed", + strategyClassName, + Level.WARN, + getExpectedLogMessage.apply("failed to connect", 4) + ) + ); + } + } + + private MockTransportService startTransport(ThreadPool threadPool) { + boolean success = false; + final Settings s = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "cluster1").put("node.name", "node1").build(); + MockTransportService newService = MockTransportService.createNewService( + s, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool + ); + try { + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + private static void waitForConnect(RemoteConnectionStrategy strategy) { + PlainActionFuture connectFuture = new PlainActionFuture<>(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + } + private static class FakeConnectionStrategy extends RemoteConnectionStrategy { private final ConnectionStrategy strategy; + private boolean shouldConnectFail; + + FakeConnectionStrategy( + ProjectId originProjectId, + ProjectId linkedProjectId, + String clusterAlias, + TransportService transportService, + RemoteConnectionManager connectionManager + ) { + this( + originProjectId, + linkedProjectId, + clusterAlias, + transportService, + connectionManager, + randomFrom(RemoteConnectionStrategy.ConnectionStrategy.values()) + ); + } FakeConnectionStrategy( String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, RemoteConnectionStrategy.ConnectionStrategy strategy + ) { + this(ProjectId.DEFAULT, ProjectId.DEFAULT, clusterAlias, transportService, connectionManager, strategy); + } + + FakeConnectionStrategy( + ProjectId originProjectId, + ProjectId linkedProjectId, + String clusterAlias, + TransportService transportService, + RemoteConnectionManager connectionManager, + RemoteConnectionStrategy.ConnectionStrategy strategy ) { super(switch (strategy) { - case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(clusterAlias).build(); - case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(clusterAlias).build(); + case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias) + .build(); + case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias) + .build(); }, transportService, connectionManager); this.strategy = strategy; + this.shouldConnectFail = false; + } + + void setShouldConnectFail(boolean shouldConnectFail) { + this.shouldConnectFail = shouldConnectFail; } @Override @@ -205,7 +354,11 @@ protected boolean shouldOpenMoreConnections() { @Override protected void connectImpl(ActionListener listener) { - + if (shouldConnectFail) { + listener.onFailure(new RuntimeException("simulated failure")); + } else { + listener.onResponse(null); + } } @Override @@ -213,4 +366,32 @@ protected RemoteConnectionInfo.ModeInfo getModeInfo() { return null; } } + + private static class ControlledRelativeTimeThreadPool extends ThreadPool implements Releasable { + private long currentTimeInMillis; + + ControlledRelativeTimeThreadPool(String name, long startTimeMillis) { + super( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(), + MeterRegistry.NOOP, + new DefaultBuiltInExecutorBuilders() + ); + this.currentTimeInMillis = startTimeMillis; + stopCachedTimeThread(); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeInMillis; + } + + void advance(TimeValue timeValue) { + this.currentTimeInMillis += timeValue.millis(); + } + + @Override + public void close() { + ThreadPool.terminate(this, 10, TimeUnit.SECONDS); + } + } } From aae60f9eb07645c96e1d1152c4ebc30db4fd982d Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Tue, 9 Sep 2025 21:35:29 -0400 Subject: [PATCH 2/8] Update docs/changelog/134415.yaml --- docs/changelog/134415.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/134415.yaml diff --git a/docs/changelog/134415.yaml b/docs/changelog/134415.yaml new file mode 100644 index 0000000000000..750f3bc2736c1 --- /dev/null +++ b/docs/changelog/134415.yaml @@ -0,0 +1,5 @@ +pr: 134415 +summary: Differentiate between initial and reconnect RCS connections +area: Network +type: enhancement +issues: [] From e57e23869d8e6d1a6ddcb9017e21580e77209ce2 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 10 Sep 2025 16:06:11 -0400 Subject: [PATCH 3/8] Use boolean instead of attempts counter, remove throttling --- .../transport/RemoteConnectionStrategy.java | 21 +-- .../RemoteConnectionStrategyTests.java | 140 ++++++------------ 2 files changed, 55 insertions(+), 106 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 90e46b4cb5163..adb598bc4d8a8 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -71,8 +71,6 @@ public Writeable.Reader getReader() { } } - public static final TimeValue CONNECTION_FAILURE_WARN_INTERVAL = TimeValue.timeValueSeconds(30); - private final int maxPendingConnectionListeners; protected final Logger logger = LogManager.getLogger(getClass()); @@ -80,8 +78,7 @@ public Writeable.Reader getReader() { private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); private List> listeners = new ArrayList<>(); - private long connectionAttempts = 0L; - private long lastFailedConnectionAttemptWarningTimeMillis = -1L; + private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false); protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; @@ -216,26 +213,20 @@ public void onFailure(Exception e) { } private void connectionAttemptCompleted(Exception e) { - connectionAttempts++; - final var nowMillis = transportService.threadPool.relativeTimeInMillis(); + final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true); final org.apache.logging.log4j.util.Supplier msgSupplier = () -> format( - "Origin project [%s] %s linked project [%s] alias [%s] on attempt [%d]", + "Origin project [%s] %s linked project [%s] alias [%s] on %s attempt", originProjectId, e == null ? "successfully connected to" : "failed to connect to", linkedProjectId, clusterAlias, - connectionAttempts + isInitialAttempt ? "the initial connection" : "a reconnection" ); if (e == null) { logger.info(msgSupplier); - lastFailedConnectionAttemptWarningTimeMillis = -1L; } else { - if (lastFailedConnectionAttemptWarningTimeMillis == -1L - || nowMillis - lastFailedConnectionAttemptWarningTimeMillis >= CONNECTION_FAILURE_WARN_INTERVAL.getMillis()) { - logger.warn(msgSupplier, e); - lastFailedConnectionAttemptWarningTimeMillis = nowMillis; - } - // TODO: ES-12695: Increment either the initial (connectionAttempts == 1) or retry connection failure metric. + logger.warn(msgSupplier, e); + // TODO: ES-12695: Increment either the initial or retry connection failure metric. } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index b2a78742b304c..b820f76f7a9f8 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -18,21 +18,15 @@ import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.node.Node; -import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; - import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; @@ -193,67 +187,59 @@ public void testConnectionAttemptLogging() { final var originProjectId = randomUniqueProjectId(); final var linkedProjectId = randomUniqueProjectId(); final var alias = randomAlphanumericOfLength(10); - final BiFunction getExpectedLogMessage = (connectedOrFailed, connectionAttempts) -> Strings.format( - "Origin project [%s] %s to linked project [%s] alias [%s] on attempt [%d]", - originProjectId, - connectedOrFailed, - linkedProjectId, - alias, - connectionAttempts - ); try ( - var threadPool = new ControlledRelativeTimeThreadPool(getClass().getName(), System.currentTimeMillis()); - var transportService = startTransport(threadPool) - ) { - final var connectionManager = new RemoteConnectionManager( + var threadPool = new TestThreadPool(getClass().getName()); + var transportService = startTransport(threadPool); + var connectionManager = new RemoteConnectionManager( alias, RemoteClusterCredentialsManager.EMPTY, new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext) - ); - final var strategy = new FakeConnectionStrategy(originProjectId, linkedProjectId, alias, transportService, connectionManager); - final var strategyClassName = strategy.getClass().getCanonicalName(); - - // Initial successful connection attempt. - assertThatLogger( - () -> waitForConnect(strategy), - strategy.getClass(), - new MockLog.SeenEventExpectation( - "connection strategy should log after successful connection", - strategyClassName, - Level.INFO, - getExpectedLogMessage.apply("successfully connected", 1) - ) - ); - - // Now test a series of failed connection attempts, verifying the warn logging interval check. - strategy.setShouldConnectFail(true); - assertThatLogger(() -> { - assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); - assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); - threadPool.advance(RemoteConnectionStrategy.CONNECTION_FAILURE_WARN_INTERVAL); - assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); - }, - strategy.getClass(), - new MockLog.SeenEventExpectation( - "connection strategy should log after the first failed connection attempt", - strategyClassName, - Level.WARN, - getExpectedLogMessage.apply("failed to connect", 2) - ), - new MockLog.UnseenEventExpectation( - "connection strategy should not log until warning interval has passed", - strategyClassName, - Level.WARN, - getExpectedLogMessage.apply("failed to connect", 3) - ), - new MockLog.SeenEventExpectation( - "connection strategy should log after a failed connection attempt and the warning interval has passed", - strategyClassName, - Level.WARN, - getExpectedLogMessage.apply("failed to connect", 4) - ) - ); + ) + ) { + for (boolean shouldConnectFail : new boolean[] { true, false }) { + for (boolean isIntialConnectAttempt : new boolean[] { true, false }) { + final var strategy = new FakeConnectionStrategy( + originProjectId, + linkedProjectId, + alias, + transportService, + connectionManager + ); + if (isIntialConnectAttempt == false) { + waitForConnect(strategy); + } + strategy.setShouldConnectFail(shouldConnectFail); + final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.INFO; + final var expectedLogMessage = Strings.format( + "Origin project [%s] %s to linked project [%s] alias [%s] on %s attempt", + originProjectId, + shouldConnectFail ? "failed to connect" : "successfully connected", + linkedProjectId, + alias, + isIntialConnectAttempt ? "the initial connection" : "a reconnection" + ); + assertThatLogger(() -> { + if (shouldConnectFail) { + assertThrows(RuntimeException.class, () -> waitForConnect(strategy)); + } else { + waitForConnect(strategy); + } + }, + strategy.getClass(), + new MockLog.SeenEventExpectation( + "connection strategy should log at " + + expectedLogLevel + + " after a " + + (shouldConnectFail ? "failed" : "successful") + + (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), + strategy.getClass().getCanonicalName(), + expectedLogLevel, + expectedLogMessage + ) + ); + } + } } } @@ -366,32 +352,4 @@ protected RemoteConnectionInfo.ModeInfo getModeInfo() { return null; } } - - private static class ControlledRelativeTimeThreadPool extends ThreadPool implements Releasable { - private long currentTimeInMillis; - - ControlledRelativeTimeThreadPool(String name, long startTimeMillis) { - super( - Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(), - MeterRegistry.NOOP, - new DefaultBuiltInExecutorBuilders() - ); - this.currentTimeInMillis = startTimeMillis; - stopCachedTimeThread(); - } - - @Override - public long relativeTimeInMillis() { - return currentTimeInMillis; - } - - void advance(TimeValue timeValue) { - this.currentTimeInMillis += timeValue.millis(); - } - - @Override - public void close() { - ThreadPool.terminate(this, 10, TimeUnit.SECONDS); - } - } } From d940014c8b1b64dee9c3da478ce23c105ced8405 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 10 Sep 2025 16:06:20 -0400 Subject: [PATCH 4/8] Remove strategy impl warn msg in favor of more detailed base class warning --- .../org/elasticsearch/transport/ProxyConnectionStrategy.java | 1 - .../org/elasticsearch/transport/SniffConnectionStrategy.java | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 45c07aadd1b1b..f020c2408aa47 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -163,7 +163,6 @@ public void onFailure(Exception e) { exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e); if (countDown.countDown()) { if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) { - logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e); if (exceptions.values().stream().allMatch(RemoteConnectionStrategy::isRetryableException)) { finished.onFailure(getNoSeedNodeLeftException(exceptions.values())); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 4b2c6443da98f..fbe6e61d5eab4 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -144,7 +144,6 @@ private void collectRemoteNodes(Iterator> seedNodesSuppl logger.debug(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed moving to next seed node", e); collectRemoteNodes(seedNodesSuppliers, listener); } else { - logger.warn(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed", e); listener.onFailure(e); } }; From 02d7cfd811f8e0e440d55650ac66176d1531e479 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 10 Sep 2025 20:13:26 +0000 Subject: [PATCH 5/8] [CI] Auto commit changes from spotless --- .../org/elasticsearch/transport/RemoteConnectionStrategy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index adb598bc4d8a8..a789a8ab46ad5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; From af918bfa4b88a902bffa1f39ddf31b4c60b1678c Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 11 Sep 2025 09:01:14 -0400 Subject: [PATCH 6/8] add @Nullable to exception param in connectionAttemptCompleted() --- .../org/elasticsearch/transport/RemoteConnectionStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index a789a8ab46ad5..3c8ecd5d947bd 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -211,7 +211,7 @@ public void onFailure(Exception e) { } } - private void connectionAttemptCompleted(Exception e) { + private void connectionAttemptCompleted(@Nullable Exception e) { final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true); final org.apache.logging.log4j.util.Supplier msgSupplier = () -> format( "Origin project [%s] %s linked project [%s] alias [%s] on %s attempt", From 52ed56bc0c49fae49e37006bb97765564188874b Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 11 Sep 2025 09:03:50 -0400 Subject: [PATCH 7/8] ammend the log msg in connectionAttemptCompleted() --- .../org/elasticsearch/transport/RemoteConnectionStrategy.java | 2 +- .../elasticsearch/transport/RemoteConnectionStrategyTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 3c8ecd5d947bd..d274603720bbc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -214,7 +214,7 @@ public void onFailure(Exception e) { private void connectionAttemptCompleted(@Nullable Exception e) { final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true); final org.apache.logging.log4j.util.Supplier msgSupplier = () -> format( - "Origin project [%s] %s linked project [%s] alias [%s] on %s attempt", + "Origin project [%s] %s linked project [%s] with alias [%s] on %s attempt", originProjectId, e == null ? "successfully connected to" : "failed to connect to", linkedProjectId, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index b820f76f7a9f8..0f1c05386d271 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -212,7 +212,7 @@ public void testConnectionAttemptLogging() { strategy.setShouldConnectFail(shouldConnectFail); final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.INFO; final var expectedLogMessage = Strings.format( - "Origin project [%s] %s to linked project [%s] alias [%s] on %s attempt", + "Origin project [%s] %s to linked project [%s] with alias [%s] on %s attempt", originProjectId, shouldConnectFail ? "failed to connect" : "successfully connected", linkedProjectId, From 4a5683b85dc792f742bc5dee0908acea0e6f0ed1 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 11 Sep 2025 09:20:07 -0400 Subject: [PATCH 8/8] change success msg log level to debug in connectionAttemptCompleted() --- .../elasticsearch/transport/RemoteConnectionStrategy.java | 2 +- .../transport/RemoteConnectionStrategyTests.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index d274603720bbc..70a17ee67d307 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -222,7 +222,7 @@ private void connectionAttemptCompleted(@Nullable Exception e) { isInitialAttempt ? "the initial connection" : "a reconnection" ); if (e == null) { - logger.info(msgSupplier); + logger.debug(msgSupplier); } else { logger.warn(msgSupplier, e); // TODO: ES-12695: Increment either the initial or retry connection failure metric. diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 0f1c05386d271..1e530c528b109 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -183,6 +184,10 @@ public void testConnectionStrategySerialization() { ); } + @TestLogging( + value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG", + reason = "logging verification" + ) public void testConnectionAttemptLogging() { final var originProjectId = randomUniqueProjectId(); final var linkedProjectId = randomUniqueProjectId(); @@ -210,7 +215,7 @@ public void testConnectionAttemptLogging() { waitForConnect(strategy); } strategy.setShouldConnectFail(shouldConnectFail); - final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.INFO; + final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.DEBUG; final var expectedLogMessage = Strings.format( "Origin project [%s] %s to linked project [%s] with alias [%s] on %s attempt", originProjectId,