diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 2c1a8cd3339e7..95c20346684f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -51,17 +51,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintStream; +import java.io.*; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.Socket; @@ -399,6 +389,11 @@ public static InetSocketAddress createSocketAddr(String target) { private final StorageLocationChecker storageLocationChecker; private final DatasetVolumeChecker volumeChecker; + volatile FsDatasetSpi allData = null; + private Thread checkDiskThread = null; + protected final int checkDiskInterval = 5*1000; + private Object checkDiskMutex = new Object(); + private List errorDisk = null; private final SocketFactory socketFactory; @@ -789,6 +784,7 @@ private synchronized void refreshVolumes(String newVolumes) throws IOException { public IOException call() { try { data.addVolume(location, nsInfos); + allData.addVolume(location, nsInfos); } catch (IOException e) { return e; } @@ -810,6 +806,12 @@ public IOException call() { } else { effectiveVolumes.add(volume.toString()); LOG.info("Successfully added volume: {}", volume); + if (errorDisk != null && !errorDisk.isEmpty()) { + LOG.info("Remove {} from errorDisk, because of the repaired disk ", volume); + if (errorDisk.contains(volume)) { + errorDisk.remove(volume); + } + } } } catch (Exception e) { errorMessageBuilder.append( @@ -1690,6 +1692,9 @@ void initBlockPool(BPOfferService bpos) throws IOException { // failures. checkDiskError(); + // and start check disk thread. + startCheckDiskThread(); + data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(getConf()); @@ -1739,6 +1744,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { synchronized(this) { if (data == null) { data = factory.newInstance(this, storage, getConf()); + allData = factory.newInstance(this, storage, getConf()); } } } @@ -2150,6 +2156,43 @@ public void shutdown() { tracer.close(); } + + public void startCheckDiskThread() { + if (checkDiskThread == null) { + synchronized (checkDiskMutex) { + if (checkDiskThread == null) { + checkDiskThread = new Thread(new Runnable() { + @Override + public void run() { + while (shouldRun) { + LOG.info("CheckDiskThread running "); + if (errorDisk != null && !errorDisk.isEmpty()) { + try { + checkDiskError(); + } catch (Exception e) { + LOG.warn("Unexpected exception occurred while checking disk error " + e); + checkDiskThread = null; + return; + } + lastDiskErrorCheck = Time.monotonicNow(); + } + try { + Thread.sleep(checkDiskInterval); + } catch (InterruptedException e) { + LOG.debug("InterruptedException in check disk error thread", e); + checkDiskThread = null; + return; + } + } + } + }); + checkDiskThread.start(); + LOG.info("Starting CheckDiskError Thread"); + } + } + } + } + /** * Check if there is a disk failure asynchronously * and if so, handle the error. @@ -3361,7 +3404,8 @@ public ShortCircuitRegistry getShortCircuitRegistry() { public void checkDiskError() throws IOException { Set unhealthyVolumes; try { - unhealthyVolumes = volumeChecker.checkAllVolumes(data); + // check all volume + unhealthyVolumes = volumeChecker.checkAllVolumes(allData); lastDiskErrorCheck = Time.monotonicNow(); } catch (InterruptedException e) { LOG.error("Interruped while running disk check", e); @@ -3369,11 +3413,43 @@ public void checkDiskError() throws IOException { } if (unhealthyVolumes.size() > 0) { + if (errorDisk == null) { + errorDisk = new ArrayList<>(); + } + List tmpDisk = Lists.newArrayList(errorDisk); + for (FsVolumeSpi vol : unhealthyVolumes) { + if (!errorDisk.contains(vol.getStorageLocation())) { + LOG.info("Add error disk " + vol.getStorageLocation() + " to errorDisk - " + vol.getStorageLocation()); + errorDisk.add(vol.getStorageLocation()); + } else { + tmpDisk.remove(vol.getStorageLocation()); + } + } + LOG.warn("checkDiskError got {} failed volumes - {}", unhealthyVolumes.size(), unhealthyVolumes); handleVolumeFailures(unhealthyVolumes); + + if (!tmpDisk.isEmpty()) { + try { + Configuration conf = getConf(); + String newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY) + "," + Joiner.on(",").join(tmpDisk); + refreshVolumes(newDataDirs); + } catch (IOException e) { + LOG.error("Auto refreshVolumes error : ", e); + } + } } else { - LOG.debug("checkDiskError encountered no failures"); + LOG.debug("checkDiskError encountered no failures, then check errorDisk"); + if (errorDisk != null && !errorDisk.isEmpty()) { + try { + Configuration conf = getConf(); + String newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY) + "," + Joiner.on(",").join(errorDisk); + refreshVolumes(newDataDirs); + } catch (IOException e) { + LOG.error("Auto refreshVolumes error : ", e); + } + } } }