Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ ContainerPlacementStatus validateContainerPlacement(
*/
Set<Replica> replicasToCopyToFixMisreplication(
Map<Replica, Boolean> replicas);

/**
* Given a set of replicas of a container which are overreplicated,
* return a set of replicas to delete to fix overreplication.
* @param replicas: Set of existing replicas of the container
* @param expectedCountPerUniqueReplica: Replication factor of each
* unique replica
*/
Set<Replica> replicasToRemoveToFixOverreplication(
Set<Replica> replicas, int expectedCountPerUniqueReplica);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -38,9 +39,13 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -503,4 +508,89 @@ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
protected Node getPlacementGroup(DatanodeDetails dn) {
return nodeManager.getClusterNetworkTopologyMap().getAncestor(dn, 1);
}

/**
* Given a set of replicas, expectedCount for Each replica,
* number of unique replica indexes. Replicas to be deleted for fixing over
* replication is computed.
* The algorithm starts with creating a replicaIdMap which contains the
* replicas grouped by replica Index. A placementGroup Map is created which
* groups replicas based on their rack & the replicas within the rack
* are further grouped based on the replica Index.
* A placement Group Count Map is created which keeps
* track of the count of replicas in each rack.
* We iterate through overreplicated replica indexes sorted in descending
* order based on their current replication factor in a descending factor.
* For each replica Index the replica is removed from the rack which contains
* the most replicas, in order to achieve this the racks are put
* into priority queue & are based on the number of replicas they have.
* The replica is removed from the rack with maximum replicas & the replica
* to be removed is also removed from the maps created above &
* the count for rack is reduced.
* The set of replicas computed are then returned by the function.
* @param replicas: Set of existing replicas of the container
* @param expectedCountPerUniqueReplica: Replication factor of each
* * unique replica
* @return Set of replicas to be removed are computed.
*/
@Override
public Set<ContainerReplica> replicasToRemoveToFixOverreplication(
Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
Map<Integer, Set<ContainerReplica>> replicaIdMap = new HashMap<>();
Map<Node, Map<Integer, Set<ContainerReplica>>> placementGroupReplicaIdMap
= new HashMap<>();
Map<Node, Integer> placementGroupCntMap = new HashMap<>();
for (ContainerReplica replica:replicas) {
Integer replicaId = replica.getReplicaIndex();
Node placementGroup = getPlacementGroup(replica.getDatanodeDetails());
replicaIdMap.computeIfAbsent(replicaId, (rid) -> Sets.newHashSet())
.add(replica);
placementGroupCntMap.compute(placementGroup,
(group, cnt) -> (cnt == null ? 0 : cnt) + 1);
placementGroupReplicaIdMap.computeIfAbsent(placementGroup,
(pg) -> Maps.newHashMap()).computeIfAbsent(replicaId, (rid) ->
Sets.newHashSet()).add(replica);
}

Set<ContainerReplica> replicasToRemove = new HashSet<>();
List<Integer> sortedRIDList = replicaIdMap.keySet().stream()
.filter(rid -> replicaIdMap.get(rid).size() >
expectedCountPerUniqueReplica)
.sorted((o1, o2) -> Integer.compare(replicaIdMap.get(o2).size(),
replicaIdMap.get(o1).size()))
.collect(Collectors.toList());
for (Integer rid : sortedRIDList) {
if (replicaIdMap.get(rid).size() <= expectedCountPerUniqueReplica) {
break;
}
Queue<Node> pq = new PriorityQueue<>((o1, o2) ->
Integer.compare(placementGroupCntMap.get(o2),
placementGroupCntMap.get(o1)));
pq.addAll(placementGroupReplicaIdMap.entrySet()
.stream()
.filter(nodeMapEntry -> nodeMapEntry.getValue().containsKey(rid))
.map(Map.Entry::getKey)
.collect(Collectors.toList()));

while (replicaIdMap.get(rid).size() > expectedCountPerUniqueReplica) {
Node rack = pq.poll();
Set<ContainerReplica> replicaSet =
placementGroupReplicaIdMap.get(rack).get(rid);
if (replicaSet.size() > 0) {
ContainerReplica r = replicaSet.stream().findFirst().get();
replicasToRemove.add(r);
replicaSet.remove(r);
replicaIdMap.get(rid).remove(r);
placementGroupCntMap.compute(rack,
(group, cnt) -> (cnt == null ? 0 : cnt) - 1);
if (replicaSet.size() == 0) {
placementGroupReplicaIdMap.get(rack).remove(rid);
} else {
pq.add(rack);
}
}
}
}
return replicasToRemove;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.mockito.Mockito;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* Test functions of SCMCommonPlacementPolicy.
Expand Down Expand Up @@ -286,13 +289,162 @@ public void testReplicasWithoutMisreplication() {
Assertions.assertEquals(0, replicasToCopy.size());
}

@Test
public void testReplicasToRemoveWithOneOverreplication() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 5);
List<DatanodeDetails> list = nodeManager.getAllNodes();
Set<ContainerReplica> replicas = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6)));
ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(new ContainerID(1))
.setContainerState(CLOSED)
.setReplicaIndex(1)
.setDatanodeDetails(list.get(7)).build();
replicas.add(replica);

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 1);
Assertions.assertEquals(replicasToRemove.size(), 1);
Assertions.assertEquals(replicasToRemove.toArray()[0], replica);
}

@Test
public void testReplicasToRemoveWithTwoOverreplication() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 5);
List<DatanodeDetails> list = nodeManager.getAllNodes();

Set<ContainerReplica> replicas = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6)));

Set<ContainerReplica> replicasToBeRemoved = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(7, 9)));
replicas.addAll(replicasToBeRemoved);

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 1);
Assertions.assertEquals(replicasToRemove.size(), 2);
Assertions.assertEquals(replicasToRemove, replicasToBeRemoved);
}

@Test
public void testReplicasToRemoveWith2CountPerUniqueReplica() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 3);
List<DatanodeDetails> list = nodeManager.getAllNodes();

Set<ContainerReplica> replicas = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(0, 3)));
replicas.addAll(HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(3, 6)));
Set<ContainerReplica> replicasToBeRemoved = Sets.newHashSet(
HddsTestUtils.getReplicaBuilder(new ContainerID(1), CLOSED, 0, 0, 0,
list.get(7).getUuid(), list.get(7))
.setReplicaIndex(1).build(),
HddsTestUtils.getReplicaBuilder(new ContainerID(1), CLOSED, 0, 0, 0,
list.get(8).getUuid(), list.get(8)).setReplicaIndex(1)
.build());
replicas.addAll(replicasToBeRemoved);

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 2);
Assertions.assertEquals(replicasToRemove.size(), 2);
Assertions.assertEquals(replicasToRemove, replicasToBeRemoved);
}

@Test
public void testReplicasToRemoveWithoutReplicaIndex() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 3);
List<DatanodeDetails> list = nodeManager.getAllNodes();

Set<ContainerReplica> replicas = Sets.newHashSet(HddsTestUtils.getReplicas(
new ContainerID(1), CLOSED, 0, list.subList(0, 5)));

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 3);
Assertions.assertEquals(replicasToRemove.size(), 2);
Set<Node> racksToBeRemoved = Arrays.asList(0, 1).stream()
.map(dummyPlacementPolicy.racks::get).collect(Collectors.toSet());
Assertions.assertEquals(replicasToRemove.stream()
.map(ContainerReplica::getDatanodeDetails)
.map(dummyPlacementPolicy::getPlacementGroup)
.collect(Collectors.toSet()), racksToBeRemoved);
}

@Test
public void testReplicasToRemoveWithOverreplicationWithinSameRack() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 3);
List<DatanodeDetails> list = nodeManager.getAllNodes();

Set<ContainerReplica> replicas = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6)));

ContainerReplica replica1 = ContainerReplica.newBuilder()
.setContainerID(new ContainerID(1))
.setContainerState(CLOSED)
.setReplicaIndex(1)
.setDatanodeDetails(list.get(6)).build();
replicas.add(replica1);
ContainerReplica replica2 = ContainerReplica.newBuilder()
.setContainerID(new ContainerID(1))
.setContainerState(CLOSED)
.setReplicaIndex(1)
.setDatanodeDetails(list.get(0)).build();
replicas.add(replica2);

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 1);
Map<Node, Long> removedReplicasRackCntMap = replicasToRemove.stream()
.map(ContainerReplica::getDatanodeDetails)
.map(dummyPlacementPolicy::getPlacementGroup)
.collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
Assertions.assertEquals(replicasToRemove.size(), 2);
Assertions.assertTrue(Sets.newHashSet(1L, 2L).contains(
removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(0))));
Assertions.assertEquals(
removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(1)),
removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(0))
== 2 ? 0 : 1);
}

@Test
public void testReplicasToRemoveWithNoOverreplication() {
DummyPlacementPolicy dummyPlacementPolicy =
new DummyPlacementPolicy(nodeManager, conf, 5);
List<DatanodeDetails> list = nodeManager.getAllNodes();
Set<ContainerReplica> replicas = Sets.newHashSet(
HddsTestUtils.getReplicasWithReplicaIndex(
new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6)));

Set<ContainerReplica> replicasToRemove = dummyPlacementPolicy
.replicasToRemoveToFixOverreplication(replicas, 1);
Assertions.assertEquals(replicasToRemove.size(), 0);
}



private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
private Map<DatanodeDetails, Node> rackMap;
private List<Node> racks;
private int rackCnt;


/**
* Creates Dummy Placement Policy with dn index to rack Mapping
* in round robin fashion (rack Index = dn Index % total number of racks).
* @param nodeManager
* @param conf
* @param rackCnt
*/
DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
int rackCnt) {
this(nodeManager, conf,
Expand All @@ -301,6 +453,12 @@ private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
idx -> idx % rackCnt)), rackCnt);
}

/**
* Creates Dummy Placement Policy with dn index -> rack index mapping.
* @param nodeManager
* @param conf
* @param rackCnt
*/
DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
Map<Integer, Integer> datanodeRackMap, int rackCnt) {
super(nodeManager, conf);
Expand All @@ -322,7 +480,7 @@ public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
}

@Override
public Node getPlacementGroup(DatanodeDetails dn) {
protected Node getPlacementGroup(DatanodeDetails dn) {
return rackMap.get(dn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
Map<ContainerReplica, Boolean> replicas) {
return Collections.emptySet();
}

@Override
public Set<ContainerReplica> replicasToRemoveToFixOverreplication(
Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
return null;
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ public Set<ContainerReplica> replicasToCopyToFixMisreplication(
}


@Override
public Set<ContainerReplica> replicasToRemoveToFixOverreplication(
Set<ContainerReplica> replicas, int expectedCountPerUniqueReplica) {
return null;
}

private boolean isDnPresent(List<DatanodeDetails> dns) {
for (DatanodeDetails dn : dns) {
if (misRepWhenDnPresent != null
Expand Down