Skip to content

Commit 208db10

Browse files
committed
Ensure we don't use a remote profile if cluster name matches (#31331)
If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321
1 parent 8e19fe6 commit 208db10

File tree

4 files changed

+153
-11
lines changed

4 files changed

+153
-11
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
9292
private volatile boolean skipUnavailable;
9393
private final ConnectHandler connectHandler;
9494
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
95+
private final ClusterName localClusterName;
9596

9697
/**
9798
* Creates a new {@link RemoteClusterConnection}
@@ -105,6 +106,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
105106
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
106107
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
107108
super(settings);
109+
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
108110
this.transportService = transportService;
109111
this.maxNumRemoteConnections = maxNumRemoteConnections;
110112
this.nodePredicate = nodePredicate;
@@ -303,6 +305,21 @@ public boolean isClosed() {
303305
return connectHandler.isClosed();
304306
}
305307

308+
private ConnectionProfile getRemoteProfile(ClusterName name) {
309+
// we can only compare the cluster name to make a decision if we should use a remote profile
310+
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
311+
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
312+
// rather smallish optimization on the connection layer under certain situations where remote clusters
313+
// have the same name as the local one is minor here.
314+
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
315+
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
316+
if (this.localClusterName.equals(name)) {
317+
return null;
318+
} else {
319+
return remoteProfile;
320+
}
321+
}
322+
306323
/**
307324
* The connect handler manages node discovery and the actual connect to the remote cluster.
308325
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -412,7 +429,6 @@ protected void doRun() throws Exception {
412429
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
413430
}
414431
});
415-
416432
}
417433

418434
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
@@ -424,21 +440,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
424440
if (seedNodes.hasNext()) {
425441
cancellableThreads.executeIO(() -> {
426442
final DiscoveryNode seedNode = seedNodes.next();
427-
final DiscoveryNode handshakeNode;
443+
final TransportService.HandshakeResponse handshakeResponse;
428444
Transport.Connection connection = transportService.openConnection(seedNode,
429445
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
430446
boolean success = false;
431447
try {
432448
try {
433-
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
449+
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
434450
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
435451
} catch (IllegalStateException ex) {
436452
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
437453
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
438454
throw ex;
439455
}
456+
457+
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
440458
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
441-
transportService.connectToNode(handshakeNode, remoteProfile);
459+
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
460+
if (remoteClusterName.get() == null) {
461+
assert handshakeResponse.getClusterName().value() != null;
462+
remoteClusterName.set(handshakeResponse.getClusterName());
463+
}
442464
connectedNodes.add(handshakeNode);
443465
}
444466
ClusterStateRequest request = new ClusterStateRequest();
@@ -545,7 +567,8 @@ public void handleResponse(ClusterStateResponse response) {
545567
for (DiscoveryNode node : nodesIter) {
546568
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
547569
try {
548-
transportService.connectToNode(node, remoteProfile); // noop if node is connected
570+
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
571+
// connected
549572
connectedNodes.add(node);
550573
} catch (ConnectTransportException | IllegalStateException ex) {
551574
// ISE if we fail the handshake with an version incompatible node

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
330330
}
331331
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
332332
// We don't validate cluster names to allow for tribe node connections.
333-
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
333+
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
334334
if (node.equals(remote) == false) {
335335
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
336336
}
@@ -366,7 +366,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
366366
public DiscoveryNode handshake(
367367
final Transport.Connection connection,
368368
final long handshakeTimeout) throws ConnectTransportException {
369-
return handshake(connection, handshakeTimeout, clusterName::equals);
369+
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
370370
}
371371

372372
/**
@@ -378,11 +378,11 @@ public DiscoveryNode handshake(
378378
* @param connection the connection to a specific node
379379
* @param handshakeTimeout handshake timeout
380380
* @param clusterNamePredicate cluster name validation predicate
381-
* @return the connected node
381+
* @return the handshake response
382382
* @throws ConnectTransportException if the connection failed
383383
* @throws IllegalStateException if the handshake failed
384384
*/
385-
public DiscoveryNode handshake(
385+
public HandshakeResponse handshake(
386386
final Transport.Connection connection,
387387
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
388388
final HandshakeResponse response;
@@ -408,7 +408,7 @@ public HandshakeResponse newInstance() {
408408
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
409409
}
410410

411-
return response.discoveryNode;
411+
return response;
412412
}
413413

414414
static class HandshakeRequest extends TransportRequest {
@@ -449,6 +449,14 @@ public void writeTo(StreamOutput out) throws IOException {
449449
clusterName.writeTo(out);
450450
Version.writeVersion(version, out);
451451
}
452+
453+
public DiscoveryNode getDiscoveryNode() {
454+
return discoveryNode;
455+
}
456+
457+
public ClusterName getClusterName() {
458+
return clusterName;
459+
}
452460
}
453461

454462
public void disconnectFromNode(DiscoveryNode node) {

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,102 @@ public static MockTransportService startTransport(
148148
}
149149
}
150150

151+
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
152+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
153+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
154+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
155+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
156+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
157+
knownNodes.add(seedTransport.getLocalDiscoNode());
158+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
159+
Collections.shuffle(knownNodes, random());
160+
161+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
162+
service.start();
163+
service.acceptIncomingRequests();
164+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
165+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
166+
updateSeedNodes(connection, Arrays.asList(seedNode));
167+
assertTrue(service.nodeConnected(seedNode));
168+
assertTrue(service.nodeConnected(discoverableNode));
169+
assertTrue(connection.assertNoRunningConnections());
170+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
171+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
172+
@Override
173+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
174+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
175+
inst.readFrom(in);
176+
return inst;
177+
}
178+
});
179+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
180+
.build();
181+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
182+
options, futureHandler);
183+
futureHandler.txGet();
184+
}
185+
}
186+
}
187+
}
188+
189+
public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
190+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
191+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
192+
Settings.builder().put("cluster.name", "foobar").build());
193+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
194+
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
195+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
196+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
197+
knownNodes.add(seedTransport.getLocalDiscoNode());
198+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
199+
Collections.shuffle(knownNodes, random());
200+
201+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
202+
service.start();
203+
service.acceptIncomingRequests();
204+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
205+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
206+
updateSeedNodes(connection, Arrays.asList(seedNode));
207+
assertTrue(service.nodeConnected(seedNode));
208+
assertTrue(service.nodeConnected(discoverableNode));
209+
assertTrue(connection.assertNoRunningConnections());
210+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
211+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
212+
@Override
213+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
214+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
215+
inst.readFrom(in);
216+
return inst;
217+
}
218+
});
219+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
220+
.build();
221+
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
222+
service.sendRequest(discoverableNode,
223+
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
224+
futureHandler.txGet();
225+
}).getCause();
226+
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
227+
228+
PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
229+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
230+
@Override
231+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
232+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
233+
inst.readFrom(in);
234+
return inst;
235+
}
236+
});
237+
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
238+
.build();
239+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
240+
ops, handler);
241+
handler.txGet();
242+
}
243+
}
244+
}
245+
}
246+
151247
public void testDiscoverSingleNode() throws Exception {
152248
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
153249
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,22 @@ protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
198198
@Override
199199
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
200200
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
201-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE);
201+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
202+
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
203+
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
204+
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
205+
Set<TransportRequestOptions.Type> types = handle.getTypes();
206+
if (handle.length > 0) {
207+
allTypesWithConnection.addAll(types);
208+
} else {
209+
allTypesWithoutConnection.addAll(types);
210+
}
211+
}
212+
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
213+
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
214+
if (allTypesWithoutConnection.isEmpty() == false) {
215+
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
216+
}
202217
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
203218
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
204219
return builder.build();

0 commit comments

Comments
 (0)