diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 5b6fb6fe9b81..a30f8414dce7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -364,6 +364,9 @@ public static DatanodeDetails.Builder newBuilder( if (datanodeDetailsProto.hasNetworkLocation()) { builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation()); } + if (datanodeDetailsProto.hasLevel()) { + builder.setLevel(datanodeDetailsProto.getLevel()); + } if (datanodeDetailsProto.hasPersistedOpState()) { builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState()); } @@ -456,6 +459,9 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder( if (!Strings.isNullOrEmpty(getNetworkLocation())) { builder.setNetworkLocation(getNetworkLocation()); } + if (getLevel() > 0) { + builder.setLevel(getLevel()); + } if (persistedOpState != null) { builder.setPersistedOpState(persistedOpState); } @@ -585,6 +591,7 @@ public static final class Builder { private String hostName; private String networkName; private String networkLocation; + private int level; private List ports; private String certSerialId; private String version; @@ -616,6 +623,7 @@ public Builder setDatanodeDetails(DatanodeDetails details) { this.hostName = details.getHostName(); this.networkName = details.getNetworkName(); this.networkLocation = details.getNetworkLocation(); + this.level = details.getLevel(); this.ports = details.getPorts(); this.certSerialId = details.getCertSerialId(); this.version = details.getVersion(); @@ -683,6 +691,11 @@ public Builder setNetworkLocation(String loc) { return this; } + public Builder setLevel(int level) { + this.level = level; + return this; + } + /** * Adds a DataNode Port. * @@ -807,6 +820,9 @@ public DatanodeDetails build() { if (networkName != null) { dn.setNetworkName(networkName); } + if (level > 0) { + dn.setLevel(level); + } return dn; } } @@ -1011,4 +1027,13 @@ public String getBuildDate() { public void setBuildDate(String date) { this.buildDate = date; } + + @Override + public HddsProtos.NetworkNode toProtobuf( + int clientVersion) { + HddsProtos.NetworkNode networkNode = + HddsProtos.NetworkNode.newBuilder() + .setDatanodeDetails(toProtoBuilder(clientVersion).build()).build(); + return networkNode; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java index c87d826d2529..6074e7da0afc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.List; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + /** * The interface defines an inner node in a network topology. * An inner node represents network topology entities, such as data center, @@ -89,4 +91,16 @@ N newInnerNode(String name, String location, InnerNode parent, int level, */ Node getLeaf(int leafIndex, List excludedScopes, Collection excludedNodes, int ancestorGen); + + @Override + HddsProtos.NetworkNode toProtobuf(int clientVersion); + + boolean equals(Object o); + + int hashCode(); + + static InnerNode fromProtobuf( + HddsProtos.InnerNode innerNode) { + return InnerNodeImpl.fromProtobuf(innerNode); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java index f2648f3d294c..332dddac25c9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +48,10 @@ public InnerNodeImpl newInnerNode(String name, String location, } } - static final Factory FACTORY = new Factory(); + public static final Factory FACTORY = new Factory(); // a map of node's network name to Node for quick search and keep // the insert order - private final HashMap childrenMap = + private HashMap childrenMap = new LinkedHashMap(); // number of descendant leaves under this node private int numOfLeaves; @@ -66,6 +67,76 @@ protected InnerNodeImpl(String name, String location, InnerNode parent, super(name, location, parent, level, cost); } + /** + * Construct an InnerNode from its name, network location, level, cost, + * childrenMap and number of leaves. This constructor is used as part of + * protobuf deserialization. + */ + protected InnerNodeImpl(String name, String location, int level, int cost, + HashMap childrenMap, int numOfLeaves) { + super(name, location, null, level, cost); + this.childrenMap = childrenMap; + this.numOfLeaves = numOfLeaves; + } + + /** + * InnerNodeImpl Builder to help construct an InnerNodeImpl object from + * protobuf objects. + */ + public static class Builder { + private String name; + private String location; + private int cost; + private int level; + private HashMap childrenMap = new LinkedHashMap<>(); + private int numOfLeaves; + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setLocation(String location) { + this.location = location; + return this; + } + + public Builder setCost(int cost) { + this.cost = cost; + return this; + } + + public Builder setLevel(int level) { + this.level = level; + return this; + } + + public Builder setChildrenMap( + List childrenMapList) { + HashMap newChildrenMap = new LinkedHashMap<>(); + for (HddsProtos.ChildrenMap childrenMapProto : + childrenMapList) { + String networkName = childrenMapProto.hasNetworkName() ? + childrenMapProto.getNetworkName() : null; + Node node = childrenMapProto.hasNetworkNode() ? + Node.fromProtobuf(childrenMapProto.getNetworkNode()) : null; + newChildrenMap.put(networkName, node); + } + this.childrenMap = newChildrenMap; + return this; + } + + public Builder setNumOfLeaves(int numOfLeaves) { + this.numOfLeaves = numOfLeaves; + return this; + } + + public InnerNodeImpl build() { + return new InnerNodeImpl(name, location, level, cost, childrenMap, + numOfLeaves); + } + } + /** @return the number of children this node has */ private int getNumOfChildren() { return childrenMap.size(); @@ -77,6 +148,11 @@ public int getNumOfLeaves() { return numOfLeaves; } + /** @return a map of node's network name to Node. */ + public HashMap getChildrenMap() { + return childrenMap; + } + /** * @return number of its all nodes at level level. Here level is a * relative level. If level is 1, means node itself. If level is 2, means its @@ -390,14 +466,83 @@ public Node getLeaf(int leafIndex, List excludedScopes, } @Override - public boolean equals(Object to) { - if (to == null) { - return false; + public HddsProtos.NetworkNode toProtobuf( + int clientVersion) { + + HddsProtos.InnerNode.Builder innerNode = + HddsProtos.InnerNode.newBuilder() + .setNumOfLeaves(numOfLeaves) + .setNodeTopology( + NodeImpl.toProtobuf(getNetworkName(), getNetworkLocation(), + getLevel(), getCost())); + + if (childrenMap != null && !childrenMap.isEmpty()) { + for (Map.Entry entry : childrenMap.entrySet()) { + if (entry.getValue() != null) { + HddsProtos.ChildrenMap childrenMapProto = + HddsProtos.ChildrenMap.newBuilder() + .setNetworkName(entry.getKey()) + .setNetworkNode(entry.getValue().toProtobuf(clientVersion)) + .build(); + innerNode.addChildrenMap(childrenMapProto); + } + } + } + innerNode.build(); + + HddsProtos.NetworkNode networkNode = + HddsProtos.NetworkNode.newBuilder() + .setInnerNode(innerNode).build(); + + return networkNode; + } + + public static InnerNode fromProtobuf(HddsProtos.InnerNode innerNode) { + InnerNodeImpl.Builder builder = new InnerNodeImpl.Builder(); + + if (innerNode.hasNodeTopology()) { + HddsProtos.NodeTopology nodeTopology = innerNode.getNodeTopology(); + + if (nodeTopology.hasName()) { + builder.setName(nodeTopology.getName()); + } + if (nodeTopology.hasLocation()) { + builder.setLocation(nodeTopology.getLocation()); + } + if (nodeTopology.hasLevel()) { + builder.setLevel(nodeTopology.getLevel()); + } + if (nodeTopology.hasCost()) { + builder.setCost(nodeTopology.getCost()); + } + } + + if (!innerNode.getChildrenMapList().isEmpty()) { + builder.setChildrenMap(innerNode.getChildrenMapList()); + } + if (innerNode.hasNumOfLeaves()) { + builder.setNumOfLeaves(innerNode.getNumOfLeaves()); } - if (this == to) { + + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } - return this.toString().equals(to.toString()); + if (o == null || getClass() != o.getClass()) { + return false; + } + InnerNodeImpl innerNode = (InnerNodeImpl) o; + return this.getNetworkName().equals(innerNode.getNetworkName()) && + this.getNetworkLocation().equals(innerNode.getNetworkLocation()) && + this.getLevel() == innerNode.getLevel() && + this.getCost() == innerNode.getCost() && + this.numOfLeaves == innerNode.numOfLeaves && + this.childrenMap.size() == innerNode.childrenMap.size() && + this.childrenMap.equals(innerNode.childrenMap); } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java index 2dc86c1b6856..f6f013259c59 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java @@ -75,6 +75,15 @@ public NetworkTopologyImpl(ConfigurationSource conf) { schemaManager.getCost(NetConstants.ROOT_LEVEL)); } + public NetworkTopologyImpl(String schemaFile, InnerNode clusterTree) { + schemaManager = NodeSchemaManager.getInstance(); + schemaManager.init(schemaFile); + maxLevel = schemaManager.getMaxLevel(); + shuffleOperation = Collections::shuffle; + factory = InnerNodeImpl.FACTORY; + this.clusterTree = clusterTree; + } + @VisibleForTesting public NetworkTopologyImpl(NodeSchemaManager manager, Consumer> shuffleOperation) { @@ -726,8 +735,13 @@ public int getDistanceCost(Node node1, Node node2) { int cost = 0; netlock.readLock().lock(); try { - if ((node1.getAncestor(level1 - 1) != clusterTree) || - (node2.getAncestor(level2 - 1) != clusterTree)) { + Node ancestor1 = node1.getAncestor(level1 - 1); + boolean node1Topology = (ancestor1 != null && clusterTree != null && + !ancestor1.equals(clusterTree)) || (ancestor1 != clusterTree); + Node ancestor2 = node2.getAncestor(level2 - 1); + boolean node2Topology = (ancestor2 != null && clusterTree != null && + !ancestor2.equals(clusterTree)) || (ancestor2 != clusterTree); + if (node1Topology || node2Topology) { LOG.debug("One of the nodes is outside of network topology"); return Integer.MAX_VALUE; } @@ -741,7 +755,7 @@ public int getDistanceCost(Node node1, Node node2) { level2--; cost += node2 == null ? 0 : node2.getCost(); } - while (node1 != null && node2 != null && node1 != node2) { + while (node1 != null && node2 != null && !node1.equals(node2)) { node1 = node1.getParent(); node2 = node2.getParent(); cost += node1 == null ? 0 : node1.getCost(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java index 9884888a1dd4..50f702cce08e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdds.scm.net; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + /** * The interface defines a node in a network topology. * A node may be a leave representing a data node or an inner @@ -126,4 +129,21 @@ public interface Node { * @return true if this node is under a specific scope */ boolean isDescendant(String nodePath); + + default HddsProtos.NetworkNode toProtobuf( + int clientVersion) { + return null; + } + + static Node fromProtobuf( + HddsProtos.NetworkNode networkNode) { + if (networkNode.hasDatanodeDetails()) { + return DatanodeDetails.getFromProtoBuf( + networkNode.getDatanodeDetails()); + } else if (networkNode.hasInnerNode()) { + return InnerNode.fromProtobuf(networkNode.getInnerNode()); + } else { + return null; + } + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java index e7a45f649b6e..e4d76cd3dbc7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.net; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT; import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR; @@ -229,6 +230,20 @@ public boolean isDescendant(String nodePath) { NetUtils.addSuffix(nodePath)); } + public static HddsProtos.NodeTopology toProtobuf(String name, String location, + int level, int cost) { + + HddsProtos.NodeTopology.Builder nodeTopologyBuilder = + HddsProtos.NodeTopology.newBuilder() + .setName(name) + .setLocation(location) + .setLevel(level) + .setCost(cost); + + HddsProtos.NodeTopology nodeTopology = nodeTopologyBuilder.build(); + return nodeTopology; + } + @Override public boolean equals(Object to) { if (to == null) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java index eecd79876720..fb37b214cad1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java @@ -62,6 +62,14 @@ public void init(ConfigurationSource conf) { String schemaFile = conf.get( ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT); + loadSchemaFile(schemaFile); + } + + public void init(String schemaFile) { + loadSchemaFile(schemaFile); + } + + private void loadSchemaFile(String schemaFile) { NodeSchemaLoadResult result; try { result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7bfda0184096..2d60e60e467d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -672,6 +672,11 @@ public final class OzoneConfigKeys { public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY = "hdds.scmclient.failover.max.retry"; + public static final String + OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION = + "ozone.om.network.topology.refresh.duration"; + public static final String + OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT = "1h"; /** * There is no need to instantiate this class. diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e5e3726beb5d..25e3385e7dc0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3786,6 +3786,14 @@ Wait duration before which close container is send to DN. + + ozone.om.network.topology.refresh.duration + 1h + SCM, OZONE, OM + The duration at which we periodically fetch the updated network + topology cluster tree from SCM. + + ozone.scm.ha.ratis.server.snapshot.creation.gap 1024 diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java new file mode 100644 index 000000000000..2e42df957346 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmTopologyClient.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT; + +/** + * This client implements a background thread which periodically checks and + * gets the latest network topology cluster tree from SCM. + */ +public class ScmTopologyClient { + private static final Logger LOG = + LoggerFactory.getLogger(ScmTopologyClient.class); + + private final ScmBlockLocationProtocol scmBlockLocationProtocol; + private final AtomicReference cache = new AtomicReference<>(); + private ScheduledExecutorService executorService; + + public ScmTopologyClient( + ScmBlockLocationProtocol scmBlockLocationProtocol) { + this.scmBlockLocationProtocol = scmBlockLocationProtocol; + } + + public InnerNode getClusterTree() { + return requireNonNull(cache.get(), + "ScmBlockLocationClient must have been initialized already."); + } + + public void start(ConfigurationSource conf) throws IOException { + final InnerNode initialTopology = + scmBlockLocationProtocol.getNetworkTopology(); + LOG.info("Initial network topology fetched from SCM: {}.", + initialTopology); + cache.set(initialTopology); + scheduleNetworkTopologyPoller(conf, Instant.now()); + } + + public void stop() { + if (executorService != null) { + executorService.shutdown(); + try { + if (executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while shutting down executor service.", e); + Thread.currentThread().interrupt(); + } + } + } + + private void scheduleNetworkTopologyPoller(ConfigurationSource conf, + Instant initialInvocation) { + Duration refreshDuration = parseRefreshDuration(conf); + Instant nextRefresh = initialInvocation.plus(refreshDuration); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("NetworkTopologyPoller") + .setDaemon(true) + .build(); + executorService = Executors.newScheduledThreadPool(1, threadFactory); + Duration initialDelay = Duration.between(Instant.now(), nextRefresh); + + LOG.debug("Scheduling NetworkTopologyPoller with an initial delay of {}.", + initialDelay); + executorService.scheduleAtFixedRate(() -> checkAndRefresh(), + initialDelay.toMillis(), refreshDuration.toMillis(), + TimeUnit.MILLISECONDS); + } + + public static Duration parseRefreshDuration(ConfigurationSource conf) { + long refreshDurationInMs = conf.getTimeDuration( + OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION, + OZONE_OM_NETWORK_TOPOLOGY_REFRESH_DURATION_DEFAULT, + TimeUnit.MILLISECONDS); + return Duration.ofMillis(refreshDurationInMs); + } + + private synchronized void checkAndRefresh() { + InnerNode current = cache.get(); + try { + InnerNode newTopology = scmBlockLocationProtocol.getNetworkTopology(); + if (!newTopology.equals(current)) { + cache.set(newTopology); + LOG.info("Updated network topology cluster tree fetched from " + + "SCM: {}.", newTopology); + } + } catch (IOException e) { + throw new UncheckedIOException( + "Error fetching updated network topology cluster tree from SCM", e); + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java new file mode 100644 index 000000000000..8dc9cb3cca2f --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ * Freon related helper classes used for load testing. + */ + +/** + * Contains SCM client related classes. + */ +package org.apache.hadoop.hdds.scm.client; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java index ef2585488faa..8c84af859b4a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.net.InnerNode; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; @@ -138,4 +139,11 @@ List allocateBlock(long size, int numBlocks, */ List sortDatanodes(List nodes, String clientMachine) throws IOException; + + /** + * Retrieves the hierarchical cluster tree representing the network topology. + * @return the root node of the network topology cluster tree. + * @throws IOException + */ + InnerNode getNetworkTopology() throws IOException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 2e724969998b..1f114304ccaa 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -39,6 +40,8 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos .SortDatanodesRequestProto; @@ -49,6 +52,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.InnerNodeImpl; +import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider; @@ -328,6 +334,43 @@ public List sortDatanodes(List nodes, return results; } + @Override + public InnerNode getNetworkTopology() throws IOException { + GetClusterTreeRequestProto request = + GetClusterTreeRequestProto.newBuilder().build(); + SCMBlockLocationRequest wrapper = createSCMBlockRequest(Type.GetClusterTree) + .setGetClusterTreeRequest(request) + .build(); + + final SCMBlockLocationResponse wrappedResponse = + handleError(submitRequest(wrapper)); + GetClusterTreeResponseProto resp = + wrappedResponse.getGetClusterTreeResponse(); + + return (InnerNode) setParent( + InnerNodeImpl.fromProtobuf(resp.getClusterTree())); + } + + /** + * Sets the parent field for the clusterTree nodes recursively. + * + * @param node cluster tree without parents set. + * @return updated cluster tree with parents set. + */ + private Node setParent(Node node) { + if (node instanceof InnerNodeImpl) { + InnerNodeImpl innerNode = (InnerNodeImpl) node; + if (innerNode.getChildrenMap() != null) { + for (Map.Entry child : innerNode.getChildrenMap() + .entrySet()) { + child.getValue().setParent(innerNode); + setParent(child.getValue()); + } + } + } + return node; + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 3f346300b3ed..405845312357 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -47,6 +47,7 @@ message DatanodeDetailsProto { optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch when the OpState should expire // TODO(runzhiwang): when uuid is gone, specify 1 as the index of uuid128 and mark as required optional UUID uuid128 = 100; // UUID with 128 bits assigned to the Datanode. + optional uint32 level = 101; } /** @@ -497,3 +498,26 @@ message CompactionLogEntryProto { repeated CompactionFileInfoProto outputFileIntoList = 4; optional string compactionReason = 5; } + +message NodeTopology { + optional string name = 1; + optional string location = 2; + optional uint32 cost = 3; + optional uint32 level = 4; +} + +message NetworkNode { + optional DatanodeDetailsProto datanodeDetails = 1; + optional InnerNode innerNode = 3; +} + +message ChildrenMap { + optional string networkName = 1; + optional NetworkNode networkNode = 2; +} + +message InnerNode { + optional NodeTopology nodeTopology = 1; + optional uint32 numOfLeaves = 2; + repeated ChildrenMap childrenMap = 3; +} diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto index 307c23a56202..3d281975f2b4 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto @@ -30,7 +30,6 @@ package hadoop.hdds.block; import "hdds.proto"; - // SCM Block protocol enum Type { @@ -39,6 +38,7 @@ enum Type { GetScmInfo = 13; SortDatanodes = 14; AddScm = 15; + GetClusterTree = 16; } message SCMBlockLocationRequest { @@ -56,6 +56,7 @@ message SCMBlockLocationRequest { optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13; optional SortDatanodesRequestProto sortDatanodesRequest = 14; optional hadoop.hdds.AddScmRequestProto addScmRequestProto = 15; + optional GetClusterTreeRequestProto getClusterTreeRequest = 16; } message SCMBlockLocationResponse { @@ -80,6 +81,7 @@ message SCMBlockLocationResponse { optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13; optional SortDatanodesResponseProto sortDatanodesResponse = 14; optional hadoop.hdds.AddScmResponseProto addScmResponse = 15; + optional GetClusterTreeResponseProto getClusterTreeResponse = 16; } /** @@ -230,6 +232,13 @@ message SortDatanodesResponseProto{ repeated DatanodeDetailsProto node = 1; } +message GetClusterTreeRequestProto { +} + +message GetClusterTreeResponseProto { + required InnerNode clusterTree = 1; +} + /** * Protocol used from OzoneManager to StorageContainerManager. * See request and response messages for details of the RPC calls. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 7893e90812dc..ab296fc52bf8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -354,5 +354,4 @@ public int hashCode() { public boolean equals(Object obj) { return super.equals(obj); } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java index 7b1d6dd27d3a..3aff2f456e4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java @@ -261,5 +261,4 @@ public int compareTo(NodeStatus o) { } return order; } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index 0914cdd90b22..e77e2aebb31f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetClusterTreeResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.RatisUtil; +import org.apache.hadoop.hdds.scm.net.InnerNode; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -159,6 +161,10 @@ private SCMBlockLocationResponse processMessage( request.getSortDatanodesRequest(), request.getVersion() )); break; + case GetClusterTree: + response.setGetClusterTreeResponse( + getClusterTree(request.getVersion())); + break; default: // Should never happen throw new IOException("Unknown Operation " + request.getCmdType() + @@ -276,4 +282,13 @@ public SortDatanodesResponseProto sortDatanodes( throw new ServiceException(ex); } } + + public GetClusterTreeResponseProto getClusterTree(int clientVersion) + throws IOException { + GetClusterTreeResponseProto.Builder resp = + GetClusterTreeResponseProto.newBuilder(); + InnerNode clusterTree = impl.getNetworkTopology(); + resp.setClusterTree(clusterTree.toProtobuf(clientVersion).getInnerNode()); + return resp.build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 69f190c7fbd8..a0d0a85bbcbf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -73,6 +73,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.IO_EXCEPTION; import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT; import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; @@ -412,6 +413,11 @@ private Node getOtherNode(String clientMachine) { return null; } + @Override + public InnerNode getNetworkTopology() { + return (InnerNode) scm.getClusterMap().getNode(ROOT); + } + @Override public AuditMessage buildAuditMessageForSuccess( AuditAction op, Map auditMap) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java index a82a1a8be70a..77970ad4470b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.scm.ha.HASecurityUtils; import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; @@ -46,6 +47,7 @@ import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.ozone.om.OMStorage; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; @@ -312,6 +314,8 @@ public void testDelegationToken(boolean useIp) throws Exception { try { // Start OM om.setCertClient(new CertificateClientTestImpl(conf)); + om.setScmTopologyClient(new ScmTopologyClient( + new ScmBlockLocationTestingClient(null, null, 0))); om.start(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); String username = ugi.getUserName(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java new file mode 100644 index 000000000000..463c8b5ae5d9 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * + * This class is to test the serialization/deserialization of cluster tree + * information from SCM. + */ +@Timeout(300) +public class TestGetClusterTreeInformation { + + public static final Logger LOG = + LoggerFactory.getLogger(TestGetClusterTreeInformation.class); + private static int numOfDatanodes = 3; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static StorageContainerManager scm; + + @BeforeAll + public static void init() throws IOException, TimeoutException, + InterruptedException { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(numOfDatanodes) + .setNumOfOzoneManagers(3) + .setNumOfStorageContainerManagers(3) + .build(); + cluster.waitForClusterToBeReady(); + scm = cluster.getStorageContainerManager(); + } + + @AfterAll + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testGetClusterTreeInformation() throws IOException { + SCMBlockLocationFailoverProxyProvider failoverProxyProvider = + new SCMBlockLocationFailoverProxyProvider(conf); + failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); + ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = + new ScmBlockLocationProtocolClientSideTranslatorPB( + failoverProxyProvider); + + InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT); + InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology(); + assertEquals(expectedInnerNode, actualInnerNode); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java new file mode 100644 index 000000000000..cef872597e43 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOMSortDatanodes.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.DFSConfigKeysLegacy; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.server.SCMConfigurator; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.net.StaticMapping; + +import org.apache.hadoop.ozone.om.KeyManagerImpl; +import org.apache.hadoop.ozone.om.OmTestManagers; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_LEVEL; +import static org.mockito.Mockito.mock; + +/** + * {@link org.apache.hadoop.hdds.scm.server.TestSCMBlockProtocolServer} + * sortDatanodes tests for + * {@link org.apache.hadoop.ozone.om.KeyManagerImpl#sortDatanodes(List, String)}. + */ +@Timeout(300) +public class TestOMSortDatanodes { + + private static OzoneConfiguration config; + private static StorageContainerManager scm; + private static NodeManager nodeManager; + private static KeyManagerImpl keyManager; + private static StorageContainerLocationProtocol mockScmContainerClient; + private static OzoneManager om; + private static File dir; + private static final int NODE_COUNT = 10; + private static final Map EDGE_NODES = ImmutableMap.of( + "edge0", "/rack0", + "edge1", "/rack1" + ); + + @BeforeAll + public static void setup() throws Exception { + config = new OzoneConfiguration(); + dir = GenericTestUtils.getRandomizedTestDir(); + config.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString()); + config.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + StaticMapping.class.getName()); + config.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true"); + List datanodes = new ArrayList<>(NODE_COUNT); + List nodeMapping = new ArrayList<>(NODE_COUNT); + for (int i = 0; i < NODE_COUNT; i++) { + DatanodeDetails dn = randomDatanodeDetails(); + final String rack = "/rack" + (i % 2); + nodeMapping.add(dn.getHostName() + "=" + rack); + nodeMapping.add(dn.getIpAddress() + "=" + rack); + datanodes.add(dn); + } + EDGE_NODES.forEach((n, r) -> nodeMapping.add(n + "=" + r)); + config.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING, + String.join(",", nodeMapping)); + + SCMConfigurator configurator = new SCMConfigurator(); + configurator.setSCMHAManager(SCMHAManagerStub.getInstance(true)); + configurator.setScmContext(SCMContext.emptyContext()); + scm = HddsTestUtils.getScm(config, configurator); + scm.start(); + scm.exitSafeMode(); + nodeManager = scm.getScmNodeManager(); + datanodes.forEach(dn -> nodeManager.register(dn, null, null)); + mockScmContainerClient = + mock(StorageContainerLocationProtocol.class); + OmTestManagers omTestManagers + = new OmTestManagers(config, scm.getBlockProtocolServer(), + mockScmContainerClient); + om = omTestManagers.getOzoneManager(); + keyManager = (KeyManagerImpl)omTestManagers.getKeyManager(); + } + + @AfterAll + public static void cleanup() throws Exception { + if (scm != null) { + scm.stop(); + scm.join(); + } + if (om != null) { + om.stop(); + } + FileUtils.deleteDirectory(dir); + } + + @Test + public void sortDatanodesRelativeToDatanode() { + for (DatanodeDetails dn : nodeManager.getAllNodes()) { + assertEquals(ROOT_LEVEL + 2, dn.getLevel()); + List sorted = + keyManager.sortDatanodes(nodeManager.getAllNodes(), nodeAddress(dn)); + assertEquals(dn, sorted.get(0), + "Source node should be sorted very first"); + assertRackOrder(dn.getNetworkLocation(), sorted); + } + } + + @Test + public void sortDatanodesRelativeToNonDatanode() { + for (Map.Entry entry : EDGE_NODES.entrySet()) { + assertRackOrder(entry.getValue(), + keyManager.sortDatanodes(nodeManager.getAllNodes(), entry.getKey())); + } + } + + @Test + public void testSortDatanodes() { + List nodes = nodeManager.getAllNodes(); + + // sort normal datanodes + String client; + client = nodeManager.getAllNodes().get(0).getIpAddress(); + List datanodeDetails = + keyManager.sortDatanodes(nodes, client); + assertEquals(NODE_COUNT, datanodeDetails.size()); + + // illegal client 1 + client += "X"; + datanodeDetails = keyManager.sortDatanodes(nodes, client); + assertEquals(NODE_COUNT, datanodeDetails.size()); + + // illegal client 2 + client = "/default-rack"; + datanodeDetails = keyManager.sortDatanodes(nodes, client); + assertEquals(NODE_COUNT, datanodeDetails.size()); + } + + private static void assertRackOrder(String rack, List list) { + int size = list.size(); + for (int i = 0; i < size / 2; i++) { + assertEquals(rack, list.get(i).getNetworkLocation(), + "Nodes in the same rack should be sorted first"); + } + for (int i = size / 2; i < size; i++) { + assertNotEquals(rack, list.get(i).getNetworkLocation(), + "Nodes in the other rack should be sorted last"); + } + } + + private String nodeAddress(DatanodeDetails dn) { + boolean useHostname = config.getBoolean( + DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME, + DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + return useHostname ? dn.getHostName() : dn.getIpAddress(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java index 1be5b64ac87d..3f4b8f6e1c25 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java @@ -50,10 +50,12 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.scm.ha.HASecurityUtils; import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; @@ -87,6 +89,7 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMStorage; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; @@ -209,6 +212,7 @@ final class TestSecureOzoneCluster { private File testUserKeytab; private String testUserPrincipal; private StorageContainerManager scm; + private ScmBlockLocationProtocol scmBlockClient; private OzoneManager om; private HddsProtos.OzoneManagerDetailsProto omInfo; private String host; @@ -265,6 +269,7 @@ void init() { clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); + scmBlockClient = new ScmBlockLocationTestingClient(null, null, 0); startMiniKdc(); setSecureConfig(); @@ -610,6 +615,7 @@ void testAccessControlExceptionOnClient() throws Exception { setupOm(conf); om.setCertClient(new CertificateClientTestImpl(conf)); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.start(); } catch (Exception ex) { // Expects timeout failure from scmClient in om but om user login via @@ -677,6 +683,7 @@ void testDelegationTokenRenewal() throws Exception { setupOm(conf); OzoneManager.setTestSecureOmFlag(true); om.setCertClient(new CertificateClientTestImpl(conf)); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.start(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -764,6 +771,7 @@ void testGetSetRevokeS3Secret() throws Exception { setupOm(conf); // Start OM om.setCertClient(new CertificateClientTestImpl(conf)); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.start(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); String username = ugi.getUserName(); @@ -1000,6 +1008,7 @@ void testCertificateRotation() throws Exception { // create Ozone Manager instance, it will start the monitor task conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost"); om = OzoneManager.createOm(conf); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.setCertClient(client); // check after renew, client will have the new cert ID @@ -1165,6 +1174,7 @@ void testCertificateRotationUnRecoverableFailure() throws Exception { // create Ozone Manager instance, it will start the monitor task conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost"); om = OzoneManager.createOm(conf); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.setCertClient(mockClient); // check error message during renew @@ -1203,6 +1213,7 @@ void testDelegationTokenRenewCrossCertificateRenew() throws Exception { String omCertId1 = omCert.getSerialNumber().toString(); // Start OM om.setCertClient(certClient); + om.setScmTopologyClient(new ScmTopologyClient(scmBlockClient)); om.start(); GenericTestUtils.waitFor(() -> om.isLeaderReady(), 100, 10000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 50ff9c36a0a3..2ae69dc3c96f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -50,6 +50,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.InnerNodeImpl; +import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -162,6 +165,9 @@ public static void setUp() throws Exception { mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class); mockScmContainerClient = mock(StorageContainerLocationProtocol.class); + InnerNode.Factory factory = InnerNodeImpl.FACTORY; + when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn( + factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1)); OmTestManagers omTestManagers = new OmTestManagers(conf, mockScmBlockLocationProtocol, mockScmContainerClient); @@ -247,10 +253,13 @@ private static void createVolume(String volumeName) throws IOException { } @BeforeEach - public void beforeEach() { + public void beforeEach() throws IOException { CONTAINER_ID.getAndIncrement(); reset(mockScmBlockLocationProtocol, mockScmContainerClient, mockDn1Protocol, mockDn2Protocol); + InnerNode.Factory factory = InnerNodeImpl.FACTORY; + when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn( + factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1)); when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1)); when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2)); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java index 41f1c14f3727..72f1c3374b28 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java @@ -46,6 +46,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.ozone.OzoneAcl; @@ -197,6 +198,8 @@ private void setupEnvironment(boolean aclEnabled, OzoneManager.setTestSecureOmFlag(true); om = OzoneManager.createOm(conf); + om.setScmTopologyClient(new ScmTopologyClient( + new ScmBlockLocationTestingClient(null, null, 0))); om.setCertClient(new CertificateClientTestImpl(conf)); om.start(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 3786601dd63a..f9c665e98066 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -45,11 +45,15 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.DFSConfigKeysLegacy; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.net.NodeImpl; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.utils.BackgroundService; @@ -58,6 +62,9 @@ import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.net.CachedDNSToSwitchMapping; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.TableMapping; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -96,6 +103,7 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj; import org.apache.hadoop.ozone.security.acl.RequestContext; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -108,6 +116,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ; +import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT; import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; @@ -190,6 +199,7 @@ public class KeyManagerImpl implements KeyManager { private BackgroundService openKeyCleanupService; private BackgroundService multipartUploadCleanupService; private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService; + private DNSToSwitchMapping dnsToSwitchMapping; public KeyManagerImpl(OzoneManager om, ScmClient scmClient, OzoneConfiguration conf, OMPerformanceMetrics metrics) { @@ -339,6 +349,16 @@ public void start(OzoneConfiguration configuration) { ozoneManager, configuration); multipartUploadCleanupService.start(); } + + Class dnsToSwitchMappingClass = + configuration.getClass( + DFSConfigKeysLegacy.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + TableMapping.class, DNSToSwitchMapping.class); + DNSToSwitchMapping newInstance = ReflectionUtils.newInstance( + dnsToSwitchMappingClass, configuration); + dnsToSwitchMapping = + ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance + : new CachedDNSToSwitchMapping(newInstance)); } KeyProviderCryptoExtension getKMSProvider() { @@ -1850,8 +1870,7 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) return encInfo; } - @VisibleForTesting - void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { + private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { if (keyInfos != null && clientMachine != null) { Map, List> sortedPipelines = new HashMap<>(); for (OmKeyInfo keyInfo : keyInfos) { @@ -1871,8 +1890,7 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { LOG.warn("No datanodes in pipeline {}", pipeline.getId()); continue; } - sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo, - uuidList); + sortedNodes = sortDatanodes(nodes, clientMachine); if (sortedNodes != null) { sortedPipelines.put(uuidSet, sortedNodes); } @@ -1886,24 +1904,59 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { } } - private List sortDatanodes(String clientMachine, - List nodes, OmKeyInfo keyInfo, List nodeList) { - List sortedNodes = null; + @VisibleForTesting + public List sortDatanodes(List nodes, + String clientMachine) { + final Node client = getClientNode(clientMachine, nodes); + return ozoneManager.getClusterMap() + .sortByDistanceCost(client, nodes, nodes.size()); + } + + private Node getClientNode(String clientMachine, + List nodes) { + List matchingNodes = new ArrayList<>(); + boolean useHostname = ozoneManager.getConfiguration().getBoolean( + DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME, + DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + for (DatanodeDetails node : nodes) { + if ((useHostname ? node.getHostName() : node.getIpAddress()).equals( + clientMachine)) { + matchingNodes.add(node); + } + } + return !matchingNodes.isEmpty() ? matchingNodes.get(0) : + getOtherNode(clientMachine); + } + + private Node getOtherNode(String clientMachine) { try { - sortedNodes = scmClient.getBlockClient() - .sortDatanodes(nodeList, clientMachine); - if (LOG.isDebugEnabled()) { - LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes, - clientMachine, sortedNodes); + String clientLocation = resolveNodeLocation(clientMachine); + if (clientLocation != null) { + Node rack = ozoneManager.getClusterMap().getNode(clientLocation); + if (rack instanceof InnerNode) { + return new NodeImpl(clientMachine, clientLocation, + (InnerNode) rack, rack.getLevel() + 1, + NODE_COST_DEFAULT); + } } - } catch (IOException e) { - LOG.warn("Unable to sort datanodes based on distance to client, " - + " volume={}, bucket={}, key={}, client={}, datanodes={}, " - + " exception={}", - keyInfo.getVolumeName(), keyInfo.getBucketName(), - keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage()); + } catch (Exception e) { + LOG.info("Could not resolve client {}: {}", + clientMachine, e.getMessage()); + } + return null; + } + + private String resolveNodeLocation(String hostname) { + List hosts = Collections.singletonList(hostname); + List resolvedHosts = dnsToSwitchMapping.resolve(hosts); + if (resolvedHosts != null && !resolvedHosts.isEmpty()) { + String location = resolvedHosts.get(0); + LOG.debug("Node {} resolved to location {}", hostname, location); + return location; + } else { + LOG.debug("Node resolution did not yield any result for {}", hostname); + return null; } - return sortedNodes; } private static List toNodeUuid(Collection nodes) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index fda68b416e4f..decd980aee4c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -82,7 +82,11 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.server.OzoneAdmins; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.Table.KeyValue; @@ -354,6 +358,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private OzoneBlockTokenSecretManager blockTokenMgr; private CertificateClient certClient; private SecretKeySignerClient secretKeyClient; + private ScmTopologyClient scmTopologyClient; private final Text omRpcAddressTxt; private OzoneConfiguration configuration; private RPC.Server omRpcServer; @@ -386,6 +391,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private OzoneManagerHttpServer httpServer; private final OMStorage omStorage; private ObjectName omInfoBeanName; + private NetworkTopology clusterMap; private Timer metricsTimer; private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask; private static final ObjectWriter WRITER = @@ -603,6 +609,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) final StorageContainerLocationProtocol scmContainerClient = getScmContainerClient(configuration); // verifies that the SCM info in the OM Version file is correct. final ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(configuration); + scmTopologyClient = new ScmTopologyClient(scmBlockClient); this.scmClient = new ScmClient(scmBlockClient, scmContainerClient, configuration); this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(), @@ -1135,6 +1142,24 @@ public void setCertClient(CertificateClient newClient) throws IOException { serviceInfo = new ServiceInfoProvider(secConfig, this, certClient); } + /** + * For testing purpose only. This allows setting up ScmBlockLocationClient + * without having to fully setup a working cluster. + */ + @VisibleForTesting + public void setScmTopologyClient( + ScmTopologyClient scmTopologyClient) { + this.scmTopologyClient = scmTopologyClient; + } + + public NetworkTopology getClusterMap() { + InnerNode currentTree = scmTopologyClient.getClusterTree(); + return new NetworkTopologyImpl(configuration.get( + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT), + currentTree); + } + /** * For testing purpose only. This allows testing token in integration test * without fully setting up a working secure cluster. @@ -1673,6 +1698,18 @@ public void start() throws IOException { keyManager.start(configuration); + try { + scmTopologyClient.start(configuration); + } catch (IOException ex) { + LOG.error("Unable to initialize network topology schema file. ", ex); + throw new UncheckedIOException(ex); + } + + clusterMap = new NetworkTopologyImpl(configuration.get( + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, + ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT), + scmTopologyClient.getClusterTree()); + try { httpServer = new OzoneManagerHttpServer(configuration, this); httpServer.start(); @@ -2231,6 +2268,11 @@ public boolean stop() { } keyManager.stop(); stopSecretManager(); + + if (scmTopologyClient != null) { + scmTopologyClient.stop(); + } + if (httpServer != null) { httpServer.stop(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java index 43d29c1608a8..edffd5ed74eb 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.ScmTopologyClient; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -105,12 +106,16 @@ public OmTestManagers(OzoneConfiguration conf, keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils .getInternalState(om, "keyManager"); ScmClient scmClient = new ScmClient(scmBlockClient, containerClient, conf); + ScmTopologyClient scmTopologyClient = + new ScmTopologyClient(scmBlockClient); HddsWhiteboxTestUtils.setInternalState(om, "scmClient", scmClient); HddsWhiteboxTestUtils.setInternalState(keyManager, "scmClient", scmClient); HddsWhiteboxTestUtils.setInternalState(keyManager, "secretManager", mock(OzoneBlockTokenSecretManager.class)); + HddsWhiteboxTestUtils.setInternalState(om, + "scmTopologyClient", scmTopologyClient); om.start(); waitFor(() -> om.getOmRatisServer().checkLeaderStatus() == RaftServerStatus.LEADER_AND_READY, diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java index 8847a2d51e3f..8ba5ca779c1e 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java @@ -31,6 +31,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.InnerNodeImpl; +import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -200,6 +203,14 @@ public List sortDatanodes(List nodes, return null; } + @Override + public InnerNode getNetworkTopology() { + InnerNode.Factory factory = InnerNodeImpl.FACTORY; + InnerNode clusterTree = factory.newInnerNode("", "", null, + NetConstants.ROOT_LEVEL, 1); + return clusterTree; + } + /** * Return the number of blocks puesdo deleted by this testing client. */ diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java index 278d96023c81..5e2e27e0c1f4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java @@ -23,12 +23,10 @@ import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -44,6 +42,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.net.InnerNode; +import org.apache.hadoop.hdds.scm.net.InnerNodeImpl; +import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -79,14 +80,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import static com.google.common.collect.Sets.newHashSet; -import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.Comparator.comparing; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -124,6 +120,9 @@ void setup(@TempDir Path testDir) throws Exception { configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.toString()); containerClient = mock(StorageContainerLocationProtocol.class); blockClient = mock(ScmBlockLocationProtocol.class); + InnerNode.Factory factory = InnerNodeImpl.FACTORY; + when(blockClient.getNetworkTopology()).thenReturn( + factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1)); OmTestManagers omTestManagers = new OmTestManagers(configuration, blockClient, containerClient); @@ -644,9 +643,6 @@ public void listStatus() throws Exception { OMRequestTestUtils.addBucketToDB(volume, bucket, metadataManager); final Pipeline pipeline = MockPipeline.createPipeline(3); - final List nodes = pipeline.getNodes().stream() - .map(DatanodeDetails::getUuidString) - .collect(toList()); Set containerIDs = new HashSet<>(); List containersWithPipeline = new ArrayList<>(); @@ -696,7 +692,6 @@ public void listStatus() throws Exception { assertEquals(10, fileStatusList.size()); verify(containerClient).getContainerWithPipelineBatch(containerIDs); - verify(blockClient).sortDatanodes(nodes, client); // call list status the second time, and verify no more calls to // SCM. @@ -704,67 +699,4 @@ public void listStatus() throws Exception { null, Long.MAX_VALUE, client); verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet()); } - - @ParameterizedTest - @ValueSource(strings = {"anyhost", ""}) - public void sortDatanodes(String client) throws Exception { - // GIVEN - int pipelineCount = 3; - int keysPerPipeline = 5; - OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline]; - List> expectedSortDatanodesInvocations = new ArrayList<>(); - Map> expectedSortedNodes = new HashMap<>(); - int ki = 0; - for (int p = 0; p < pipelineCount; p++) { - final Pipeline pipeline = MockPipeline.createPipeline(3); - final List nodes = pipeline.getNodes().stream() - .map(DatanodeDetails::getUuidString) - .collect(toList()); - expectedSortDatanodesInvocations.add(nodes); - final List sortedNodes = pipeline.getNodes().stream() - .sorted(comparing(DatanodeDetails::getUuidString)) - .collect(toList()); - expectedSortedNodes.put(pipeline, sortedNodes); - - when(blockClient.sortDatanodes(nodes, client)) - .thenReturn(sortedNodes); - - for (int i = 1; i <= keysPerPipeline; i++) { - OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder() - .setBlockID(new BlockID(i, 1L)) - .setPipeline(pipeline) - .setOffset(0) - .setLength(256000) - .build(); - - OmKeyInfo keyInfo = new OmKeyInfo.Builder() - .setOmKeyLocationInfos(Arrays.asList( - new OmKeyLocationInfoGroup(0, emptyList()), - new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo)))) - .build(); - keyInfos[ki++] = keyInfo; - } - } - - // WHEN - keyManager.sortDatanodes(client, keyInfos); - - // THEN - // verify all key info locations got updated - for (OmKeyInfo keyInfo : keyInfos) { - OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); - assertNotNull(locations); - for (OmKeyLocationInfo locationInfo : locations.getLocationList()) { - Pipeline pipeline = locationInfo.getPipeline(); - List expectedOrder = expectedSortedNodes.get(pipeline); - assertEquals(expectedOrder, pipeline.getNodesInOrder()); - } - } - - // expect one invocation per pipeline - for (List nodes : expectedSortDatanodesInvocations) { - verify(blockClient).sortDatanodes(nodes, client); - } - } - }