Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}

/**
* A parent trait for aggregators used in fitting MLlib models. This parent trait implements
* some of the common code shared between concrete instances of aggregators. Subclasses of this
* aggregator need only implement the `add` method.
*
* @tparam Datum The type of the instances added to the aggregator to update the loss and gradient.
* @tparam Agg Specialization of [[DifferentiableLossAggregator]]. Classes that subclass this
* type need to use this parameter to specify the concrete type of the aggregator.
*/
private[ml] trait DifferentiableLossAggregator[
Datum,
Agg <: DifferentiableLossAggregator[Datum, Agg]] extends Serializable {

self: Agg => // enforce classes that extend this to be the same type as `Agg`

protected var weightSum: Double = 0.0
protected var lossSum: Double = 0.0

/** The dimension of the gradient array. */
protected val dim: Int

/** Array of gradient values that are mutated when new instances are added to the aggregator. */
protected lazy val gradientSumArray: Array[Double] = Array.ofDim[Double](dim)

/** Add a single data point to this aggregator. */
def add(instance: Datum): Agg

/** Merge two aggregators. The `this` object will be modified in place and returned. */
def merge(other: Agg): Agg = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"${getClass.getSimpleName}. Expecting $dim but got ${other.dim}.")

if (other.weightSum != 0) {
weightSum += other.weightSum
lossSum += other.lossSum

var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
while (i < dim) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
}
this
}

/** The current weighted averaged gradient. */
def gradient: Vector = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but was $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
BLAS.scal(1.0 / weightSum, result)
result
}

/** Weighted count of instances in this aggregator. */
def weight: Double = weightSum

/** The current loss value of this aggregator. */
def loss: Double = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but was $weightSum.")
lossSum / weightSum
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}

/**
* LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
* as used in linear regression for samples in sparse or dense vector in an online fashion.
*
* Two LeastSquaresAggregator can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* For improving the convergence rate during the optimization process, and also preventing against
* features with very large variances exerting an overly large influence during model training,
* package like R's GLMNET performs the scaling to unit variance and removing the mean to reduce
* the condition number, and then trains the model in scaled space but returns the coefficients in
* the original scale. See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
*
* However, we don't want to apply the `StandardScaler` on the training dataset, and then cache
* the standardized dataset since it will create a lot of overhead. As a result, we perform the
* scaling implicitly when we compute the objective function. The following is the mathematical
* derivation.
*
* Note that we don't deal with intercept by adding bias here, because the intercept
* can be computed using closed form after the coefficients are converged.
* See this discussion for detail.
* http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*
* When training with intercept enabled,
* The objective function in the scaled space is given by
*
* <blockquote>
* $$
* L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
* $$
* </blockquote>
*
* where $\bar{x_i}$ is the mean of $x_i$, $\hat{x_i}$ is the standard deviation of $x_i$,
* $\bar{y}$ is the mean of label, and $\hat{y}$ is the standard deviation of label.
*
* If we fitting the intercept disabled (that is forced through 0.0),
* we can use the same equation except we set $\bar{y}$ and $\bar{x_i}$ to 0 instead
* of the respective means.
*
* This can be rewritten as
*
* <blockquote>
* $$
* \begin{align}
* L &= 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
* + \bar{y} / \hat{y}||^2 \\
* &= 1/2n ||\sum_i w_i^\prime x_i - y / \hat{y} + offset||^2 = 1/2n diff^2
* \end{align}
* $$
* </blockquote>
*
* where $w_i^\prime$ is the effective coefficients defined by $w_i/\hat{x_i}$, offset is
*
* <blockquote>
* $$
* - \sum_i (w_i/\hat{x_i})\bar{x_i} + \bar{y} / \hat{y}.
* $$
* </blockquote>
*
* and diff is
*
* <blockquote>
* $$
* \sum_i w_i^\prime x_i - y / \hat{y} + offset
* $$
* </blockquote>
*
* Note that the effective coefficients and offset don't depend on training dataset,
* so they can be precomputed.
*
* Now, the first derivative of the objective function in scaled space is
*
* <blockquote>
* $$
* \frac{\partial L}{\partial w_i} = diff/N (x_i - \bar{x_i}) / \hat{x_i}
* $$
* </blockquote>
*
* However, $(x_i - \bar{x_i})$ will densify the computation, so it's not
* an ideal formula when the training dataset is sparse format.
*
* This can be addressed by adding the dense $\bar{x_i} / \hat{x_i}$ terms
* in the end by keeping the sum of diff. The first derivative of total
* objective function from all the samples is
*
*
* <blockquote>
* $$
* \begin{align}
* \frac{\partial L}{\partial w_i} &=
* 1/N \sum_j diff_j (x_{ij} - \bar{x_i}) / \hat{x_i} \\
* &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) - diffSum \bar{x_i} / \hat{x_i}) \\
* &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) + correction_i)
* \end{align}
* $$
* </blockquote>
*
* where $correction_i = - diffSum \bar{x_i} / \hat{x_i}$
*
* A simple math can show that diffSum is actually zero, so we don't even
* need to add the correction terms in the end. From the definition of diff,
*
* <blockquote>
* $$
* \begin{align}
* diffSum &= \sum_j (\sum_i w_i(x_{ij} - \bar{x_i})
* / \hat{x_i} - (y_j - \bar{y}) / \hat{y}) \\
* &= N * (\sum_i w_i(\bar{x_i} - \bar{x_i}) / \hat{x_i} - (\bar{y} - \bar{y}) / \hat{y}) \\
* &= 0
* \end{align}
* $$
* </blockquote>
*
* As a result, the first derivative of the total objective function only depends on
* the training dataset, which can be easily computed in distributed fashion, and is
* sparse format friendly.
*
* <blockquote>
* $$
* \frac{\partial L}{\partial w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i})
* $$
* </blockquote>
*
* @note The constructor is curried, since the cost function will repeatedly create new versions
* of this class for different coefficient vectors.
*
* @param labelStd The standard deviation value of the label.
* @param labelMean The mean value of the label.
* @param fitIntercept Whether to fit an intercept term.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param bcFeaturesMean The broadcast mean values of the features.
* @param bcCoefficients The broadcast coefficients corresponding to the features.
*/
private[ml] class LeastSquaresAggregator(
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] {
require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
s"deviation to be positive.")

private val numFeatures = bcFeaturesStd.value.length
protected override val dim: Int = numFeatures
// make transient so we do not serialize between aggregation stages
@transient private lazy val featuresStd = bcFeaturesStd.value
@transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
val featuresMean = bcFeaturesMean.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
while (i < len) {
if (featuresStd(i) != 0.0) {
coefficientsArray(i) /= featuresStd(i)
sum += coefficientsArray(i) * featuresMean(i)
} else {
coefficientsArray(i) = 0.0
}
i += 1
}
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
(Vectors.dense(coefficientsArray), offset)
}
// do not use tuple assignment above because it will circumvent the @transient tag
@transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2

/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This LeastSquaresAggregator object.
*/
def add(instance: Instance): LeastSquaresAggregator = {
instance match { case Instance(label, weight, features) =>
require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

if (weight == 0.0) return this

val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset

if (diff != 0) {
val localGradientSumArray = gradientSumArray
val localFeaturesStd = featuresStd
features.foreachActive { (index, value) =>
val fStd = localFeaturesStd(index)
if (fStd != 0.0 && value != 0.0) {
localGradientSumArray(index) += weight * diff * value / fStd
}
}
lossSum += weight * diff * diff / 2.0
}
weightSum += weight
this
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.optim.loss

import breeze.optimize.DiffFunction

/**
* A Breeze diff function which represents a cost function for differentiable regularization
* of parameters. e.g. L2 regularization: 1 / 2 regParam * beta dot beta
*
* @tparam T The type of the coefficients being regularized.
*/
private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] {

/** Magnitude of the regularization penalty. */
def regParam: Double

}

/**
* A Breeze diff function for computing the L2 regularized loss and gradient of an array of
* coefficients.
*
* @param regParam The magnitude of the regularization.
* @param shouldApply A function (Int => Boolean) indicating whether a given index should have
* regularization applied to it.
* @param featuresStd Option indicating whether the regularization should be scaled by the standard
* deviation of the features.
*/
private[ml] class L2Regularization(
val regParam: Double,
shouldApply: Int => Boolean,
featuresStd: Option[Array[Double]]) extends DifferentiableRegularization[Array[Double]] {

override def calculate(coefficients: Array[Double]): (Double, Array[Double]) = {
var sum = 0.0
val gradient = new Array[Double](coefficients.length)
coefficients.indices.filter(shouldApply).foreach { j =>
val coef = coefficients(j)
featuresStd match {
case Some(stds) =>
val std = stds(j)
if (std != 0.0) {
val temp = coef / (std * std)
sum += coef * temp
gradient(j) = regParam * temp
} else {
0.0
}
case None =>
sum += coef * coef
gradient(j) = coef * regParam
}
}
(0.5 * sum * regParam, gradient)
}
}
Loading