Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/reference/modules/remote-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ PUT _cluster/settings
by default, but they can selectively be made optional by setting this setting
to `true`.

`cluster.remote.${cluster_alias}.transport.ping_schedule`::

Sets the time interval between regular application-level ping messages that
are sent to ensure that transport connections to nodes belonging to remote
clusters are kept alive. If set to `-1`, application-level ping messages to
this remote cluster are not sent. If unset, application-level ping messages
are sent according to the global `transport.ping_schedule` setting, which
defaults to ``-1` meaning that pings are not sent.

[float]
[[retrieve-remote-clusters-info]]
=== Retrieving remote clusters info
Expand Down
6 changes: 3 additions & 3 deletions docs/reference/modules/transport.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ between all nodes. Defaults to `false`.

|`transport.ping_schedule` | Schedule a regular application-level ping message
to ensure that transport connections between nodes are kept alive. Defaults to
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to
correctly configure TCP keep-alives instead of using this feature, because TCP
keep-alives apply to all kinds of long-lived connection and not just to
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable
to correctly configure TCP keep-alives instead of using this feature, because
TCP keep-alives apply to all kinds of long-lived connections and not just to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debates rage to this day on the internet about the use of singular or plural after "kinds of". I suspect that British English prefers the singular and US English the plural, and both are ok 😄 🇬🇧 (I'm ok with this change, just thought you'd like to know)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting :)

transport connections.

|=======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable {
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.defaultProfile = defaultProfile;
this.pingSchedule = pingSchedule;
this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings);
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
}
Expand Down Expand Up @@ -252,6 +251,10 @@ private void ensureOpen() {
}
}

TimeValue getPingSchedule() {
return pingSchedule;
}

private class ScheduledPing extends AbstractLifecycleRunnable {

private ScheduledPing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.elasticsearch.transport;

import java.net.InetSocketAddress;
import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -48,6 +46,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -64,6 +63,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -104,17 +104,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param connectionManager the connection manager to use for this remote connection
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
}

RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode>
nodePredicate,
String proxyAddress) {
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.timeSetting;

/**
* Basic service for accessing remote clusters via gateway nodes
Expand Down Expand Up @@ -166,6 +167,12 @@ public String getKey(final String key) {
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());

Expand Down Expand Up @@ -211,10 +218,13 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Su
}

if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings), proxyAddress);
remoteClusters.put(entry.getKey(), remote);
String clusterAlias = entry.getKey();
TimeValue pingSchedule = REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings);
ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport,
transportService.threadPool, pingSchedule);
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, connectionManager,
numRemoteConnections, getNodePredicate(settings), proxyAddress);
remoteClusters.put(clusterAlias, remote);
}

// now update the seed nodes no matter if it's new or already existing
Expand Down Expand Up @@ -340,31 +350,27 @@ public void onFailure(Exception e) {
* @throws IllegalArgumentException if the remote cluster is unknown
*/
public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection(node);
return getRemoteClusterConnection(cluster).getConnection(node);
}

/**
* Ensures that the given cluster alias is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
}
remoteClusterConnection.ensureConnected(listener);
void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
}

public Transport.Connection getConnection(String cluster) {
return getRemoteClusterConnection(cluster).getConnection();
}

RemoteClusterConnection getRemoteClusterConnection(String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection();
return connection;
}

@Override
Expand All @@ -386,7 +392,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}


@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));
Expand Down
Loading