From 66502f901c3d5ec3410965ea5fdef2b31947d816 Mon Sep 17 00:00:00 2001 From: Hongbing Wang <284734261@qq.com> Date: Fri, 16 Apr 2021 17:15:21 +0800 Subject: [PATCH 1/4] HDFS-15987. Improve oiv tool to parse fsimage file in parallel with delimited format --- .../OfflineImageViewerPB.java | 10 +- .../PBImageCorruptionDetector.java | 2 +- .../PBImageDelimitedTextWriter.java | 8 +- .../offlineImageViewer/PBImageTextWriter.java | 277 ++++++++++++++++-- .../TestOfflineImageViewer.java | 59 +++- .../TestOfflineImageViewerForAcl.java | 2 +- 6 files changed, 318 insertions(+), 40 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java index dbcb452e166aa..c62a92027e633 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java @@ -107,6 +107,7 @@ public class OfflineImageViewerPB { + " Delimited outputs. If not set, the processor\n" + " constructs the namespace in memory \n" + " before outputting text.\n" + + "-threads Use multiple threads to process sub-sections.\n" + "-h,--help Display usage information and exit\n"; /** @@ -132,6 +133,7 @@ private static Options buildOptions() { options.addOption("delimiter", true, ""); options.addOption("sp", false, ""); options.addOption("t", "temp", true, ""); + options.addOption("threads", true, ""); return options; } @@ -185,6 +187,7 @@ public static int run(String[] args) throws Exception { String delimiter = cmd.getOptionValue("delimiter", PBImageTextWriter.DEFAULT_DELIMITER); String tempPath = cmd.getOptionValue("t", ""); + int threads = Integer.parseInt(cmd.getOptionValue("threads", "1")); Configuration conf = new Configuration(); PrintStream out = null; @@ -227,15 +230,14 @@ public static int run(String[] args) throws Exception { boolean printStoragePolicy = cmd.hasOption("sp"); try (PBImageDelimitedTextWriter writer = new PBImageDelimitedTextWriter(out, delimiter, - tempPath, printStoragePolicy); - RandomAccessFile r = new RandomAccessFile(inputFile, "r")) { - writer.visit(r); + tempPath, printStoragePolicy, threads, outputFile)) { + writer.visit(inputFile); } break; case "DETECTCORRUPTION": try (PBImageCorruptionDetector detector = new PBImageCorruptionDetector(out, delimiter, tempPath)) { - detector.visit(new RandomAccessFile(inputFile, "r")); + detector.visit(inputFile); } break; default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java index 737e7384b9a7c..a4c42f783acd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java @@ -337,7 +337,7 @@ public void afterOutput() throws IOException { if (parentId != -1) { entryBuilder.setParentId(parentId); } - printIfNotEmpty(entryBuilder.build()); + printIfNotEmpty(out, entryBuilder.build()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java index 45d42f0396b1a..3e080ec8e65cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageDelimitedTextWriter.java @@ -146,7 +146,13 @@ public String build() { PBImageDelimitedTextWriter(PrintStream out, String delimiter, String tempPath, boolean printStoragePolicy) throws IOException { - super(out, delimiter, tempPath); + this(out, delimiter, tempPath, printStoragePolicy, 1, "-"); + } + + PBImageDelimitedTextWriter(PrintStream out, String delimiter, + String tempPath, boolean printStoragePolicy, int threads, + String parallelOut) throws IOException { + super(out, delimiter, tempPath, threads, parallelOut); this.printStoragePolicy = printStoragePolicy; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java index ef0b168658c68..e6a96923c93b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java @@ -21,17 +21,22 @@ import java.io.Closeable; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -455,20 +460,22 @@ public String getParentPath(long inode) throws IOException { return "/"; } long parent = getFromDirChildMap(inode); - if (!dirPathCache.containsKey(parent)) { - byte[] bytes = dirMap.get(toBytes(parent)); - if (parent != INodeId.ROOT_INODE_ID && bytes == null) { - // The parent is an INodeReference, which is generated from snapshot. - // For delimited oiv tool, no need to print out metadata in snapshots. - throw PBImageTextWriter.createIgnoredSnapshotException(inode); + byte[] bytes = dirMap.get(toBytes(parent)); + synchronized (this) { + if (!dirPathCache.containsKey(parent)) { + if (parent != INodeId.ROOT_INODE_ID && bytes == null) { + // The parent is an INodeReference, which is generated from snapshot. + // For delimited oiv tool, no need to print out metadata in snapshots. + throw PBImageTextWriter.createIgnoredSnapshotException(inode); + } + String parentName = toString(bytes); + String parentPath = + new Path(getParentPath(parent), + parentName.isEmpty() ? "/" : parentName).toString(); + dirPathCache.put(parent, parentPath); } - String parentName = toString(bytes); - String parentPath = - new Path(getParentPath(parent), - parentName.isEmpty() ? "/" : parentName).toString(); - dirPathCache.put(parent, parentPath); + return dirPathCache.get(parent); } - return dirPathCache.get(parent); } @Override @@ -493,9 +500,12 @@ public long getParentId(long id) throws IOException { } private SerialNumberManager.StringTable stringTable; - private PrintStream out; + protected final PrintStream out; private MetadataMap metadataMap = null; private String delimiter; + private File filename; + private int numThreads; + private String parallelOut; /** * Construct a PB FsImage writer to generate text file. @@ -503,8 +513,8 @@ public long getParentId(long id) throws IOException { * @param tempPath the path to store metadata. If it is empty, store metadata * in memory instead. */ - PBImageTextWriter(PrintStream out, String delimiter, String tempPath) - throws IOException { + PBImageTextWriter(PrintStream out, String delimiter, String tempPath, + int numThreads, String parallelOut) throws IOException { this.out = out; this.delimiter = delimiter; if (tempPath.isEmpty()) { @@ -512,6 +522,13 @@ public long getParentId(long id) throws IOException { } else { metadataMap = new LevelDBMetadataMap(tempPath); } + this.numThreads = numThreads; + this.parallelOut = parallelOut; + } + + PBImageTextWriter(PrintStream out, String delimiter, String tempPath) + throws IOException { + this(out, delimiter, tempPath, 1, "-"); } @Override @@ -562,7 +579,9 @@ void append(StringBuffer buffer, String field) { */ abstract protected void afterOutput() throws IOException; - public void visit(RandomAccessFile file) throws IOException { + public void visit(String filePath) throws IOException { + filename = new File(filePath); + RandomAccessFile file = new RandomAccessFile(filePath, "r"); Configuration conf = new Configuration(); if (!FSImageUtil.checkFileFormat(file)) { throw new IOException("Unrecognized FSImage"); @@ -642,6 +661,19 @@ long getParentId(long id) throws IOException { private void output(Configuration conf, FileSummary summary, FileInputStream fin, ArrayList sections) throws IOException { + ArrayList allINodeSubSections = + getINodeSubSections(sections); + if (numThreads > 1 && !parallelOut.equals("-") && + allINodeSubSections.size() > 1) { + outputInParallel(conf, summary, allINodeSubSections); + } else { + outputInSerial(conf, summary, fin, sections); + } + } + + private void outputInSerial(Configuration conf, FileSummary summary, + FileInputStream fin, ArrayList sections) + throws IOException { InputStream is; long startTime = Time.monotonicNow(); out.println(getHeader()); @@ -651,7 +683,10 @@ private void output(Configuration conf, FileSummary summary, is = FSImageUtil.wrapInputStreamForCompression(conf, summary.getCodec(), new BufferedInputStream(new LimitInputStream( fin, section.getLength()))); - outputINodes(is); + INodeSection s = INodeSection.parseDelimitedFrom(is); + LOG.info("Found {} INodes in the INode section", s.getNumInodes()); + int count = outputINodes(is, out); + LOG.info("Outputted {} INodes.", count); } } afterOutput(); @@ -659,6 +694,112 @@ private void output(Configuration conf, FileSummary summary, LOG.debug("Time to output inodes: {}ms", timeTaken); } + /** + * STEP1: Multi-threaded process sub-sections. + * Given n (n>1) threads to process k (k>=n) sections, + * E.g. 10 sections and 4 threads, grouped as follows: + * |---------------------------------------------------------------| + * | (0 1 2) (3 4 5) (6 7) (8 9) | + * | thread[0] thread[1] thread[2] thread[3] | + * |---------------------------------------------------------------| + * + * STEP2: Merge files. + */ + private void outputInParallel(Configuration conf, FileSummary summary, + ArrayList subSections) + throws IOException { + int nThreads = Integer.min(numThreads, subSections.size()); + LOG.info("Outputting in parallel with {} sub-sections" + + " using {} threads", subSections.size(), nThreads); + final CopyOnWriteArrayList exceptions = + new CopyOnWriteArrayList<>(); + Thread[] threads = new Thread[nThreads]; + String[] paths = new String[nThreads]; + for (int i = 0; i < paths.length; i++) { + paths[i] = parallelOut + ".tmp." + i; + } + AtomicLong expectedINodes = new AtomicLong(0); + AtomicLong totalParsed = new AtomicLong(0); + String codec = summary.getCodec(); + + int mark = 0; + for (int i = 0; i < nThreads; i++) { + // Each thread processes different ordered sub-sections + // and outputs to different paths + int step = subSections.size() / nThreads + + (i < subSections.size() % nThreads ? 1 : 0); + int start = mark; + int end = start + step; + ArrayList subList = new ArrayList<>( + subSections.subList(start, end)); + mark = end; + String path = paths[i]; + + threads[i] = new Thread(() -> { + LOG.info("output iNodes of sub-sections: [{},{})", start, end); + InputStream is = null; + try (PrintStream theOut = new PrintStream(path, "UTF-8")) { + long startTime = Time.monotonicNow(); + for (int j = 0; j < subList.size(); j++) { + is = getInputStreamForSection(subList.get(j), codec, conf); + if (start == 0 && j == 0) { + // The first iNode section has a header which must be + // processed first + INodeSection s = INodeSection.parseDelimitedFrom(is); + expectedINodes.set(s.getNumInodes()); + } + totalParsed.addAndGet(outputINodes(is, theOut)); + } + long timeTaken = Time.monotonicNow() - startTime; + LOG.info("Time to output iNodes of sub-sections: [{},{}) {} ms", + start, end, timeTaken); + } catch (Exception e) { + exceptions.add(new IOException(e)); + } finally { + try { + is.close(); + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); + } + } + }); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + try { + t.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted when thread join.", e); + throw new IOException(e); + } + } + + if (exceptions.size() != 0) { + LOG.error("Failed to output INode sub-sections, {} exception(s)" + + " occurred.", exceptions.size()); + throw exceptions.get(0); + } + if (totalParsed.get() != expectedINodes.get()) { + throw new IOException("Expected to parse " + expectedINodes + " in " + + "parallel, but parsed " + totalParsed.get() + ". The image may " + + "be corrupt."); + } + LOG.info("Completed outputting all INode sub-sections to {} tmp files.", + paths.length); + + try (PrintStream pout = new PrintStream(parallelOut, "UTF-8")) { + pout.println(getHeader()); + } + + // merge tmp files + long startTime = Time.monotonicNow(); + mergeFiles(paths, parallelOut); + long timeTaken = Time.monotonicNow() - startTime; + LOG.info("Completed all stages. Time to merge files: {}ms", timeTaken); + } + protected PermissionStatus getPermission(long perm) { return FSImageFormatPBINode.Loader.loadPermission(perm, stringTable); } @@ -763,22 +904,27 @@ protected void buildNamespace(InputStream in, List refIdList) LOG.info("Scanned {} INode directories to build namespace.", count); } - void printIfNotEmpty(String line) { + void printIfNotEmpty(PrintStream outStream, String line) { if (!line.isEmpty()) { - out.println(line); + outStream.println(line); } } - private void outputINodes(InputStream in) throws IOException { - INodeSection s = INodeSection.parseDelimitedFrom(in); - LOG.info("Found {} INodes in the INode section", s.getNumInodes()); + private int outputINodes(InputStream in, PrintStream outStream) + throws IOException { long ignored = 0; long ignoredSnapshots = 0; - for (int i = 0; i < s.getNumInodes(); ++i) { + // As the input stream is a LimitInputStream, the reading will stop when + // EOF is encountered at the end of the stream. + int count = 0; + while (true) { INode p = INode.parseDelimitedFrom(in); + if (p == null) { + break; + } try { String parentPath = metadataMap.getParentPath(p.getId()); - printIfNotEmpty(getEntry(parentPath, p)); + printIfNotEmpty(outStream, getEntry(parentPath, p)); } catch (IOException ioe) { ignored++; if (!(ioe instanceof IgnoreSnapshotException)) { @@ -790,16 +936,16 @@ private void outputINodes(InputStream in) throws IOException { } } } - - if (LOG.isDebugEnabled() && i % 100000 == 0) { - LOG.debug("Outputted {} INodes.", i); + count++; + if (LOG.isDebugEnabled() && count % 100000 == 0) { + LOG.debug("Outputted {} INodes.", count); } } if (ignored > 0) { LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on" + " debug log for details", ignored, ignoredSnapshots); } - LOG.info("Outputted {} INodes.", s.getNumInodes()); + return count; } private static IgnoreSnapshotException createIgnoredSnapshotException( @@ -822,4 +968,79 @@ public int getStoragePolicy( } return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; } + + private ArrayList getINodeSubSections( + ArrayList sections) { + ArrayList subSections = new ArrayList<>(); + Iterator iter = sections.iterator(); + while (iter.hasNext()) { + FileSummary.Section s = iter.next(); + if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) { + subSections.add(s); + } + } + return subSections; + } + + /** + * Given a FSImage FileSummary.section, return a LimitInput stream set to + * the starting position of the section and limited to the section length. + * @param section The FileSummary.Section containing the offset and length + * @param compressionCodec The compression codec in use, if any + * @return An InputStream for the given section + * @throws IOException + */ + private InputStream getInputStreamForSection(FileSummary.Section section, + String compressionCodec, Configuration conf) + throws IOException { + // channel of RandomAccessFile is not thread safe, use File + FileInputStream fin = new FileInputStream(filename); + try { + FileChannel channel = fin.getChannel(); + channel.position(section.getOffset()); + InputStream in = new BufferedInputStream(new LimitInputStream(fin, + section.getLength())); + + in = FSImageUtil.wrapInputStreamForCompression(conf, + compressionCodec, in); + return in; + } catch (IOException e) { + fin.close(); + throw e; + } + } + + /** + * @param srcPaths Source files of contents to be merged + * @param resultPath Merged file path + * @throws IOException + */ + public static void mergeFiles(String[] srcPaths, String resultPath) + throws IOException { + if (srcPaths == null || srcPaths.length < 1) { + LOG.warn("no source files to merge."); + return; + } + + File[] files = new File[srcPaths.length]; + for (int i = 0; i < srcPaths.length; i++) { + files[i] = new File(srcPaths[i]); + } + + File resultFile = new File(resultPath); + try (FileChannel resultChannel = + new FileOutputStream(resultFile, true).getChannel()) { + for (File file : files) { + try (FileChannel src = new FileInputStream(file).getChannel()) { + resultChannel.transferFrom(src, resultChannel.size(), src.size()); + } + } + } + + for (File file : files) { + if (!file.delete() && file.exists()) { + LOG.warn("delete tmp file: {} returned false", file); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java index 5c91530bd9d58..6ba37a1adff13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java @@ -83,8 +83,10 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; +import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; import org.apache.hadoop.net.NetUtils; @@ -122,6 +124,7 @@ import static org.apache.hadoop.fs.permission.FsAction.ALL; import static org.apache.hadoop.fs.permission.FsAction.EXECUTE; import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; +import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR; import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_NAME; import static org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.ERASURE_CODING_SECTION_POLICY; @@ -186,6 +189,12 @@ public static void createOriginalFSImage() throws IOException { conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT"); + // fsimage with sub-section conf + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4"); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); DistributedFileSystem hdfs = cluster.getFileSystem(); @@ -791,6 +800,13 @@ public void testPBDelimitedWriter() throws IOException, InterruptedException { new FileSystemTestHelper().getTestRootDir() + "/delimited.db"); } + @Test + public void testParallelPBDelimitedWriter() throws Exception { + testParallelPBDelimitedWriter(""); // Test in memory db. + testParallelPBDelimitedWriter(new FileSystemTestHelper().getTestRootDir() + + "/parallel-delimited.db"); + } + @Test public void testCorruptionOutputEntryBuilder() throws IOException { PBImageCorruptionDetector corrDetector = @@ -882,11 +898,10 @@ private void testPBDelimitedWriter(String db) final String DELIMITER = "\t"; ByteArrayOutputStream output = new ByteArrayOutputStream(); - try (PrintStream o = new PrintStream(output); - RandomAccessFile r = new RandomAccessFile(originalFsimage, "r")) { + try (PrintStream o = new PrintStream(output)) { PBImageDelimitedTextWriter v = new PBImageDelimitedTextWriter(o, DELIMITER, db); - v.visit(r); + v.visit(originalFsimage.getAbsolutePath()); } Set fileNames = new HashSet<>(); @@ -920,6 +935,37 @@ private void testPBDelimitedWriter(String db) assertEquals(writtenFiles.keySet(), fileNames); } + private void testParallelPBDelimitedWriter(String db) throws Exception{ + String delemiter = "\t"; + int numThreads = 4; + + File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut"); + if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited", + "-i", originalFsimage.getAbsolutePath(), + "-o", parallelDelimitedOut.getAbsolutePath(), + "-delimiter", delemiter, + "-t", db, + "-threads", String.valueOf(numThreads)}) != 0) { + throw new IOException("oiv returned failure outputting in parallel."); + } + MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut); + + File serialDelimitedOut = new File(tempDir, "serialDelimitedOut"); + if (db != "") { + db = db + "/../serial.db"; + } + if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited", + "-i", originalFsimage.getAbsolutePath(), + "-o", serialDelimitedOut.getAbsolutePath(), + "-t", db, + "-delimiter", delemiter}) != 0) { + throw new IOException("oiv returned failure outputting in serial."); + } + MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut); + + assertEquals(parallelMd5, serialMd5); + } + private void testPBCorruptionDetector(String db) throws IOException, InterruptedException { final String delimiter = "\t"; @@ -928,7 +974,7 @@ private void testPBCorruptionDetector(String db) try (PrintStream o = new PrintStream(output)) { PBImageCorruptionDetector v = new PBImageCorruptionDetector(o, delimiter, db); - v.visit(new RandomAccessFile(originalFsimage, "r")); + v.visit(originalFsimage.getAbsolutePath()); } try ( @@ -1024,7 +1070,7 @@ private String testCorruptionDetectorRun(int runNumber, try (PrintStream o = new PrintStream(output)) { PBImageCorruptionDetector v = new PBImageCorruptionDetector(o, ",", db); - v.visit(new RandomAccessFile(corruptedImage, "r")); + v.visit(corruptedImage.getAbsolutePath()); } return output.toString(); } @@ -1178,6 +1224,9 @@ public void testReverseXmlWrongLayoutVersion() throws Throwable { public void testFileDistributionCalculatorForException() throws Exception { File fsimageFile = null; Configuration conf = new Configuration(); + // Avoid using the same cluster dir to cause the global originalFsimage + // file to be cleared. + conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath()); HashMap files = Maps.newHashMap(); // Create a initial fsimage file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java index 4955846432a60..b23ddf4afbcfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java @@ -239,7 +239,7 @@ public void testPBDelimitedWriterForAcl() throws Exception { try (PrintStream o = new PrintStream(output)) { PBImageDelimitedTextWriter v = new PBImageDelimitedTextWriter(o, DELIMITER, ""); // run in memory. - v.visit(new RandomAccessFile(originalFsimage, "r")); + v.visit(originalFsimage.getAbsolutePath()); } try ( From f99527d85dfe93d4005f00ac68d1b2f48d1652b2 Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Tue, 15 Mar 2022 18:06:08 +0800 Subject: [PATCH 2/4] Replace threads with thread pools and some other small fixes. --- .../offlineImageViewer/PBImageTextWriter.java | 121 ++++++++---------- 1 file changed, 52 insertions(+), 69 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java index e6a96923c93b6..d0d63765a67ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java @@ -33,6 +33,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.HashMap; @@ -505,7 +508,7 @@ public long getParentId(long id) throws IOException { private String delimiter; private File filename; private int numThreads; - private String parallelOut; + private String parallelOutputFile; /** * Construct a PB FsImage writer to generate text file. @@ -514,7 +517,7 @@ public long getParentId(long id) throws IOException { * in memory instead. */ PBImageTextWriter(PrintStream out, String delimiter, String tempPath, - int numThreads, String parallelOut) throws IOException { + int numThreads, String parallelOutputFile) throws IOException { this.out = out; this.delimiter = delimiter; if (tempPath.isEmpty()) { @@ -523,7 +526,7 @@ public long getParentId(long id) throws IOException { metadataMap = new LevelDBMetadataMap(tempPath); } this.numThreads = numThreads; - this.parallelOut = parallelOut; + this.parallelOutputFile = parallelOutputFile; } PBImageTextWriter(PrintStream out, String delimiter, String tempPath) @@ -663,10 +666,12 @@ private void output(Configuration conf, FileSummary summary, throws IOException { ArrayList allINodeSubSections = getINodeSubSections(sections); - if (numThreads > 1 && !parallelOut.equals("-") && + if (numThreads > 1 && !parallelOutputFile.equals("-") && allINodeSubSections.size() > 1) { outputInParallel(conf, summary, allINodeSubSections); } else { + LOG.info("Serial output due to threads num: {}, parallel output file: {}, " + + "subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size()); outputInSerial(conf, summary, fin, sections); } } @@ -691,113 +696,91 @@ private void outputInSerial(Configuration conf, FileSummary summary, } afterOutput(); long timeTaken = Time.monotonicNow() - startTime; - LOG.debug("Time to output inodes: {}ms", timeTaken); + LOG.debug("Time to output inodes: {} ms", timeTaken); } /** * STEP1: Multi-threaded process sub-sections. * Given n (n>1) threads to process k (k>=n) sections, - * E.g. 10 sections and 4 threads, grouped as follows: - * |---------------------------------------------------------------| - * | (0 1 2) (3 4 5) (6 7) (8 9) | - * | thread[0] thread[1] thread[2] thread[3] | - * |---------------------------------------------------------------| - * - * STEP2: Merge files. + * output parsed results of each section to tmp file in order. + * + * STEP2: Merge tmp files. */ private void outputInParallel(Configuration conf, FileSummary summary, ArrayList subSections) throws IOException { int nThreads = Integer.min(numThreads, subSections.size()); - LOG.info("Outputting in parallel with {} sub-sections" + - " using {} threads", subSections.size(), nThreads); - final CopyOnWriteArrayList exceptions = - new CopyOnWriteArrayList<>(); - Thread[] threads = new Thread[nThreads]; - String[] paths = new String[nThreads]; - for (int i = 0; i < paths.length; i++) { - paths[i] = parallelOut + ".tmp." + i; - } + LOG.info("Outputting in parallel with {} sub-sections using {} threads", + subSections.size(), nThreads); + final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(subSections.size()); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); AtomicLong expectedINodes = new AtomicLong(0); AtomicLong totalParsed = new AtomicLong(0); String codec = summary.getCodec(); + String[] paths = new String[subSections.size()]; - int mark = 0; - for (int i = 0; i < nThreads; i++) { - // Each thread processes different ordered sub-sections - // and outputs to different paths - int step = subSections.size() / nThreads + - (i < subSections.size() % nThreads ? 1 : 0); - int start = mark; - int end = start + step; - ArrayList subList = new ArrayList<>( - subSections.subList(start, end)); - mark = end; - String path = paths[i]; - - threads[i] = new Thread(() -> { - LOG.info("output iNodes of sub-sections: [{},{})", start, end); + for (int i = 0; i < subSections.size(); i++) { + paths[i] = parallelOutputFile + ".tmp." + i; + int index = i; + executorService.submit(() -> { + LOG.info("Output iNodes of section-{}", index); InputStream is = null; - try (PrintStream theOut = new PrintStream(path, "UTF-8")) { + try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) { long startTime = Time.monotonicNow(); - for (int j = 0; j < subList.size(); j++) { - is = getInputStreamForSection(subList.get(j), codec, conf); - if (start == 0 && j == 0) { - // The first iNode section has a header which must be - // processed first - INodeSection s = INodeSection.parseDelimitedFrom(is); - expectedINodes.set(s.getNumInodes()); - } - totalParsed.addAndGet(outputINodes(is, theOut)); + is = getInputStreamForSection(subSections.get(index), codec, conf); + if (index == 0) { + // The first iNode section has a header which must be processed first + INodeSection s = INodeSection.parseDelimitedFrom(is); + expectedINodes.set(s.getNumInodes()); } + totalParsed.addAndGet(outputINodes(is, outStream)); long timeTaken = Time.monotonicNow() - startTime; - LOG.info("Time to output iNodes of sub-sections: [{},{}) {} ms", - start, end, timeTaken); + LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken); } catch (Exception e) { exceptions.add(new IOException(e)); } finally { + latch.countDown(); try { - is.close(); + if (is != null) { + is.close(); + } } catch (IOException ioe) { LOG.warn("Failed to close the input stream, ignoring", ioe); } } }); } - for (Thread t : threads) { - t.start(); - } - for (Thread t : threads) { - try { - t.join(); - } catch (InterruptedException e) { - LOG.error("Interrupted when thread join.", e); - throw new IOException(e); - } + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for countdown latch", e); + throw new IOException(e); } + executorService.shutdown(); if (exceptions.size() != 0) { - LOG.error("Failed to output INode sub-sections, {} exception(s)" + - " occurred.", exceptions.size()); + LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.", + exceptions.size()); throw exceptions.get(0); } if (totalParsed.get() != expectedINodes.get()) { - throw new IOException("Expected to parse " + expectedINodes + " in " + - "parallel, but parsed " + totalParsed.get() + ". The image may " + - "be corrupt."); + throw new IOException("Expected to parse " + expectedINodes + " in parallel, " + + "but parsed " + totalParsed.get() + ". The image may be corrupt."); } LOG.info("Completed outputting all INode sub-sections to {} tmp files.", - paths.length); + subSections.size()); - try (PrintStream pout = new PrintStream(parallelOut, "UTF-8")) { - pout.println(getHeader()); + try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) { + ps.println(getHeader()); } // merge tmp files long startTime = Time.monotonicNow(); - mergeFiles(paths, parallelOut); + mergeFiles(paths, parallelOutputFile); long timeTaken = Time.monotonicNow() - startTime; - LOG.info("Completed all stages. Time to merge files: {}ms", timeTaken); + LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken); } protected PermissionStatus getPermission(long perm) { From 0dbfa74ab952e5f8c7cbacb6c122ec39b748949f Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Wed, 16 Mar 2022 16:42:00 +0800 Subject: [PATCH 3/4] rename threads option. --- .../tools/offlineImageViewer/OfflineImageViewerPB.java | 6 +++--- .../hdfs/tools/offlineImageViewer/PBImageTextWriter.java | 1 - .../tools/offlineImageViewer/TestOfflineImageViewer.java | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java index c62a92027e633..05e687ab97e43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java @@ -107,7 +107,7 @@ public class OfflineImageViewerPB { + " Delimited outputs. If not set, the processor\n" + " constructs the namespace in memory \n" + " before outputting text.\n" - + "-threads Use multiple threads to process sub-sections.\n" + + "-m,--multiThread Use multiThread to process sub-sections.\n" + "-h,--help Display usage information and exit\n"; /** @@ -133,7 +133,7 @@ private static Options buildOptions() { options.addOption("delimiter", true, ""); options.addOption("sp", false, ""); options.addOption("t", "temp", true, ""); - options.addOption("threads", true, ""); + options.addOption("m", "multiThread", true, ""); return options; } @@ -187,7 +187,7 @@ public static int run(String[] args) throws Exception { String delimiter = cmd.getOptionValue("delimiter", PBImageTextWriter.DEFAULT_DELIMITER); String tempPath = cmd.getOptionValue("t", ""); - int threads = Integer.parseInt(cmd.getOptionValue("threads", "1")); + int threads = Integer.parseInt(cmd.getOptionValue("m", "1")); Configuration conf = new Configuration(); PrintStream out = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java index d0d63765a67ca..404b14f2b9f85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java @@ -703,7 +703,6 @@ private void outputInSerial(Configuration conf, FileSummary summary, * STEP1: Multi-threaded process sub-sections. * Given n (n>1) threads to process k (k>=n) sections, * output parsed results of each section to tmp file in order. - * * STEP2: Merge tmp files. */ private void outputInParallel(Configuration conf, FileSummary summary, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java index 6ba37a1adff13..fe1c86ba538d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java @@ -936,16 +936,16 @@ private void testPBDelimitedWriter(String db) } private void testParallelPBDelimitedWriter(String db) throws Exception{ - String delemiter = "\t"; + String delimiter = "\t"; int numThreads = 4; File parallelDelimitedOut = new File(tempDir, "parallelDelimitedOut"); if (OfflineImageViewerPB.run(new String[] {"-p", "Delimited", "-i", originalFsimage.getAbsolutePath(), "-o", parallelDelimitedOut.getAbsolutePath(), - "-delimiter", delemiter, + "-delimiter", delimiter, "-t", db, - "-threads", String.valueOf(numThreads)}) != 0) { + "-m", String.valueOf(numThreads)}) != 0) { throw new IOException("oiv returned failure outputting in parallel."); } MD5Hash parallelMd5 = MD5FileUtils.computeMd5ForFile(parallelDelimitedOut); @@ -958,7 +958,7 @@ private void testParallelPBDelimitedWriter(String db) throws Exception{ "-i", originalFsimage.getAbsolutePath(), "-o", serialDelimitedOut.getAbsolutePath(), "-t", db, - "-delimiter", delemiter}) != 0) { + "-delimiter", delimiter}) != 0) { throw new IOException("oiv returned failure outputting in serial."); } MD5Hash serialMd5 = MD5FileUtils.computeMd5ForFile(serialDelimitedOut); From 362762d88d75c8d11cab324c3f9ce79d638e7be3 Mon Sep 17 00:00:00 2001 From: wanghongbing Date: Thu, 17 Mar 2022 21:15:12 +0800 Subject: [PATCH 4/4] fix checkstyle. --- .../offlineImageViewer/PBImageCorruptionDetector.java | 2 +- .../tools/offlineImageViewer/PBImageTextWriter.java | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java index a4c42f783acd5..90bae24f069bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageCorruptionDetector.java @@ -337,7 +337,7 @@ public void afterOutput() throws IOException { if (parentId != -1) { entryBuilder.setParentId(parentId); } - printIfNotEmpty(out, entryBuilder.build()); + printIfNotEmpty(serialOutStream(), entryBuilder.build()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java index 404b14f2b9f85..b6d32d54ad79d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java @@ -503,7 +503,7 @@ public long getParentId(long id) throws IOException { } private SerialNumberManager.StringTable stringTable; - protected final PrintStream out; + private final PrintStream out; private MetadataMap metadataMap = null; private String delimiter; private File filename; @@ -534,6 +534,10 @@ public long getParentId(long id) throws IOException { this(out, delimiter, tempPath, 1, "-"); } + protected PrintStream serialOutStream() { + return out; + } + @Override public void close() throws IOException { out.flush(); @@ -681,7 +685,7 @@ private void outputInSerial(Configuration conf, FileSummary summary, throws IOException { InputStream is; long startTime = Time.monotonicNow(); - out.println(getHeader()); + serialOutStream().println(getHeader()); for (FileSummary.Section section : sections) { if (SectionName.fromString(section.getName()) == SectionName.INODE) { fin.getChannel().position(section.getOffset()); @@ -690,7 +694,7 @@ private void outputInSerial(Configuration conf, FileSummary summary, fin, section.getLength()))); INodeSection s = INodeSection.parseDelimitedFrom(is); LOG.info("Found {} INodes in the INode section", s.getNumInodes()); - int count = outputINodes(is, out); + int count = outputINodes(is, serialOutStream()); LOG.info("Outputted {} INodes.", count); } }