From 63fbfbffa187de3221160b692023b99a642e5196 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 20 Mar 2025 15:07:44 +0100 Subject: [PATCH] HDDS-12636. Reduce code duplication for tarball creation --- .../org/apache/hadoop/hdds/HddsUtils.java | 2 +- .../keyvalue/TarContainerPacker.java | 101 +---------- .../keyvalue/TestTarContainerPacker.java | 9 +- .../apache/hadoop/hdds/utils/Archiver.java | 160 ++++++++++++++++++ .../hadoop/hdds/utils/HddsServerUtil.java | 33 +--- .../ozone/om/OMDBCheckpointServlet.java | 14 +- .../apache/hadoop/ozone/recon/ReconUtils.java | 116 +------------ 7 files changed, 188 insertions(+), 247 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index f35141d68aae..9950808a820e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -632,7 +632,7 @@ public static void validatePath(Path path, Path ancestor) { "Ancestor should not be null"); Preconditions.checkArgument( path.normalize().startsWith(ancestor.normalize()), - "Path should be a descendant of %s", ancestor); + "Path %s should be a descendant of %s", path, ancestor); } public static File createDir(String dirPath) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java index 415a5fa58c91..f3613a735473 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java @@ -17,13 +17,15 @@ package org.apache.hadoop.ozone.container.keyvalue; -import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; +import static org.apache.hadoop.hdds.utils.Archiver.extractEntry; +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.includePath; +import static org.apache.hadoop.hdds.utils.Archiver.readEntry; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.hdds.utils.Archiver.untar; import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; -import com.google.common.annotations.VisibleForTesting; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -32,16 +34,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; -import java.util.stream.Stream; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -108,37 +105,6 @@ public byte[] unpackContainerData(Container container, return descriptorFileContent; } - private void extractEntry(ArchiveEntry entry, InputStream input, long size, - Path ancestor, Path path) throws IOException { - HddsUtils.validatePath(path, ancestor); - - if (entry.isDirectory()) { - Files.createDirectories(path); - } else { - Path parent = path.getParent(); - if (parent != null) { - Files.createDirectories(parent); - } - - try (OutputStream fileOutput = Files.newOutputStream(path); - OutputStream output = new BufferedOutputStream(fileOutput)) { - int bufferSize = 1024; - byte[] buffer = new byte[bufferSize + 1]; - long remaining = size; - while (remaining > 0) { - int len = (int) Math.min(remaining, bufferSize); - int read = input.read(buffer, 0, len); - if (read >= 0) { - remaining -= read; - output.write(buffer, 0, read); - } else { - remaining = 0; - } - } - } - } - } - /** * Given a containerData include all the required container data/metadata * in a tar file. @@ -218,65 +184,10 @@ public static Path getChunkPath(Path baseDir, return KeyValueContainerLocationUtil.getChunksLocationPath(baseDir.toString()).toPath(); } - private byte[] readEntry(InputStream input, final long size) - throws IOException { - ByteArrayOutputStream output = new ByteArrayOutputStream(); - int bufferSize = 1024; - byte[] buffer = new byte[bufferSize + 1]; - long remaining = size; - while (remaining > 0) { - int len = (int) Math.min(remaining, bufferSize); - int read = input.read(buffer, 0, len); - remaining -= read; - output.write(buffer, 0, read); - } - return output.toByteArray(); - } - - private void includePath(Path dir, String subdir, - ArchiveOutputStream archiveOutput) throws IOException { - - // Add a directory entry before adding files, in case the directory is - // empty. - TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir); - archiveOutput.putArchiveEntry(entry); - archiveOutput.closeArchiveEntry(); - - // Add files in the directory. - try (Stream dirEntries = Files.list(dir)) { - for (Path path : dirEntries.collect(toList())) { - String entryName = subdir + "/" + path.getFileName(); - includeFile(path.toFile(), entryName, archiveOutput); - } - } - } - - static void includeFile(File file, String entryName, - ArchiveOutputStream archiveOutput) throws IOException { - TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName); - archiveOutput.putArchiveEntry(entry); - try (InputStream input = Files.newInputStream(file.toPath())) { - IOUtils.copy(input, archiveOutput); - } - archiveOutput.closeArchiveEntry(); - } - - private static ArchiveInputStream untar(InputStream input) { - return new TarArchiveInputStream(input); - } - - private static ArchiveOutputStream tar(OutputStream output) { - TarArchiveOutputStream os = new TarArchiveOutputStream(output); - os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); - return os; - } - - @VisibleForTesting InputStream decompress(InputStream input) throws IOException { return compression.wrap(input); } - @VisibleForTesting OutputStream compress(OutputStream output) throws IOException { return compression.wrap(output); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java index 86068af32b87..538809678d62 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java @@ -48,6 +48,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.Archiver; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.replication.CopyContainerCompression; import org.apache.ozone.test.SpyInputStream; @@ -391,12 +392,10 @@ private File writeSingleFile(Path parentPath, String fileName, private File packContainerWithSingleFile(File file, String entryName) throws Exception { File targetFile = TEMP_DIR.resolve("container.tar").toFile(); - try (OutputStream output = newOutputStream(targetFile.toPath()); - OutputStream compressed = packer.compress(output); - TarArchiveOutputStream archive = - new TarArchiveOutputStream(compressed)) { + Path path = targetFile.toPath(); + try (TarArchiveOutputStream archive = new TarArchiveOutputStream(packer.compress(newOutputStream(path)))) { archive.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); - TarContainerPacker.includeFile(file, entryName, archive); + Archiver.includeFile(file, entryName, archive); } return targetFile; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java new file mode 100644 index 000000000000..5bd93609a906 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import static java.util.stream.Collectors.toList; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hdds.HddsUtils; + +/** Create and extract archives. */ +public final class Archiver { + + private Archiver() { + // no instances (for now) + } + + /** Create tarball including contents of {@code from}. */ + public static void create(File tarFile, Path from) throws IOException { + try (ArchiveOutputStream out = tar(Files.newOutputStream(tarFile.toPath()))) { + includePath(from, "", out); + } + } + + /** Extract {@code tarFile} to {@code dir}. */ + public static void extract(File tarFile, Path dir) throws IOException { + Files.createDirectories(dir); + String parent = dir.toString(); + try (ArchiveInputStream in = untar(Files.newInputStream(tarFile.toPath()))) { + TarArchiveEntry entry; + while ((entry = in.getNextEntry()) != null) { + Path path = Paths.get(parent, entry.getName()); + extractEntry(entry, in, entry.getSize(), dir, path); + } + } + } + + public static byte[] readEntry(InputStream input, final long size) + throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + int bufferSize = 1024; + byte[] buffer = new byte[bufferSize + 1]; + long remaining = size; + while (remaining > 0) { + int len = (int) Math.min(remaining, bufferSize); + int read = input.read(buffer, 0, len); + remaining -= read; + output.write(buffer, 0, read); + } + return output.toByteArray(); + } + + public static void includePath(Path dir, String subdir, + ArchiveOutputStream archiveOutput) throws IOException { + + // Add a directory entry before adding files, in case the directory is + // empty. + TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir); + archiveOutput.putArchiveEntry(entry); + archiveOutput.closeArchiveEntry(); + + // Add files in the directory. + try (Stream dirEntries = Files.list(dir)) { + for (Path path : dirEntries.collect(toList())) { + File file = path.toFile(); + String entryName = subdir + "/" + path.getFileName(); + if (file.isDirectory()) { + includePath(path, entryName, archiveOutput); + } else { + includeFile(file, entryName, archiveOutput); + } + } + } + } + + public static long includeFile(File file, String entryName, + ArchiveOutputStream archiveOutput) throws IOException { + final long bytes; + TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName); + archiveOutput.putArchiveEntry(entry); + try (InputStream input = Files.newInputStream(file.toPath())) { + bytes = IOUtils.copyLarge(input, archiveOutput); + } + archiveOutput.closeArchiveEntry(); + return bytes; + } + + public static void extractEntry(ArchiveEntry entry, InputStream input, long size, + Path ancestor, Path path) throws IOException { + HddsUtils.validatePath(path, ancestor); + + if (entry.isDirectory()) { + Files.createDirectories(path); + } else { + Path parent = path.getParent(); + if (parent != null) { + Files.createDirectories(parent); + } + + try (OutputStream fileOutput = Files.newOutputStream(path); + OutputStream output = new BufferedOutputStream(fileOutput)) { + int bufferSize = 1024; + byte[] buffer = new byte[bufferSize + 1]; + long remaining = size; + while (remaining > 0) { + int len = (int) Math.min(remaining, bufferSize); + int read = input.read(buffer, 0, len); + if (read >= 0) { + remaining -= read; + output.write(buffer, 0, read); + } else { + remaining = 0; + } + } + } + } + } + + public static ArchiveInputStream untar(InputStream input) { + return new TarArchiveInputStream(input); + } + + public static ArchiveOutputStream tar(OutputStream output) { + TarArchiveOutputStream os = new TarArchiveOutputStream(output); + os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); + os.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); + return os; + } + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java index 739a764939ef..eeb65cb48ea7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java @@ -39,13 +39,14 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs; +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR; import com.google.common.base.Strings; import com.google.protobuf.BlockingService; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.file.Files; @@ -60,10 +61,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveOutputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.lang3.SystemUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -602,12 +601,9 @@ public static void writeDBCheckpointToStream( List toExcludeList, List excludedList) throws IOException { - try (TarArchiveOutputStream archiveOutputStream = - new TarArchiveOutputStream(destination); + try (ArchiveOutputStream archiveOutputStream = tar(destination); Stream files = Files.list(checkpoint.getCheckpointLocation())) { - archiveOutputStream.setBigNumberMode( - TarArchiveOutputStream.BIGNUMBER_POSIX); for (Path path : files.collect(Collectors.toList())) { if (path != null) { Path fileNamePath = path.getFileName(); @@ -625,29 +621,12 @@ public static void writeDBCheckpointToStream( } } - public static long includeFile(File file, String entryName, - ArchiveOutputStream archiveOutputStream) - throws IOException { - ArchiveEntry archiveEntry = - archiveOutputStream.createArchiveEntry(file, entryName); - archiveOutputStream.putArchiveEntry(archiveEntry); - long bytesWritten; - try (InputStream fis = Files.newInputStream(file.toPath())) { - bytesWritten = IOUtils.copy(fis, archiveOutputStream); - archiveOutputStream.flush(); - } finally { - archiveOutputStream.closeArchiveEntry(); - } - return bytesWritten; - } - // Mark tarball completed. public static void includeRatisSnapshotCompleteFlag( - ArchiveOutputStream archiveOutput) throws IOException { + ArchiveOutputStream archiveOutput) throws IOException { File file = File.createTempFile( OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME, ""); - String entryName = OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME; - includeFile(file, entryName, archiveOutput); + includeFile(file, OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME, archiveOutput); } static boolean ratisSnapshotComplete(Path dir) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index e7df608aed1e..727eb5e710cd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -18,7 +18,8 @@ package org.apache.hadoop.ozone.om; import static org.apache.commons.io.filefilter.TrueFileFilter.TRUE; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; @@ -55,7 +56,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import org.apache.commons.compress.archivers.ArchiveOutputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOCase; import org.apache.commons.io.file.Counters; @@ -160,12 +161,7 @@ public void writeDbDataToStream(DBCheckpoint checkpoint, // Map of link to path. Map hardLinkFiles = new HashMap<>(); - try (TarArchiveOutputStream archiveOutputStream = - new TarArchiveOutputStream(destination)) { - archiveOutputStream - .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); - archiveOutputStream - .setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); + try (ArchiveOutputStream archiveOutputStream = tar(destination)) { RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); DirectoryData sstBackupDir = new DirectoryData(tmpdir, @@ -582,7 +578,7 @@ private boolean includeSnapshotData(HttpServletRequest request) { private void writeFilesToArchive( Map copyFiles, Map hardLinkFiles, - ArchiveOutputStream archiveOutputStream, + ArchiveOutputStream archiveOutputStream, boolean completed, Path checkpointLocation) throws IOException { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index 5786702bfc8e..a590b834d033 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -31,17 +31,11 @@ import com.google.common.base.Preconditions; import com.google.inject.Singleton; import jakarta.annotation.Nonnull; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -60,24 +54,20 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import javax.ws.rs.core.Response; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; +import org.apache.hadoop.hdds.utils.Archiver; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdfs.web.URLConnectionFactory; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -105,8 +95,6 @@ @Singleton public class ReconUtils { - private static final int WRITE_BUFFER = 1048576; //1MB - public ReconUtils() { } @@ -165,59 +153,12 @@ public File getReconDbDir(ConfigurationSource conf, String dirConfigKey) { * * @param sourcePath the path to the directory to be archived. * @return tar file - * @throws IOException */ public static File createTarFile(Path sourcePath) throws IOException { - TarArchiveOutputStream tarOs = null; - OutputStream fileOutputStream = null; - try { - String sourceDir = sourcePath.toString(); - String fileName = sourceDir.concat(".tar"); - fileOutputStream = Files.newOutputStream(Paths.get(fileName)); - tarOs = new TarArchiveOutputStream(fileOutputStream); - tarOs.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); - File folder = new File(sourceDir); - File[] filesInDir = folder.listFiles(); - if (filesInDir != null) { - for (File file : filesInDir) { - addFilesToArchive(file.getName(), file, tarOs); - } - } - return new File(fileName); - } finally { - try { - org.apache.hadoop.io.IOUtils.closeStream(tarOs); - org.apache.hadoop.io.IOUtils.closeStream(fileOutputStream); - } catch (Exception e) { - log.error("Exception encountered when closing " + - "TAR file output stream: " + e); - } - } - } - - private static void addFilesToArchive(String source, File file, - TarArchiveOutputStream - tarFileOutputStream) - throws IOException { - tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source)); - if (file.isFile()) { - try (InputStream fileInputStream = Files.newInputStream(file.toPath())) { - BufferedInputStream bufferedInputStream = - new BufferedInputStream(fileInputStream); - org.apache.commons.compress.utils.IOUtils.copy(bufferedInputStream, - tarFileOutputStream); - tarFileOutputStream.closeArchiveEntry(); - } - } else if (file.isDirectory()) { - tarFileOutputStream.closeArchiveEntry(); - File[] filesInDir = file.listFiles(); - if (filesInDir != null) { - for (File cFile : filesInDir) { - addFilesToArchive(cFile.getAbsolutePath(), cFile, - tarFileOutputStream); - } - } - } + String source = StringUtils.removeEnd(sourcePath.toString(), "/"); + File tarFile = new File(source.concat(".tar")); + Archiver.create(tarFile, sourcePath); + return tarFile; } /** @@ -229,52 +170,7 @@ private static void addFilesToArchive(String source, File file, */ public void untarCheckpointFile(File tarFile, Path destPath) throws IOException { - - InputStream fileInputStream = null; - try { - fileInputStream = Files.newInputStream(tarFile.toPath()); - - //Create Destination directory if it does not exist. - if (!destPath.toFile().exists()) { - boolean success = destPath.toFile().mkdirs(); - if (!success) { - throw new IOException("Unable to create Destination directory."); - } - } - - try (TarArchiveInputStream tarInStream = - new TarArchiveInputStream(fileInputStream)) { - TarArchiveEntry entry; - - while ((entry = (TarArchiveEntry) tarInStream.getNextEntry()) != null) { - Path path = Paths.get(destPath.toString(), entry.getName()); - HddsUtils.validatePath(path, destPath); - File f = path.toFile(); - //If directory, create a directory. - if (entry.isDirectory()) { - boolean success = f.mkdirs(); - if (!success) { - log.error("Unable to create directory found in tar."); - } - } else { - //Write contents of file in archive to a new file. - int count; - byte[] data = new byte[WRITE_BUFFER]; - - OutputStream fos = Files.newOutputStream(f.toPath()); - try (BufferedOutputStream dest = - new BufferedOutputStream(fos, WRITE_BUFFER)) { - while ((count = - tarInStream.read(data, 0, WRITE_BUFFER)) != -1) { - dest.write(data, 0, count); - } - } - } - } - } - } finally { - IOUtils.closeStream(fileInputStream); - } + Archiver.extract(tarFile, destPath); }