diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 5e5ea8cebc6f7..45f4d6b73dbb8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -40,10 +40,10 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.CuratorCache; -import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; -import org.apache.curator.framework.recipes.cache.CuratorCacheListener; -import org.apache.curator.framework.recipes.shared.SharedCount; +//import org.apache.curator.framework.recipes.cache.CuratorCache; +//import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +//import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +//import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; @@ -136,10 +136,10 @@ public static void setCurator(CuratorFramework curator) { private final boolean isExternalClient; protected final CuratorFramework zkClient; - private SharedCount delTokSeqCounter; - private SharedCount keyIdSeqCounter; - private CuratorCacheBridge keyCache; - private CuratorCacheBridge tokenCache; +// private SharedCount delTokSeqCounter; +// private SharedCount keyIdSeqCounter; +// private CuratorCacheBridge keyCache; +// private CuratorCacheBridge tokenCache; private final int seqNumBatchSize; private int currentSeqNum; private int currentMaxSeqNum; @@ -310,108 +310,108 @@ private String getKrb5LoginModuleName() { @Override public void startThreads() throws IOException { - if (!isExternalClient) { - try { - zkClient.start(); - } catch (Exception e) { - throw new IOException("Could not start Curator Framework", e); - } - } else { - // If namespace parents are implicitly created, they won't have ACLs. - // So, let's explicitly create them. - CuratorFramework nullNsFw = zkClient.usingNamespace(null); - EnsurePath ensureNs = - nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace()); - try { - ensureNs.ensure(nullNsFw.getZookeeperClient()); - } catch (Exception e) { - throw new IOException("Could not create namespace", e); - } - } - try { - delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); - if (delTokSeqCounter != null) { - delTokSeqCounter.start(); - } - // the first batch range should be allocated during this starting window - // by calling the incrSharedCount - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched initial range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); - } catch (Exception e) { - throw new IOException("Could not start Sequence Counter", e); - } - try { - keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0); - if (keyIdSeqCounter != null) { - keyIdSeqCounter.start(); - } - } catch (Exception e) { - throw new IOException("Could not start KeyId Counter", e); - } - try { - createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT); - createPersistentNode(ZK_DTSM_TOKENS_ROOT); - } catch (Exception e) { - throw new RuntimeException("Could not create ZK paths"); - } - try { - keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT) - .build(); - CuratorCacheListener keyCacheListener = CuratorCacheListener.builder() - .forCreatesAndChanges((oldNode, node) -> { - try { - processKeyAddOrUpdate(node.getData()); - } catch (IOException e) { - LOG.error("Error while processing Curator keyCacheListener " - + "NODE_CREATED / NODE_CHANGED event"); - throw new UncheckedIOException(e); - } - }) - .forDeletes(childData -> processKeyRemoved(childData.getPath())) - .build(); - keyCache.listenable().addListener(keyCacheListener); - keyCache.start(); - loadFromZKCache(false); - } catch (Exception e) { - throw new IOException("Could not start Curator keyCacheListener for keys", - e); - } - if (isTokenWatcherEnabled) { - LOG.info("TokenCache is enabled"); - try { - tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) - .build(); - CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() - .forCreatesAndChanges((oldNode, node) -> { - try { - processTokenAddOrUpdate(node.getData()); - } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " - + "NODE_CREATED / NODE_CHANGED event"); - throw new UncheckedIOException(e); - } - }) - .forDeletes(childData -> { - try { - processTokenRemoved(childData); - } catch (IOException e) { - LOG.error("Error while processing Curator tokenCacheListener " - + "NODE_DELETED event"); - throw new UncheckedIOException(e); - } - }) - .build(); - tokenCache.listenable().addListener(tokenCacheListener); - tokenCache.start(); - loadFromZKCache(true); - } catch (Exception e) { - throw new IOException( - "Could not start Curator tokenCacheListener for tokens", e); - } - } - super.startThreads(); +// if (!isExternalClient) { +// try { +// zkClient.start(); +// } catch (Exception e) { +// throw new IOException("Could not start Curator Framework", e); +// } +// } else { +// // If namespace parents are implicitly created, they won't have ACLs. +// // So, let's explicitly create them. +// CuratorFramework nullNsFw = zkClient.usingNamespace(null); +// EnsurePath ensureNs = +// nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace()); +// try { +// ensureNs.ensure(nullNsFw.getZookeeperClient()); +// } catch (Exception e) { +// throw new IOException("Could not create namespace", e); +// } +// } +// try { +// delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); +// if (delTokSeqCounter != null) { +// delTokSeqCounter.start(); +// } +// // the first batch range should be allocated during this starting window +// // by calling the incrSharedCount +// currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); +// currentMaxSeqNum = currentSeqNum + seqNumBatchSize; +// LOG.info("Fetched initial range of seq num, from {} to {} ", +// currentSeqNum+1, currentMaxSeqNum); +// } catch (Exception e) { +// throw new IOException("Could not start Sequence Counter", e); +// } +// try { +// keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0); +// if (keyIdSeqCounter != null) { +// keyIdSeqCounter.start(); +// } +// } catch (Exception e) { +// throw new IOException("Could not start KeyId Counter", e); +// } +// try { +// createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT); +// createPersistentNode(ZK_DTSM_TOKENS_ROOT); +// } catch (Exception e) { +// throw new RuntimeException("Could not create ZK paths"); +// } +// try { +//// keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT) +//// .build(); +//// CuratorCacheListener keyCacheListener = CuratorCacheListener.builder() +//// .forCreatesAndChanges((oldNode, node) -> { +//// try { +//// processKeyAddOrUpdate(node.getData()); +//// } catch (IOException e) { +//// LOG.error("Error while processing Curator keyCacheListener " +//// + "NODE_CREATED / NODE_CHANGED event"); +//// throw new UncheckedIOException(e); +//// } +//// }) +//// .forDeletes(childData -> processKeyRemoved(childData.getPath())) +//// .build(); +//// keyCache.listenable().addListener(keyCacheListener); +//// keyCache.start(); +// loadFromZKCache(false); +// } catch (Exception e) { +// throw new IOException("Could not start Curator keyCacheListener for keys", +// e); +// } +// if (isTokenWatcherEnabled) { +// LOG.info("TokenCache is enabled"); +// try { +//// tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) +//// .build(); +//// CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() +//// .forCreatesAndChanges((oldNode, node) -> { +//// try { +//// processTokenAddOrUpdate(node.getData()); +//// } catch (IOException e) { +//// LOG.error("Error while processing Curator tokenCacheListener " +//// + "NODE_CREATED / NODE_CHANGED event"); +//// throw new UncheckedIOException(e); +//// } +//// }) +//// .forDeletes(childData -> { +//// try { +//// processTokenRemoved(childData); +//// } catch (IOException e) { +//// LOG.error("Error while processing Curator tokenCacheListener " +//// + "NODE_DELETED event"); +//// throw new UncheckedIOException(e); +//// } +//// }) +//// .build(); +//// tokenCache.listenable().addListener(tokenCacheListener); +//// tokenCache.start(); +// loadFromZKCache(true); +// } catch (Exception e) { +// throw new IOException( +// "Could not start Curator tokenCacheListener for tokens", e); +// } +// } +// super.startThreads(); } /** @@ -424,27 +424,27 @@ private void loadFromZKCache(final boolean isTokenCache) { final String cacheName = isTokenCache ? "token" : "key"; LOG.info("Starting to load {} cache.", cacheName); final Stream children; - if (isTokenCache) { - children = tokenCache.stream(); - } else { - children = keyCache.stream(); - } +// if (isTokenCache) { +// children = tokenCache.stream(); +// } else { +// children = keyCache.stream(); +// } final AtomicInteger count = new AtomicInteger(0); - children.forEach(childData -> { - try { - if (isTokenCache) { - processTokenAddOrUpdate(childData.getData()); - } else { - processKeyAddOrUpdate(childData.getData()); - } - } catch (Exception e) { - LOG.info("Ignoring node {} because it failed to load.", - childData.getPath()); - LOG.debug("Failure exception:", e); - count.getAndIncrement(); - } - }); +// children.forEach(childData -> { +// try { +// if (isTokenCache) { +// processTokenAddOrUpdate(childData.getData()); +// } else { +// processKeyAddOrUpdate(childData.getData()); +// } +// } catch (Exception e) { +// LOG.info("Ignoring node {} because it failed to load.", +// childData.getPath()); +// LOG.debug("Failure exception:", e); +// count.getAndIncrement(); +// } +// }); if (isTokenCache) { syncTokenOwnerStats(); } @@ -504,41 +504,41 @@ private void processTokenRemoved(ChildData data) throws IOException { @Override public void stopThreads() { super.stopThreads(); - try { - if (tokenCache != null) { - tokenCache.close(); - } - } catch (Exception e) { - LOG.error("Could not stop Delegation Token Cache", e); - } - try { - if (delTokSeqCounter != null) { - delTokSeqCounter.close(); - } - } catch (Exception e) { - LOG.error("Could not stop Delegation Token Counter", e); - } - try { - if (keyIdSeqCounter != null) { - keyIdSeqCounter.close(); - } - } catch (Exception e) { - LOG.error("Could not stop Key Id Counter", e); - } - try { - if (keyCache != null) { - keyCache.close(); - } - } catch (Exception e) { - LOG.error("Could not stop KeyCache", e); - } - try { - if (!isExternalClient && (zkClient != null)) { - zkClient.close(); - } - } catch (Exception e) { - LOG.error("Could not stop Curator Framework", e); - } +// try { +// if (tokenCache != null) { +// tokenCache.close(); +// } +// } catch (Exception e) { +// LOG.error("Could not stop Delegation Token Cache", e); +// } +// try { +// if (delTokSeqCounter != null) { +// delTokSeqCounter.close(); +// } +// } catch (Exception e) { +// LOG.error("Could not stop Delegation Token Counter", e); +// } +// try { +// if (keyIdSeqCounter != null) { +// keyIdSeqCounter.close(); +// } +// } catch (Exception e) { +// LOG.error("Could not stop Key Id Counter", e); +// } +// try { +// if (keyCache != null) { +// keyCache.close(); +// } +// } catch (Exception e) { +// LOG.error("Could not stop KeyCache", e); +// } +// try { +// if (!isExternalClient && (zkClient != null)) { +// zkClient.close(); +// } +// } catch (Exception e) { +// LOG.error("Could not stop Curator Framework", e); +// } } private void createPersistentNode(String nodePath) throws Exception { @@ -553,42 +553,43 @@ private void createPersistentNode(String nodePath) throws Exception { @Override protected int getDelegationTokenSeqNum() { - return delTokSeqCounter.getCount(); - } - - private int incrSharedCount(SharedCount sharedCount, int batchSize) - throws Exception { - while (true) { - // Loop until we successfully increment the counter - VersionedValue versionedValue = sharedCount.getVersionedValue(); - if (sharedCount.trySetCount( - versionedValue, versionedValue.getValue() + batchSize)) { - return versionedValue.getValue(); - } - } - } + //return delTokSeqCounter.getCount(); + return 0; + } + +// private int incrSharedCount(SharedCount sharedCount, int batchSize) +// throws Exception { +// while (true) { +// // Loop until we successfully increment the counter +// VersionedValue versionedValue = sharedCount.getVersionedValue(); +// if (sharedCount.trySetCount( +// versionedValue, versionedValue.getValue() + batchSize)) { +// return versionedValue.getValue(); +// } +// } +// } @Override protected int incrementDelegationTokenSeqNum() { // The secret manager will keep a local range of seq num which won't be // seen by peers, so only when the range is exhausted it will ask zk for // another range again - if (currentSeqNum >= currentMaxSeqNum) { - try { - // after a successful batch request, we can get the range starting point - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched new range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug( - "Thread interrupted while performing token counter increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared counter !!", e); - } - } +// if (currentSeqNum >= currentMaxSeqNum) { +// try { +// // after a successful batch request, we can get the range starting point +// //currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); +// currentMaxSeqNum = currentSeqNum + seqNumBatchSize; +// LOG.info("Fetched new range of seq num, from {} to {} ", +// currentSeqNum+1, currentMaxSeqNum); +// } catch (InterruptedException e) { +// // The ExpirationThread is just finishing.. so dont do anything.. +// LOG.debug( +// "Thread interrupted while performing token counter increment", e); +// Thread.currentThread().interrupt(); +// } catch (Exception e) { +// throw new RuntimeException("Could not increment shared counter !!", e); +// } +// } return ++currentSeqNum; } @@ -596,7 +597,7 @@ protected int incrementDelegationTokenSeqNum() { @Override protected void setDelegationTokenSeqNum(int seqNum) { try { - delTokSeqCounter.setCount(seqNum); + // delTokSeqCounter.setCount(seqNum); } catch (Exception e) { throw new RuntimeException("Could not set shared counter !!", e); } @@ -604,21 +605,23 @@ protected void setDelegationTokenSeqNum(int seqNum) { @Override protected int getCurrentKeyId() { - return keyIdSeqCounter.getCount(); +// return keyIdSeqCounter.getCount(); + return 0; } @Override protected int incrementCurrentKeyId() { - try { - incrSharedCount(keyIdSeqCounter, 1); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug("Thread interrupted while performing keyId increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared keyId counter !!", e); - } - return keyIdSeqCounter.getCount(); +// try { +// //incrSharedCount(keyIdSeqCounter, 1); +// } catch (InterruptedException e) { +// // The ExpirationThread is just finishing.. so dont do anything.. +// LOG.debug("Thread interrupted while performing keyId increment", e); +// Thread.currentThread().interrupt(); +// } catch (Exception e) { +// throw new RuntimeException("Could not increment shared keyId counter !!", e); +// } +// return keyIdSeqCounter.getCount(); + return 0; } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java index 1fd339bfc359b..1c717d22f5607 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java @@ -132,7 +132,7 @@ public void testRandomHealthAndDisconnects() throws Exception { long st = Time.now(); while (Time.now() - st < runFor) { cluster.getTestContext().checkException(); - serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); + //serverFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); Thread.sleep(50); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3252535ca4876..54f5785605069 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -5360,4 +5360,14 @@ public void disableSPS() { public StoragePolicySatisfyManager getSPSManager() { return spsManager; } + + public void setExculeSlowDataNodesForWriteEnabled(boolean enable) { + placementPolicies.getPolicy(CONTIGUOUS).setExculeSlowDataNodesEnabled(enable); + placementPolicies.getPolicy(STRIPED).setExculeSlowDataNodesEnabled(enable); + } + + @VisibleForTesting + public boolean getEnableExculeSlowDataNodesForWrite(BlockType blockType) { + return placementPolicies.getPolicy(blockType).getExculeSlowDataNodesEnabled(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 2a212d6261598..de7649e3ac838 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -261,4 +261,16 @@ public void splitNodesWithRack( } } } + + /** + * Updates the value used for excludeSlowNodesEnabled, which is set by + * {@code DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY} + * initially. + * + * @param enable true, we will filter out slow nodes + * when choosing targets for blocks, otherwise false not filter. + */ + public abstract void setExculeSlowDataNodesEnabled(boolean enable); + + public abstract boolean getExculeSlowDataNodesEnabled(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 3ea232258aa2b..75e530fabc9f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -103,7 +103,7 @@ private String getText() { protected double considerLoadFactor; private boolean preferLocalNode; private boolean dataNodePeerStatsEnabled; - private boolean excludeSlowNodesEnabled; + private volatile boolean excludeSlowNodesEnabled; protected NetworkTopology clusterMap; protected Host2NodesMap host2datanodeMap; private FSClusterStats stats; @@ -1359,5 +1359,15 @@ protected Collection pickupReplicaSet( void setPreferLocalNode(boolean prefer) { this.preferLocalNode = prefer; } + + @Override + public void setExculeSlowDataNodesEnabled(boolean enable) { + this.excludeSlowNodesEnabled = enable; + } + + @Override + public boolean getExculeSlowDataNodesEnabled() { + return excludeSlowNodesEnabled; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 578656ea43d53..7ec7356ebd264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -189,6 +189,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ToolRunner.confirmPrompt; @@ -331,7 +333,8 @@ public enum OperationCategory { DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, DFS_IMAGE_PARALLEL_LOAD_KEY, - DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)); + DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2200,7 +2203,8 @@ protected String reconfigurePropertyImpl(String property, String newVal) return newVal; } else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) { return reconfigureParallelLoad(newVal); - } else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) { + } else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) + || (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))) { return reconfigureSlowNodesParameters(datanodeManager, property, newVal); } else { throw new ReconfigurationException(property, newVal, getConf().get( @@ -2234,7 +2238,7 @@ private String reconfReplicationParameters(final String newVal, newSetting = bm.getBlocksReplWorkMultiplier(); } else { throw new IllegalArgumentException("Unexpected property " + - property + "in reconfReplicationParameters"); + property + " in reconfReplicationParameters"); } LOG.info("RECONFIGURE* changed {} to {}", property, newSetting); return String.valueOf(newSetting); @@ -2390,15 +2394,24 @@ String reconfigureParallelLoad(String newVal) { String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager, final String property, final String newVal) throws ReconfigurationException { + BlockManager bm = namesystem.getBlockManager(); namesystem.writeLock(); boolean enable; try { - if (newVal == null) { - enable = DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT; + if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) { + enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT : + Boolean.parseBoolean(newVal)); + datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable); + } else if (property.equals( + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) { + enable = (newVal == null ? + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT : + Boolean.parseBoolean(newVal)); + bm.setExculeSlowDataNodesForWriteEnabled(enable); } else { - enable = Boolean.parseBoolean(newVal); + throw new IllegalArgumentException("Unexpected property " + + property + " in reconfigureSlowNodesParameters"); } - datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable); LOG.info("RECONFIGURE* changed {} to {}", property, newVal); return Boolean.toString(enable); } catch (IllegalArgumentException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index eed442c24096f..2631fd53610af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -34,10 +34,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; +import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; @@ -52,6 +54,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT; public class TestNameNodeReconfigure { @@ -399,8 +402,8 @@ public void testEnableParallelLoadAfterReconfigured() public void testEnableSlowNodesParametersAfterReconfigured() throws ReconfigurationException { final NameNode nameNode = cluster.getNameNode(); - final DatanodeManager datanodeManager = nameNode.namesystem - .getBlockManager().getDatanodeManager(); + final BlockManager blockManager = nameNode.namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); // By default, avoidSlowDataNodesForRead is false. assertEquals(false, datanodeManager.getEnableAvoidSlowDataNodesForRead()); @@ -410,6 +413,21 @@ public void testEnableSlowNodesParametersAfterReconfigured() // After reconfigured, avoidSlowDataNodesForRead is true. assertEquals(true, datanodeManager.getEnableAvoidSlowDataNodesForRead()); + + // By default, excludeSlowNodesEnabled is false. + assertEquals(false, blockManager. + getEnableExculeSlowDataNodesForWrite(BlockType.CONTIGUOUS)); + assertEquals(false, blockManager. + getEnableExculeSlowDataNodesForWrite(BlockType.STRIPED)); + + nameNode.reconfigureProperty( + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, Boolean.toString(true)); + + // After reconfigured, excludeSlowNodesEnabled is true. + assertEquals(true, blockManager. + getEnableExculeSlowDataNodesForWrite(BlockType.CONTIGUOUS)); + assertEquals(true, blockManager. + getEnableExculeSlowDataNodesForWrite(BlockType.STRIPED)); } @After diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 04776fa79b760..21634a829c1ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -39,6 +39,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import org.apache.commons.io.FileUtils; import org.apache.commons.text.TextStringBuilder; @@ -431,13 +432,14 @@ public void testNameNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(14, outs.size()); + assertEquals(15, outs.size()); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1)); assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2)); assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3)); assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4)); assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(6)); + assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6)); + assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7)); assertEquals(errs.size(), 0); }