Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134415.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134415
summary: Differentiate between initial and reconnect RCS connections
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ public void onFailure(Exception e) {
exceptions.put(new Tuple<>(e.getClass(), e.getMessage()), e);
if (countDown.countDown()) {
if (attemptNumber >= MAX_CONNECT_ATTEMPTS_PER_RUN && connectionManager.size() == 0) {
logger.warn(() -> "failed to open any proxy connections to cluster [" + clusterAlias + "]", e);
if (exceptions.values().stream().allMatch(RemoteConnectionStrategy::isRetryableException)) {
finished.onFailure(getNoSeedNodeLeftException(exceptions.values()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -76,12 +77,17 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Object mutex = new Object();
private List<ActionListener<Void>> listeners = new ArrayList<>();
private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false);

protected final TransportService transportService;
protected final RemoteConnectionManager connectionManager;
protected final ProjectId originProjectId;
protected final ProjectId linkedProjectId;
protected final String clusterAlias;

RemoteConnectionStrategy(LinkedProjectConfig config, TransportService transportService, RemoteConnectionManager connectionManager) {
this.originProjectId = config.originProjectId();
this.linkedProjectId = config.linkedProjectId();
this.clusterAlias = config.linkedProjectAlias();
this.transportService = transportService;
this.connectionManager = connectionManager;
Expand Down Expand Up @@ -190,11 +196,13 @@ protected void doRun() {
connectImpl(new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
connectionAttemptCompleted(null);
ActionListener.onResponse(getAndClearListeners(), aVoid);
}

@Override
public void onFailure(Exception e) {
connectionAttemptCompleted(e);
ActionListener.onFailure(getAndClearListeners(), e);
}
});
Expand All @@ -203,6 +211,24 @@ public void onFailure(Exception e) {
}
}

private void connectionAttemptCompleted(@Nullable Exception e) {
final boolean isInitialAttempt = initialConnectionAttempted.compareAndSet(false, true);
final org.apache.logging.log4j.util.Supplier<String> msgSupplier = () -> format(
"Origin project [%s] %s linked project [%s] with alias [%s] on %s attempt",
originProjectId,
e == null ? "successfully connected to" : "failed to connect to",
linkedProjectId,
clusterAlias,
isInitialAttempt ? "the initial connection" : "a reconnection"
);
if (e == null) {
logger.debug(msgSupplier);
} else {
logger.warn(msgSupplier, e);
// TODO: ES-12695: Increment either the initial or retry connection failure metric.
}
}

boolean shouldRebuildConnection(LinkedProjectConfig config) {
return config.connectionStrategy().equals(strategyType()) == false
|| connectionProfileChanged(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodesSuppl
logger.debug(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed moving to next seed node", e);
collectRemoteNodes(seedNodesSuppliers, listener);
} else {
logger.warn(() -> "fetching nodes from external cluster [" + clusterAlias + "] failed", e);
listener.onFailure(e);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@

package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.VersionInformation;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EnumSerializationTestUtils;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.test.MockLog.assertThatLogger;
import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS;
import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE;
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS;
Expand Down Expand Up @@ -171,21 +184,148 @@ public void testConnectionStrategySerialization() {
);
}

@TestLogging(
value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG",
reason = "logging verification"
)
public void testConnectionAttemptLogging() {
final var originProjectId = randomUniqueProjectId();
final var linkedProjectId = randomUniqueProjectId();
final var alias = randomAlphanumericOfLength(10);

try (
var threadPool = new TestThreadPool(getClass().getName());
var transportService = startTransport(threadPool);
var connectionManager = new RemoteConnectionManager(
alias,
RemoteClusterCredentialsManager.EMPTY,
new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext)
)
) {
for (boolean shouldConnectFail : new boolean[] { true, false }) {
for (boolean isIntialConnectAttempt : new boolean[] { true, false }) {
final var strategy = new FakeConnectionStrategy(
originProjectId,
linkedProjectId,
alias,
transportService,
connectionManager
);
if (isIntialConnectAttempt == false) {
waitForConnect(strategy);
}
strategy.setShouldConnectFail(shouldConnectFail);
final var expectedLogLevel = shouldConnectFail ? Level.WARN : Level.DEBUG;
final var expectedLogMessage = Strings.format(
"Origin project [%s] %s to linked project [%s] with alias [%s] on %s attempt",
originProjectId,
shouldConnectFail ? "failed to connect" : "successfully connected",
linkedProjectId,
alias,
isIntialConnectAttempt ? "the initial connection" : "a reconnection"
);
assertThatLogger(() -> {
if (shouldConnectFail) {
assertThrows(RuntimeException.class, () -> waitForConnect(strategy));
} else {
waitForConnect(strategy);
}
},
strategy.getClass(),
new MockLog.SeenEventExpectation(
"connection strategy should log at "
+ expectedLogLevel
+ " after a "
+ (shouldConnectFail ? "failed" : "successful")
+ (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
strategy.getClass().getCanonicalName(),
expectedLogLevel,
expectedLogMessage
)
);
}
}
}
}

private MockTransportService startTransport(ThreadPool threadPool) {
boolean success = false;
final Settings s = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "cluster1").put("node.name", "node1").build();
MockTransportService newService = MockTransportService.createNewService(
s,
VersionInformation.CURRENT,
TransportVersion.current(),
threadPool
);
try {
newService.start();
newService.acceptIncomingRequests();
success = true;
return newService;
} finally {
if (success == false) {
newService.close();
}
}
}

private static void waitForConnect(RemoteConnectionStrategy strategy) {
PlainActionFuture<Void> connectFuture = new PlainActionFuture<>();
strategy.connect(connectFuture);
connectFuture.actionGet();
}

private static class FakeConnectionStrategy extends RemoteConnectionStrategy {

private final ConnectionStrategy strategy;
private boolean shouldConnectFail;

FakeConnectionStrategy(
ProjectId originProjectId,
ProjectId linkedProjectId,
String clusterAlias,
TransportService transportService,
RemoteConnectionManager connectionManager
) {
this(
originProjectId,
linkedProjectId,
clusterAlias,
transportService,
connectionManager,
randomFrom(RemoteConnectionStrategy.ConnectionStrategy.values())
);
}

FakeConnectionStrategy(
String clusterAlias,
TransportService transportService,
RemoteConnectionManager connectionManager,
RemoteConnectionStrategy.ConnectionStrategy strategy
) {
this(ProjectId.DEFAULT, ProjectId.DEFAULT, clusterAlias, transportService, connectionManager, strategy);
}

FakeConnectionStrategy(
ProjectId originProjectId,
ProjectId linkedProjectId,
String clusterAlias,
TransportService transportService,
RemoteConnectionManager connectionManager,
RemoteConnectionStrategy.ConnectionStrategy strategy
) {
super(switch (strategy) {
case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(clusterAlias).build();
case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(clusterAlias).build();
case PROXY -> new LinkedProjectConfig.ProxyLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias)
.build();
case SNIFF -> new LinkedProjectConfig.SniffLinkedProjectConfigBuilder(originProjectId, linkedProjectId, clusterAlias)
.build();
}, transportService, connectionManager);
this.strategy = strategy;
this.shouldConnectFail = false;
}

void setShouldConnectFail(boolean shouldConnectFail) {
this.shouldConnectFail = shouldConnectFail;
}

@Override
Expand All @@ -205,7 +345,11 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected void connectImpl(ActionListener<Void> listener) {

if (shouldConnectFail) {
listener.onFailure(new RuntimeException("simulated failure"));
} else {
listener.onResponse(null);
}
}

@Override
Expand Down