Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1801,40 +1801,39 @@ class SparkContext(config: SparkConf) extends Logging {
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
def addJarFile(file: File): String = {
try {
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
}
if (file.isDirectory) {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
env.rpcEnv.fileServer.addJar(file)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
}
}

if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
var key = ""
if (path.contains("\\")) {
val key = if (path.contains("\\")) {
// For local paths with backslashes on Windows, URI throws an exception
key = env.rpcEnv.fileServer.addJar(new File(path))
addJarFile(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
key = uri.getScheme match {
uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
try {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I tried to move this try-catch logic into addJarFile and used this for local paths with backslashes on Windows. This is covered in add jar with invalid path in SparkContextSuite.

val file = new File(uri.getPath)
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
}
if (file.isDirectory) {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
env.rpcEnv.fileServer.addJar(new File(uri.getPath))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
}
case null | "file" => addJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" =>
"file:" + uri.getPath
case _ =>
path
case "local" => "file:" + uri.getPath
case _ => path
}
}
if (key != null) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,13 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addJar(tmpJar.getAbsolutePath)

// Invaid jar path will only print the error log, will not add to file server.
// Invalid jar path will only print the error log, will not add to file server.
sc.addJar("dummy.jar")
sc.addJar("")
sc.addJar(tmpDir.getAbsolutePath)

sc.listJars().size should be (1)
sc.listJars().head should include (tmpJar.getName)
assert(sc.listJars().size == 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives a better error message.

Before

2 was not equal to 1 (SparkContextSuite.scala:309)

After

ArrayBuffer("spark://172.24.17.81:2411/jars/spark-7e45e7da-ca1b-4e33-8a64-1fb44860ee76", "spark://172.24.17.81:2411/jars/test7132431731623035882.jar") had size 2 instead of expected size 1 (SparkContextSuite.scala:309)

assert(sc.listJars().head.contains(tmpJar.getName))
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
Expand Down
33 changes: 30 additions & 3 deletions core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,50 @@ class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter {
Utils.clearLocalRootDirs()
}

private def assumeNonExistentAndNotCreatable(f: File): Unit = {
try {
assume(!f.exists() && !f.mkdirs())
} finally {
Utils.deleteRecursively(f)
}
}

test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is, Utils.getLocalDir -> Utils.getOrCreateLocalRootDirs actually creates the directory. So, even if the path does not exist, this can be created. I believe the not existing directory are not created under the root in the most cases. However, on Windows, it is arguably more possible (at least it seems working in AppVeyor) as it creates the directory under C: as below:

scala> val a = new java.io.File("/NENEXISTENT_PATH")
a: java.io.File = \NENEXISTENT_PATH

scala> a.exists()
res3: Boolean = false

scala> a.mkdirs()
res4: Boolean = true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too sure which path I should provide here for Windows. So, I added assumeNonExistentAndNotCreatable here to check if the directory is creatable.

// Regression test for SPARK-2974
assert(!new File("/NONEXISTENT_PATH").exists())
val f = new File("/NONEXISTENT_PATH")
assumeNonExistentAndNotCreatable(f)

val conf = new SparkConf(false)
.set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
assert(new File(Utils.getLocalDir(conf)).exists())

// This directory should not be created.
assert(!f.exists())
}

test("SPARK_LOCAL_DIRS override also affects driver") {
// Regression test for SPARK-2975
assert(!new File("/NONEXISTENT_PATH").exists())
// Regression test for SPARK-2974
val f = new File("/NONEXISTENT_PATH")
assumeNonExistentAndNotCreatable(f)

// spark.local.dir only contains invalid directories, but that's not a problem since
// SPARK_LOCAL_DIRS will override it on both the driver and workers:
val conf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> System.getProperty("java.io.tmpdir")))
.set("spark.local.dir", "/NONEXISTENT_PATH")
assert(new File(Utils.getLocalDir(conf)).exists())

// This directory should not be created.
assert(!f.exists())
}

test("Utils.getLocalDir() throws an exception if any temporary directory cannot be retrieved") {
val path1 = "/NONEXISTENT_PATH_ONE"
val path2 = "/NONEXISTENT_PATH_TWO"
val f1 = new File(path1)
val f2 = new File(path2)
assumeNonExistentAndNotCreatable(f1)
assumeNonExistentAndNotCreatable(f2)

assert(!new File(path1).exists())
assert(!new File(path2).exists())
val conf = new SparkConf(false).set("spark.local.dir", s"$path1,$path2")
Expand All @@ -67,5 +90,9 @@ class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter {
// If any temporary directory could not be retrieved under the given paths above, it should
// throw an exception with the message that includes the paths.
assert(message.contains(s"$path1,$path2"))

// These directories should not be created.
assert(!f1.exists())
assert(!f2.exists())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
withTable("t") {
withTempPath { path =>
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
sql(s"CREATE TABLE t USING parquet LOCATION '$path'")
sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
spark.catalog.cacheTable("t")
spark.table("t").select($"i").cache()
checkAnswer(spark.table("t").select($"i"), Row(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {

val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
assert(rootPath.toString.contains(basePath.toString))
assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/")))

assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName))
Expand Down
Loading