Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io.File
import java.nio.file.Files

import scala.collection.mutable.HashMap

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.util.MutableURLClassLoader

private[deploy] object DependencyUtils {

def resolveMavenDependencies(
packagesExclusions: String,
packages: String,
repositories: String,
ivyRepoPath: String): String = {
val exclusions: Seq[String] =
if (!StringUtils.isBlank(packagesExclusions)) {
packagesExclusions.split(",")
} else {
Nil
}
// Create the IvySettings, either load from file or build defaults
val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), Option(ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
}

SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
}

def createTempDir(): File = {
val targetDir = Files.createTempDirectory("tmp").toFile
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
FileUtils.deleteQuietly(targetDir)
}
})
// scalastyle:on runtimeaddshutdownhook
targetDir
}

def resolveAndDownloadJars(jars: String, userJar: String): String = {
val targetDir = DependencyUtils.createTempDir()
val hadoopConf = new Configuration()
val sparkProperties = new HashMap[String, String]()
val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore",
"spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
"spark.ssl.fs.protocol", "spark.ssl.protocol")

securityProperties.foreach { pName =>
sys.props.get(pName).foreach { pValue =>
sparkProperties.put(pName, pValue)
}
}

Option(jars)
.map {
SparkSubmit.resolveGlobPaths(_, hadoopConf)
.split(",")
.filterNot(_.contains(userJar.split("/").last))
.mkString(",")
}
.filterNot(_ == "")
.map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf))
.orNull
}

def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
}
}
}
}
52 changes: 15 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy
import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.Files
import java.security.{KeyStore, PrivilegedExceptionAction}
import java.security.cert.X509Certificate
import java.text.ParseException
Expand All @@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties

import com.google.common.io.ByteStreams
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -300,28 +298,13 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER

if (!isMesosCluster) {
if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}

// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}

val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)

val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
Expand All @@ -338,14 +321,7 @@ object SparkSubmit extends CommandLineUtils {
}

val hadoopConf = new HadoopConfiguration()
val targetDir = Files.createTempDirectory("tmp").toFile
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
FileUtils.deleteQuietly(targetDir)
}
})
// scalastyle:on runtimeaddshutdownhook
val targetDir = DependencyUtils.createTempDir()

// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
Expand Down Expand Up @@ -473,11 +449,13 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),

// Mesos only - propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
CLUSTER, sysProp = "spark.jars.excludes"),

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
Expand Down Expand Up @@ -780,7 +758,7 @@ object SparkSubmit extends CommandLineUtils {
}
}

private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
Expand Down Expand Up @@ -845,7 +823,7 @@ object SparkSubmit extends CommandLineUtils {
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
private def mergeFileLists(lists: String*): String = {
private[deploy] def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(_.split(","))
.mkString(",")
Expand Down Expand Up @@ -968,7 +946,7 @@ object SparkSubmit extends CommandLineUtils {
}
}

private def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
require(paths != null, "paths cannot be null.")
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
val uri = Utils.resolveURI(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker

import java.io.File

import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

Expand Down Expand Up @@ -51,6 +54,7 @@ object DriverWrapper {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)
setupDependencies(loader, userJar)

// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
Expand All @@ -66,4 +70,23 @@ object DriverWrapper {
System.exit(-1)
}
}

private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
.map(sys.props.get(_).orNull)

val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
packages, repositories, ivyRepoPath)
val jars = {
val jarsProp = sys.props.get("spark.jars").orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
} else {
jarsProp
}
}
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
DependencyUtils.addJarsToClassPath(localJars, loader)
}
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/PagedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ private[ui] trait PagedTable[T] {
val _dataSource = dataSource
try {
val PageData(totalPages, data) = _dataSource.pageData(page)
val pageNavi = pageNavigation(page, _dataSource.pageSize, totalPages)
<div>
{pageNavigation(page, _dataSource.pageSize, totalPages)}
{pageNavi}
<table class={tableCssClass} id={tableId}>
{headers}
<tbody>
{data.map(row)}
</tbody>
</table>
{pageNavi}
</div>
} catch {
case e: IndexOutOfBoundsException =>
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ private[ui] class TaskTableRowData(
val speculative: Boolean,
val status: String,
val taskLocality: String,
val executorIdAndHost: String,
val executorId: String,
val host: String,
val launchTime: Long,
val duration: Long,
val formatDuration: String,
Expand Down Expand Up @@ -1017,7 +1018,8 @@ private[ui] class TaskDataSource(
info.speculative,
info.status,
info.taskLocality.toString,
s"${info.executorId} / ${info.host}",
info.executorId,
info.host,
info.launchTime,
duration,
formatDuration,
Expand Down Expand Up @@ -1047,7 +1049,8 @@ private[ui] class TaskDataSource(
case "Attempt" => Ordering.by(_.attempt)
case "Status" => Ordering.by(_.status)
case "Locality Level" => Ordering.by(_.taskLocality)
case "Executor ID / Host" => Ordering.by(_.executorIdAndHost)
case "Executor ID" => Ordering.by(_.executorId)
case "Host" => Ordering.by(_.host)
case "Launch Time" => Ordering.by(_.launchTime)
case "Duration" => Ordering.by(_.duration)
case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
Expand Down Expand Up @@ -1200,7 +1203,7 @@ private[ui] class TaskPagedTable(
val taskHeadersAndCssClasses: Seq[(String, String)] =
Seq(
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Executor ID", ""), ("Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", ""),
Expand Down Expand Up @@ -1271,8 +1274,9 @@ private[ui] class TaskPagedTable(
<td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
<td>{task.status}</td>
<td>{task.taskLocality}</td>
<td>{task.executorId}</td>
<td>
<div style="float: left">{task.executorIdAndHost}</div>
<div style="float: left">{task.host}</div>
<div style="float: right">
{
task.logs.map {
Expand Down
2 changes: 1 addition & 1 deletion dev/appveyor-install-dependencies.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Function InstallR {
}

$urlPath = ""
$latestVer = $(ConvertFrom-JSON $(Invoke-WebRequest http://rversions.r-pkg.org/r-release-win).Content).version
$latestVer = $(ConvertFrom-JSON $(Invoke-WebRequest https://rversions.r-pkg.org/r-release-win).Content).version
If ($rVer -ne $latestVer) {
$urlPath = ("old/" + $rVer + "/")
}
Expand Down
4 changes: 2 additions & 2 deletions dev/check-license
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

acquire_rat_jar () {

URL="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
URL="https://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"

JAR="$rat_jar"

Expand Down Expand Up @@ -58,7 +58,7 @@ else
declare java_cmd=java
fi

export RAT_VERSION=0.11
export RAT_VERSION=0.12
export rat_jar="$FWDIR"/lib/apache-rat-${RAT_VERSION}.jar
mkdir -p "$FWDIR"/lib

Expand Down
2 changes: 1 addition & 1 deletion dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ $JAVA_CMD \
-cp "$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" \
org.apache.spark.tools.GenerateMIMAIgnore

echo -e "q\n" | build/sbt -DcopyDependencies=false "$@" mimaReportBinaryIssues | grep -v -e "info.*Resolving"
echo -e "q\n" | build/sbt -mem 4096 -DcopyDependencies=false "$@" mimaReportBinaryIssues | grep -v -e "info.*Resolving"
ret_val=$?

if [ $ret_val != 0 ]; then
Expand Down
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,13 @@ the following case-insensitive options:
</td>
</tr>

<tr>
<td><code>sessionInitStatement</code></td>
<td>
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
sbt.version=0.13.13
sbt.version=0.13.16
Loading