diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a9d6180d2fd7..730a04329238 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -27,8 +27,9 @@ import java.nio.ByteBuffer import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission import java.security.SecureRandom -import java.util.{Locale, Properties, Random, UUID} +import java.util.{EnumSet, Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.{GZIPInputStream, ZipInputStream} @@ -49,6 +50,7 @@ import com.google.common.collect.Interners import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex +import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipFile} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -597,7 +599,7 @@ private[spark] object Utils extends Logging { if (lowerSrc.endsWith(".jar")) { RunJar.unJar(source, dest, RunJar.MATCH_ANY) } else if (lowerSrc.endsWith(".zip")) { - FileUtil.unZip(source, dest) + unZip(source, dest) } else if ( lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) { FileUtil.unTar(source, dest) @@ -607,6 +609,66 @@ private[spark] object Utils extends Logging { } } + def unZip(inFile: File, unzipDir: File): Unit = { + val zipFile = new ZipFile(inFile) + try { + val entries = zipFile.getEntries + val targetDirPath = unzipDir.getCanonicalPath + File.separator + while ({entries.hasMoreElements}) { + val entry: ZipArchiveEntry = entries.nextElement + if (!entry.isDirectory) { + val in = zipFile.getInputStream(entry) + try { + val file = new File(unzipDir, entry.getName) + if (!file.getCanonicalPath.startsWith(targetDirPath)) { + throw new IOException( + "expanding " + entry.getName + " would create file outside of " + unzipDir + ) + } + if (!file.getParentFile.mkdirs && !file.getParentFile.isDirectory) { + throw new IOException("Mkdirs failed to create " + file.getParentFile.toString) + } + + val out = Files.newOutputStream(file.toPath) + try { + val buffer = new Array[Byte](8192) + var len = 0 + while ({len = in.read(buffer); len} != -1) { + out.write(buffer, 0, len) + } + } + finally { + out.close() + if (entry.getPlatform == ZipArchiveEntry.PLATFORM_UNIX) { + Files.setPosixFilePermissions(file.toPath, permissionsFromMode(entry.getUnixMode)) + } + } + } finally in.close() + } + } + } finally zipFile.close() + } + + /** The permission to store each file */ + def permissionsFromMode(mode: Int): java.util.Set[PosixFilePermission] = { + val permissions = + EnumSet.noneOf(classOf[PosixFilePermission]) + addPermissions(permissions, "OTHERS", mode.toLong) + addPermissions(permissions, "GROUP", (mode >> 3).toLong) + addPermissions(permissions, "OWNER", (mode >> 6).toLong) + permissions + } + + /** Assign the original permissions to the file */ + def addPermissions( + permissions: java.util.Set[PosixFilePermission], + prefix: String, + mode: Long): Unit = { + if ((mode & 1L) == 1L) permissions.add(PosixFilePermission.valueOf(prefix + "_EXECUTE")) + if ((mode & 2L) == 2L) permissions.add(PosixFilePermission.valueOf(prefix + "_WRITE")) + if ((mode & 4L) == 4L) permissions.add(PosixFilePermission.valueOf(prefix + "_READ")) + } + /** Records the duration of running `body`. */ def timeTakenMs[T](body: => T): (T, Long) = { val startTime = System.nanoTime() diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 62cd81917766..a955163c455a 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -31,6 +31,7 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import com.google.common.io.Files +import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.commons.math3.stat.inference.ChiSquareTest @@ -703,6 +704,77 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } } + test("unZip") { + def zipFile(srcFiles: Array[File], zipFile: File): Unit = { + assert(!zipFile.exists()) + zipFile.createNewFile() + var fileOutputStream: FileOutputStream = null + var zipOutputStream: ZipArchiveOutputStream = null + var fileInStream: FileInputStream = null + + try { + fileOutputStream = new FileOutputStream(zipFile) + zipOutputStream = new ZipArchiveOutputStream(fileOutputStream) + var zipEntry: ZipArchiveEntry = null + for (i <- 0 until srcFiles.length) { + fileInStream = new FileInputStream(srcFiles(i)) + zipEntry = new ZipArchiveEntry(srcFiles(i).getName) + // Assign permissions to files to be decompressed + zipEntry.setUnixMode(866) + zipOutputStream.putArchiveEntry(zipEntry) + var len = 0 + val buffer = new Array[Byte](1024) + while ({len = fileInStream.read(buffer); len} != -1) { + zipOutputStream.write(buffer, 0, len) + } + } + } catch { + case e: IOException => e.printStackTrace() + } finally { + zipOutputStream.closeArchiveEntry() + zipOutputStream.close() + fileInStream.close() + fileOutputStream.close() + } + } + + val tempDir = Utils.createTempDir() + assert(tempDir.isDirectory) + val sourceDir = new File(tempDir, "source-dir") + assert(tempDir.exists()) + sourceDir.mkdir() + val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) + assert(innerSourceDir.isDirectory) + val sourceFile = File.createTempFile("tmp", "tmp", innerSourceDir) + Files.write("file", sourceFile, UTF_8) + assert(sourceFile.exists()) + val sourceSecondFile = File.createTempFile("tmp2", "tmp2", innerSourceDir) + Files.write("second file", sourceSecondFile, UTF_8) + assert(sourceSecondFile.exists()) + val files = new Array[File](2) + files(0) = sourceFile + files(1) = sourceSecondFile + val targetZip = new File(tempDir, "target-dir.zip") + // Compress the file into ZIP format + zipFile(files, targetZip) + assert(targetZip.exists()) + + val targetDir = new File(tempDir, "target-dir") + Utils.unZip(targetZip, targetDir) + assert(targetDir.exists()) + + // The file permissions must be the same as before the decompression + val targetFiles = targetDir.listFiles() + // The permissions of the two files are the same to avoid contingency + assert(targetFiles.length === 2) + assert(targetFiles(0).canRead === true) + assert(targetFiles(0).canWrite === false) + assert(targetFiles(0).canExecute === true) + assert(targetFiles(1).canRead === true) + assert(targetFiles(1).canWrite === false) + assert(targetFiles(1).canExecute === true) + } + test("deleteRecursively") { val tempDir1 = Utils.createTempDir() assert(tempDir1.exists())