diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java index 220e9e2835625..fdaf9a481e8c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java @@ -80,11 +80,13 @@ public void close() throws IOException { public void sendLifeline(DatanodeRegistration registration, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + float volumeUsageStdDev) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) - .setFailedVolumes(failedVolumes); + .setFailedVolumes(failedVolumes) + .setVolumeUsageStdDev(volumeUsageStdDev); builder.addAllReports(PBHelperClient.convertStorageReports(reports)); if (cacheCapacity != 0) { builder.setCacheCapacity(cacheCapacity); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java index 42c8fc9241251..8eeb7fb0025c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java @@ -62,7 +62,8 @@ public LifelineResponseProto sendLifeline(RpcController controller, impl.sendLifeline(PBHelper.convert(request.getRegistration()), report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), - request.getFailedVolumes(), volumeFailureSummary); + request.getFailedVolumes(), volumeFailureSummary, + request.getVolumeUsageStdDev()); return VOID_LIFELINE_RESPONSE_PROTO; } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index add19e9e102ec..55f1dd82befab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -138,13 +138,15 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + @Nonnull SlowDiskReports slowDisks, + float volumeUsageStdDev) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) .setFailedVolumes(failedVolumes) - .setRequestFullBlockReportLease(requestFullBlockReportLease); + .setRequestFullBlockReportLease(requestFullBlockReportLease) + .setVolumeUsageStdDev(volumeUsageStdDev); builder.addAllReports(PBHelperClient.convertStorageReports(reports)); if (cacheCapacity != 0) { builder.setCacheCapacity(cacheCapacity); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 9244b9fef8571..2b3a0badd6ab0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -122,7 +122,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, request.getXceiverCount(), request.getFailedVolumes(), volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), - PBHelper.convertSlowDiskInfo(request.getSlowDisksList())); + PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), + request.getVolumeUsageStdDev()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ead915f1d38de..be96dc4588d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2657,24 +2657,24 @@ public long getProvidedCapacity() { void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) { for (StorageReport report: reports) { providedStorageMap.updateStorage(node, report.getStorage()); } node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount, - failedVolumes, volumeFailureSummary); + failedVolumes, volumeFailureSummary, volumeUsageStdDev); } void updateHeartbeatState(DatanodeDescriptor node, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) { for (StorageReport report: reports) { providedStorageMap.updateStorage(node, report.getStorage()); } node.updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount, - failedVolumes, volumeFailureSummary); + failedVolumes, volumeFailureSummary, volumeUsageStdDev); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index e0e5c354cb8f7..87ab577834609 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -218,6 +218,7 @@ public Type getType() { private long lastBlocksScheduledRollTime = 0; private int volumeFailures = 0; private VolumeFailureSummary volumeFailureSummary = null; + private float volumeUsageStdDev; /** * When set to true, the node is not in include list and is not allowed @@ -340,7 +341,7 @@ boolean hasStaleStorages() { } public void resetBlocks() { - updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null); + updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null, 0.0f); synchronized (invalidateBlocks) { this.invalidateBlocks.clear(); } @@ -379,9 +380,9 @@ public int numBlocks() { */ void updateHeartbeat(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) { updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount, - volFailures, volumeFailureSummary); + volFailures, volumeFailureSummary, volumeUsageSD); heartbeatedSinceRegistration = true; } @@ -390,9 +391,9 @@ void updateHeartbeat(StorageReport[] reports, long cacheCapacity, */ void updateHeartbeatState(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) { updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount, - volFailures, volumeFailureSummary); + volFailures, volumeFailureSummary, volumeUsageSD); setLastUpdate(Time.now()); setLastUpdateMonotonic(Time.monotonicNow()); rollBlocksScheduled(getLastUpdateMonotonic()); @@ -400,7 +401,7 @@ void updateHeartbeatState(StorageReport[] reports, long cacheCapacity, private void updateStorageStats(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) { long totalCapacity = 0; long totalRemaining = 0; long totalBlockPoolUsed = 0; @@ -454,6 +455,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity, setXceiverCount(xceiverCount); this.volumeFailures = volFailures; this.volumeFailureSummary = volumeFailureSummary; + this.volumeUsageStdDev = volumeUsageSD; for (StorageReport report : reports) { DatanodeStorageInfo storage = null; @@ -945,6 +947,13 @@ public VolumeFailureSummary getVolumeFailureSummary() { return volumeFailureSummary; } + /** + * @return the standard deviation of volume usage + */ + public float getVolumeUsageStdDev() { + return volumeUsageStdDev; + } + /** * @param nodeReg DatanodeID to update registration for. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 68ee16ca6f74b..1bbd9a319db67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1751,7 +1751,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + float volumeUsageStdDev) throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); @@ -1769,7 +1770,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, - cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); + cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary, + volumeUsageStdDev); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. @@ -1891,12 +1893,15 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, * @param xceiverCount estimated count of transfer threads running at DataNode * @param failedVolumes count of failed volumes at DataNode * @param volumeFailureSummary info on failed volumes at DataNode + * @param volumeUsageStdDev the standard deviation of volume usage + * * @throws IOException if there is an error */ public void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + float volumeUsageStdDev) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Received handleLifeline from nodeReg = " + nodeReg); } @@ -1917,7 +1922,7 @@ public void handleLifeline(DatanodeRegistration nodeReg, return; } heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed, - xceiverCount, failedVolumes, volumeFailureSummary); + xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 5da47c4b2a8a8..74db46053ec0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -223,7 +223,8 @@ synchronized void register(final DatanodeDescriptor d) { addDatanode(d); //update its timestamp - d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); + d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, + 0L, 0L, 0, 0, null, 0.0f); stats.add(d); } } @@ -254,24 +255,24 @@ synchronized void removeDatanode(DatanodeDescriptor node) { synchronized void updateHeartbeat(final DatanodeDescriptor node, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) { stats.subtract(node); blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed, - xceiverCount, failedVolumes, volumeFailureSummary); + xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev); stats.add(node); } synchronized void updateLifeline(final DatanodeDescriptor node, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) { + VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) { stats.subtract(node); // This intentionally calls updateHeartbeatState instead of // updateHeartbeat, because we don't want to modify the // heartbeatedSinceRegistration flag. Arrival of a lifeline message does // not count as arrival of the first heartbeat. blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed, - xceiverCount, failedVolumes, volumeFailureSummary); + xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev); stats.add(node); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 54624140ac08a..be5ef23352483 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -536,6 +536,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) scheduler.scheduleNextHeartbeat(); StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); + float volumeUsageStdDev = dn.getFSDataset().getVolumeUsageStdDev(); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); @@ -567,7 +568,8 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) volumeFailureSummary, requestBlockReportLease, slowPeers, - slowDisks); + slowDisks, + volumeUsageStdDev); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. @@ -1117,6 +1119,7 @@ private void sendLifeline() throws IOException { .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; + float volumeUsageStdDev = dn.getFSDataset().getVolumeUsageStdDev(); lifelineNamenode.sendLifeline(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), @@ -1124,7 +1127,8 @@ private void sendLifeline() throws IOException { dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, - volumeFailureSummary); + volumeFailureSummary, + volumeUsageStdDev); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index f162ea1b3ae15..c16562d00dd98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -230,6 +230,9 @@ StorageReport[] getStorageReports(String bpid) /** @return a volume information map (name {@literal =>} info). */ Map getVolumeInfoMap(); + /** @return the standard deviation of a volume usage. */ + float getVolumeUsageStdDev(); + /** * Returns info about volume failures. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 597ffbe806694..26f8b20518ba1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -675,6 +675,10 @@ public long getDfsUsed() throws IOException { return volumes.getDfsUsed(); } + public long setDfsUsed() throws IOException { + return volumes.getDfsUsed(); + } + /** * Return the total space used by dfs datanode */ @@ -3129,6 +3133,7 @@ private static class VolumeInfo { final String directory; final long usedSpace; // size of space used by HDFS final long freeSpace; // size of free space excluding reserved space + final float volumeUsagePercent; // usage of volume final long reservedSpace; // size of space reserved for non-HDFS final long reservedSpaceForReplicas; // size of space reserved RBW or // re-replication @@ -3139,6 +3144,7 @@ private static class VolumeInfo { this.directory = v.toString(); this.usedSpace = usedSpace; this.freeSpace = freeSpace; + this.volumeUsagePercent = (usedSpace * 100.0f) / (usedSpace + freeSpace); this.reservedSpace = v.getReserved(); this.reservedSpaceForReplicas = v.getReservedForReplicas(); this.numBlocks = v.getNumBlocks(); @@ -3175,6 +3181,7 @@ public Map getVolumeInfoMap() { final Map innerInfo = new HashMap(); innerInfo.put("usedSpace", v.usedSpace); innerInfo.put("freeSpace", v.freeSpace); + innerInfo.put("volumeUsagePercent", v.volumeUsagePercent); innerInfo.put("reservedSpace", v.reservedSpace); innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas); innerInfo.put("numBlocks", v.numBlocks); @@ -3184,6 +3191,26 @@ public Map getVolumeInfoMap() { return info; } + @Override + public float getVolumeUsageStdDev() { + Collection volumeInfos = getVolumeInfo(); + ArrayList usages = new ArrayList(); + float totalDfsUsed = 0; + float dev = 0; + for (VolumeInfo v : volumeInfos) { + usages.add(v.volumeUsagePercent); + totalDfsUsed += v.volumeUsagePercent; + } + + totalDfsUsed /= volumeInfos.size(); + Collections.sort(usages); + for (Float usage : usages) { + dev += (usage - totalDfsUsed) * (usage - totalDfsUsed); + } + dev = (float) Math.sqrt(dev / usages.size()); + return dev; + } + @Override //FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7ccaae9773e1f..4bb92f830ea47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4363,7 +4363,8 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + @Nonnull SlowDiskReports slowDisks, + float volumeUsageStdDev) throws IOException { readLock(); try { @@ -4373,7 +4374,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, - slowPeers, slowDisks); + slowPeers, slowDisks, volumeUsageStdDev); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); @@ -4410,15 +4411,17 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, * @param xmitsInProgress count of transfers running at DataNode * @param failedVolumes count of failed volumes at DataNode * @param volumeFailureSummary info on failed volumes at DataNode + * @param volumeUsageStdDev the standard deviation of volume usage * @throws IOException if there is an error */ void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, - int failedVolumes, VolumeFailureSummary volumeFailureSummary) + int failedVolumes, VolumeFailureSummary volumeFailureSummary, + float volumeUsageStdDev) throws IOException { blockManager.getDatanodeManager().handleLifeline(nodeReg, reports, cacheCapacity, cacheUsed, xceiverCount, - failedVolumes, volumeFailureSummary); + failedVolumes, volumeFailureSummary, volumeUsageStdDev); } /** @@ -6515,7 +6518,7 @@ public String getLiveNodes() { .put("nonDfsUsedSpace", node.getNonDfsUsed()) .put("capacity", node.getCapacity()) .put("numBlocks", node.numBlocks()) - .put("version", node.getSoftwareVersion()) + .put("version", "") .put("used", node.getDfsUsed()) .put("remaining", node.getRemaining()) .put("blockScheduled", node.getBlocksScheduled()) @@ -6523,7 +6526,8 @@ public String getLiveNodes() { .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()) .put("volfails", node.getVolumeFailures()) // Block report time in minutes - .put("lastBlockReport", getLastBlockReport(node)); + .put("lastBlockReport", getLastBlockReport(node)) + .put("volumeUsageStdDev", node.getVolumeUsageStdDev()); VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary(); if (volumeFailureSummary != null) { innerinfo diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 580a991f343a8..df3bf06b7bbde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1593,14 +1593,15 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + @Nonnull SlowDiskReports slowDisks, + float volumeUsageStdDev) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, - slowPeers, slowDisks); + slowPeers, slowDisks, volumeUsageStdDev); } @Override // DatanodeProtocol @@ -1727,11 +1728,13 @@ public NamespaceInfo versionRequest() throws IOException { public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + float volumeUsageStdDev) throws IOException { checkNNStartup(); verifyRequest(nodeReg); namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed, - xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary); + xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, + volumeUsageStdDev); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java index b30e60b45d5a4..c70b984d341b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java @@ -38,5 +38,6 @@ public interface DatanodeLifelineProtocol { void sendLifeline(DatanodeRegistration registration, StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException; + VolumeFailureSummary volumeFailureSummary, + float volumeUsageStdDev) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 24cd7aa1155fc..0cd379da74a46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -112,6 +112,7 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration * @param slowPeers Details of peer DataNodes that were detected as being * slow to respond to packet writes. Empty report if no * slow peers were detected by the DataNode. + * @param volumeUsageStdDev the standard deviation of volume usage * @throws IOException on error */ @Idempotent @@ -125,7 +126,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + @Nonnull SlowDiskReports slowDisks, + float volumeUsageStdDev) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 4a98f2d01e9e9..84c4582622cb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -211,6 +211,7 @@ message HeartbeatRequestProto { optional bool requestFullBlockReportLease = 9 [ default = false ]; repeated SlowPeerReportProto slowPeers = 10; repeated SlowDiskReportProto slowDisks = 11; + optional float volumeUsageStdDev = 12; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html index 7301064651e2b..de7476872f9b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html @@ -106,6 +106,7 @@ Directory StorageType Capacity Used + Capacity Used Percent Capacity Left Capacity Reserved Reserved Space for Replicas @@ -117,6 +118,7 @@ {name} {storageType} {usedSpace|fmt_bytes} + {volumeUsagePercent|fmt_percentage} {freeSpace|fmt_bytes} {reservedSpace|fmt_bytes} {reservedSpaceForReplicas|fmt_bytes} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html index c3ba37165c998..e81fda836c728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html @@ -320,6 +320,7 @@ Capacity Blocks Block pool used + Volume Usage StdDev Version @@ -343,6 +344,7 @@ {numBlocks} {blockPoolUsed|fmt_bytes} ({blockPoolUsedPercent|fmt_percentage}) + {volumeUsageStdDev|fmt_percentage} {version} {/LiveNodes} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js index 9be19fefca95d..86502ddc132d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.js @@ -360,6 +360,7 @@ { 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0}, { 'type': 'num' , "defaultContent": 0}, { 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0}, + { 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0}, { 'type': 'string' , "defaultContent": ""} ], initComplete: function () { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index ec86093ad54a2..af376064dfb97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -58,12 +58,13 @@ abstract public class BaseReplicationPolicyTest { static void updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, - int volFailures) { + int volFailures, float volumeUsageStdDev) { dn.getStorageInfos()[0].setUtilizationForTesting( capacity, dfsUsed, remaining, blockPoolUsed); dn.updateHeartbeat( BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null, + volumeUsageStdDev); } abstract DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf); @@ -107,7 +108,8 @@ void updateHeartbeatWithUsage() { for (int i=0; i < dataNodes.length; i++) { updateHeartbeatWithUsage(dataNodes[i], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 0L, 0L, 0, 0, 0.0f); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java index a5920c4d9c8f1..7354af6efa47a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java @@ -101,11 +101,12 @@ public static void setupCluster() throws Exception { protected static void updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, - int volFailures) { + int volFailures, float volumeUsageStdDev) { dn.getStorageInfos()[0] .setUtilizationForTesting(capacity, dfsUsed, remaining, blockPoolUsed); dn.updateHeartbeat(BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null, + volumeUsageStdDev); } protected static void setupDataNodeCapacity() { @@ -115,14 +116,14 @@ protected static void setupDataNodeCapacity() { updateHeartbeatWithUsage(dataNodes[i], 4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, - 0L, 0, 0); + 0L, 0, 0, 0.0f); } else { // remaining 25% updateHeartbeatWithUsage(dataNodes[i], 4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 3 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, 0L, - 0, 0); + 0, 0, 0.0f); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java index f58961eb9d072..2d58bbb872153 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java @@ -99,12 +99,13 @@ public static void setupCluster() throws Exception { private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, - int volFailures) { + int volFailures, float volumeUsageStdDev) { dn.getStorageInfos()[0].setUtilizationForTesting( capacity, dfsUsed, remaining, blockPoolUsed); dn.updateHeartbeat( BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null, + volumeUsageStdDev); } private static void setupDataNodeCapacity() { @@ -112,12 +113,13 @@ private static void setupDataNodeCapacity() { if ((i % 2) == 0) { // remaining 100% updateHeartbeatWithUsage(dataNodes[i], 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * blockSize, - 0L, 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * blockSize, 0L, 0L, 0L, 0, 0); + 0L, 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * blockSize, + 0L, 0L, 0L, 0, 0, 0.0f); } else { // remaining 50% updateHeartbeatWithUsage(dataNodes[i], 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * blockSize, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * blockSize, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE - * blockSize, 0L, 0L, 0L, 0, 0); + * blockSize, 0L, 0L, 0L, 0, 0, 0.0f); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceRackFaultTolerantBPP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceRackFaultTolerantBPP.java index 179c6c6b6931c..379fe30376349 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceRackFaultTolerantBPP.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceRackFaultTolerantBPP.java @@ -106,11 +106,12 @@ public static void setupCluster() throws Exception { private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, - int volFailures) { + int volFailures, float volumeUsageStdDev) { dn.getStorageInfos()[0] .setUtilizationForTesting(capacity, dfsUsed, remaining, blockPoolUsed); dn.updateHeartbeat(BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null, + volumeUsageStdDev); } private static void setupDataNodeCapacity() { @@ -120,14 +121,14 @@ private static void setupDataNodeCapacity() { updateHeartbeatWithUsage(dataNodes[i], 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, - 0L, 0, 0); + 0L, 0, 0, 0.0f); } else { // remaining 50% updateHeartbeatWithUsage(dataNodes[i], 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, 0L, - 0, 0); + 0, 0, 0.0f); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index d5e0a99fe789b..6fdae47497abf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -198,7 +198,7 @@ private void addNodes(Iterable nodesToAdd) { 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L); dn.updateHeartbeat( BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0, - null); + null, 0.0f); bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn); } } @@ -1453,7 +1453,7 @@ public void testBlockManagerMachinesArray() throws Exception { } } failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport - .EMPTY_ARRAY), 0L, 0L, 0, 0, null); + .EMPTY_ARRAY), 0L, 0L, 0, 0, null, 0.0f); ns.writeLock(); DatanodeStorageInfo corruptStorageInfo= null; for(int i=0; i chosenNodes = new ArrayList<>(); chosenNodes.add(storagesInBoundaryCase[0]); @@ -778,7 +783,8 @@ public void testChooseMoreTargetsThanNodeGroups() throws Exception { for(int i=0; i chosenNodes = new ArrayList<>(); @@ -883,7 +890,7 @@ public void testChooseFavoredNodesNodeGroup() throws Exception { updateHeartbeatWithUsage(dataNodes[3], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, - 0L, 0L, 0, 0); // no space + 0L, 0L, 0, 0, 0.0f); // no space DatanodeStorageInfo[] targets; List expectedTargets = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index 6f7d67a8af6fd..b5a917d192919 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -85,7 +85,7 @@ public void testChooseTarget1() throws Exception { updateHeartbeatWithUsage(dataNodes[0], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 0L, 0L, 4, 0); + 0L, 0L, 4, 0, 0.0f); DatanodeStorageInfo[] targets; targets = chooseTarget(0); @@ -118,7 +118,8 @@ public void testChooseTarget1() throws Exception { updateHeartbeatWithUsage(dataNodes[0], 2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + 0L, 0L, 0L, 0, 0, 0.0f); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestVolumeUsageStdDev.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestVolumeUsageStdDev.java new file mode 100644 index 0000000000000..42a568f20012d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestVolumeUsageStdDev.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.eclipse.jetty.util.ajax.JSON; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Map; + +public class TestVolumeUsageStdDev { + private final static int NUM_DATANODES = 5; + private final static int STORAGES_PER_DATANODE = 3; + private final static int DEFAULT_BLOCK_SIZE = 102400; + private final static int BUFFER_LENGTH = 1024; + private static Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + // Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE. + long capacity = 8 * DEFAULT_BLOCK_SIZE; + long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE]; + String[] hostnames = new String[5]; + for (int i = 0; i < NUM_DATANODES; i++) { + hostnames[i] = i + "." + i + "." + i + "." + i; + for(int j = 0; j < STORAGES_PER_DATANODE; j++){ + capacities[i][j]=capacity; + } + } + + cluster = new MiniDFSCluster.Builder(conf) + .hosts(hostnames) + .numDataNodes(NUM_DATANODES) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .storageCapacities(capacities).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + } + + /** + * Create files of different sizes for each datanode. + * Ensure that the file size is smaller than the blocksize + * and only one block is generated. In this way, data will + * be written to only one volume. + * + * Using favoredNodes, we can write files of a specified size + * to specified datanodes to create a batch of datanodes with + * different volumeUsageStdDev. + * + * Then, we assert the value of volumeUsageStdDev on namenode + * and datanode is the same. + */ + @SuppressWarnings("unchecked") + @Test + public void testVolumeUsageStdDev() throws IOException { + // Create file for each datanode. + ArrayList dataNodes = cluster.getDataNodes(); + DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000, + DEFAULT_BLOCK_SIZE, (short) 1, 0, false, + new InetSocketAddress[]{dataNodes.get(0).getXferAddress()}); + DFSTestUtil.createFile(fs, new Path("/file1"), false, BUFFER_LENGTH, 2000, + DEFAULT_BLOCK_SIZE, (short) 1, 0, false, + new InetSocketAddress[]{dataNodes.get(1).getXferAddress()}); + DFSTestUtil.createFile(fs, new Path("/file2"), false, BUFFER_LENGTH, 4000, + DEFAULT_BLOCK_SIZE, (short) 1, 0, false, + new InetSocketAddress[]{dataNodes.get(2).getXferAddress()}); + DFSTestUtil.createFile(fs, new Path("/file3"), false, BUFFER_LENGTH, 8000, + DEFAULT_BLOCK_SIZE, (short) 1, 0, false, + new InetSocketAddress[]{dataNodes.get(3).getXferAddress()}); + DFSTestUtil.createFile(fs, new Path("/file4"), false, BUFFER_LENGTH, 16000, + DEFAULT_BLOCK_SIZE, (short) 1, 0, false, + new InetSocketAddress[]{dataNodes.get(4).getXferAddress()}); + + // Trigger Heartbeats. + cluster.triggerHeartbeats(); + + // Assert that the volumeUsageStdDev on namenode and Datanode are the same. + String liveNodes = cluster.getNameNode().getNamesystem().getLiveNodes(); + Map> info = + (Map>) JSON.parse(liveNodes); + for (DataNode dataNode : dataNodes) { + String dnAddress = dataNode.getDisplayName(); + float volumeUsageStdDev = dataNode.getFSDataset().getVolumeUsageStdDev(); + Assert.assertEquals( + String.valueOf(volumeUsageStdDev), + String.valueOf(info.get(dnAddress).get("volumeUsageStdDev"))); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index 30fee2fddd99b..c5ee83d67e875 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -163,7 +163,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation) Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), - Mockito.any())).thenReturn( + Mockito.any(), + Mockito.anyFloat())).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 73e8bf7cb618e..2702a394b1993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1530,6 +1530,11 @@ public Map getVolumeInfoMap() { throw new UnsupportedOperationException(); } + @Override + public float getVolumeUsageStdDev() { + return 0.0f; + } + @Override public FsVolumeSpi getVolume(ExtendedBlock b) { return getStorage(b.getLocalBlock()).getVolume(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index fc2a998acb3d8..4deee3c132587 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -178,7 +178,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.anyFloat()); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; @@ -1131,7 +1132,8 @@ public void testRefreshLeaseId() throws Exception { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class))) + Mockito.any(SlowDiskReports.class), + Mockito.anyFloat())) //heartbeat to old NN instance .thenAnswer(new HeartbeatAnswer(0)) //heartbeat to new NN instance with Register Command diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 995a135c4e30f..6f90facd9da68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -218,7 +218,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation) Mockito.any(), Mockito.anyBoolean(), Mockito.any(), - Mockito.any())) + Mockito.any(), + Mockito.anyFloat())) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java index 8d2df18711256..66dd55388c172 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java @@ -157,7 +157,8 @@ public void startUp() throws IOException { Mockito.any(), Mockito.anyBoolean(), Mockito.any(), - Mockito.any())) + Mockito.any(), + Mockito.anyFloat())) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index 44f90690e34a4..ad706c28e4bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyFloat; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.atLeastOnce; @@ -171,7 +172,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception { any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + anyFloat()); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer(lifelinesSent)) @@ -183,7 +185,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception { anyInt(), anyInt(), anyInt(), - any()); + any(), + anyFloat()); // While waiting on the latch for the expected number of lifeline messages, // poll DataNode tracking information. Thanks to the lifeline, we expect @@ -210,7 +213,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception { anyInt(), anyInt(), anyInt(), - any()); + any(), + anyFloat()); // Also verify lifeline call through metrics. We expect at least // numLifelines, guaranteed by waiting on the latch. There is a small @@ -240,7 +244,8 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception { any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + anyFloat()); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always @@ -263,7 +268,8 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception { anyInt(), anyInt(), anyInt(), - any()); + any(), + anyFloat()); // Also verify no lifeline calls through metrics. assertEquals("Expect metrics to count no lifeline calls.", 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index 905cc2a1ecc19..d469b3d9fa5f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -216,7 +216,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation) Mockito.any(), Mockito.anyBoolean(), Mockito.any(), - Mockito.any()); + Mockito.any(), + Mockito.anyFloat()); dn = new DataNode(conf, locations, null, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index 9df6209e21a4b..4bfb1962f6f8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -45,6 +45,7 @@ import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyFloat; import static org.mockito.ArgumentMatchers.anyLong; public class TestStorageReport { @@ -110,7 +111,8 @@ public void testStorageReportHasStorageTypeAndState() throws IOException { anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), any(), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + anyFloat()); StorageReport[] reports = captor.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 3fbd4de721260..4790cc310b077 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -90,6 +90,11 @@ public Map getVolumeInfoMap() { return null; } + @Override + public float getVolumeUsageStdDev() { + return 0.0f; + } + @Override public List getFinalizedBlocks(String bpid) { return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java index 8b1a6c0814ca8..9a4c0829edbf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyFloat; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; @@ -210,7 +211,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds) (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), anyFloat()); } finally { lock.writeLock().unlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 7f6d572fceb39..044454601d662 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -1019,7 +1019,7 @@ void sendHeartbeat() throws IOException { DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, 0.0f) .getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { @@ -1070,7 +1070,7 @@ int replicateBlocks() throws IOException { false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, 0.0f) .getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 3731c2d4cad75..30566f6f3c053 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -158,7 +158,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, 0.0f); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 01ea148a5600e..ebd485e723eb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -140,7 +140,7 @@ public void testDeadDatanode() throws Exception { false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, 0.0f) .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER