diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5cd6c2b9c380a..127c015f4ac08 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1775,7 +1775,7 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addJar(path: String) { - def addJarFile(file: File): String = { + def addLocalJarFile(file: File): String = { try { if (!file.exists()) { throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found") @@ -1792,12 +1792,36 @@ class SparkContext(config: SparkConf) extends Logging { } } + def checkRemoteJarFile(path: String): String = { + val hadoopPath = new Path(path) + val scheme = new URI(path).getScheme + if (!Array("http", "https", "ftp").contains(scheme)) { + try { + val fs = hadoopPath.getFileSystem(hadoopConfiguration) + if (!fs.exists(hadoopPath)) { + throw new FileNotFoundException(s"Jar ${path} not found") + } + if (fs.isDirectory(hadoopPath)) { + throw new IllegalArgumentException( + s"Directory ${path} is not allowed for addJar") + } + path + } catch { + case NonFatal(e) => + logError(s"Failed to add $path to Spark environment", e) + null + } + } else { + path + } + } + if (path == null) { logWarning("null specified as parameter to addJar") } else { val key = if (path.contains("\\")) { // For local paths with backslashes on Windows, URI throws an exception - addJarFile(new File(path)) + addLocalJarFile(new File(path)) } else { val uri = new URI(path) // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies @@ -1806,12 +1830,12 @@ class SparkContext(config: SparkConf) extends Logging { // A JAR file which exists only on the driver node case null => // SPARK-22585 path without schema is not url encoded - addJarFile(new File(uri.getRawPath)) + addLocalJarFile(new File(uri.getRawPath)) // A JAR file which exists only on the driver node - case "file" => addJarFile(new File(uri.getPath)) + case "file" => addLocalJarFile(new File(uri.getPath)) // A JAR file which exists locally on every worker node case "local" => "file:" + uri.getPath - case _ => path + case _ => checkRemoteJarFile(path) } } if (key != null) { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 7a16f7b715e63..620b5c4949178 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -165,6 +165,17 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + test("add FS jar files not exists") { + try { + val jarPath = "hdfs:///no/path/to/TestUDTF.jar" + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(jarPath) + assert(sc.listJars().forall(!_.contains("TestUDTF.jar"))) + } finally { + sc.stop() + } + } + test("SPARK-17650: malformed url's throw exceptions before bricking Executors") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))