Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
Expand Down Expand Up @@ -356,7 +356,7 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails,
.setContainerID(containerId)
.setContainerState(replicaProto.getState())
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
.setOriginNodeId(DatanodeID.fromUuidString(replicaProto.getOriginNodeId()))
.setSequenceId(replicaProto.getBlockCommitSequenceId())
.setKeyCount(replicaProto.getKeyCount())
.setReplicaIndex(replicaProto.getReplicaIndex())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.hadoop.hdds.scm.container;

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.UUID;
import java.util.Objects;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;

/**
Expand All @@ -34,7 +33,12 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
private final ContainerID containerID;
private final ContainerReplicaProto.State state;
private final DatanodeDetails datanodeDetails;
private final UUID placeOfBirth;
/**
* The origin creation of this replica.
* null: origin is the same as {@link #datanodeDetails}.
*/
private final DatanodeID originDatanodeId;
/** The position at the pipeline. */
private final int replicaIndex;

private final Long sequenceId;
Expand All @@ -43,15 +47,15 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
private final boolean isEmpty;

private ContainerReplica(ContainerReplicaBuilder b) {
containerID = b.containerID;
state = b.state;
datanodeDetails = b.datanode;
placeOfBirth = Optional.ofNullable(b.placeOfBirth).orElse(datanodeDetails.getUuid());
keyCount = b.keyCount;
bytesUsed = b.bytesUsed;
replicaIndex = b.replicaIndex;
isEmpty = b.isEmpty;
sequenceId = b.sequenceId;
this.containerID = Objects.requireNonNull(b.containerID, "containerID == null");
this.state = Objects.requireNonNull(b.state, "state == null");
this.datanodeDetails = Objects.requireNonNull(b.datanode, "datanode == null");
this.originDatanodeId = b.placeOfBirth;
this.keyCount = b.keyCount;
this.bytesUsed = b.bytesUsed;
this.replicaIndex = b.replicaIndex;
this.isEmpty = b.isEmpty;
this.sequenceId = b.sequenceId;
}

public ContainerID getContainerID() {
Expand All @@ -72,8 +76,8 @@ public DatanodeDetails getDatanodeDetails() {
*
* @return UUID
*/
public UUID getOriginDatanodeId() {
return placeOfBirth;
public DatanodeID getOriginDatanodeId() {
return originDatanodeId != null ? originDatanodeId : datanodeDetails.getID();
}

/**
Expand Down Expand Up @@ -144,7 +148,7 @@ public boolean equals(Object o) {

@Override
public int compareTo(ContainerReplica that) {
Preconditions.checkNotNull(that);
Objects.requireNonNull(that);
return new CompareToBuilder()
.append(this.containerID, that.containerID)
.append(this.datanodeDetails, that.datanodeDetails)
Expand All @@ -167,26 +171,24 @@ public ContainerReplicaBuilder toBuilder() {
.setContainerState(state)
.setDatanodeDetails(datanodeDetails)
.setKeyCount(keyCount)
.setOriginNodeId(placeOfBirth)
.setOriginNodeId(originDatanodeId)
.setReplicaIndex(replicaIndex)
.setSequenceId(sequenceId)
.setEmpty(isEmpty);
}

@Override
public String toString() {
return "ContainerReplica{" +
"containerID=" + containerID +
", state=" + state +
", datanodeDetails=" + datanodeDetails +
", placeOfBirth=" + placeOfBirth +
", sequenceId=" + sequenceId +
", keyCount=" + keyCount +
", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ?
",replicaIndex=" + replicaIndex :
"") +
", isEmpty=" + isEmpty +
'}';
return "ContainerReplica{" + containerID
+ " (" + state
+ ") currentDN=" + datanodeDetails
+ (originDatanodeId != null ? ", originDN=" + originDatanodeId : " (origin)")
+ ", bcsid=" + sequenceId
+ (replicaIndex > 0 ? ", replicaIndex=" + replicaIndex : "")
+ ", keyCount=" + keyCount
+ ", bytesUsed=" + bytesUsed
+ ", " + (isEmpty ? "empty" : "non-empty")
+ '}';
}

/**
Expand All @@ -197,7 +199,7 @@ public static class ContainerReplicaBuilder {
private ContainerID containerID;
private ContainerReplicaProto.State state;
private DatanodeDetails datanode;
private UUID placeOfBirth;
private DatanodeID placeOfBirth;
private Long sequenceId;
private long bytesUsed;
private long keyCount;
Expand Down Expand Up @@ -246,7 +248,7 @@ public ContainerReplicaBuilder setReplicaIndex(
* @param originNodeId origin node UUID
* @return ContainerReplicaBuilder
*/
public ContainerReplicaBuilder setOriginNodeId(UUID originNodeId) {
public ContainerReplicaBuilder setOriginNodeId(DatanodeID originNodeId) {
placeOfBirth = originNodeId;
return this;
}
Expand Down Expand Up @@ -283,12 +285,6 @@ public ContainerReplicaBuilder setEmpty(boolean empty) {
* @return ContainerReplicaBuilder
*/
public ContainerReplica build() {
Preconditions.checkNotNull(containerID,
"Container Id can't be null");
Preconditions.checkNotNull(state,
"Container state can't be null");
Preconditions.checkNotNull(datanode,
"DatanodeDetails can't be null");
return new ContainerReplica(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand All @@ -34,18 +34,20 @@
*/
public class QuasiClosedStuckReplicaCount {

private final Map<UUID, Set<ContainerReplica>> replicasByOrigin = new HashMap<>();
private final Map<UUID, Set<ContainerReplica>> inServiceReplicasByOrigin = new HashMap<>();
private final Map<UUID, Set<ContainerReplica>> maintenanceReplicasByOrigin = new HashMap<>();
private boolean hasOutOfServiceReplicas = false;
private int minHealthyForMaintenance;
private boolean hasHealthyReplicas = false;
private final Map<DatanodeID, Set<ContainerReplica>> replicasByOrigin = new HashMap<>();
private final Map<DatanodeID, Set<ContainerReplica>> inServiceReplicasByOrigin = new HashMap<>();
private final Map<DatanodeID, Set<ContainerReplica>> maintenanceReplicasByOrigin = new HashMap<>();
private final int minHealthyForMaintenance;
private final boolean hasHealthyReplicas;
private final boolean hasOutOfServiceReplicas;

public QuasiClosedStuckReplicaCount(Set<ContainerReplica> replicas, int minHealthyForMaintenance) {
this.minHealthyForMaintenance = minHealthyForMaintenance;
boolean hasHealthy = false;
boolean hasOutOfService = false;
for (ContainerReplica r : replicas) {
if (r.getState() != StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY) {
hasHealthyReplicas = true;
hasHealthy = true;
}
replicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r);
HddsProtos.NodeOperationalState opState = r.getDatanodeDetails().getPersistedOpState();
Expand All @@ -54,11 +56,14 @@ public QuasiClosedStuckReplicaCount(Set<ContainerReplica> replicas, int minHealt
} else if (opState == HddsProtos.NodeOperationalState.IN_MAINTENANCE
|| opState == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE) {
maintenanceReplicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r);
hasOutOfServiceReplicas = true;
hasOutOfService = true;
} else {
hasOutOfServiceReplicas = true;
hasOutOfService = true;
}
}

this.hasHealthyReplicas = hasHealthy;
this.hasOutOfServiceReplicas = hasOutOfService;
}

public int availableOrigins() {
Expand All @@ -77,17 +82,23 @@ public boolean isUnderReplicated() {
return !getUnderReplicatedReplicas().isEmpty();
}

private Set<ContainerReplica> getInService(DatanodeID origin) {
final Set<ContainerReplica> set = inServiceReplicasByOrigin.get(origin);
return set == null ? Collections.emptySet() : set;
}

private int getMaintenanceCount(DatanodeID origin) {
final Set<ContainerReplica> maintenance = maintenanceReplicasByOrigin.get(origin);
return maintenance == null ? 0 : maintenance.size();
}

public List<MisReplicatedOrigin> getUnderReplicatedReplicas() {
List<MisReplicatedOrigin> misReplicatedOrigins = new ArrayList<>();

if (replicasByOrigin.size() == 1) {
Map.Entry<UUID, Set<ContainerReplica>> entry = replicasByOrigin.entrySet().iterator().next();
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(entry.getKey());
if (inService == null) {
inService = Collections.emptySet();
}
Set<ContainerReplica> maintenance = maintenanceReplicasByOrigin.get(entry.getKey());
int maintenanceCount = maintenance == null ? 0 : maintenance.size();
final Map.Entry<DatanodeID, Set<ContainerReplica>> entry = replicasByOrigin.entrySet().iterator().next();
final Set<ContainerReplica> inService = getInService(entry.getKey());
final int maintenanceCount = getMaintenanceCount(entry.getKey());

if (maintenanceCount > 0) {
if (inService.size() < minHealthyForMaintenance) {
Expand All @@ -105,13 +116,9 @@ public List<MisReplicatedOrigin> getUnderReplicatedReplicas() {

// If there are multiple origins, we expect 2 copies of each origin
// For maintenance, we expect 1 copy of each origin and ignore the minHealthyForMaintenance parameter
for (Map.Entry<UUID, Set<ContainerReplica>> entry : replicasByOrigin.entrySet()) {
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(entry.getKey());
if (inService == null) {
inService = Collections.emptySet();
}
Set<ContainerReplica> maintenance = maintenanceReplicasByOrigin.get(entry.getKey());
int maintenanceCount = maintenance == null ? 0 : maintenance.size();
for (Map.Entry<DatanodeID, Set<ContainerReplica>> entry : replicasByOrigin.entrySet()) {
final Set<ContainerReplica> inService = getInService(entry.getKey());
final int maintenanceCount = getMaintenanceCount(entry.getKey());

if (inService.size() < 2) {
if (maintenanceCount > 0) {
Expand Down Expand Up @@ -142,19 +149,19 @@ public boolean isOverReplicated() {
public List<MisReplicatedOrigin> getOverReplicatedOrigins() {
// If there is only a single origin, we expect 3 copies, otherwise we expect 2 copies of each origin
if (replicasByOrigin.size() == 1) {
UUID origin = replicasByOrigin.keySet().iterator().next();
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(origin);
if (inService != null && inService.size() > 3) {
final DatanodeID origin = replicasByOrigin.keySet().iterator().next();
final Set<ContainerReplica> inService = getInService(origin);
if (inService.size() > 3) {
return Collections.singletonList(new MisReplicatedOrigin(inService, inService.size() - 3));
}
return Collections.emptyList();
}

// If there are multiple origins, we expect 2 copies of each origin
List<MisReplicatedOrigin> overReplicatedOrigins = new ArrayList<>();
for (UUID origin : replicasByOrigin.keySet()) {
Set<ContainerReplica> replicas = inServiceReplicasByOrigin.get(origin);
if (replicas != null && replicas.size() > 2) {
for (DatanodeID origin : replicasByOrigin.keySet()) {
final Set<ContainerReplica> replicas = getInService(origin);
if (replicas.size() > 2) {
overReplicatedOrigins.add(new MisReplicatedOrigin(replicas, replicas.size() - 2));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand Down Expand Up @@ -511,7 +511,7 @@ public List<ContainerReplica> getVulnerableUnhealthyReplicas(Function<DatanodeDe
vulnerable and need to be saved.
*/
// TODO should we also consider pending deletes?
Set<UUID> originsOfInServiceReplicas = new HashSet<>();
final Set<DatanodeID> originsOfInServiceReplicas = new HashSet<>();
for (ContainerReplica replica : replicas) {
if (replica.getDatanodeDetails().getPersistedOpState()
.equals(IN_SERVICE) && replica.getSequenceId().equals(container.getSequenceId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
Expand Down Expand Up @@ -331,7 +331,7 @@ static List<ContainerReplica> findNonUniqueDeleteCandidates(
Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
// Gather the origin node IDs of replicas which are not candidates for
// deletion.
Set<UUID> existingOriginNodeIDs = allReplicas.stream()
final Set<DatanodeID> existingOriginNodeIDs = allReplicas.stream()
.filter(r -> !deleteCandidates.contains(r))
.filter(r -> {
NodeStatus status = nodeStatusFn.apply(r.getDatanodeDetails());
Expand Down Expand Up @@ -374,7 +374,7 @@ both the first and last replicas have the same origin node ID (and no
return nonUniqueDeleteCandidates;
}

private static void checkUniqueness(Set<UUID> existingOriginNodeIDs,
private static void checkUniqueness(Set<DatanodeID> existingOriginNodeIDs,
List<ContainerReplica> nonUniqueDeleteCandidates,
ContainerReplica replica) {
if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
Expand Down
Loading