Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.states;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;

/**
* The entry ({@link ContainerInfo} and {@link ContainerReplica}s)
* for a container in {@link ContainerStateMap}.
*/
public class ContainerEntry {
private final ContainerInfo info;
private final Map<DatanodeID, ContainerReplica> replicas = new HashMap<>();

ContainerEntry(ContainerInfo info) {
this.info = info;
}

public ContainerInfo getInfo() {
return info;
}

public Set<ContainerReplica> getReplicas() {
return new HashSet<>(replicas.values());
}

public ContainerReplica put(ContainerReplica r) {
return replicas.put(r.getDatanodeDetails().getID(), r);
}

public ContainerReplica removeReplica(DatanodeID datanodeID) {
return replicas.remove(datanodeID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.hadoop.hdds.scm.container.states;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -78,58 +75,33 @@ public class ContainerStateMap {

/**
* Two levels map.
* Outer container map: {@link ContainerID} -> {@link Entry} (info and replicas)
* Outer container map: {@link ContainerID} -> {@link ContainerEntry} (info and replicas)
* Inner replica map: {@link DatanodeID} -> {@link ContainerReplica}
*/
private static class ContainerMap {
private static class Entry {
private final ContainerInfo info;
private final Map<DatanodeID, ContainerReplica> replicas = new HashMap<>();

Entry(ContainerInfo info) {
this.info = info;
}

ContainerInfo getInfo() {
return info;
}

Set<ContainerReplica> getReplicas() {
return new HashSet<>(replicas.values());
}

ContainerReplica put(ContainerReplica r) {
return replicas.put(r.getDatanodeDetails().getID(), r);
}

ContainerReplica removeReplica(DatanodeID datanodeID) {
return replicas.remove(datanodeID);
}
}

private final NavigableMap<ContainerID, Entry> map = new TreeMap<>();
private final NavigableMap<ContainerID, ContainerEntry> map = new TreeMap<>();

boolean contains(ContainerID id) {
return map.containsKey(id);
}

ContainerInfo getInfo(ContainerID id) {
final Entry entry = map.get(id);
final ContainerEntry entry = map.get(id);
return entry == null ? null : entry.getInfo();
}

List<ContainerInfo> getInfos(ContainerID start, int count) {
Objects.requireNonNull(start, "start == null");
Preconditions.assertTrue(count >= 0, "count < 0");
return map.tailMap(start).values().stream()
.map(Entry::getInfo)
.map(ContainerEntry::getInfo)
.limit(count)
.collect(Collectors.toList());
}

Set<ContainerReplica> getReplicas(ContainerID id) {
Objects.requireNonNull(id, "id == null");
final Entry entry = map.get(id);
final ContainerEntry entry = map.get(id);
return entry == null ? null : entry.getReplicas();
}

Expand All @@ -144,27 +116,27 @@ boolean addIfAbsent(ContainerInfo info) {
if (map.containsKey(id)) {
return false; // already exist
}
final Entry previous = map.put(id, new Entry(info));
final ContainerEntry previous = map.put(id, new ContainerEntry(info));
Preconditions.assertNull(previous, "previous");
return true;
}

ContainerReplica put(ContainerReplica replica) {
Objects.requireNonNull(replica, "replica == null");
final Entry entry = map.get(replica.getContainerID());
final ContainerEntry entry = map.get(replica.getContainerID());
return entry == null ? null : entry.put(replica);
}

ContainerInfo remove(ContainerID id) {
Objects.requireNonNull(id, "id == null");
final Entry removed = map.remove(id);
final ContainerEntry removed = map.remove(id);
return removed == null ? null : removed.getInfo();
}

ContainerReplica removeReplica(ContainerID containerID, DatanodeID datanodeID) {
Objects.requireNonNull(containerID, "containerID == null");
Objects.requireNonNull(datanodeID, "datanodeID == null");
final Entry entry = map.get(containerID);
final ContainerEntry entry = map.get(containerID);
return entry == null ? null : entry.removeReplica(datanodeID);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.node.states;

import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;

/**
* The entry ({@link DatanodeInfo} and {@link ContainerID}s)
* for a datanode in {@link NodeStateMap}.
*/
public class DatanodeEntry {
private final DatanodeInfo info;
private final Set<ContainerID> containers = new TreeSet<>();

DatanodeEntry(DatanodeInfo info) {
this.info = info;
}

public DatanodeInfo getInfo() {
return info;
}

public int getContainerCount() {
return containers.size();
}

public Set<ContainerID> copyContainers() {
return new TreeSet<>(containers);
}

public void add(ContainerID containerId) {
containers.add(containerId);
}

public void remove(ContainerID containerID) {
containers.remove(containerID);
}

public void setContainersForTesting(Set<ContainerID> newContainers) {
containers.clear();
containers.addAll(newContainers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
Expand All @@ -34,51 +33,13 @@
import org.apache.hadoop.hdds.scm.node.NodeStatus;

/**
* Maintains the state of datanodes in SCM. This class should only be used by
* NodeStateManager to maintain the state. If anyone wants to change the
* state of a node they should call NodeStateManager, do not directly use
* this class.
* Map: {@link DatanodeID} to {@link DatanodeEntry}.
* <p>
* Concurrency consideration:
* - thread-safe
* This class is thread-safe.
*/
public class NodeStateMap {
private static class Entry {
private final DatanodeInfo info;
private final Set<ContainerID> containers = new TreeSet<>();

Entry(DatanodeInfo info) {
this.info = info;
}

DatanodeInfo getInfo() {
return info;
}

int getContainerCount() {
return containers.size();
}

Set<ContainerID> copyContainers() {
return new TreeSet<>(containers);
}

void add(ContainerID containerId) {
containers.add(containerId);
}

void remove(ContainerID containerID) {
containers.remove(containerID);
}

void setContainersForTesting(Set<ContainerID> newContainers) {
containers.clear();
containers.addAll(newContainers);
}
}

/** Map: {@link DatanodeID} -> ({@link DatanodeInfo}, {@link ContainerID}s). */
private final Map<DatanodeID, Entry> nodeMap = new HashMap<>();
private final Map<DatanodeID, DatanodeEntry> nodeMap = new HashMap<>();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

Expand All @@ -99,7 +60,7 @@ public void addNode(DatanodeInfo datanode) throws NodeAlreadyExistsException {
if (nodeMap.containsKey(id)) {
throw new NodeAlreadyExistsException(id);
}
nodeMap.put(id, new Entry(datanode));
nodeMap.put(id, new DatanodeEntry(datanode));
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -131,7 +92,7 @@ public DatanodeInfo updateNode(DatanodeInfo datanode) throws NodeNotFoundExcepti
if (oldInfo == null) {
throw new NodeNotFoundException(id);
}
nodeMap.put(id, new Entry(datanode));
nodeMap.put(id, new DatanodeEntry(datanode));
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -212,7 +173,7 @@ public List<DatanodeInfo> getAllDatanodeInfos() {
lock.readLock().lock();
try {
return nodeMap.values().stream()
.map(Entry::getInfo)
.map(DatanodeEntry::getInfo)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
Expand Down Expand Up @@ -383,8 +344,8 @@ public String toString() {
* @return the entry mapping to the given id.
* @throws NodeNotFoundException If the node is missing.
*/
private Entry getExisting(DatanodeID id) throws NodeNotFoundException {
final Entry entry = nodeMap.get(id);
private DatanodeEntry getExisting(DatanodeID id) throws NodeNotFoundException {
final DatanodeEntry entry = nodeMap.get(id);
if (entry == null) {
throw new NodeNotFoundException(id);
}
Expand All @@ -396,7 +357,7 @@ private int countNodes(Predicate<DatanodeInfo> filter) {
lock.readLock().lock();
try {
count = nodeMap.values().stream()
.map(Entry::getInfo)
.map(DatanodeEntry::getInfo)
.filter(filter)
.count();
} finally {
Expand All @@ -412,7 +373,7 @@ private List<DatanodeInfo> filterNodes(Predicate<DatanodeInfo> filter) {
lock.readLock().lock();
try {
return nodeMap.values().stream()
.map(Entry::getInfo)
.map(DatanodeEntry::getInfo)
.filter(filter)
.collect(Collectors.toList());
} finally {
Expand Down