Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import java.nio.ByteBuffer
import java.nio.channels.{Channels, FileChannel, WritableByteChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission
import java.security.SecureRandom
import java.util.{Locale, Properties, Random, UUID}
import java.util.{EnumSet, Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.{GZIPInputStream, ZipInputStream}
Expand All @@ -49,6 +50,7 @@ import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipFile}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -597,7 +599,7 @@ private[spark] object Utils extends Logging {
if (lowerSrc.endsWith(".jar")) {
RunJar.unJar(source, dest, RunJar.MATCH_ANY)
} else if (lowerSrc.endsWith(".zip")) {
FileUtil.unZip(source, dest)
unZip(source, dest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to chmod again after unzip? I'm not sure

} else if (
lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) {
FileUtil.unTar(source, dest)
Expand All @@ -607,6 +609,66 @@ private[spark] object Utils extends Logging {
}
}

def unZip(inFile: File, unzipDir: File): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refer to other references of codes? e.g., https://cs.android.com/android/platform/superproject/+/master:tools/tradefederation/core/common_util/com/android/tradefed/util/ZipUtil2.java;drc=master;l=44

It has a bunch of corner cases handling. .e.g., we will have to skip this logic if the platform is non-Unix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your suggestion is right, we really can not cover all scenarios, I refer to, for the logic of granting file permissions added judgment, for non-UNIX platforms, skip this logic.

val zipFile = new ZipFile(inFile)
try {
val entries = zipFile.getEntries
val targetDirPath = unzipDir.getCanonicalPath + File.separator
while ({entries.hasMoreElements}) {
val entry: ZipArchiveEntry = entries.nextElement
if (!entry.isDirectory) {
val in = zipFile.getInputStream(entry)
try {
val file = new File(unzipDir, entry.getName)
if (!file.getCanonicalPath.startsWith(targetDirPath)) {
throw new IOException(
"expanding " + entry.getName + " would create file outside of " + unzipDir
)
}
if (!file.getParentFile.mkdirs && !file.getParentFile.isDirectory) {
throw new IOException("Mkdirs failed to create " + file.getParentFile.toString)
}

val out = Files.newOutputStream(file.toPath)
try {
val buffer = new Array[Byte](8192)
var len = 0
while ({len = in.read(buffer); len} != -1) {
out.write(buffer, 0, len)
}
}
finally {
out.close()
if (entry.getPlatform == ZipArchiveEntry.PLATFORM_UNIX) {
Files.setPosixFilePermissions(file.toPath, permissionsFromMode(entry.getUnixMode))
}
}
} finally in.close()
}
}
} finally zipFile.close()
}

/** The permission to store each file */
def permissionsFromMode(mode: Int): java.util.Set[PosixFilePermission] = {
val permissions =
EnumSet.noneOf(classOf[PosixFilePermission])
addPermissions(permissions, "OTHERS", mode.toLong)
addPermissions(permissions, "GROUP", (mode >> 3).toLong)
addPermissions(permissions, "OWNER", (mode >> 6).toLong)
permissions
}

/** Assign the original permissions to the file */
def addPermissions(
permissions: java.util.Set[PosixFilePermission],
prefix: String,
mode: Long): Unit = {
if ((mode & 1L) == 1L) permissions.add(PosixFilePermission.valueOf(prefix + "_EXECUTE"))
if ((mode & 2L) == 2L) permissions.add(PosixFilePermission.valueOf(prefix + "_WRITE"))
if ((mode & 4L) == 4L) permissions.add(PosixFilePermission.valueOf(prefix + "_READ"))
}

/** Records the duration of running `body`. */
def timeTakenMs[T](body: => T): (T, Long) = {
val startTime = System.nanoTime()
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random

import com.google.common.io.Files
import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.commons.math3.stat.inference.ChiSquareTest
Expand Down Expand Up @@ -703,6 +704,77 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
}

test("unZip") {
def zipFile(srcFiles: Array[File], zipFile: File): Unit = {
assert(!zipFile.exists())
zipFile.createNewFile()
var fileOutputStream: FileOutputStream = null
var zipOutputStream: ZipArchiveOutputStream = null
var fileInStream: FileInputStream = null

try {
fileOutputStream = new FileOutputStream(zipFile)
zipOutputStream = new ZipArchiveOutputStream(fileOutputStream)
var zipEntry: ZipArchiveEntry = null
for (i <- 0 until srcFiles.length) {
fileInStream = new FileInputStream(srcFiles(i))
zipEntry = new ZipArchiveEntry(srcFiles(i).getName)
// Assign permissions to files to be decompressed
zipEntry.setUnixMode(866)
zipOutputStream.putArchiveEntry(zipEntry)
var len = 0
val buffer = new Array[Byte](1024)
while ({len = fileInStream.read(buffer); len} != -1) {
zipOutputStream.write(buffer, 0, len)
}
}
} catch {
case e: IOException => e.printStackTrace()
} finally {
zipOutputStream.closeArchiveEntry()
zipOutputStream.close()
fileInStream.close()
fileOutputStream.close()
}
}

val tempDir = Utils.createTempDir()
assert(tempDir.isDirectory)
val sourceDir = new File(tempDir, "source-dir")
assert(tempDir.exists())
sourceDir.mkdir()
val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
assert(innerSourceDir.isDirectory)
val sourceFile = File.createTempFile("tmp", "tmp", innerSourceDir)
Files.write("file", sourceFile, UTF_8)
assert(sourceFile.exists())
val sourceSecondFile = File.createTempFile("tmp2", "tmp2", innerSourceDir)
Files.write("second file", sourceSecondFile, UTF_8)
assert(sourceSecondFile.exists())
val files = new Array[File](2)
files(0) = sourceFile
files(1) = sourceSecondFile
val targetZip = new File(tempDir, "target-dir.zip")
// Compress the file into ZIP format
zipFile(files, targetZip)
assert(targetZip.exists())

val targetDir = new File(tempDir, "target-dir")
Utils.unZip(targetZip, targetDir)
assert(targetDir.exists())

// The file permissions must be the same as before the decompression
val targetFiles = targetDir.listFiles()
// The permissions of the two files are the same to avoid contingency
assert(targetFiles.length === 2)
assert(targetFiles(0).canRead === true)
assert(targetFiles(0).canWrite === false)
assert(targetFiles(0).canExecute === true)
assert(targetFiles(1).canRead === true)
assert(targetFiles(1).canWrite === false)
assert(targetFiles(1).canExecute === true)
}

test("deleteRecursively") {
val tempDir1 = Utils.createTempDir()
assert(tempDir1.exists())
Expand Down