Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,11 @@ class LogisticRegression @Since("1.2.0") (
}

/*
The coefficients are laid out in column major order during training. e.g. for
`numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:

Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
intercept_3)

where beta_jk corresponds to the coefficient for class `j` and feature `k`.
The coefficients are laid out in column major order during training. Here we initialize
a column major matrix of initial coefficients.
*/
val initialCoefficientsWithIntercept =
Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
val initialCoefWithInterceptMatrix =
Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept)

val initialModelIsValid = optInitialModel match {
case Some(_initialModel) =>
Expand All @@ -491,18 +486,15 @@ class LogisticRegression @Since("1.2.0") (
}

if (initialModelIsValid) {
val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (row, col, value) =>
// convert matrix to column major for training
val flatIndex = col * numCoefficientSets + row
providedCoef.foreachActive { (classIndex, featureIndex, value) =>
// We need to scale the coefficients since they will be trained in the scaled space
initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
initialCoefWithInterceptMatrix.update(classIndex, featureIndex,
value * featuresStd(featureIndex))
}
if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (index, value) =>
val coefIndex = numCoefficientSets * numFeatures + index
initialCoefWithInterceptArray(coefIndex) = value
optInitialModel.get.interceptVector.foreachActive { (classIndex, value) =>
initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value)
}
}
} else if ($(fitIntercept) && isMultinomial) {
Expand Down Expand Up @@ -532,8 +524,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
rawIntercepts(i) - rawMean
initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
}
} else if ($(fitIntercept)) {
/*
Expand All @@ -549,12 +540,12 @@ class LogisticRegression @Since("1.2.0") (
b = \log{P(1) / P(0)} = \log{count_1 / count_0}
}}}
*/
initialCoefficientsWithIntercept.toArray(numFeatures) = math.log(
histogram(1) / histogram(0))
initialCoefWithInterceptMatrix.update(0, numFeatures,
math.log(histogram(1) / histogram(0)))
}

val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialCoefficientsWithIntercept.asBreeze.toDenseVector)
new BDV[Double](initialCoefWithInterceptMatrix.toArray))

/*
Note that in Logistic Regression, the objective history (loss + regularization)
Expand Down Expand Up @@ -586,15 +577,24 @@ class LogisticRegression @Since("1.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray.clone()
val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
val featureIndex = i % numFeatures
if (featuresStd(featureIndex) != 0.0) {
rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
} else {
0.0
val allCoefficients = state.x.toArray.clone()
val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept,
allCoefficients)
val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures,
new Array[Double](numCoefficientSets * numFeatures), isTransposed = true)
val interceptVec = if ($(fitIntercept) || !isMultinomial) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consistently use interceptVector?

Vectors.zeros(numCoefficientSets)
} else {
Vectors.sparse(numCoefficientSets, Seq())
}
// separate intercepts and coefficients from the combined matrix
allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
val isIntercept = $(fitIntercept) && (featureIndex == numFeatures)
if (!isIntercept && featuresStd(featureIndex) != 0.0) {
denseCoefficientMatrix.update(classIndex, featureIndex,
value / featuresStd(featureIndex))
}
if (isIntercept) interceptVec.toArray(classIndex) = value
}

if ($(regParam) == 0.0 && isMultinomial) {
Expand All @@ -607,17 +607,16 @@ class LogisticRegression @Since("1.2.0") (
Friedman, et al. "Regularization Paths for Generalized Linear Models via
Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf
*/
val coefficientMean = coefficientArray.sum / coefficientArray.length
coefficientArray.indices.foreach { i => coefficientArray(i) -= coefficientMean}
val denseValues = denseCoefficientMatrix.values
val coefficientMean = denseValues.sum / denseValues.length
denseCoefficientMatrix.update(_ - coefficientMean)
}

val denseCoefficientMatrix =
new DenseMatrix(numCoefficientSets, numFeatures, coefficientArray, isTransposed = true)
// TODO: use `denseCoefficientMatrix.compressed` after SPARK-17471
val compressedCoefficientMatrix = if (isMultinomial) {
denseCoefficientMatrix
} else {
val compressedVector = Vectors.dense(coefficientArray).compressed
val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed
compressedVector match {
case dv: DenseVector => denseCoefficientMatrix
case sv: SparseVector =>
Expand All @@ -626,25 +625,13 @@ class LogisticRegression @Since("1.2.0") (
}
}

val interceptsArray: Array[Double] = if ($(fitIntercept)) {
Array.tabulate(numCoefficientSets) { i =>
val coefIndex = numFeatures * numCoefficientSets + i
rawCoefficients(coefIndex)
}
} else {
Array.empty[Double]
}
val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) {
// The intercepts are never regularized, so we always center the mean.
val interceptMean = interceptsArray.sum / numClasses
interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean }
Vectors.dense(interceptsArray)
} else if (interceptsArray.length == 1) {
Vectors.dense(interceptsArray)
} else {
Vectors.sparse(numCoefficientSets, Seq())
// center the intercepts when using multinomial algorithm
if ($(fitIntercept) && isMultinomial) {
val interceptArray = interceptVec.toArray
val interceptMean = interceptArray.sum / interceptArray.length
(0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean }
}
(compressedCoefficientMatrix, interceptVector.compressed, arrayBuilder.result())
(compressedCoefficientMatrix, interceptVec.compressed, arrayBuilder.result())
}
}

Expand Down Expand Up @@ -1424,6 +1411,7 @@ private class LogisticAggregator(
private val numFeatures = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
private val coefficientSize = bcCoefficients.value.size
private val numCoefficientSets = if (multinomial) numClasses else 1
if (multinomial) {
require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " +
s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize")
Expand Down Expand Up @@ -1633,12 +1621,12 @@ private class LogisticAggregator(
lossSum / weightSum
}

def gradient: Vector = {
def gradient: Matrix = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
result
new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray)
}
}

Expand All @@ -1664,6 +1652,7 @@ private class LogisticCostFun(
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1
val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures

val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
Expand All @@ -1675,32 +1664,34 @@ private class LogisticCostFun(
)(seqOp, combOp, aggregationDepth)
}

val totalGradientArray = logisticAggregator.gradient.toArray
val totalGradientMatrix = logisticAggregator.gradient
val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
coeffs.foreachActive { case (index, value) =>
coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
// We do not apply regularization to the intercepts
val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
val isIntercept = fitIntercept && (featureIndex == numFeatures)
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
totalGradientArray(index) += regParamL2 * value
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
value * value
} else {
val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
totalGradientArray(index) += regParamL2 * temp
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
value * temp
} else {
0.0
Expand All @@ -1713,6 +1704,6 @@ private class LogisticCostFun(
}
bcCoeffs.destroy(blocking = false)

(logisticAggregator.loss + regVal, new BDV(totalGradientArray))
(logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
}
}