Skip to content

Commit cbaf287

Browse files
committed
Merge branch 'master' into SPARK-3454
Conflicts: core/src/main/scala/org/apache/spark/ui/SparkUI.scala
2 parents 56db31e + 5c1faba commit cbaf287

File tree

52 files changed

+1648
-618
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1648
-618
lines changed

bin/spark-shell2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ rem
1919

2020
set SPARK_HOME=%~dp0..
2121

22-
echo "%*" | findstr " --help -h" >nul
22+
echo "%*" | findstr " \<--help\> \<-h\>" >nul
2323
if %ERRORLEVEL% equ 0 (
2424
call :usage
2525
exit /b 0

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,23 @@ private[spark] object TestUtils {
105105
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
106106
}
107107

108-
private[spark] class JavaSourceFromString(val name: String, val code: String)
108+
private class JavaSourceFromString(val name: String, val code: String)
109109
extends SimpleJavaFileObject(createURI(name), SOURCE) {
110110
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
111111
}
112112

113-
/** Creates a compiled class with the source file. Class file will be placed in destDir. */
113+
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
114114
def createCompiledClass(
115115
className: String,
116116
destDir: File,
117-
sourceFile: JavaSourceFromString,
118-
classpathUrls: Seq[URL]): File = {
117+
toStringValue: String = "",
118+
baseClass: String = null,
119+
classpathUrls: Seq[URL] = Seq()): File = {
119120
val compiler = ToolProvider.getSystemJavaCompiler
121+
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
122+
val sourceFile = new JavaSourceFromString(className,
123+
"public class " + className + extendsText + " implements java.io.Serializable {" +
124+
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
120125

121126
// Calling this outputs a class file in pwd. It's easier to just rename the file than
122127
// build a custom FileManager that controls the output location.
@@ -139,18 +144,4 @@ private[spark] object TestUtils {
139144
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
140145
out
141146
}
142-
143-
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
144-
def createCompiledClass(
145-
className: String,
146-
destDir: File,
147-
toStringValue: String = "",
148-
baseClass: String = null,
149-
classpathUrls: Seq[URL] = Seq()): File = {
150-
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
151-
val sourceFile = new JavaSourceFromString(className,
152-
"public class " + className + extendsText + " implements java.io.Serializable {" +
153-
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
154-
createCompiledClass(className, destDir, sourceFile, classpathUrls)
155-
}
156147
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.io.{ByteArrayInputStream, DataInputStream}
2021
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
23+
import java.util.{Arrays, Comparator}
2224

25+
import com.google.common.primitives.Longs
2326
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2528
import org.apache.hadoop.fs.FileSystem.Statistics
29+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2630
import org.apache.hadoop.mapred.JobConf
2731
import org.apache.hadoop.mapreduce.JobContext
2832
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,14 +36,17 @@ import org.apache.spark.annotation.DeveloperApi
3236
import org.apache.spark.util.Utils
3337

3438
import scala.collection.JavaConversions._
39+
import scala.concurrent.duration._
40+
import scala.language.postfixOps
3541

3642
/**
3743
* :: DeveloperApi ::
3844
* Contains util methods to interact with Hadoop from Spark.
3945
*/
4046
@DeveloperApi
4147
class SparkHadoopUtil extends Logging {
42-
val conf: Configuration = newConfiguration(new SparkConf())
48+
private val sparkConf = new SparkConf()
49+
val conf: Configuration = newConfiguration(sparkConf)
4350
UserGroupInformation.setConfiguration(conf)
4451

4552
/**
@@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
201208
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
202209
}
203210

211+
/**
212+
* Lists all the files in a directory with the specified prefix, and does not end with the
213+
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
214+
* the respective files.
215+
*/
216+
def listFilesSorted(
217+
remoteFs: FileSystem,
218+
dir: Path,
219+
prefix: String,
220+
exclusionSuffix: String): Array[FileStatus] = {
221+
val fileStatuses = remoteFs.listStatus(dir,
222+
new PathFilter {
223+
override def accept(path: Path): Boolean = {
224+
val name = path.getName
225+
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
226+
}
227+
})
228+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
229+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
230+
Longs.compare(o1.getModificationTime, o2.getModificationTime)
231+
}
232+
})
233+
fileStatuses
234+
}
235+
236+
/**
237+
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
238+
* is valid the latest)?
239+
* This will return -ve (or 0) value if the fraction of validity has already expired.
240+
*/
241+
def getTimeFromNowToRenewal(
242+
sparkConf: SparkConf,
243+
fraction: Double,
244+
credentials: Credentials): Long = {
245+
val now = System.currentTimeMillis()
246+
247+
val renewalInterval =
248+
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
249+
250+
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
251+
.map { t =>
252+
val identifier = new DelegationTokenIdentifier()
253+
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
254+
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
255+
}.foldLeft(0L)(math.max)
256+
}
257+
258+
259+
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
260+
val fileName = credentialsPath.getName
261+
fileName.substring(
262+
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
263+
}
264+
265+
204266
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
205267

206268
/**
@@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
231293
}
232294
}
233295
}
296+
297+
/**
298+
* Start a thread to periodically update the current user's credentials with new delegation
299+
* tokens so that writes to HDFS do not fail.
300+
*/
301+
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
302+
303+
/**
304+
* Stop the thread that does the delegation token updates.
305+
*/
306+
private[spark] def stopExecutorDelegationTokenRenewer() {}
234307
}
235308

236309
object SparkHadoopUtil {
@@ -251,6 +324,10 @@ object SparkHadoopUtil {
251324
}
252325
}
253326

327+
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
328+
329+
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
330+
254331
def get: SparkHadoopUtil = {
255332
hadoop
256333
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 64 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.deploy
2020
import java.io.{File, PrintStream}
2121
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
2222
import java.net.URL
23-
import java.nio.file.{Path => JavaPath}
2423
import java.security.PrivilegedExceptionAction
2524

2625
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -401,6 +400,10 @@ object SparkSubmit {
401400
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
402401
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
403402

403+
// Yarn client or cluster
404+
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
405+
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
406+
404407
// Other options
405408
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
406409
sysProp = "spark.executor.cores"),
@@ -709,9 +712,7 @@ private[deploy] object SparkSubmitUtils {
709712
* @param artifactId the artifactId of the coordinate
710713
* @param version the version of the coordinate
711714
*/
712-
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
713-
override def toString: String = s"$groupId:$artifactId:$version"
714-
}
715+
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
715716

716717
/**
717718
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
@@ -734,10 +735,6 @@ private[deploy] object SparkSubmitUtils {
734735
}
735736
}
736737

737-
/** Path of the local Maven cache. */
738-
private[spark] def m2Path: JavaPath = new File(System.getProperty("user.home"),
739-
".m2" + File.separator + "repository" + File.separator).toPath
740-
741738
/**
742739
* Extracts maven coordinates from a comma-delimited string
743740
* @param remoteRepos Comma-delimited string of remote repositories
@@ -751,7 +748,8 @@ private[deploy] object SparkSubmitUtils {
751748

752749
val localM2 = new IBiblioResolver
753750
localM2.setM2compatible(true)
754-
localM2.setRoot(m2Path.toUri.toString)
751+
val m2Path = ".m2" + File.separator + "repository" + File.separator
752+
localM2.setRoot(new File(System.getProperty("user.home"), m2Path).toURI.toString)
755753
localM2.setUsepoms(true)
756754
localM2.setName("local-m2-cache")
757755
cr.add(localM2)
@@ -876,72 +874,69 @@ private[deploy] object SparkSubmitUtils {
876874
""
877875
} else {
878876
val sysOut = System.out
879-
try {
880-
// To prevent ivy from logging to system out
881-
System.setOut(printStream)
882-
val artifacts = extractMavenCoordinates(coordinates)
883-
// Default configuration name for ivy
884-
val ivyConfName = "default"
885-
// set ivy settings for location of cache
886-
val ivySettings: IvySettings = new IvySettings
887-
// Directories for caching downloads through ivy and storing the jars when maven coordinates
888-
// are supplied to spark-submit
889-
val alternateIvyCache = ivyPath.getOrElse("")
890-
val packagesDirectory: File =
891-
if (alternateIvyCache.trim.isEmpty) {
892-
new File(ivySettings.getDefaultIvyUserDir, "jars")
893-
} else {
894-
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
895-
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
896-
new File(alternateIvyCache, "jars")
897-
}
898-
printStream.println(
899-
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
900-
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
901-
// create a pattern matcher
902-
ivySettings.addMatcher(new GlobPatternMatcher)
903-
// create the dependency resolvers
904-
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
905-
ivySettings.addResolver(repoResolver)
906-
ivySettings.setDefaultResolver(repoResolver.getName)
907-
908-
val ivy = Ivy.newInstance(ivySettings)
909-
// Set resolve options to download transitive dependencies as well
910-
val resolveOptions = new ResolveOptions
911-
resolveOptions.setTransitive(true)
912-
val retrieveOptions = new RetrieveOptions
913-
// Turn downloading and logging off for testing
914-
if (isTest) {
915-
resolveOptions.setDownload(false)
916-
resolveOptions.setLog(LogOptions.LOG_QUIET)
917-
retrieveOptions.setLog(LogOptions.LOG_QUIET)
877+
// To prevent ivy from logging to system out
878+
System.setOut(printStream)
879+
val artifacts = extractMavenCoordinates(coordinates)
880+
// Default configuration name for ivy
881+
val ivyConfName = "default"
882+
// set ivy settings for location of cache
883+
val ivySettings: IvySettings = new IvySettings
884+
// Directories for caching downloads through ivy and storing the jars when maven coordinates
885+
// are supplied to spark-submit
886+
val alternateIvyCache = ivyPath.getOrElse("")
887+
val packagesDirectory: File =
888+
if (alternateIvyCache.trim.isEmpty) {
889+
new File(ivySettings.getDefaultIvyUserDir, "jars")
918890
} else {
919-
resolveOptions.setDownload(true)
891+
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
892+
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
893+
new File(alternateIvyCache, "jars")
920894
}
895+
printStream.println(
896+
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
897+
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
898+
// create a pattern matcher
899+
ivySettings.addMatcher(new GlobPatternMatcher)
900+
// create the dependency resolvers
901+
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
902+
ivySettings.addResolver(repoResolver)
903+
ivySettings.setDefaultResolver(repoResolver.getName)
904+
905+
val ivy = Ivy.newInstance(ivySettings)
906+
// Set resolve options to download transitive dependencies as well
907+
val resolveOptions = new ResolveOptions
908+
resolveOptions.setTransitive(true)
909+
val retrieveOptions = new RetrieveOptions
910+
// Turn downloading and logging off for testing
911+
if (isTest) {
912+
resolveOptions.setDownload(false)
913+
resolveOptions.setLog(LogOptions.LOG_QUIET)
914+
retrieveOptions.setLog(LogOptions.LOG_QUIET)
915+
} else {
916+
resolveOptions.setDownload(true)
917+
}
921918

922-
// A Module descriptor must be specified. Entries are dummy strings
923-
val md = getModuleDescriptor
924-
md.setDefaultConf(ivyConfName)
919+
// A Module descriptor must be specified. Entries are dummy strings
920+
val md = getModuleDescriptor
921+
md.setDefaultConf(ivyConfName)
925922

926-
// Add exclusion rules for Spark and Scala Library
927-
addExclusionRules(ivySettings, ivyConfName, md)
928-
// add all supplied maven artifacts as dependencies
929-
addDependenciesToIvy(md, artifacts, ivyConfName)
923+
// Add exclusion rules for Spark and Scala Library
924+
addExclusionRules(ivySettings, ivyConfName, md)
925+
// add all supplied maven artifacts as dependencies
926+
addDependenciesToIvy(md, artifacts, ivyConfName)
930927

931-
// resolve dependencies
932-
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
933-
if (rr.hasError) {
934-
throw new RuntimeException(rr.getAllProblemMessages.toString)
935-
}
936-
// retrieve all resolved dependencies
937-
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
938-
packagesDirectory.getAbsolutePath + File.separator +
939-
"[organization]_[artifact]-[revision].[ext]",
940-
retrieveOptions.setConfs(Array(ivyConfName)))
941-
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
942-
} finally {
943-
System.setOut(sysOut)
928+
// resolve dependencies
929+
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
930+
if (rr.hasError) {
931+
throw new RuntimeException(rr.getAllProblemMessages.toString)
944932
}
933+
// retrieve all resolved dependencies
934+
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
935+
packagesDirectory.getAbsolutePath + File.separator +
936+
"[organization]_[artifact]-[revision].[ext]",
937+
retrieveOptions.setConfs(Array(ivyConfName)))
938+
System.setOut(sysOut)
939+
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
945940
}
946941
}
947942
}

0 commit comments

Comments
 (0)