Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
Expand Down Expand Up @@ -179,6 +181,14 @@ ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;

/**
* Gets the list of underReplicated and unClosed containers on a decommissioning node.
*
* @param dn - Datanode detail
* @return Lists of underReplicated and Unclosed containers
*/
Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException;

/**
* Returns a set of Nodes that meet a query criteria. Passing null for opState
* or nodeState acts like a wild card, returning all nodes in that state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
Expand Down Expand Up @@ -220,6 +221,14 @@ List<ContainerInfo> listContainer(long startContainerID,
*/
void deleteContainer(long containerID) throws IOException;

/**
* Gets the list of underReplicated and unClosed containers on a decommissioning node.
*
* @param dn - Datanode detail
* @return Lists of underReplicated and unClosed containers
*/
Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException;

/**
* Queries a list of Node Statuses. Passing a null for either opState or
* state acts like a wildcard returning all nodes in that state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto;
Expand Down Expand Up @@ -55,6 +56,9 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainersOnDecomNodeProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
Expand Down Expand Up @@ -459,6 +463,23 @@ public void deleteContainer(long containerID)

}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
GetContainersOnDecomNodeRequestProto request = GetContainersOnDecomNodeRequestProto.newBuilder()
.setDatanodeDetails(dn.getProtoBufMessage()).build();
GetContainersOnDecomNodeResponseProto response = submitRequest(Type.GetContainersOnDecomNode,
builder -> builder.setGetContainersOnDecomNodeRequest(request)).getGetContainersOnDecomNodeResponse();
Map<String, List<ContainerID>> containerMap = new HashMap<>();
for (ContainersOnDecomNodeProto containersProto : response.getContainersOnDecomNodeList()) {
List<ContainerID> containerIds = new ArrayList<>();
for (HddsProtos.ContainerID id : containersProto.getIdList()) {
containerIds.add(ContainerID.getFromProtobuf(id));
}
containerMap.put(containersProto.getName(), containerIds);
}
return containerMap;
}

/**
* Queries a list of Nodes based on their operational state or health state.
* Passing a null for either value acts as a wildcard for that state.
Expand Down
16 changes: 16 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ message ScmContainerLocationRequest {
optional GetFailedDeletedBlocksTxnRequestProto getFailedDeletedBlocksTxnRequest = 43;
optional DecommissionScmRequestProto decommissionScmRequest = 44;
optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45;
optional GetContainersOnDecomNodeRequestProto getContainersOnDecomNodeRequest = 46;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -135,6 +136,7 @@ message ScmContainerLocationResponse {
optional GetFailedDeletedBlocksTxnResponseProto getFailedDeletedBlocksTxnResponse = 43;
optional DecommissionScmResponseProto decommissionScmResponse = 44;
optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45;
optional GetContainersOnDecomNodeResponseProto getContainersOnDecomNodeResponse = 46;

enum Status {
OK = 1;
Expand Down Expand Up @@ -187,6 +189,7 @@ enum Type {
GetFailedDeletedBlocksTransaction = 39;
DecommissionScm = 40;
SingleNodeQuery = 41;
GetContainersOnDecomNode = 42;
}

/**
Expand Down Expand Up @@ -602,6 +605,19 @@ message DecommissionScmResponseProto {
optional string errorMsg = 2;
}

message GetContainersOnDecomNodeRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
}

message ContainersOnDecomNodeProto {
required string name = 1;
repeated ContainerID id = 2;
}

message GetContainersOnDecomNodeResponseProto {
repeated ContainersOnDecomNodeProto containersOnDecomNode = 1;
}

/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -31,4 +35,6 @@ public interface DatanodeAdminMonitor extends Runnable {
void stopMonitoring(DatanodeDetails dn);
Set<DatanodeAdminMonitorImpl.TrackedNode> getTrackedNodes();
void setMetrics(NodeDecommissionMetrics metrics);
Map<String, List<ContainerID>> getContainersReplicatedOnNode(DatanodeDetails dn)
throws NodeNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
public static final class TrackedNode {

private DatanodeDetails datanodeDetails;

private long startTime = 0L;
private Map<String, List<ContainerID>> containersReplicatedOnNode = new ConcurrentHashMap<>();

public TrackedNode(DatanodeDetails datanodeDetails, long startTime) {
this.datanodeDetails = datanodeDetails;
Expand All @@ -122,6 +122,15 @@ public DatanodeDetails getDatanodeDetails() {
public long getStartTime() {
return startTime;
}

public Map<String, List<ContainerID>> getContainersReplicatedOnNode() {
return containersReplicatedOnNode;
}

public void setContainersReplicatedOnNode(List<ContainerID> underReplicated, List<ContainerID> unClosed) {
this.containersReplicatedOnNode.put("UnderReplicated", Collections.unmodifiableList(underReplicated));
this.containersReplicatedOnNode.put("UnClosed", Collections.unmodifiableList(unClosed));
}
}

private Map<String, ContainerStateInWorkflow> containerStateByHost;
Expand Down Expand Up @@ -423,9 +432,7 @@ private boolean checkContainersReplicatedOnNode(TrackedNode dn)

boolean isHealthy = replicaSet.isHealthyEnoughForOffline();
if (!isHealthy) {
if (LOG.isDebugEnabled()) {
unClosedIDs.add(cid);
}
unClosedIDs.add(cid);
if (unclosed < containerDetailsLoggingLimit
|| LOG.isDebugEnabled()) {
LOG.info("Unclosed Container {} {}; {}", cid, replicaSet, replicaDetails(replicaSet.getReplicas()));
Expand All @@ -448,20 +455,18 @@ private boolean checkContainersReplicatedOnNode(TrackedNode dn)
replicationManager.checkContainerStatus(replicaSet.getContainer(), report);
replicatedOK = report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED) == 0;
}

if (replicatedOK) {
sufficientlyReplicated++;
} else {
if (LOG.isDebugEnabled()) {
underReplicatedIDs.add(cid);
}
underReplicatedIDs.add(cid);
if (underReplicated < containerDetailsLoggingLimit || LOG.isDebugEnabled()) {
LOG.info("Under Replicated Container {} {}; {}", cid, replicaSet, replicaDetails(replicaSet.getReplicas()));
}
underReplicated++;
}
} catch (ContainerNotFoundException e) {
LOG.warn("ContainerID {} present in node list for {} but not found in containerManager", cid, dn);
LOG.warn("ContainerID {} present in node list for {} but not found in containerManager", cid,
dn.getDatanodeDetails());
}
}
LOG.info("{} has {} sufficientlyReplicated, {} deleting, {} " +
Expand All @@ -485,9 +490,21 @@ private boolean checkContainersReplicatedOnNode(TrackedNode dn)
unclosed, unClosedIDs.stream().map(
Object::toString).collect(Collectors.joining(", ")));
}
dn.setContainersReplicatedOnNode(underReplicatedIDs, unClosedIDs);
return underReplicated == 0 && unclosed == 0;
}

public Map<String, List<ContainerID>> getContainersReplicatedOnNode(DatanodeDetails dn) {
Iterator<TrackedNode> iterator = trackedNodes.iterator();
while (iterator.hasNext()) {
TrackedNode trackedNode = iterator.next();
if (trackedNode.equals(new TrackedNode(dn, 0L))) {
return trackedNode.getContainersReplicatedOnNode();
}
}
return new HashMap<>();
}

private String replicaDetails(Collection<ContainerReplica> replicas) {
StringBuilder sb = new StringBuilder();
sb.append("Replicas{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand All @@ -40,6 +41,7 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -292,6 +294,11 @@ public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
TimeUnit.SECONDS);
}

public Map<String, List<ContainerID>> getContainersReplicatedOnNode(DatanodeDetails dn)
throws NodeNotFoundException {
return getMonitor().getContainersReplicatedOnNode(dn);
}

@VisibleForTesting
public DatanodeAdminMonitor getMonitor() {
return monitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
Expand Down Expand Up @@ -51,6 +52,9 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainersOnDecomNodeProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenRequestProto;
Expand Down Expand Up @@ -614,6 +618,12 @@ public ScmContainerLocationResponse processRequest(
.setDecommissionNodesResponse(decommissionNodes(
request.getDecommissionNodesRequest()))
.build();
case GetContainersOnDecomNode:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setGetContainersOnDecomNodeResponse(getContainersOnDecomNode(request.getGetContainersOnDecomNodeRequest()))
.build();
case RecommissionNodes:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
Expand Down Expand Up @@ -1160,6 +1170,22 @@ public DecommissionNodesResponseProto decommissionNodes(
return response.build();
}

public GetContainersOnDecomNodeResponseProto getContainersOnDecomNode(GetContainersOnDecomNodeRequestProto request)
throws IOException {
Map<String, List<ContainerID>> containerMap = impl.getContainersOnDecomNode(
DatanodeDetails.getFromProtoBuf(request.getDatanodeDetails()));
List<ContainersOnDecomNodeProto> containersProtoList = new ArrayList<>();
for (Map.Entry<String, List<ContainerID>> containerList : containerMap.entrySet()) {
List<HddsProtos.ContainerID> containerIdsProto = new ArrayList<>();
for (ContainerID id : containerList.getValue()) {
containerIdsProto.add(id.getProtobuf());
}
containersProtoList.add(ContainersOnDecomNodeProto.newBuilder().setName(containerList.getKey())
.addAllId(containerIdsProto).build());
}
return GetContainersOnDecomNodeResponseProto.newBuilder().addAllContainersOnDecomNode(containersProtoList).build();
}

public RecommissionNodesResponseProto recommissionNodes(
RecommissionNodesRequestProto request) throws IOException {
List<DatanodeAdminError> errors =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,15 @@ public void deleteContainer(long containerID) throws IOException {
}
}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
try {
return scm.getScmDecommissionManager().getContainersReplicatedOnNode(dn);
} catch (NodeNotFoundException e) {
throw new IOException("Failed to get containers list. Unable to find required node", e);
}
}

@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState, HddsProtos.NodeState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,50 @@ public void testCancelledNodesMovedToInService()
nodeManager.getNodeStatus(dn1).getOperationalState());
}

@Test
public void testContainersReplicatedOnDecomDnAPI()
throws NodeNotFoundException, ContainerNotFoundException {
conf.setBoolean("hdds.scm.replication.enable.legacy", false);

DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
nodeManager.register(dn1,
new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING,
HddsProtos.NodeState.HEALTHY));

Set<ContainerID> containers = new HashSet<>();
containers.add(ContainerID.valueOf(1));
containers.add(ContainerID.valueOf(2));
nodeManager.setContainers(dn1, containers);
DatanodeAdminMonitorTestUtil
.mockGetContainerReplicaCount(repManager,
true,
HddsProtos.LifeCycleState.CLOSED,
DECOMMISSIONING,
IN_SERVICE,
IN_SERVICE);

monitor.startMonitoring(dn1);
monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
assertEquals(monitor.getContainersReplicatedOnNode(dn1).get("UnderReplicated").size(), 2);
assertEquals(monitor.getContainersReplicatedOnNode(dn1).get("UnClosed").size(), 0);

DatanodeAdminMonitorTestUtil
.mockGetContainerReplicaCount(repManager,
true,
HddsProtos.LifeCycleState.OPEN,
IN_SERVICE);

monitor.run();
assertEquals(1, monitor.getTrackedNodeCount());
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
nodeManager.getNodeStatus(dn1).getOperationalState());
assertEquals(monitor.getContainersReplicatedOnNode(dn1).get("UnderReplicated").size(), 0);
assertEquals(monitor.getContainersReplicatedOnNode(dn1).get("UnClosed").size(), 2);
}

/**
* Generate a set of ContainerID, starting from an ID of zero up to the given
* count minus 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
}
}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
return storageContainerLocationClient.getContainersOnDecomNode(dn);
}

@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState,
Expand Down
Loading