Skip to content

Commit 47ecf74

Browse files
committed
[8.1] Make DiscoveryNodes behave like a collection (elastic#83453)
Backports elastic#83453 to 8.1 ``` Extend from `AbstractCollection`. Advantages: * Optimized version of `spliterator` based on the size implemented in `AbstractCollection` by default * Exposed `stream` method, so consumers don't need to call `StreamSupport.stream(discoveryNodes.spliterator(), false)` ```
1 parent 86d413b commit 47ecf74

File tree

15 files changed

+48
-50
lines changed

15 files changed

+48
-50
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import java.util.concurrent.CountDownLatch;
6464
import java.util.concurrent.TimeUnit;
6565
import java.util.stream.Collectors;
66-
import java.util.stream.StreamSupport;
6766

6867
import static org.hamcrest.Matchers.anyOf;
6968
import static org.hamcrest.Matchers.empty;
@@ -159,7 +158,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
159158
if (randomBoolean()) {
160159
internalCluster().startNodes(randomIntBetween(1, 3));
161160
}
162-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
161+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
163162
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4));
164163
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
165164
Set<TestRequest> pendingRequests = allowPartialRequest(rootRequest);
@@ -212,7 +211,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
212211
}
213212

214213
public void testCancelTaskMultipleTimes() throws Exception {
215-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
214+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
216215
TestRequest rootRequest = generateTestRequest(nodes, 0, randomIntBetween(1, 3));
217216
ActionFuture<TestResponse> mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
218217
TaskId taskId = getRootTaskId(rootRequest);
@@ -244,7 +243,7 @@ public void testCancelTaskMultipleTimes() throws Exception {
244243
}
245244

246245
public void testDoNotWaitForCompletion() throws Exception {
247-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
246+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
248247
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
249248
ActionFuture<TestResponse> mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
250249
TaskId taskId = getRootTaskId(rootRequest);
@@ -270,7 +269,7 @@ public void testDoNotWaitForCompletion() throws Exception {
270269
}
271270

272271
public void testFailedToStartChildTaskAfterCancelled() throws Exception {
273-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
272+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
274273
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
275274
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
276275
TaskId taskId = getRootTaskId(rootRequest);
@@ -290,7 +289,7 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception {
290289

291290
public void testCancelOrphanedTasks() throws Exception {
292291
final String nodeWithRootTask = internalCluster().startDataOnlyNode();
293-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
292+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
294293
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
295294
client(nodeWithRootTask).execute(TransportTestAction.ACTION, rootRequest);
296295
allowPartialRequest(rootRequest);
@@ -315,7 +314,7 @@ public void testCancelOrphanedTasks() throws Exception {
315314
}
316315

317316
public void testRemoveBanParentsOnDisconnect() throws Exception {
318-
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
317+
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
319318
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4));
320319
client().execute(TransportTestAction.ACTION, rootRequest);
321320
Set<TestRequest> pendingRequests = allowPartialRequest(rootRequest);

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicReference;
4949
import java.util.stream.Collectors;
50-
import java.util.stream.StreamSupport;
5150

5251
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5352
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -96,7 +95,10 @@ public void testRemoteClusterClientRole() throws Exception {
9695
);
9796

9897
final String nodeWithRemoteClusterClientRole = randomFrom(
99-
StreamSupport.stream(localCluster.clusterService().state().nodes().spliterator(), false)
98+
localCluster.clusterService()
99+
.state()
100+
.nodes()
101+
.stream()
100102
.map(DiscoveryNode::getName)
101103
.filter(nodeName -> nodeWithoutRemoteClusterClientRole.equals(nodeName) == false)
102104
.filter(nodeName -> nodeName.equals(pureDataNode) == false)
@@ -163,7 +165,10 @@ public void testCancel() throws Exception {
163165
final Settings.Builder allocationFilter = Settings.builder();
164166
if (randomBoolean()) {
165167
remoteCluster.ensureAtLeastNumDataNodes(3);
166-
List<String> remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false)
168+
List<String> remoteDataNodes = remoteCluster.clusterService()
169+
.state()
170+
.nodes()
171+
.stream()
167172
.filter(DiscoveryNode::canContainData)
168173
.map(DiscoveryNode::getName)
169174
.collect(Collectors.toList());

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchLeakIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Collection;
3030
import java.util.List;
3131
import java.util.stream.Collectors;
32-
import java.util.stream.StreamSupport;
3332

3433
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
3534
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -78,7 +77,10 @@ public void testSearch() throws Exception {
7877
final InternalTestCluster remoteCluster = cluster("cluster_a");
7978
int minRemotes = between(2, 5);
8079
remoteCluster.ensureAtLeastNumDataNodes(minRemotes);
81-
List<String> remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false)
80+
List<String> remoteDataNodes = remoteCluster.clusterService()
81+
.state()
82+
.nodes()
83+
.stream()
8284
.filter(DiscoveryNode::canContainData)
8385
.map(DiscoveryNode::getName)
8486
.collect(Collectors.toList());

server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingConfigExclusionsRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Set;
2727
import java.util.function.Function;
2828
import java.util.stream.Collectors;
29-
import java.util.stream.StreamSupport;
3029

3130
/**
3231
* A request to add voting config exclusions for certain master-eligible nodes, and wait for these nodes to be removed from the voting
@@ -102,7 +101,7 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentSta
102101
}
103102
} else {
104103
assert nodeNames.length > 0;
105-
Map<String, DiscoveryNode> existingNodes = StreamSupport.stream(allNodes.spliterator(), false)
104+
Map<String, DiscoveryNode> existingNodes = allNodes.stream()
106105
.collect(Collectors.toMap(DiscoveryNode::getName, Function.identity()));
107106

108107
for (String nodeName : nodeNames) {

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,15 +1085,17 @@ ClusterState improveConfiguration(ClusterState clusterState) {
10851085
// ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in
10861086
// the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes
10871087
// the logging much harder to follow.
1088-
final Stream<String> masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
1088+
final Stream<String> masterIneligibleNodeIdsInVotingConfig = clusterState.nodes()
1089+
.stream()
10891090
.filter(
10901091
n -> n.isMasterNode() == false
10911092
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
10921093
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId()))
10931094
)
10941095
.map(DiscoveryNode::getId);
10951096

1096-
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
1097+
final Set<DiscoveryNode> liveNodes = clusterState.nodes()
1098+
.stream()
10971099
.filter(DiscoveryNode::isMasterNode)
10981100
.filter(coordinationState.get()::containsJoinVoteFor)
10991101
.collect(Collectors.toSet());

server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.core.Nullable;
2323

2424
import java.io.IOException;
25+
import java.util.AbstractCollection;
2526
import java.util.ArrayList;
2627
import java.util.Collection;
2728
import java.util.Collections;
@@ -34,15 +35,13 @@
3435
import java.util.Set;
3536
import java.util.function.Consumer;
3637
import java.util.function.Predicate;
37-
import java.util.stream.Collectors;
3838
import java.util.stream.Stream;
39-
import java.util.stream.StreamSupport;
4039

4140
/**
4241
* This class holds all {@link DiscoveryNode} in the cluster and provides convenience methods to
4342
* access, modify merge / diff discovery nodes.
4443
*/
45-
public class DiscoveryNodes implements SimpleDiffable<DiscoveryNodes>, Iterable<DiscoveryNode> {
44+
public class DiscoveryNodes extends AbstractCollection<DiscoveryNode> implements SimpleDiffable<DiscoveryNodes> {
4645

4746
public static final DiscoveryNodes EMPTY_NODES = builder().build();
4847

@@ -84,6 +83,11 @@ public Iterator<DiscoveryNode> iterator() {
8483
return nodes.valuesIt();
8584
}
8685

86+
@Override
87+
public int size() {
88+
return nodes.size();
89+
}
90+
8791
/**
8892
* Returns {@code true} if the local node is the elected master node.
8993
*/
@@ -167,17 +171,14 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
167171
* @return
168172
*/
169173
public Collection<DiscoveryNode> getAllNodes() {
170-
return StreamSupport.stream(this.spliterator(), false).collect(Collectors.toUnmodifiableList());
174+
return this;
171175
}
172176

173177
/**
174178
* Returns a stream of all nodes, with master nodes at the front
175179
*/
176180
public Stream<DiscoveryNode> mastersFirstStream() {
177-
return Stream.concat(
178-
masterNodes.stream().map(Map.Entry::getValue),
179-
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false)
180-
);
181+
return Stream.concat(masterNodes.stream().map(Map.Entry::getValue), stream().filter(n -> n.isMasterNode() == false));
181182
}
182183

183184
/**
@@ -339,7 +340,7 @@ public DiscoveryNode resolveNode(String node) {
339340
*/
340341
public String[] resolveNodes(String... nodes) {
341342
if (nodes == null || nodes.length == 0) {
342-
return StreamSupport.stream(this.spliterator(), false).map(DiscoveryNode::getId).toArray(String[]::new);
343+
return stream().map(DiscoveryNode::getId).toArray(String[]::new);
343344
} else {
344345
Set<String> resolvedNodesIds = new HashSet<>(nodes.length);
345346
for (String nodeId : nodes) {

server/src/test/java/org/elasticsearch/cluster/coordination/NodeRemovalClusterStateTaskExecutorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222
import java.util.concurrent.atomic.AtomicReference;
2323
import java.util.stream.Collectors;
24-
import java.util.stream.StreamSupport;
2524

2625
import static org.hamcrest.CoreMatchers.equalTo;
2726
import static org.mockito.ArgumentMatchers.any;
@@ -45,7 +44,8 @@ public void testRemovingNonExistentNodes() throws Exception {
4544
for (int i = nodes; i < nodes + randomIntBetween(1, 16); i++) {
4645
removeBuilder.add(node(i));
4746
}
48-
final List<NodeRemovalClusterStateTaskExecutor.Task> tasks = StreamSupport.stream(removeBuilder.build().spliterator(), false)
47+
final List<NodeRemovalClusterStateTaskExecutor.Task> tasks = removeBuilder.build()
48+
.stream()
4949
.map(node -> new NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed", () -> {}))
5050
.collect(Collectors.toList());
5151

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.TreeSet;
3838
import java.util.function.Function;
3939
import java.util.stream.Collectors;
40-
import java.util.stream.StreamSupport;
4140

4241
public class AutoscalingCalculateCapacityService implements PolicyValidator {
4342
private final Map<String, AutoscalingDeciderService> deciderByName;
@@ -246,7 +245,8 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
246245
this.clusterInfo = clusterInfo;
247246
this.snapshotShardSizeInfo = snapshotShardSizeInfo;
248247
this.memoryInfo = memoryInfo;
249-
this.currentNodes = StreamSupport.stream(state.nodes().spliterator(), false)
248+
this.currentNodes = state.nodes()
249+
.stream()
250250
.filter(this::rolesFilter)
251251
.collect(Collectors.toCollection(() -> new TreeSet<>(AutoscalingDeciderResults.DISCOVERY_NODE_COMPARATOR)));
252252
this.currentCapacity = calculateCurrentCapacity();

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ void onClusterChanged(ClusterChangedEvent event) {
8585

8686
Set<DiscoveryNode> relevantNodes(ClusterState state) {
8787
final Set<Set<DiscoveryNodeRole>> roleSets = calculateAutoscalingRoleSets(state);
88-
return StreamSupport.stream(state.nodes().spliterator(), false)
89-
.filter(n -> roleSets.contains(n.getRoles()))
90-
.collect(Collectors.toSet());
88+
return state.nodes().stream().filter(n -> roleSets.contains(n.getRoles())).collect(Collectors.toSet());
9189
}
9290

9391
private Set<DiscoveryNode> addMissingNodes(Set<DiscoveryNode> nodes) {

x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.TreeSet;
4040
import java.util.stream.Collectors;
4141
import java.util.stream.IntStream;
42-
import java.util.stream.StreamSupport;
4342

4443
import static org.hamcrest.Matchers.containsString;
4544
import static org.hamcrest.Matchers.equalTo;
@@ -209,7 +208,7 @@ public void testContext() {
209208
);
210209

211210
assertThat(context.nodes().size(), equalTo(1));
212-
assertThat(context.nodes(), equalTo(StreamSupport.stream(state.nodes().spliterator(), false).collect(Collectors.toSet())));
211+
assertThat(context.nodes(), equalTo(state.nodes().stream().collect(Collectors.toSet())));
213212
if (hasDataRole) {
214213
assertNull(context.currentCapacity());
215214
} else {

0 commit comments

Comments
 (0)