Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

/**
* The interface to call into underlying container layer.
Expand Down Expand Up @@ -194,6 +195,14 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeOperationalState opState,
HddsProtos.NodeState nodeState, HddsProtos.QueryScope queryScope,
String poolName) throws IOException;

/**
* Returns a node with the given UUID.
* @param uuid - datanode uuid string
* @return A nodes that matches the requested UUID.
* @throws IOException
*/
HddsProtos.Node queryNode(UUID uuid) throws IOException;

/**
* Allows a list of hosts to be decommissioned. The hosts are identified
* by their hostname and optionally port in the format foo.com:port.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

/**
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
Expand Down Expand Up @@ -232,6 +233,8 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeOperationalState opState,
HddsProtos.NodeState state, HddsProtos.QueryScope queryScope,
String poolName, int clientVersion) throws IOException;

HddsProtos.Node queryNode(UUID uuid) throws IOException;

List<DatanodeAdminError> decommissionNodes(List<String> nodes)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
Expand All @@ -114,6 +116,7 @@
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ProtobufUtils;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -123,6 +126,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.UUID;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto.Status.CONTAINER_ALREADY_CLOSED;
Expand Down Expand Up @@ -486,6 +490,18 @@ public List<HddsProtos.Node> queryNode(
return response.getDatanodesList();
}

@Override
public HddsProtos.Node queryNode(UUID uuid) throws IOException {
SingleNodeQueryRequestProto request = SingleNodeQueryRequestProto.newBuilder()
.setUuid(ProtobufUtils.toProtobuf(uuid))
.build();
SingleNodeQueryResponseProto response =
submitRequest(Type.SingleNodeQuery,
builder -> builder.setSingleNodeQueryRequest(request))
.getSingleNodeQueryResponse();
return response.getDatanode();
}

/**
* Attempts to decommission the list of nodes.
* @param nodes The list of hostnames or hostname:ports to decommission
Expand Down
15 changes: 13 additions & 2 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ message ScmContainerLocationRequest {
optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40;
optional ResetDeletedBlockRetryCountRequestProto resetDeletedBlockRetryCountRequest = 41;
optional TransferLeadershipRequestProto transferScmLeadershipRequest = 42;
optional TransferLeadershipRequestProto transferScmLeadershipRequest = 42;
optional GetFailedDeletedBlocksTxnRequestProto getFailedDeletedBlocksTxnRequest = 43;
optional DecommissionScmRequestProto decommissionScmRequest = 44;
optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -130,9 +131,10 @@ message ScmContainerLocationResponse {
optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40;
optional ResetDeletedBlockRetryCountResponseProto resetDeletedBlockRetryCountResponse = 41;
optional TransferLeadershipResponseProto transferScmLeadershipResponse = 42;
optional TransferLeadershipResponseProto transferScmLeadershipResponse = 42;
optional GetFailedDeletedBlocksTxnResponseProto getFailedDeletedBlocksTxnResponse = 43;
optional DecommissionScmResponseProto decommissionScmResponse = 44;
optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45;

enum Status {
OK = 1;
Expand Down Expand Up @@ -184,6 +186,7 @@ enum Type {
TransferLeadership = 38;
GetFailedDeletedBlocksTransaction = 39;
DecommissionScm = 40;
SingleNodeQuery = 41;
}

/**
Expand Down Expand Up @@ -326,6 +329,14 @@ message NodeQueryResponseProto {
repeated Node datanodes = 1;
}

message SingleNodeQueryRequestProto {
required UUID uuid = 1;
}

message SingleNodeQueryResponseProto {
optional Node datanode = 1;
}

/*
Datanode usage info request message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesRequestProto;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.apache.hadoop.util.ProtobufUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -460,6 +463,13 @@ public ScmContainerLocationResponse processRequest(
.setNodeQueryResponse(queryNode(request.getNodeQueryRequest(),
request.getVersion()))
.build();
case SingleNodeQuery:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setSingleNodeQueryResponse(querySingleNode(request
.getSingleNodeQueryRequest()))
.build();
case CloseContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
Expand Down Expand Up @@ -866,6 +876,16 @@ public NodeQueryResponseProto queryNode(
.build();
}

public SingleNodeQueryResponseProto querySingleNode(
SingleNodeQueryRequestProto request)
throws IOException {

HddsProtos.Node datanode = impl.queryNode(ProtobufUtils.fromProtobuf(request.getUuid()));
return SingleNodeQueryResponseProto.newBuilder()
.setDatanode(datanode)
.build();
}

public SCMCloseContainerResponseProto closeContainer(
SCMCloseContainerRequestProto request)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.UUID;

import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService.newReflectiveBlockingService;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
Expand Down Expand Up @@ -613,6 +614,27 @@ public List<HddsProtos.Node> queryNode(
return result;
}

@Override
public HddsProtos.Node queryNode(UUID uuid)
throws IOException {
HddsProtos.Node result = null;
try {
DatanodeDetails node = scm.getScmNodeManager().getNodeByUuid(uuid);
if (node != null) {
NodeStatus ns = scm.getScmNodeManager().getNodeStatus(node);
result = HddsProtos.Node.newBuilder()
.setNodeID(node.getProtoBufMessage())
.addNodeStates(ns.getHealth())
.addNodeOperationalStates(ns.getOperationalState())
.build();
}
} catch (NodeNotFoundException e) {
throw new IOException(
"An unexpected error occurred querying the NodeStatus", e);
}
return result;
}

@Override
public List<DatanodeAdminError> decommissionNodes(List<String> nodes)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED_DEFAULT;
Expand Down Expand Up @@ -225,6 +226,11 @@ public List<HddsProtos.Node> queryNode(
queryScope, poolName, ClientVersion.CURRENT_VERSION);
}

@Override
public HddsProtos.Node queryNode(UUID uuid) throws IOException {
return storageContainerLocationClient.queryNode(uuid);
}

@Override
public List<DatanodeAdminError> decommissionNodes(List<String> hosts)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -82,6 +83,15 @@ public class ListInfoSubcommand extends ScmSubcommand {
@Override
public void execute(ScmClient scmClient) throws IOException {
pipelines = scmClient.listPipelines();
if (!Strings.isNullOrEmpty(uuid)) {
HddsProtos.Node node = scmClient.queryNode(UUID.fromString(uuid));
DatanodeWithAttributes dwa = new DatanodeWithAttributes(DatanodeDetails
.getFromProtoBuf(node.getNodeID()),
node.getNodeOperationalStates(0),
node.getNodeStates(0));
printDatanodeInfo(dwa);
return;
}
Stream<DatanodeWithAttributes> allNodes = getAllNodes(scmClient).stream();
if (!Strings.isNullOrEmpty(ipaddress)) {
allNodes = allNodes.filter(p -> p.getDatanodeDetails().getIpAddress()
Expand All @@ -91,10 +101,6 @@ public void execute(ScmClient scmClient) throws IOException {
allNodes = allNodes.filter(p -> p.getDatanodeDetails().getHostName()
.compareToIgnoreCase(hostname) == 0);
}
if (!Strings.isNullOrEmpty(uuid)) {
allNodes = allNodes.filter(p ->
p.getDatanodeDetails().getUuidString().equals(uuid));
}
if (!Strings.isNullOrEmpty(nodeOperationalState)) {
allNodes = allNodes.filter(p -> p.getOpState().toString()
.compareToIgnoreCase(nodeOperationalState) == 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import picocli.CommandLine;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -101,6 +102,32 @@ public void testDataNodeOperationalStateAndHealthIncludedInOutput()
assertTrue(m.find());
}

@Test
public void testDataNodeByUuidOutput()
throws Exception {
List<HddsProtos.Node> nodes = getNodeDetails();

ScmClient scmClient = mock(ScmClient.class);
when(scmClient.queryNode(any()))
.thenAnswer(invocation -> nodes.get(0));
when(scmClient.listPipelines())
.thenReturn(new ArrayList<>());

CommandLine c = new CommandLine(cmd);
c.parseArgs("--id", nodes.get(0).getNodeID().getUuid());
cmd.execute(scmClient);

Pattern p = Pattern.compile(
"^Operational State:\\s+IN_SERVICE$", Pattern.MULTILINE);
Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
assertTrue(m.find());

p = Pattern.compile(nodes.get(0).getNodeID().getUuid().toString(),
Pattern.MULTILINE);
m = p.matcher(outContent.toString(DEFAULT_ENCODING));
assertTrue(m.find());
}

private List<HddsProtos.Node> getNodeDetails() {
List<HddsProtos.Node> nodes = new ArrayList<>();

Expand Down