Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -159,7 +158,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
if (randomBoolean()) {
internalCluster().startNodes(randomIntBetween(1, 3));
}
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4));
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
Set<TestRequest> pendingRequests = allowPartialRequest(rootRequest);
Expand Down Expand Up @@ -212,7 +211,7 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
}

public void testCancelTaskMultipleTimes() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, randomIntBetween(1, 3));
ActionFuture<TestResponse> mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
TaskId taskId = getRootTaskId(rootRequest);
Expand Down Expand Up @@ -244,7 +243,7 @@ public void testCancelTaskMultipleTimes() throws Exception {
}

public void testDoNotWaitForCompletion() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
ActionFuture<TestResponse> mainTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
TaskId taskId = getRootTaskId(rootRequest);
Expand All @@ -270,7 +269,7 @@ public void testDoNotWaitForCompletion() throws Exception {
}

public void testFailedToStartChildTaskAfterCancelled() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
TaskId taskId = getRootTaskId(rootRequest);
Expand All @@ -290,7 +289,7 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception {

public void testCancelOrphanedTasks() throws Exception {
final String nodeWithRootTask = internalCluster().startDataOnlyNode();
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
client(nodeWithRootTask).execute(TransportTestAction.ACTION, rootRequest);
allowPartialRequest(rootRequest);
Expand All @@ -315,7 +314,7 @@ public void testCancelOrphanedTasks() throws Exception {
}

public void testRemoveBanParentsOnDisconnect() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4));
client().execute(TransportTestAction.ACTION, rootRequest);
Set<TestRequest> pendingRequests = allowPartialRequest(rootRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -96,7 +95,10 @@ public void testRemoteClusterClientRole() throws Exception {
);

final String nodeWithRemoteClusterClientRole = randomFrom(
StreamSupport.stream(localCluster.clusterService().state().nodes().spliterator(), false)
localCluster.clusterService()
.state()
.nodes()
.stream()
.map(DiscoveryNode::getName)
.filter(nodeName -> nodeWithoutRemoteClusterClientRole.equals(nodeName) == false)
.filter(nodeName -> nodeName.equals(pureDataNode) == false)
Expand Down Expand Up @@ -163,7 +165,10 @@ public void testCancel() throws Exception {
final Settings.Builder allocationFilter = Settings.builder();
if (randomBoolean()) {
remoteCluster.ensureAtLeastNumDataNodes(3);
List<String> remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false)
List<String> remoteDataNodes = remoteCluster.clusterService()
.state()
.nodes()
.stream()
.filter(DiscoveryNode::canContainData)
.map(DiscoveryNode::getName)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -78,7 +77,10 @@ public void testSearch() throws Exception {
final InternalTestCluster remoteCluster = cluster("cluster_a");
int minRemotes = between(2, 5);
remoteCluster.ensureAtLeastNumDataNodes(minRemotes);
List<String> remoteDataNodes = StreamSupport.stream(remoteCluster.clusterService().state().nodes().spliterator(), false)
List<String> remoteDataNodes = remoteCluster.clusterService()
.state()
.nodes()
.stream()
.filter(DiscoveryNode::canContainData)
.map(DiscoveryNode::getName)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A request to add voting config exclusions for certain master-eligible nodes, and wait for these nodes to be removed from the voting
Expand Down Expand Up @@ -102,7 +101,7 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentSta
}
} else {
assert nodeNames.length > 0;
Map<String, DiscoveryNode> existingNodes = StreamSupport.stream(allNodes.spliterator(), false)
Map<String, DiscoveryNode> existingNodes = allNodes.stream()
.collect(Collectors.toMap(DiscoveryNode::getName, Function.identity()));

for (String nodeName : nodeNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,15 +1085,17 @@ ClusterState improveConfiguration(ClusterState clusterState) {
// ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in
// the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes
// the logging much harder to follow.
final Stream<String> masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
final Stream<String> masterIneligibleNodeIdsInVotingConfig = clusterState.nodes()
.stream()
.filter(
n -> n.isMasterNode() == false
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId()))
)
.map(DiscoveryNode::getId);

final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
final Set<DiscoveryNode> liveNodes = clusterState.nodes()
.stream()
.filter(DiscoveryNode::isMasterNode)
.filter(coordinationState.get()::containsJoinVoteFor)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -32,17 +33,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* This class holds all {@link DiscoveryNode} in the cluster and provides convenience methods to
* access, modify merge / diff discovery nodes.
*/
public class DiscoveryNodes implements SimpleDiffable<DiscoveryNodes>, Iterable<DiscoveryNode> {
public class DiscoveryNodes extends AbstractCollection<DiscoveryNode> implements SimpleDiffable<DiscoveryNodes> {

public static final DiscoveryNodes EMPTY_NODES = builder().build();

Expand Down Expand Up @@ -84,6 +84,16 @@ public Iterator<DiscoveryNode> iterator() {
return nodes.valuesIt();
}

@Override
public int size() {
return nodes.size();
}

@Override
public Spliterator<DiscoveryNode> spliterator() {
return super.spliterator();
}

/**
* Returns {@code true} if the local node is the elected master node.
*/
Expand Down Expand Up @@ -167,17 +177,14 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
* @return
*/
public Collection<DiscoveryNode> getAllNodes() {
return StreamSupport.stream(this.spliterator(), false).collect(Collectors.toUnmodifiableList());
return this;
}

/**
* Returns a stream of all nodes, with master nodes at the front
*/
public Stream<DiscoveryNode> mastersFirstStream() {
return Stream.concat(
masterNodes.stream().map(Map.Entry::getValue),
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false)
);
return Stream.concat(masterNodes.stream().map(Map.Entry::getValue), stream().filter(n -> n.isMasterNode() == false));
}

/**
Expand Down Expand Up @@ -339,7 +346,7 @@ public DiscoveryNode resolveNode(String node) {
*/
public String[] resolveNodes(String... nodes) {
if (nodes == null || nodes.length == 0) {
return StreamSupport.stream(this.spliterator(), false).map(DiscoveryNode::getId).toArray(String[]::new);
return stream().map(DiscoveryNode::getId).toArray(String[]::new);
} else {
Set<String> resolvedNodesIds = new HashSet<>(nodes.length);
for (String nodeId : nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -45,7 +44,8 @@ public void testRemovingNonExistentNodes() throws Exception {
for (int i = nodes; i < nodes + randomIntBetween(1, 16); i++) {
removeBuilder.add(node(i));
}
final List<NodeRemovalClusterStateTaskExecutor.Task> tasks = StreamSupport.stream(removeBuilder.build().spliterator(), false)
final List<NodeRemovalClusterStateTaskExecutor.Task> tasks = removeBuilder.build()
.stream()
.map(node -> new NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed", () -> {}))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class AutoscalingCalculateCapacityService implements PolicyValidator {
private final Map<String, AutoscalingDeciderService> deciderByName;
Expand Down Expand Up @@ -246,7 +245,8 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
this.clusterInfo = clusterInfo;
this.snapshotShardSizeInfo = snapshotShardSizeInfo;
this.memoryInfo = memoryInfo;
this.currentNodes = StreamSupport.stream(state.nodes().spliterator(), false)
this.currentNodes = state.nodes()
.stream()
.filter(this::rolesFilter)
.collect(Collectors.toCollection(() -> new TreeSet<>(AutoscalingDeciderResults.DISCOVERY_NODE_COMPARATOR)));
this.currentCapacity = calculateCurrentCapacity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ void onClusterChanged(ClusterChangedEvent event) {

Set<DiscoveryNode> relevantNodes(ClusterState state) {
final Set<Set<DiscoveryNodeRole>> roleSets = calculateAutoscalingRoleSets(state);
return StreamSupport.stream(state.nodes().spliterator(), false)
.filter(n -> roleSets.contains(n.getRoles()))
.collect(Collectors.toSet());
return state.nodes().stream().filter(n -> roleSets.contains(n.getRoles())).collect(Collectors.toSet());
}

private Set<DiscoveryNode> addMissingNodes(Set<DiscoveryNode> nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -209,7 +208,7 @@ public void testContext() {
);

assertThat(context.nodes().size(), equalTo(1));
assertThat(context.nodes(), equalTo(StreamSupport.stream(state.nodes().spliterator(), false).collect(Collectors.toSet())));
assertThat(context.nodes(), equalTo(state.nodes().stream().collect(Collectors.toSet())));
if (hasDataRole) {
assertNull(context.currentCapacity());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,7 @@ private static class TestAutoscalingDeciderContext implements AutoscalingDecider
private TestAutoscalingDeciderContext(ClusterState state, Set<DiscoveryNodeRole> roles, AutoscalingCapacity currentCapacity) {
this.state = state;
this.currentCapacity = currentCapacity;
this.nodes = StreamSupport.stream(state.nodes().spliterator(), false)
.filter(n -> roles.stream().anyMatch(n.getRoles()::contains))
.collect(Collectors.toSet());
this.nodes = state.nodes().stream().filter(n -> roles.stream().anyMatch(n.getRoles()::contains)).collect(Collectors.toSet());
this.roles = roles;
this.info = createClusterInfo(state);
}
Expand Down Expand Up @@ -567,7 +565,8 @@ public void ensureNotCancelled() {
private static ClusterInfo createClusterInfo(ClusterState state) {
// we make a simple setup to detect the right decisions are made. The unmovable calculation is tested in more detail elsewhere.
// the diskusage is set such that the disk threshold decider never rejects an allocation.
Map<String, DiskUsage> diskUsages = StreamSupport.stream(state.nodes().spliterator(), false)
Map<String, DiskUsage> diskUsages = state.nodes()
.stream()
.collect(Collectors.toMap(DiscoveryNode::getId, node -> new DiskUsage(node.getId(), null, "the_path", 1000, 1000)));
ImmutableOpenMap<String, DiskUsage> immutableDiskUsages = ImmutableOpenMap.<String, DiskUsage>builder().putAll(diskUsages).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -241,7 +240,7 @@ public void testSizeOf() {
);
ShardRouting primaryShard = subjectRoutings.primaryShard();
ShardRouting replicaShard = subjectRoutings.replicaShards().get(0);
DiscoveryNode[] nodes = StreamSupport.stream(initialClusterState.nodes().spliterator(), false).toArray(DiscoveryNode[]::new);
DiscoveryNode[] nodes = initialClusterState.nodes().toArray(DiscoveryNode[]::new);
boolean useReplica = randomBoolean();
if (useReplica || randomBoolean()) {
startShard(allocation, primaryShard, nodes[0].getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@SuppressWarnings("HiddenField")
public class XPackPlugin extends XPackClientPlugin
Expand Down Expand Up @@ -262,7 +261,7 @@ public static boolean isReadyForXPackCustomMetadata(ClusterState clusterState) {
*/
public static List<DiscoveryNode> nodesNotReadyForXPackCustomMetadata(ClusterState clusterState) {
// check that all nodes would be capable of deserializing newly added x-pack metadata
final List<DiscoveryNode> notReadyNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false).filter(node -> {
final List<DiscoveryNode> notReadyNodes = clusterState.nodes().stream().filter(node -> {
final String xpackInstalledAttr = node.getAttributes().getOrDefault(XPACK_INSTALLED_NODE_ATTR, "false");
return Booleans.parseBoolean(xpackInstalledAttr) == false;
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.junit.Before;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Stream;

import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -78,7 +78,7 @@ protected void setInitialState(License license, XPackLicenseState licenseState,
when(state.metadata()).thenReturn(metadata);
final DiscoveryNode mockNode = getLocalNode();
when(discoveryNodes.getMasterNode()).thenReturn(mockNode);
when(discoveryNodes.spliterator()).thenReturn(Arrays.asList(mockNode).spliterator());
when(discoveryNodes.stream()).thenAnswer(invocation -> Stream.of(mockNode));
when(discoveryNodes.isLocalNodeElectedMaster()).thenReturn(false);
when(discoveryNodes.getMinNodeVersion()).thenReturn(mockNode.getVersion());
when(state.nodes()).thenReturn(discoveryNodes);
Expand Down
Loading