Skip to content

Commit 41260ef

Browse files
committed
Merge branch 'master' into mapping-version
* master: Fix a mappings update test (elastic#33146) Reload Secure Settings REST specs & docs (elastic#32990) Refactor CachingUsernamePassword realm (elastic#32646) Add proxy support to RemoteClusterConnection (elastic#33062)
2 parents e7cdfd2 + f8b07a0 commit 41260ef

File tree

16 files changed

+542
-179
lines changed

16 files changed

+542
-179
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,7 @@ public void testApiNamingConventions() throws Exception {
685685
"nodes.stats",
686686
"nodes.hot_threads",
687687
"nodes.usage",
688+
"nodes.reload_secure_settings",
688689
"search_shards",
689690
};
690691
Set<String> deprecatedMethods = new HashSet<>();
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
[[cluster-nodes-reload-secure-settings]]
2+
== Nodes Reload Secure Settings
3+
4+
The cluster nodes reload secure settings API is used to re-read the
5+
local node's encrypted keystore. Specifically, it will prompt the keystore
6+
decryption and reading accross the cluster. The keystore's plain content is
7+
used to reinitialize all compatible plugins. A compatible plugin can be
8+
reinitilized without restarting the node. The operation is
9+
complete when all compatible plugins have finished reinitilizing. Subsequently,
10+
the keystore is closed and any changes to it will not be reflected on the node.
11+
12+
[source,js]
13+
--------------------------------------------------
14+
POST _nodes/reload_secure_settings
15+
POST _nodes/nodeId1,nodeId2/reload_secure_settings
16+
--------------------------------------------------
17+
// CONSOLE
18+
// TEST[setup:node]
19+
// TEST[s/nodeId1,nodeId2/*/]
20+
21+
The first command reloads the keystore on each node. The seconds allows
22+
to selectively target `nodeId1` and `nodeId2`. The node selection options are
23+
detailed <<cluster-nodes,here>>.
24+
25+
Note: It is an error if secure settings are inconsistent across the cluster
26+
nodes, yet this consistency is not enforced whatsoever. Hence, reloading specific
27+
nodes is not standard. It is only justifiable when retrying failed reload operations.
28+
29+
[float]
30+
[[rest-reload-secure-settings]]
31+
==== REST Reload Secure Settings Response
32+
33+
The response contains the `nodes` object, which is a map, keyed by the
34+
node id. Each value has the node `name` and an optional `reload_exception`
35+
field. The `reload_exception` field is a serialization of the exception
36+
that was thrown during the reload process, if any.
37+
38+
[source,js]
39+
--------------------------------------------------
40+
{
41+
"_nodes": {
42+
"total": 1,
43+
"successful": 1,
44+
"failed": 0
45+
},
46+
"cluster_name": "my_cluster",
47+
"nodes": {
48+
"pQHNt5rXTTWNvUgOrdynKg": {
49+
"name": "node-0"
50+
}
51+
}
52+
}
53+
--------------------------------------------------
54+
// TESTRESPONSE[s/"my_cluster"/$body.cluster_name/]
55+
// TESTRESPONSE[s/"pQHNt5rXTTWNvUgOrdynKg"/\$node_name/]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"nodes.reload_secure_settings": {
3+
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-nodes-reload-secure-settings.html",
4+
"methods": ["POST"],
5+
"url": {
6+
"path": "/_nodes/reload_secure_settings",
7+
"paths": ["/_nodes/reload_secure_settings", "/_nodes/{node_id}/reload_secure_settings"],
8+
"parts": {
9+
"node_id": {
10+
"type": "list",
11+
"description": "A comma-separated list of node IDs to span the reload/reinit call. Should stay empty because reloading usually involves all cluster nodes."
12+
}
13+
},
14+
"params": {
15+
"timeout": {
16+
"type" : "time",
17+
"description" : "Explicit operation timeout"
18+
}
19+
}
20+
},
21+
"body": null
22+
}
23+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"node_reload_secure_settings test":
3+
4+
- do:
5+
nodes.reload_secure_settings: {}
6+
7+
- is_true: nodes
8+
- is_true: cluster_name

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ public void apply(Settings value, Settings current, Settings previous) {
272272
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
273273
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
274274
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
275+
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
275276
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
276277
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
277278
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,

server/src/main/java/org/elasticsearch/common/settings/Setting.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,10 @@ public static Setting<String> simpleString(String key, Property... properties) {
10091009
return new Setting<>(key, s -> "", Function.identity(), properties);
10101010
}
10111011

1012+
public static Setting<String> simpleString(String key, Function<String, String> parser, Property... properties) {
1013+
return new Setting<>(key, s -> "", parser, properties);
1014+
}
1015+
10121016
public static Setting<String> simpleString(String key, Setting<String> fallback, Property... properties) {
10131017
return new Setting<>(key, fallback, Function.identity(), properties);
10141018
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import java.util.EnumSet;
2122
import java.util.function.Supplier;
2223
import org.elasticsearch.Version;
2324
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.Strings;
27+
import org.elasticsearch.common.UUIDs;
28+
import org.elasticsearch.common.collect.Tuple;
2529
import org.elasticsearch.common.component.AbstractComponent;
2630
import org.elasticsearch.common.settings.ClusterSettings;
2731
import org.elasticsearch.common.settings.Setting;
@@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent {
6670
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
6771
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
6872

73+
/**
74+
* A proxy address for the remote cluster.
75+
* NOTE: this settings is undocumented until we have at last one transport that supports passing
76+
* on the hostname via a mechanism like SNI.
77+
*/
78+
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
79+
"search.remote.",
80+
"proxy",
81+
key -> Setting.simpleString(key, s -> {
82+
if (Strings.hasLength(s)) {
83+
parsePort(s);
84+
}
85+
return s;
86+
}, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
87+
88+
6989
protected final ClusterNameExpressionResolver clusterNameResolver;
7090

7191
/**
@@ -77,25 +97,42 @@ protected RemoteClusterAware(Settings settings) {
7797
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
7898
}
7999

80-
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
100+
/**
101+
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
102+
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
103+
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
104+
*/
105+
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
81106
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
82107
return allConcreteSettings.collect(
83108
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
84109
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
85110
List<String> addresses = concreteSetting.get(settings);
111+
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
86112
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
87113
for (String address : addresses) {
88-
nodes.add(() -> {
89-
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
90-
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
91-
transportAddress,
92-
Version.CURRENT.minimumCompatibilityVersion());
93-
});
114+
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
94115
}
95-
return nodes;
116+
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
96117
}));
97118
}
98119

120+
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
121+
if (proxyMode) {
122+
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
123+
String hostName = address.substring(0, indexOfPortSeparator(address));
124+
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
125+
transportAddress, Collections
126+
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
127+
Version.CURRENT.minimumCompatibilityVersion());
128+
} else {
129+
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
130+
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
131+
transportAddress,
132+
Version.CURRENT.minimumCompatibilityVersion());
133+
}
134+
}
135+
99136
/**
100137
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
101138
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
@@ -138,20 +175,24 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
138175

139176
protected abstract Set<String> getRemoteClusterNames();
140177

178+
141179
/**
142180
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
143181
* empty the cluster alias is unregistered and should be removed.
144182
*/
145-
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);
183+
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
146184

147185
/**
148186
* Registers this instance to listen to updates on the cluster settings.
149187
*/
150188
public void listenForUpdates(ClusterSettings clusterSettings) {
151-
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster,
189+
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
190+
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
191+
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
152192
(namespace, value) -> {});
153193
}
154194

195+
155196
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
156197
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
157198
InetAddress hostAddress;

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import java.net.InetSocketAddress;
2122
import java.util.function.Supplier;
2223
import org.apache.logging.log4j.message.ParameterizedMessage;
2324
import org.apache.lucene.store.AlreadyClosedException;
@@ -88,6 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8889
private final int maxNumRemoteConnections;
8990
private final Predicate<DiscoveryNode> nodePredicate;
9091
private final ThreadPool threadPool;
92+
private volatile String proxyAddress;
9193
private volatile List<Supplier<DiscoveryNode>> seedNodes;
9294
private volatile boolean skipUnavailable;
9395
private final ConnectHandler connectHandler;
@@ -106,6 +108,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
106108
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
107109
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
108110
Predicate<DiscoveryNode> nodePredicate) {
111+
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
112+
}
113+
114+
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
115+
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode>
116+
nodePredicate,
117+
String proxyAddress) {
109118
super(settings);
110119
this.transportService = transportService;
111120
this.maxNumRemoteConnections = maxNumRemoteConnections;
@@ -130,13 +139,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
130139
connectionManager.addListener(this);
131140
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
132141
connectionManager.addListener(transportService);
142+
this.proxyAddress = proxyAddress;
143+
}
144+
145+
private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
146+
if (proxyAddress == null || proxyAddress.isEmpty()) {
147+
return node;
148+
} else {
149+
// resovle proxy address lazy here
150+
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
151+
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
152+
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
153+
}
133154
}
134155

135156
/**
136157
* Updates the list of seed nodes for this cluster connection
137158
*/
138-
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
159+
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
139160
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
161+
this.proxyAddress = proxyAddress;
140162
connectHandler.connect(connectListener);
141163
}
142164

@@ -281,6 +303,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
281303
return new ProxyConnection(connection, remoteClusterNode);
282304
}
283305

306+
284307
static final class ProxyConnection implements Transport.Connection {
285308
private final Transport.Connection proxyConnection;
286309
private final DiscoveryNode targetNode;
@@ -461,7 +484,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
461484
try {
462485
if (seedNodes.hasNext()) {
463486
cancellableThreads.executeIO(() -> {
464-
final DiscoveryNode seedNode = seedNodes.next().get();
487+
final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get());
465488
final TransportService.HandshakeResponse handshakeResponse;
466489
Transport.Connection connection = manager.openConnection(seedNode,
467490
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
@@ -476,7 +499,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
476499
throw ex;
477500
}
478501

479-
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
502+
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
480503
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
481504
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
482505
if (remoteClusterName.get() == null) {
@@ -583,7 +606,8 @@ public void handleResponse(ClusterStateResponse response) {
583606
cancellableThreads.executeIO(() -> {
584607
DiscoveryNodes nodes = response.getState().nodes();
585608
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
586-
for (DiscoveryNode node : nodesIter) {
609+
for (DiscoveryNode n : nodesIter) {
610+
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
587611
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
588612
try {
589613
connectionManager.connectToNode(node, remoteProfile,
@@ -646,7 +670,8 @@ void addConnectedNode(DiscoveryNode node) {
646670
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
647671
*/
648672
public RemoteConnectionInfo getConnectionInfo() {
649-
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
673+
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
674+
(Collectors.toList());
650675
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
651676
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
652677
initialConnectionTimeout, skipUnavailable);

0 commit comments

Comments
 (0)