Skip to content

Commit 5a55c96

Browse files
brkyvzpwendell
authored andcommitted
[SPARK-5979][SPARK-6032] Smaller safer --packages fix
pwendell tdas This is the safer parts of PR apache#4754: - SPARK-5979: All dependencies with the groupId `org.apache.spark` passed through `--packages`, were being excluded from the dependency tree on the assumption that they would be in the assembly jar. This is not the case, therefore the exclusion rules had to be defined more explicitly. - SPARK-6032: Ivy prints a whole lot of logs while retrieving dependencies. These were printed to `System.out`. Moved the logging to `System.err`. Author: Burak Yavuz <[email protected]> Closes apache#4802 from brkyvz/simple-streaming-fix and squashes the following commits: e0f38cb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into simple-streaming-fix bad921c [Burak Yavuz] [SPARK-5979][SPARK-6032] Smaller safer fix (cherry picked from commit 6d8e5fb) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 1747e0a commit 5a55c96

File tree

2 files changed

+51
-18
lines changed

2 files changed

+51
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
655655

656656
/**
657657
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
658-
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
659-
* simplicity for Spark Package users.
658+
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
660659
* @param coordinates Comma-delimited string of maven coordinates
661660
* @return Sequence of Maven coordinates
662661
*/
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
747746
md.addDependency(dd)
748747
}
749748
}
749+
750+
/** Add exclusion rules for dependencies already included in the spark-assembly */
751+
private[spark] def addExclusionRules(
752+
ivySettings: IvySettings,
753+
ivyConfName: String,
754+
md: DefaultModuleDescriptor): Unit = {
755+
// Add scala exclusion rule
756+
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
757+
val scalaDependencyExcludeRule =
758+
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
759+
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
760+
md.addExcludeRule(scalaDependencyExcludeRule)
761+
762+
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
763+
// other spark-streaming utility components. Underscore is there to differentiate between
764+
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
765+
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
766+
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
767+
768+
components.foreach { comp =>
769+
val sparkArtifacts =
770+
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
771+
val sparkDependencyExcludeRule =
772+
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
773+
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
774+
775+
md.addExcludeRule(sparkDependencyExcludeRule)
776+
}
777+
}
750778

751779
/** A nice function to use in tests as well. Values are dummy strings. */
752780
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
768796
if (coordinates == null || coordinates.trim.isEmpty) {
769797
""
770798
} else {
799+
val sysOut = System.out
800+
// To prevent ivy from logging to system out
801+
System.setOut(printStream)
771802
val artifacts = extractMavenCoordinates(coordinates)
772803
// Default configuration name for ivy
773804
val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
811842
val md = getModuleDescriptor
812843
md.setDefaultConf(ivyConfName)
813844

814-
// Add an exclusion rule for Spark and Scala Library
815-
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
816-
val sparkDependencyExcludeRule =
817-
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
818-
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
819-
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
820-
val scalaDependencyExcludeRule =
821-
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
822-
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
823-
824-
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
825-
md.addExcludeRule(sparkDependencyExcludeRule)
826-
md.addExcludeRule(scalaDependencyExcludeRule)
845+
// Add exclusion rules for Spark and Scala Library
846+
addExclusionRules(ivySettings, ivyConfName, md)
847+
// add all supplied maven artifacts as dependencies
827848
addDependenciesToIvy(md, artifacts, ivyConfName)
828849

829850
// resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
835856
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
836857
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
837858
retrieveOptions.setConfs(Array(ivyConfName)))
838-
859+
System.setOut(sysOut)
839860
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
840861
}
841862
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
117117
}
118118

119119
test("neglects Spark and Spark's dependencies") {
120-
val path = SparkSubmitUtils.resolveMavenCoordinates(
121-
"org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
120+
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
121+
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
122+
123+
val coordinates =
124+
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
125+
",org.apache.spark:spark-core_fake:1.2.0"
126+
127+
val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
122128
assert(path === "", "should return empty path")
129+
// Should not exclude the following dependency. Will throw an error, because it doesn't exist,
130+
// but the fact that it is checking means that it wasn't excluded.
131+
intercept[RuntimeException] {
132+
SparkSubmitUtils.resolveMavenCoordinates(coordinates +
133+
",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
134+
}
123135
}
124136
}

0 commit comments

Comments
 (0)