diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66fe1d7f2ba13..cd8784a3ad951 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -380,6 +380,7 @@ class SparkContext(config: SparkConf) extends Logging { try { _conf = config.clone() _conf.validateSettings() + _conf.set("spark.app.startTime", startTime.toString) if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") @@ -487,11 +488,17 @@ class SparkContext(config: SparkConf) extends Logging { // Add each JAR given through the constructor if (jars != null) { - jars.foreach(addJar) + jars.foreach(jar => addJar(jar, true)) + if (addedJars.nonEmpty) { + _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(",")) + } } if (files != null) { - files.foreach(addFile) + files.foreach(file => addFile(file, false, true)) + if (addedFiles.nonEmpty) { + _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(",")) + } } _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) @@ -1495,7 +1502,7 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String): Unit = { - addFile(path, false) + addFile(path, false, false) } /** @@ -1517,6 +1524,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String, recursive: Boolean): Unit = { + addFile(path, recursive, false) + } + + private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI @@ -1554,7 +1565,7 @@ class SparkContext(config: SparkConf) extends Logging { path } } - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the @@ -1564,7 +1575,7 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() } else { logWarning(s"The path $path has been added already. Overwriting of added paths " + - "is not supported in the current version.") + "is not supported in the current version.") } } @@ -1827,6 +1838,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addJar(path: String): Unit = { + addJar(path, false) + } + + private def addJar(path: String, addedOnSubmit: Boolean): Unit = { def addLocalJarFile(file: File): String = { try { if (!file.exists()) { @@ -1891,7 +1906,7 @@ class SparkContext(config: SparkConf) extends Logging { } } if (key != null) { - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e9f1d9c250ad2..a90ec2e028e4f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -213,6 +213,20 @@ private[spark] class Executor( heartbeater.start() + private val appStartTime = conf.getLong("spark.app.startTime", 0) + + // To allow users to distribute plugins and their required files + // specified by --jars and --files on application submission, those jars/files should be + // downloaded and added to the class loader via updateDependencies. + // This should be done before plugin initialization below + // because executors search plugins from the class loader and initialize them. + private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key => + conf.getOption(s"spark.app.initial.$key.urls").map { urls => + Map(urls.split(",").map(url => (url, appStartTime)): _*) + }.getOrElse(Map.empty) + } + updateDependencies(initialUserFiles, initialUserJars) + // Plugins need to load using a class loader that includes the executor's user classpath. // Plugins also needs to be initialized after the heartbeater started // to avoid blocking to send heartbeat (see SPARK-32175). diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java index 0f489fb219010..b188ee16b97d0 100644 --- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.*; import org.apache.spark.*; +import org.apache.spark.util.Utils; /** * Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext. @@ -35,14 +36,16 @@ public class JavaSparkContextSuite implements Serializable { @Test - public void javaSparkContext() { + public void javaSparkContext() throws IOException { + File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); + String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString(); String[] jars = new String[] {}; java.util.Map environment = new java.util.HashMap<>(); new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop(); new JavaSparkContext("local", "name", new SparkConf()).stop(); new JavaSparkContext("local", "name").stop(); - new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop(); + new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop(); new JavaSparkContext("local", "name", "sparkHome", jars).stop(); new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop(); } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 071508713e405..fb2a65e8c07cf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -566,7 +566,8 @@ class SparkSubmitSuite } } - val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar") + val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty) + val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString) val appArgs2 = new SparkSubmitArguments(clArgs2) val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2) assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS)) @@ -1216,6 +1217,86 @@ class SparkSubmitSuite testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*")) } + test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") { + val tempDir = Utils.createTempDir() + val tempFileName = "test.txt" + val tempFile = new File(tempDir, tempFileName) + + // scalastyle:off println + Utils.tryWithResource { + new PrintWriter(tempFile) + } { writer => + writer.println("SparkPluginTest") + } + // scalastyle:on println + + val sparkPluginCodeBody = + """ + |@Override + |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() { + | return new TestExecutorPlugin(); + |} + | + |@Override + |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; } + """.stripMargin + val executorPluginCodeBody = + s""" + |@Override + |public void init( + | org.apache.spark.api.plugin.PluginContext ctx, + | java.util.Map extraConf) { + | String str = null; + | try (java.io.BufferedReader reader = + | new java.io.BufferedReader(new java.io.InputStreamReader( + | new java.io.FileInputStream("$tempFileName")))) { + | str = reader.readLine(); + | } catch (java.io.IOException e) { + | throw new RuntimeException(e); + | } finally { + | assert str == "SparkPluginTest"; + | } + |} + """.stripMargin + + val compiledExecutorPlugin = TestUtils.createCompiledClass( + "TestExecutorPlugin", + tempDir, + "", + null, + Seq.empty, + Seq("org.apache.spark.api.plugin.ExecutorPlugin"), + executorPluginCodeBody) + + val thisClassPath = + sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL) + val compiledSparkPlugin = TestUtils.createCompiledClass( + "TestSparkPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + Seq("org.apache.spark.api.plugin.SparkPlugin"), + sparkPluginCodeBody) + + val jarUrl = TestUtils.createJar( + Seq(compiledSparkPlugin, compiledExecutorPlugin), + new File(tempDir, "testplugin.jar")) + + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[1,1,1024]", + "--conf", "spark.plugins=TestSparkPlugin", + "--conf", "spark.ui.enabled=false", + "--jars", jarUrl.toString + "," + unusedJar.toString, + "--files", tempFile.toString + "," + unusedFile.toString, + unusedJar.toString) + runSparkSubmit(args) + } + private def testRemoteResources( enableHttpFs: Boolean, blacklistSchemes: Seq[String] = Nil): Unit = { diff --git a/docs/monitoring.md b/docs/monitoring.md index 4608a4e61a41c..8471417184974 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1326,9 +1326,3 @@ Both take a comma-separated list of class names that implement the possible for one list to be placed in the Spark default config file, allowing users to easily add other plugins from the command line without overwriting the config file's list. Duplicate plugins are ignored. - -Distribution of the jar files containing the plugin code is currently not done by Spark. The user -or admin should make sure that the jar files are available to Spark applications, for example, by -including the plugin jar with the Spark distribution. The exception to this rule is the YARN -backend, where the --jars command line option (or equivalent config entry) can be -used to make the plugin code available to both executors and cluster-mode drivers.