diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java index ad50fa21f114a..3907112b8759f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java @@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; @@ -159,7 +158,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception { if (randomBoolean()) { internalCluster().startNodes(randomIntBetween(1, 3)); } - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4)); ActionFuture rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); Set pendingRequests = allowPartialRequest(rootRequest); @@ -212,7 +211,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception { } public void testCancelTaskMultipleTimes() throws Exception { - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); TestRequest rootRequest = generateTestRequest(nodes, 0, randomIntBetween(1, 3)); ActionFuture mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); TaskId taskId = getRootTaskId(rootRequest); @@ -244,7 +243,7 @@ public void testCancelTaskMultipleTimes() throws Exception { } public void testDoNotWaitForCompletion() throws Exception { - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3)); ActionFuture mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); TaskId taskId = getRootTaskId(rootRequest); @@ -270,7 +269,7 @@ public void testDoNotWaitForCompletion() throws Exception { } public void testFailedToStartChildTaskAfterCancelled() throws Exception { - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3)); ActionFuture rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); TaskId taskId = getRootTaskId(rootRequest); @@ -290,7 +289,7 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception { public void testCancelOrphanedTasks() throws Exception { final String nodeWithRootTask = internalCluster().startDataOnlyNode(); - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3)); client(nodeWithRootTask).execute(TransportTestAction.ACTION, rootRequest); allowPartialRequest(rootRequest); @@ -315,7 +314,7 @@ public void testCancelOrphanedTasks() throws Exception { } public void testRemoveBanParentsOnDisconnect() throws Exception { - Set nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet()); + Set nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4)); client().execute(TransportTestAction.ACTION, rootRequest); Set pendingRequests = allowPartialRequest(rootRequest); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java index 543c71ec234e5..85befbd159729 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -96,7 +95,10 @@ public void testRemoteClusterClientRole() throws Exception { ); final String nodeWithRemoteClusterClientRole = randomFrom( - StreamSupport.stream(localCluster.clusterService().state().nodes().spliterator(), false) + localCluster.clusterService() + .state() + .nodes() + .stream() .map(DiscoveryNode::getName) .filter(nodeName -> nodeWithoutRemoteClusterClientRole.equals(nodeName) == false) .filter(nodeName -> nodeName.equals(pureDataNode) == false) @@ -163,7 +165,10 @@ public void testCancel() throws Exception { final Settings.Builder allocationFilter = Settings.builder(); if (randomBoolean()) { remoteCluster.ensureAtLeastNumDataNodes(3); - List remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false) + List remoteDataNodes = remoteCluster.clusterService() + .state() + .nodes() + .stream() .filter(DiscoveryNode::canContainData) .map(DiscoveryNode::getName) .collect(Collectors.toList()); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchLeakIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchLeakIT.java index 8868b87d7420e..f119bb161524a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchLeakIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchLeakIT.java @@ -29,7 +29,6 @@ import java.util.Collection; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -78,7 +77,10 @@ public void testSearch() throws Exception { final InternalTestCluster remoteCluster = cluster("cluster_a"); int minRemotes = between(2, 5); remoteCluster.ensureAtLeastNumDataNodes(minRemotes); - List remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false) + List remoteDataNodes = remoteCluster.clusterService() + .state() + .nodes() + .stream() .filter(DiscoveryNode::canContainData) .map(DiscoveryNode::getName) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingConfigExclusionsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingConfigExclusionsRequest.java index 9d518f9fc6fe6..e9473228ba1f4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingConfigExclusionsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingConfigExclusionsRequest.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * A request to add voting config exclusions for certain master-eligible nodes, and wait for these nodes to be removed from the voting @@ -102,7 +101,7 @@ Set resolveVotingConfigExclusions(ClusterState currentSta } } else { assert nodeNames.length > 0; - Map existingNodes = StreamSupport.stream(allNodes.spliterator(), false) + Map existingNodes = allNodes.stream() .collect(Collectors.toMap(DiscoveryNode::getName, Function.identity())); for (String nodeName : nodeNames) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3c6ec286de6db..ac172ad64620b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -1085,7 +1085,8 @@ ClusterState improveConfiguration(ClusterState clusterState) { // ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in // the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes // the logging much harder to follow. - final Stream masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false) + final Stream masterIneligibleNodeIdsInVotingConfig = clusterState.nodes() + .stream() .filter( n -> n.isMasterNode() == false && (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId()) @@ -1093,7 +1094,8 @@ ClusterState improveConfiguration(ClusterState clusterState) { ) .map(DiscoveryNode::getId); - final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) + final Set liveNodes = clusterState.nodes() + .stream() .filter(DiscoveryNode::isMasterNode) .filter(coordinationState.get()::containsJoinVoteFor) .collect(Collectors.toSet()); diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index e29b08088aaa4..64734f936fe68 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -22,6 +22,7 @@ import org.elasticsearch.core.Nullable; import java.io.IOException; +import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -34,15 +35,13 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** * This class holds all {@link DiscoveryNode} in the cluster and provides convenience methods to * access, modify merge / diff discovery nodes. */ -public class DiscoveryNodes implements SimpleDiffable, Iterable { +public class DiscoveryNodes extends AbstractCollection implements SimpleDiffable { public static final DiscoveryNodes EMPTY_NODES = builder().build(); @@ -84,6 +83,11 @@ public Iterator iterator() { return nodes.valuesIt(); } + @Override + public int size() { + return nodes.size(); + } + /** * Returns {@code true} if the local node is the elected master node. */ @@ -167,17 +171,14 @@ public ImmutableOpenMap getCoordinatingOnlyNodes() { * @return */ public Collection getAllNodes() { - return StreamSupport.stream(this.spliterator(), false).collect(Collectors.toUnmodifiableList()); + return this; } /** * Returns a stream of all nodes, with master nodes at the front */ public Stream mastersFirstStream() { - return Stream.concat( - masterNodes.stream().map(Map.Entry::getValue), - StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false) - ); + return Stream.concat(masterNodes.stream().map(Map.Entry::getValue), stream().filter(n -> n.isMasterNode() == false)); } /** @@ -339,7 +340,7 @@ public DiscoveryNode resolveNode(String node) { */ public String[] resolveNodes(String... nodes) { if (nodes == null || nodes.length == 0) { - return StreamSupport.stream(this.spliterator(), false).map(DiscoveryNode::getId).toArray(String[]::new); + return stream().map(DiscoveryNode::getId).toArray(String[]::new); } else { Set resolvedNodesIds = new HashSet<>(nodes.length); for (String nodeId : nodes) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeRemovalClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeRemovalClusterStateTaskExecutorTests.java index c4d0b55fa9d8c..91c69fa7b44fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeRemovalClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeRemovalClusterStateTaskExecutorTests.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.ArgumentMatchers.any; @@ -45,7 +44,8 @@ public void testRemovingNonExistentNodes() throws Exception { for (int i = nodes; i < nodes + randomIntBetween(1, 16); i++) { removeBuilder.add(node(i)); } - final List tasks = StreamSupport.stream(removeBuilder.build().spliterator(), false) + final List tasks = removeBuilder.build() + .stream() .map(node -> new NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed", () -> {})) .collect(Collectors.toList()); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java index d3806d69f3bd5..27ba96a4dfeed 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java @@ -37,7 +37,6 @@ import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public class AutoscalingCalculateCapacityService implements PolicyValidator { private final Map deciderByName; @@ -246,7 +245,8 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte this.clusterInfo = clusterInfo; this.snapshotShardSizeInfo = snapshotShardSizeInfo; this.memoryInfo = memoryInfo; - this.currentNodes = StreamSupport.stream(state.nodes().spliterator(), false) + this.currentNodes = state.nodes() + .stream() .filter(this::rolesFilter) .collect(Collectors.toCollection(() -> new TreeSet<>(AutoscalingDeciderResults.DISCOVERY_NODE_COMPARATOR))); this.currentCapacity = calculateCurrentCapacity(); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoService.java index 4e3db7f8bf31e..5550df57c391a 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoService.java @@ -85,9 +85,7 @@ void onClusterChanged(ClusterChangedEvent event) { Set relevantNodes(ClusterState state) { final Set> roleSets = calculateAutoscalingRoleSets(state); - return StreamSupport.stream(state.nodes().spliterator(), false) - .filter(n -> roleSets.contains(n.getRoles())) - .collect(Collectors.toSet()); + return state.nodes().stream().filter(n -> roleSets.contains(n.getRoles())).collect(Collectors.toSet()); } private Set addMissingNodes(Set nodes) { diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java index 6f863a5bf0fe7..5bae8269e4eeb 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java @@ -39,7 +39,6 @@ import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -209,7 +208,7 @@ public void testContext() { ); assertThat(context.nodes().size(), equalTo(1)); - assertThat(context.nodes(), equalTo(StreamSupport.stream(state.nodes().spliterator(), false).collect(Collectors.toSet()))); + assertThat(context.nodes(), equalTo(state.nodes().stream().collect(Collectors.toSet()))); if (hasDataRole) { assertNull(context.currentCapacity()); } else { diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java index bad3417478577..95bca4a437f28 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java @@ -521,9 +521,7 @@ private static class TestAutoscalingDeciderContext implements AutoscalingDecider private TestAutoscalingDeciderContext(ClusterState state, Set roles, AutoscalingCapacity currentCapacity) { this.state = state; this.currentCapacity = currentCapacity; - this.nodes = StreamSupport.stream(state.nodes().spliterator(), false) - .filter(n -> roles.stream().anyMatch(n.getRoles()::contains)) - .collect(Collectors.toSet()); + this.nodes = state.nodes().stream().filter(n -> roles.stream().anyMatch(n.getRoles()::contains)).collect(Collectors.toSet()); this.roles = roles; this.info = createClusterInfo(state); } @@ -567,7 +565,8 @@ public void ensureNotCancelled() { private static ClusterInfo createClusterInfo(ClusterState state) { // we make a simple setup to detect the right decisions are made. The unmovable calculation is tested in more detail elsewhere. // the diskusage is set such that the disk threshold decider never rejects an allocation. - Map diskUsages = StreamSupport.stream(state.nodes().spliterator(), false) + Map diskUsages = state.nodes() + .stream() .collect(Collectors.toMap(DiscoveryNode::getId, node -> new DiskUsage(node.getId(), null, "the_path", 1000, 1000))); ImmutableOpenMap immutableDiskUsages = ImmutableOpenMap.builder().putAll(diskUsages).build(); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java index c644cbd656708..5039d7fb0da43 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java @@ -60,7 +60,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -241,7 +240,7 @@ public void testSizeOf() { ); ShardRouting primaryShard = subjectRoutings.primaryShard(); ShardRouting replicaShard = subjectRoutings.replicaShards().get(0); - DiscoveryNode[] nodes = StreamSupport.stream(initialClusterState.nodes().spliterator(), false).toArray(DiscoveryNode[]::new); + DiscoveryNode[] nodes = initialClusterState.nodes().toArray(DiscoveryNode[]::new); boolean useReplica = randomBoolean(); if (useReplica || randomBoolean()) { startShard(allocation, primaryShard, nodes[0].getId()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 260353e82697d..f22cb6bf8e7b2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -105,7 +105,6 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; @SuppressWarnings("HiddenField") public class XPackPlugin extends XPackClientPlugin @@ -262,7 +261,7 @@ public static boolean isReadyForXPackCustomMetadata(ClusterState clusterState) { */ public static List nodesNotReadyForXPackCustomMetadata(ClusterState clusterState) { // check that all nodes would be capable of deserializing newly added x-pack metadata - final List notReadyNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false).filter(node -> { + final List notReadyNodes = clusterState.nodes().stream().filter(node -> { final String xpackInstalledAttr = node.getAttributes().getOrDefault(XPACK_INSTALLED_NODE_ATTR, "false"); return Booleans.parseBoolean(xpackInstalledAttr) == false; }).collect(Collectors.toList()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java index eaddb3435ab84..35a817131ac2a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicenseServiceTestCase.java @@ -27,7 +27,7 @@ import org.junit.Before; import java.nio.file.Path; -import java.util.Arrays; +import java.util.stream.Stream; import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; @@ -78,7 +78,7 @@ protected void setInitialState(License license, XPackLicenseState licenseState, when(state.metadata()).thenReturn(metadata); final DiscoveryNode mockNode = getLocalNode(); when(discoveryNodes.getMasterNode()).thenReturn(mockNode); - when(discoveryNodes.spliterator()).thenReturn(Arrays.asList(mockNode).spliterator()); + when(discoveryNodes.stream()).thenAnswer(invocation -> Stream.of(mockNode)); when(discoveryNodes.isLocalNodeElectedMaster()).thenReturn(false); when(discoveryNodes.getMinNodeVersion()).thenReturn(mockNode.getVersion()); when(state.nodes()).thenReturn(discoveryNodes); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java index 3e0e3fd6456ab..cb56f7f304c51 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java @@ -36,7 +36,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public final class TransformNodes { @@ -124,8 +123,7 @@ public static Assignment getAssignment(String transformId, ClusterState clusterS * @return number of transform nodes */ public static boolean hasAnyTransformNode(DiscoveryNodes nodes) { - return StreamSupport.stream(nodes.spliterator(), false) - .anyMatch(node -> node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE)); + return nodes.stream().anyMatch(node -> node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE)); } /** @@ -211,9 +209,7 @@ public static selectAnyNodeThatCanRunThisTransform(DiscoveryNodes nodes, boolean requiresRemote) { - return StreamSupport.stream(nodes.spliterator(), false) - .filter(node -> nodeCanRunThisTransform(node, Version.V_7_13_0, requiresRemote, null)) - .findAny(); + return nodes.stream().filter(node -> nodeCanRunThisTransform(node, Version.V_7_13_0, requiresRemote, null)).findAny(); } public static boolean nodeCanRunThisTransform(