Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
repositories = Option(repositories)
.orElse(sparkProperties.get("spark.jars.repositories")).orNull
deployMode = Option(deployMode)
.orElse(sparkProperties.get("spark.submit.deployMode"))
.orElse(env.get("DEPLOY_MODE"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
val execId = taskIdToExecutorId(tid)
backend.killTask(tid, execId, interruptThread, reason = "stage cancelled")
taskIdToExecutorId.get(tid).foreach(execId =>
backend.killTask(tid, execId, interruptThread, reason = "Stage cancelled"))
}
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,26 @@ class SparkSubmitSuite
}
}

test("includes jars passed through spark.jars.packages and spark.jars.repositories") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
// Test using "spark.jars.packages" and "spark.jars.repositories" configurations.
IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.jars.packages=my.great.lib:mylib:0.1,my.great.dep:mylib:0.1",
"--conf", s"spark.jars.repositories=$repo",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
unusedJar.toString,
"my.great.lib.MyLib", "my.great.dep.MyLib")
runSparkSubmit(args)
}
}

// TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds.
// See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
ignore("correctly builds R packages included in a jar with --packages") {
Expand Down
13 changes: 11 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,19 @@ Apart from these, the following properties are also available, and may be useful
<td>
Path to an Ivy settings file to customize resolution of jars specified using <code>spark.jars.packages</code>
instead of the built-in defaults, such as maven central. Additional repositories given by the command-line
option <code>--repositories</code> will also be included. Useful for allowing Spark to resolve artifacts from behind
a firewall e.g. via an in-house artifact server like Artifactory. Details on the settings file format can be
option <code>--repositories</code> or <code>spark.jars.repositories</code> will also be included.
Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house
artifact server like Artifactory. Details on the settings file format can be
found at http://ant.apache.org/ivy/history/latest-milestone/settings.html
</td>
</tr>
<tr>
<td><code>spark.jars.repositories</code></td>
<td></td>
<td>
Comma-separated list of additional remote repositories to search for the maven coordinates
given with <code>--packages</code> or <code>spark.jars.packages</code>.
</td>
</tr>
<tr>
<td><code>spark.pyspark.driver.python</code></td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}

/**
* A parent trait for aggregators used in fitting MLlib models. This parent trait implements
* some of the common code shared between concrete instances of aggregators. Subclasses of this
* aggregator need only implement the `add` method.
*
* @tparam Datum The type of the instances added to the aggregator to update the loss and gradient.
* @tparam Agg Specialization of [[DifferentiableLossAggregator]]. Classes that subclass this
* type need to use this parameter to specify the concrete type of the aggregator.
*/
private[ml] trait DifferentiableLossAggregator[
Datum,
Agg <: DifferentiableLossAggregator[Datum, Agg]] extends Serializable {

self: Agg => // enforce classes that extend this to be the same type as `Agg`

protected var weightSum: Double = 0.0
protected var lossSum: Double = 0.0

/** The dimension of the gradient array. */
protected val dim: Int

/** Array of gradient values that are mutated when new instances are added to the aggregator. */
protected lazy val gradientSumArray: Array[Double] = Array.ofDim[Double](dim)

/** Add a single data point to this aggregator. */
def add(instance: Datum): Agg

/** Merge two aggregators. The `this` object will be modified in place and returned. */
def merge(other: Agg): Agg = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"${getClass.getSimpleName}. Expecting $dim but got ${other.dim}.")

if (other.weightSum != 0) {
weightSum += other.weightSum
lossSum += other.lossSum

var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
while (i < dim) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
}
this
}

/** The current weighted averaged gradient. */
def gradient: Vector = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but was $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
BLAS.scal(1.0 / weightSum, result)
result
}

/** Weighted count of instances in this aggregator. */
def weight: Double = weightSum

/** The current loss value of this aggregator. */
def loss: Double = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but was $weightSum.")
lossSum / weightSum
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}

/**
* LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
* as used in linear regression for samples in sparse or dense vector in an online fashion.
*
* Two LeastSquaresAggregator can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* For improving the convergence rate during the optimization process, and also preventing against
* features with very large variances exerting an overly large influence during model training,
* package like R's GLMNET performs the scaling to unit variance and removing the mean to reduce
* the condition number, and then trains the model in scaled space but returns the coefficients in
* the original scale. See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
*
* However, we don't want to apply the `StandardScaler` on the training dataset, and then cache
* the standardized dataset since it will create a lot of overhead. As a result, we perform the
* scaling implicitly when we compute the objective function. The following is the mathematical
* derivation.
*
* Note that we don't deal with intercept by adding bias here, because the intercept
* can be computed using closed form after the coefficients are converged.
* See this discussion for detail.
* http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*
* When training with intercept enabled,
* The objective function in the scaled space is given by
*
* <blockquote>
* $$
* L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
* $$
* </blockquote>
*
* where $\bar{x_i}$ is the mean of $x_i$, $\hat{x_i}$ is the standard deviation of $x_i$,
* $\bar{y}$ is the mean of label, and $\hat{y}$ is the standard deviation of label.
*
* If we fitting the intercept disabled (that is forced through 0.0),
* we can use the same equation except we set $\bar{y}$ and $\bar{x_i}$ to 0 instead
* of the respective means.
*
* This can be rewritten as
*
* <blockquote>
* $$
* \begin{align}
* L &= 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
* + \bar{y} / \hat{y}||^2 \\
* &= 1/2n ||\sum_i w_i^\prime x_i - y / \hat{y} + offset||^2 = 1/2n diff^2
* \end{align}
* $$
* </blockquote>
*
* where $w_i^\prime$ is the effective coefficients defined by $w_i/\hat{x_i}$, offset is
*
* <blockquote>
* $$
* - \sum_i (w_i/\hat{x_i})\bar{x_i} + \bar{y} / \hat{y}.
* $$
* </blockquote>
*
* and diff is
*
* <blockquote>
* $$
* \sum_i w_i^\prime x_i - y / \hat{y} + offset
* $$
* </blockquote>
*
* Note that the effective coefficients and offset don't depend on training dataset,
* so they can be precomputed.
*
* Now, the first derivative of the objective function in scaled space is
*
* <blockquote>
* $$
* \frac{\partial L}{\partial w_i} = diff/N (x_i - \bar{x_i}) / \hat{x_i}
* $$
* </blockquote>
*
* However, $(x_i - \bar{x_i})$ will densify the computation, so it's not
* an ideal formula when the training dataset is sparse format.
*
* This can be addressed by adding the dense $\bar{x_i} / \hat{x_i}$ terms
* in the end by keeping the sum of diff. The first derivative of total
* objective function from all the samples is
*
*
* <blockquote>
* $$
* \begin{align}
* \frac{\partial L}{\partial w_i} &=
* 1/N \sum_j diff_j (x_{ij} - \bar{x_i}) / \hat{x_i} \\
* &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) - diffSum \bar{x_i} / \hat{x_i}) \\
* &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) + correction_i)
* \end{align}
* $$
* </blockquote>
*
* where $correction_i = - diffSum \bar{x_i} / \hat{x_i}$
*
* A simple math can show that diffSum is actually zero, so we don't even
* need to add the correction terms in the end. From the definition of diff,
*
* <blockquote>
* $$
* \begin{align}
* diffSum &= \sum_j (\sum_i w_i(x_{ij} - \bar{x_i})
* / \hat{x_i} - (y_j - \bar{y}) / \hat{y}) \\
* &= N * (\sum_i w_i(\bar{x_i} - \bar{x_i}) / \hat{x_i} - (\bar{y} - \bar{y}) / \hat{y}) \\
* &= 0
* \end{align}
* $$
* </blockquote>
*
* As a result, the first derivative of the total objective function only depends on
* the training dataset, which can be easily computed in distributed fashion, and is
* sparse format friendly.
*
* <blockquote>
* $$
* \frac{\partial L}{\partial w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i})
* $$
* </blockquote>
*
* @note The constructor is curried, since the cost function will repeatedly create new versions
* of this class for different coefficient vectors.
*
* @param labelStd The standard deviation value of the label.
* @param labelMean The mean value of the label.
* @param fitIntercept Whether to fit an intercept term.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param bcFeaturesMean The broadcast mean values of the features.
* @param bcCoefficients The broadcast coefficients corresponding to the features.
*/
private[ml] class LeastSquaresAggregator(
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] {
require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
s"deviation to be positive.")

private val numFeatures = bcFeaturesStd.value.length
protected override val dim: Int = numFeatures
// make transient so we do not serialize between aggregation stages
@transient private lazy val featuresStd = bcFeaturesStd.value
@transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
val featuresMean = bcFeaturesMean.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
while (i < len) {
if (featuresStd(i) != 0.0) {
coefficientsArray(i) /= featuresStd(i)
sum += coefficientsArray(i) * featuresMean(i)
} else {
coefficientsArray(i) = 0.0
}
i += 1
}
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
(Vectors.dense(coefficientsArray), offset)
}
// do not use tuple assignment above because it will circumvent the @transient tag
@transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2

/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This LeastSquaresAggregator object.
*/
def add(instance: Instance): LeastSquaresAggregator = {
instance match { case Instance(label, weight, features) =>
require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

if (weight == 0.0) return this

val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset

if (diff != 0) {
val localGradientSumArray = gradientSumArray
val localFeaturesStd = featuresStd
features.foreachActive { (index, value) =>
val fStd = localFeaturesStd(index)
if (fStd != 0.0 && value != 0.0) {
localGradientSumArray(index) += weight * diff * value / fStd
}
}
lossSum += weight * diff * diff / 2.0
}
weightSum += weight
this
}
}
}
Loading