diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 6b1cdd8ad396..f16648d2abee 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -157,7 +157,7 @@ class LinearSVC @Since("2.2.0") ( /** * Set block size for stacking input data in matrices. - * Default is 4096. + * Default is 1024. * * @group expertSetParam */ @@ -240,7 +240,7 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_) + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 50c14d086957..9b5b36257a58 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -50,7 +50,8 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth + with HasBlockSize { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -430,6 +431,15 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) + /** + * Set block size for stacking input data in matrices. + * Default is 1024. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -482,24 +492,17 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => - val instances = extractInstances(dataset) - - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - + override protected[spark] def train( + dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) + val sc = dataset.sparkSession.sparkContext + val instances = extractInstances(dataset) + val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => @@ -582,8 +585,9 @@ class LogisticRegression @Since("1.2.0") ( s"dangerous ground, so the algorithm may not converge.") } - val featuresMean = summarizer.mean.toArray - val featuresStd = summarizer.std.toArray + val featuresMean = summarizer.mean.compressed + val featuresStd = summarizer.std.compressed + val bcFeaturesStd = sc.broadcast(featuresStd) if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { @@ -595,8 +599,7 @@ class LogisticRegression @Since("1.2.0") ( val regParamL1 = $(elasticNetParam) * $(regParam) val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), + val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept), multinomial = isMultinomial)(_) val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { featuresStd(j / numCoefficientSets) @@ -612,7 +615,21 @@ class LogisticRegression @Since("1.2.0") ( None } - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStd = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStd(i) + if (std != 0) array(i) = v / std + } + Instance(label, weight, Vectors.dense(array)) + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets @@ -806,6 +823,7 @@ class LogisticRegression @Since("1.2.0") ( state = states.next() arrayBuilder += state.adjustedValue } + blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { @@ -875,8 +893,6 @@ class LogisticRegression @Since("1.2.0") ( } } - if (handlePersistence) instances.unpersist() - val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, numClasses, isMultinomial)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 25f7c9ddab42..292187b3e146 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -35,8 +35,7 @@ import org.apache.spark.ml.linalg._ */ private[ml] class HingeAggregator( numFeatures: Int, - fitIntercept: Boolean, - blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures @@ -55,20 +54,6 @@ private[ml] class HingeAggregator( } } - @transient private lazy val intercept = - if (fitIntercept) coefficientsArray(numFeatures) else 0.0 - - @transient private lazy val linearGradSumVec = { - if (fitIntercept) { - new DenseVector(Array.ofDim[Double](numFeatures)) - } else { - null - } - } - - @transient private lazy val auxiliaryVec = - new DenseVector(Array.ofDim[Double](blockSize)) - /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -138,19 +123,14 @@ private[ml] class HingeAggregator( val localGradientSumArray = gradientSumArray // vec here represents dotProducts - val vec = if (size == blockSize) { - auxiliaryVec + val vec = if (fitIntercept && coefficientsArray.last != 0) { + val intercept = coefficientsArray.last + new DenseVector(Array.fill(size)(intercept)) } else { - // the last block within one partition may be of size less than blockSize new DenseVector(Array.ofDim[Double](size)) } if (fitIntercept) { - var i = 0 - while (i < size) { - vec.values(i) = intercept - i += 1 - } BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) @@ -185,6 +165,9 @@ private[ml] class HingeAggregator( if (vec.values.forall(_ == 0)) return this if (fitIntercept) { + // localGradientSumArray is of size numFeatures+1, so can not + // be directly used as the output of BLAS.gemv + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } localGradientSumArray(numFeatures) += vec.values.sum diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index f2b3566f8f09..76d21995a2c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.{DenseVector, Vector} +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg._ import org.apache.spark.mllib.util.MLUtils /** @@ -171,7 +171,6 @@ import org.apache.spark.mllib.util.MLUtils * * * @param bcCoefficients The broadcast coefficients corresponding to the features. - * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. @@ -183,13 +182,12 @@ import org.apache.spark.mllib.util.MLUtils * since this form is optimal for the matrix operations used for prediction. */ private[ml] class LogisticAggregator( - bcFeaturesStd: Broadcast[Array[Double]], + numFeatures: Int, numClasses: Int, fitIntercept: Boolean, multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging { + extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { - private val numFeatures = bcFeaturesStd.value.length private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size protected override val dim: Int = coefficientSize @@ -209,6 +207,31 @@ private[ml] class LogisticAggregator( s"got type ${bcCoefficients.value.getClass}.)") } + @transient private lazy val binaryLinear = { + if (!multinomial) { + if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } + } else { + null + } + } + + @transient private lazy val multinomialLinear = { + if (multinomial) { + if (fitIntercept) { + new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures)) + } else { + new DenseMatrix(numClasses, numFeatures, coefficientsArray) + } + } else { + null + } + } + + if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + s"coefficients for positive and negative classes. When no regularization is applied, the" + @@ -219,15 +242,12 @@ private[ml] class LogisticAggregator( /** Update gradient and loss using binary loss function. */ private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray val margin = - { var sum = 0.0 features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } + sum += localCoefficients(index) * value } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -236,9 +256,7 @@ private[ml] class LogisticAggregator( val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientArray(index) += multiplier * value / localFeaturesStd(index) - } + localGradientArray(index) += multiplier * value } if (fitIntercept) { @@ -253,6 +271,61 @@ private[ml] class LogisticAggregator( } } + /** Update gradient and loss using binary loss function. */ + private def binaryUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + val localGradientSumArray = gradientSumArray + + // vec here represents margins or negative dotProducts + val vec = if (fitIntercept && coefficientsArray.last != 0) { + val intercept = coefficientsArray.last + new DenseVector(Array.fill(size)(intercept)) + } else { + new DenseVector(Array.ofDim[Double](size)) + } + + if (fitIntercept) { + BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) + } else { + BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec) + } + + // in-place convert margins to multiplier + // then, vec represents multiplier + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + val margin = vec(i) + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += weight * MLUtils.log1pExp(margin) + } else { + lossSum += weight * (MLUtils.log1pExp(margin) - margin) + } + val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) + vec.values(i) = multiplier + } else { + vec.values(i) = 0.0 + } + i += 1 + } + + if (fitIntercept) { + // localGradientSumArray is of size numFeatures+1, so can not + // be directly used as the output of BLAS.gemv + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + localGradientSumArray(numFeatures) += vec.values.sum + } else { + val gradSumVec = new DenseVector(localGradientSumArray) + BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) + } + } + /** Update gradient and loss using multinomial (softmax) loss function. */ private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations @@ -260,7 +333,6 @@ private[ml] class LogisticAggregator( Note: this can still be used when numClasses = 2 for binary logistic regression without pivoting. */ - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray @@ -270,13 +342,10 @@ private[ml] class LogisticAggregator( val margins = new Array[Double](numClasses) features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - margins(j) += localCoefficients(index * numClasses + j) * stdValue - j += 1 - } + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * value + j += 1 } } var i = 0 @@ -314,13 +383,10 @@ private[ml] class LogisticAggregator( multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) } features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue - j += 1 - } + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += weight * multipliers(j) * value + j += 1 } } if (fitIntercept) { @@ -339,6 +405,112 @@ private[ml] class LogisticAggregator( lossSum += weight * loss } + /** Update gradient and loss using multinomial (softmax) loss function. */ + private def multinomialUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + val localGradientSumArray = gradientSumArray + + // mat here represents margins, shape: S X C + val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) + + if (fitIntercept) { + val intercept = coefficientsArray.takeRight(numClasses) + var i = 0 + while (i < size) { + var j = 0 + while (j < numClasses) { + mat.update(i, j, intercept(j)) + j += 1 + } + i += 1 + } + BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat) + } else { + BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat) + } + + // in-place convert margins to multipliers + // then, mat represents multipliers + var i = 0 + val tmp = Array.ofDim[Double](numClasses) + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + + var maxMargin = Double.NegativeInfinity + var j = 0 + while (j < numClasses) { + tmp(j) = mat(i, j) + maxMargin = math.max(maxMargin, tmp(j)) + j += 1 + } + + // marginOfLabel is margins(label) in the formula + val marginOfLabel = tmp(label.toInt) + + var sum = 0.0 + j = 0 + while (j < numClasses) { + if (maxMargin > 0) tmp(j) -= maxMargin + val exp = math.exp(tmp(j)) + sum += exp + tmp(j) = exp + j += 1 + } + + j = 0 + while (j < numClasses) { + val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0)) + mat.update(i, j, multiplier) + j += 1 + } + + if (maxMargin > 0) { + lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) + } else { + lossSum += weight * (math.log(sum) - marginOfLabel) + } + } else { + var j = 0 + while (j < numClasses) { + mat.update(i, j, 0.0) + j += 1 + } + } + i += 1 + } + + // block.matrix: S X F, unknown type + // mat (multipliers): S X C, dense + // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense + block.matrix match { + case dm: DenseMatrix if !fitIntercept => + // If fitIntercept==false, gradientSumArray += mat.T X matrix + // GEMM requires block.matrix is dense + val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray) + BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) + + case _ => + // Otherwise, use linearGradSumMat (F X C) as a temp matrix: + // linearGradSumMat = matrix.T X mat + val linearGradSumMat = new DenseMatrix(numFeatures, numClasses, + Array.ofDim[Double](numFeatures * numClasses)) + BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat) + linearGradSumMat.foreachActive { (i, j, v) => + if (v != 0) localGradientSumArray(i * numClasses + j) += v + } + + if (fitIntercept) { + val start = numClasses * numFeatures + mat.foreachActive { (i, j, v) => + if (v != 0) localGradientSumArray(start + j) += v + } + } + } + } + /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient * of the objective function. @@ -363,4 +535,28 @@ private[ml] class LogisticAggregator( this } } + + /** + * Add a new training instance block to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The instance block of data point to be added. + * @return This LogisticAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + + if (multinomial) { + multinomialUpdateInPlace(block) + } else { + binaryUpdateInPlace(block) + } + + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index eee75e7f5b72..3d1fab8692af 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,10 +104,10 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation."), + "validation"), ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + "stacked within partitions. If block size is more than remaining data in a partition " + - "then it is adjusted to the size of this data.", Some("4096"), + "then it is adjusted to the size of this data", Some("1024"), isValid = "ParamValidators.gt(0)", isExpertParam = true) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 3d1c55a5eb42..7fe8ccd973a7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -570,29 +570,29 @@ trait HasDistanceMeasure extends Params { trait HasValidationIndicatorCol extends Params { /** - * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.. + * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation. * @group param */ - final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.") + final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation") /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } /** - * Trait for shared param blockSize (default: 4096). This trait may be changed or + * Trait for shared param blockSize (default: 1024). This trait may be changed or * removed between minor versions. */ @DeveloperApi trait HasBlockSize extends Params { /** - * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. * @group expertParam */ - final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0)) - setDefault(blockSize, 4096) + setDefault(blockSize, 1024) /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 21eb17dfaacb..f88f3fce61b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -339,10 +339,8 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6d31e6efc7e1..9e359ba098bf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in LogisticAggregator") { val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2, + val binaryAgg = new LogisticAggregator(1, 2, fitIntercept = true, multinomial = false)(bcCoefficientsBinary) val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { @@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { assert(thrownBinary.getMessage.contains("coefficients only supports dense")) val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) - val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3, + val multinomialAgg = new LogisticAggregator(1, 3, fitIntercept = true, multinomial = true)(bcCoefficientsMulti) val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index e699adcc14c0..83718076dde7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( + )) + instancesConstantFeatureFiltered = standardize(Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - ) + )) } /** Get summary statistics for some data and create a new LogisticAggregator. */ @@ -55,13 +55,27 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { coefficients: Vector, fitIntercept: Boolean, isMultinomial: Boolean): LogisticAggregator = { - val (featuresSummarizer, ySummarizer) = + val (_, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val numFeatures = instances.head.features.size val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + } + + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } } test("aggregator add method input size") { @@ -277,4 +291,24 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeatureBinary.gradient, aggConstantFeatureBinaryFiltered.gradient, 1) } + + test("add instance block") { + val binaryInstances = instances.map { instance => + if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) + } + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + + val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) + binaryInstances.foreach(agg.add) + + val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) + val block = InstanceBlock.fromInstances(binaryInstances) + agg2.add(block) + + assert(agg.loss ~== agg2.loss relTol 1e-8) + assert(agg.gradient ~== agg2.gradient relTol 1e-8) + } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 89d27fbfa316..bb9cd034808f 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -216,7 +216,7 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable >>> model.getThreshold() 0.5 >>> model.getBlockSize() - 4096 + 1024 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -255,19 +255,19 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=4096) + blockSize=1024) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -276,12 +276,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -388,7 +388,7 @@ def intercept(self): class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold): + HasThreshold, HasBlockSize): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -570,6 +570,8 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams, 10 >>> blor.clear(blor.maxIter) >>> blorModel = blor.fit(bdf) + >>> blorModel.getBlockSize() + 1024 >>> blorModel.setFeaturesCol("features") LogisticRegressionModel... >>> blorModel.setProbabilityCol("newProbability") @@ -638,7 +640,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -647,13 +649,14 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") + self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", + blockSize=1024) kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -666,7 +669,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -674,7 +677,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -769,6 +772,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams, JavaMLWritable, JavaMLReadable, HasTrainingSummary): diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 3994625c05f1..fb4d55d57a2d 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -167,7 +167,7 @@ def get$Name(self): None, "TypeConverters.toString"), ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", "4096", "TypeConverters.toInt")] + "adjusted to the size of this data.", "1024", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 41ba7b9dc552..456463580878 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -591,7 +591,7 @@ class HasBlockSize(Params): def __init__(self): super(HasBlockSize, self).__init__() - self._setDefault(blockSize=4096) + self._setDefault(blockSize=1024) def getBlockSize(self): """