Skip to content

Commit bad921c

Browse files
committed
[SPARK-5979][SPARK-6032] Smaller safer fix
1 parent c5ba975 commit bad921c

File tree

2 files changed

+51
-18
lines changed

2 files changed

+51
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
655655

656656
/**
657657
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
658-
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
659-
* simplicity for Spark Package users.
658+
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
660659
* @param coordinates Comma-delimited string of maven coordinates
661660
* @return Sequence of Maven coordinates
662661
*/
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
747746
md.addDependency(dd)
748747
}
749748
}
749+
750+
/** Add exclusion rules for dependencies already included in the spark-assembly */
751+
private[spark] def addExclusionRules(
752+
ivySettings: IvySettings,
753+
ivyConfName: String,
754+
md: DefaultModuleDescriptor): Unit = {
755+
// Add scala exclusion rule
756+
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
757+
val scalaDependencyExcludeRule =
758+
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
759+
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
760+
md.addExcludeRule(scalaDependencyExcludeRule)
761+
762+
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
763+
// other spark-streaming utility components. Underscore is there to differentiate between
764+
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
765+
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
766+
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
767+
768+
components.foreach { comp =>
769+
val sparkArtifacts =
770+
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
771+
val sparkDependencyExcludeRule =
772+
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
773+
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
774+
775+
md.addExcludeRule(sparkDependencyExcludeRule)
776+
}
777+
}
750778

751779
/** A nice function to use in tests as well. Values are dummy strings. */
752780
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
768796
if (coordinates == null || coordinates.trim.isEmpty) {
769797
""
770798
} else {
799+
val sysOut = System.out
800+
// To prevent ivy from logging to system out
801+
System.setOut(printStream)
771802
val artifacts = extractMavenCoordinates(coordinates)
772803
// Default configuration name for ivy
773804
val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
811842
val md = getModuleDescriptor
812843
md.setDefaultConf(ivyConfName)
813844

814-
// Add an exclusion rule for Spark and Scala Library
815-
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
816-
val sparkDependencyExcludeRule =
817-
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
818-
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
819-
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
820-
val scalaDependencyExcludeRule =
821-
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
822-
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
823-
824-
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
825-
md.addExcludeRule(sparkDependencyExcludeRule)
826-
md.addExcludeRule(scalaDependencyExcludeRule)
845+
// Add exclusion rules for Spark and Scala Library
846+
addExclusionRules(ivySettings, ivyConfName, md)
847+
// add all supplied maven artifacts as dependencies
827848
addDependenciesToIvy(md, artifacts, ivyConfName)
828849

829850
// resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
835856
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
836857
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
837858
retrieveOptions.setConfs(Array(ivyConfName)))
838-
859+
System.setOut(sysOut)
839860
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
840861
}
841862
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
117117
}
118118

119119
test("neglects Spark and Spark's dependencies") {
120-
val path = SparkSubmitUtils.resolveMavenCoordinates(
121-
"org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
120+
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
121+
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
122+
123+
val coordinates =
124+
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
125+
",org.apache.spark:spark-core_fake:1.2.0"
126+
127+
val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
122128
assert(path === "", "should return empty path")
129+
// Should not exclude the following dependency. Will throw an error, because it doesn't exist,
130+
// but the fact that it is checking means that it wasn't excluded.
131+
intercept[RuntimeException] {
132+
SparkSubmitUtils.resolveMavenCoordinates(coordinates +
133+
",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
134+
}
123135
}
124136
}

0 commit comments

Comments
 (0)