Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaInfo;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
Expand Down Expand Up @@ -178,6 +180,14 @@ ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;

/**
* Gets the list of underReplicated and unClosed containers on a decommissioning node
*
* @param dn - Datanode detail
* @return Lists of underReplicated and Unclosed containers
*/
Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException;

/**
* Returns a set of Nodes that meet a query criteria. Passing null for opState
* or nodeState acts like a wild card, returning all nodes in that state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
Expand Down Expand Up @@ -219,6 +220,14 @@ List<ContainerInfo> listContainer(long startContainerID,
*/
void deleteContainer(long containerID) throws IOException;

/**
* Gets the list of underReplicated and unClosed containers on a decommissioning node
*
* @param dn - Datanode detail
* @return Lists of underReplicated and unClosed containers
*/
Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException;

/**
* Queries a list of Node Statuses. Passing a null for either opState or
* state acts like a wildcard returning all nodes in that state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto;
Expand Down Expand Up @@ -55,6 +56,9 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainersOnDecomNodeProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
Expand Down Expand Up @@ -455,6 +459,23 @@ public void deleteContainer(long containerID)

}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
GetContainersOnDecomNodeRequestProto request = GetContainersOnDecomNodeRequestProto.newBuilder()
.setDatanodeDetails(dn.getProtoBufMessage()).build();
GetContainersOnDecomNodeResponseProto response = submitRequest(Type.GetContainersOnDecomNode,
builder -> builder.setGetContainersOnDecomNodeRequest(request)).getGetContainersOnDecomNodeResponse();
Map<String, List<ContainerID>> containerMap = new HashMap<>();
for (ContainersOnDecomNodeProto containersProto : response.getContainersOnDecomNodeList()) {
List<ContainerID> containerIds = new ArrayList<>();
for (HddsProtos.ContainerID id : containersProto.getIdList()) {
containerIds.add(ContainerID.getFromProtobuf(id));
}
containerMap.put(containersProto.getName(),containerIds);
}
return containerMap;
}

/**
* Queries a list of Nodes based on their operational state or health state.
* Passing a null for either value acts as a wildcard for that state.
Expand Down
16 changes: 16 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ message ScmContainerLocationRequest {
optional TransferLeadershipRequestProto transferScmLeadershipRequest = 42;
optional GetFailedDeletedBlocksTxnRequestProto getFailedDeletedBlocksTxnRequest = 43;
optional DecommissionScmRequestProto decommissionScmRequest = 44;
optional GetContainersOnDecomNodeRequestProto getContainersOnDecomNodeRequest = 45;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -133,6 +134,7 @@ message ScmContainerLocationResponse {
optional TransferLeadershipResponseProto transferScmLeadershipResponse = 42;
optional GetFailedDeletedBlocksTxnResponseProto getFailedDeletedBlocksTxnResponse = 43;
optional DecommissionScmResponseProto decommissionScmResponse = 44;
optional GetContainersOnDecomNodeResponseProto getContainersOnDecomNodeResponse = 45;

enum Status {
OK = 1;
Expand Down Expand Up @@ -184,6 +186,7 @@ enum Type {
TransferLeadership = 38;
GetFailedDeletedBlocksTransaction = 39;
DecommissionScm = 40;
GetContainersOnDecomNode = 41;
}

/**
Expand Down Expand Up @@ -591,6 +594,19 @@ message DecommissionScmResponseProto {
optional string errorMsg = 2;
}

message GetContainersOnDecomNodeRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
}

message ContainersOnDecomNodeProto {
required string name = 1;
repeated ContainerID id = 2;
}

message GetContainersOnDecomNodeResponseProto {
repeated ContainersOnDecomNodeProto containersOnDecomNode = 1;
}

/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -31,4 +35,5 @@ public interface DatanodeAdminMonitor extends Runnable {
void stopMonitoring(DatanodeDetails dn);
Set<DatanodeAdminMonitorImpl.TrackedNode> getTrackedNodes();
void setMetrics(NodeDecommissionMetrics metrics);
Map<String, List<ContainerID>> containersReplicatedOnNode(DatanodeDetails dn) throws NodeNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,54 @@ private boolean checkContainersReplicatedOnNode(TrackedNode dn)
return underReplicated == 0 && unclosed == 0;
}

public Map<String, List<ContainerID>> containersReplicatedOnNode(DatanodeDetails dn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this method to something more appropriate, like getContainersPendingReplication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if it doesn't turn out to be too complicated, let's refactor the common code between this method and the one above to another common method. That way we don't have to maintain the same logic in two places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the common code, Could you please take a look at it now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactor looks good, but I think you forgot to change the method's name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review! It is fixed in the latest code. Could you please review it again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see the method name as containersReplicatedOnNode in the latest commit... am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed the above in this pr: #6293

throws NodeNotFoundException {
List<ContainerID> underReplicatedIDs = new ArrayList<>();
List<ContainerID> unClosedIDs = new ArrayList<>();
Set<ContainerID> containers =
nodeManager.getContainers(dn);
for (ContainerID cid : containers) {
try {
ContainerReplicaCount replicaSet =
replicationManager.getContainerReplicaCount(cid);

HddsProtos.LifeCycleState containerState
= replicaSet.getContainer().getState();
if (containerState == HddsProtos.LifeCycleState.DELETED
|| containerState == HddsProtos.LifeCycleState.DELETING) {
continue;
}

boolean isHealthy = replicaSet.isHealthyEnoughForOffline();
if (!isHealthy) {
unClosedIDs.add(cid);
continue;
}

boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
".legacy", false);
boolean replicatedOK;
if (legacyEnabled) {
replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(dn, nodeManager);
} else {
ReplicationManagerReport report = new ReplicationManagerReport();
replicationManager.checkContainerStatus(replicaSet.getContainer(), report);
replicatedOK = report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED) == 0;
}

if (!replicatedOK) {
underReplicatedIDs.add(cid);
}
} catch (ContainerNotFoundException e) {
LOG.warn("ContainerID {} present in node list for {} but not found in containerManager", cid, dn);
}
}
Map<String, List<ContainerID>> containerList = new HashMap<>();
containerList.put("UnderReplicated", underReplicatedIDs);
containerList.put("UnClosed", unClosedIDs);
return containerList;
}

private String replicaDetails(Collection<ContainerReplica> replicas) {
StringBuilder sb = new StringBuilder();
sb.append("Replicas{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
TimeUnit.SECONDS);
}

@VisibleForTesting
public DatanodeAdminMonitor getMonitor() {
return monitor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
Expand Down Expand Up @@ -51,6 +52,9 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerReplicasResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainersOnDecomNodeProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerTokenRequestProto;
Expand Down Expand Up @@ -604,6 +608,12 @@ public ScmContainerLocationResponse processRequest(
.setDecommissionNodesResponse(decommissionNodes(
request.getDecommissionNodesRequest()))
.build();
case GetContainersOnDecomNode:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setGetContainersOnDecomNodeResponse(getContainersOnDecomNode(request.getGetContainersOnDecomNodeRequest()))
.build();
case RecommissionNodes:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
Expand Down Expand Up @@ -1140,6 +1150,22 @@ public DecommissionNodesResponseProto decommissionNodes(
return response.build();
}

public GetContainersOnDecomNodeResponseProto getContainersOnDecomNode(GetContainersOnDecomNodeRequestProto request)
throws IOException {
Map<String, List<ContainerID>> containerMap = impl.getContainersOnDecomNode(
DatanodeDetails.getFromProtoBuf(request.getDatanodeDetails()));
List<ContainersOnDecomNodeProto> containersProtoList = new ArrayList<>();
for (Map.Entry<String, List<ContainerID>> containerList : containerMap.entrySet()) {
List<HddsProtos.ContainerID> containerIdsProto = new ArrayList<>();
for (ContainerID id : containerList.getValue()) {
containerIdsProto.add(id.getProtobuf());
}
containersProtoList.add(ContainersOnDecomNodeProto.newBuilder().setName(containerList.getKey())
.addAllId(containerIdsProto).build());
}
return GetContainersOnDecomNodeResponseProto.newBuilder().addAllContainersOnDecomNode(containersProtoList).build();
}

public RecommissionNodesResponseProto recommissionNodes(
RecommissionNodesRequestProto request) throws IOException {
List<DatanodeAdminError> errors =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,15 @@ public void deleteContainer(long containerID) throws IOException {
}
}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
try {
return scm.getScmDecommissionManager().getMonitor().containersReplicatedOnNode(dn);
} catch (NodeNotFoundException e) {
throw new IOException("Failed to get containers list. Unable to find required node", e);
}
}

@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState, HddsProtos.NodeState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
}
}

@Override
public Map<String, List<ContainerID>> getContainersOnDecomNode(DatanodeDetails dn) throws IOException {
return storageContainerLocationClient.getContainersOnDecomNode(dn);
}

@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import picocli.CommandLine;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -83,6 +85,8 @@ public void execute(ScmClient scmClient) throws IOException {
DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
node.getNodeID());
printDetails(datanode);
Map<String, List<ContainerID>> containers = scmClient.getContainersOnDecomNode(datanode);
System.out.println(containers);
}
}
private void printDetails(DatanodeDetails datanode) {
Expand Down