diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 55ac2c410953..cdaab599e2a0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -502,7 +502,6 @@ private[serializer] object KryoSerializer { "org.apache.spark.ml.attribute.NumericAttribute", "org.apache.spark.ml.feature.Instance", - "org.apache.spark.ml.feature.InstanceBlock", "org.apache.spark.ml.feature.LabeledPoint", "org.apache.spark.ml.feature.OffsetInstance", "org.apache.spark.ml.linalg.DenseMatrix", diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 00e5b61dbdc1..e054a15fc9b7 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable { val xTemp = xValues(k) * alpha while (i < indEnd) { + val rowIndex = Arows(i) yValues(Arows(i)) += Avals(i) * xTemp i += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index f16648d2abee..905789090d62 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold with HasBlockSize { + with HasAggregationDepth with HasThreshold { /** * Param for threshold in binary classification prediction. @@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) - /** - * Set block size for stacking input data in matrices. - * Default is 1024. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + + val instances = extractInstances(dataset) + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) - - val sc = dataset.sparkSession.sparkContext - val instances = extractInstances(dataset) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( @@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") ( throw new SparkException(msg) } - val featuresStd = summarizer.std.compressed - val bcFeaturesStd = sc.broadcast(featuresStd) + val featuresStd = summarizer.std.toArray + val getFeaturesStd = (j: Int) => featuresStd(j) val regParamL2 = $(regParam) + val bcFeaturesStd = instances.context.broadcast(featuresStd) val regularization = if (regParamL2 != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(featuresStd.apply))) + if ($(standardization)) None else Some(getFeaturesStd))) } else { None } - val standardized = instances.map { - case Instance(label, weight, features) => - val featuresStd = bcFeaturesStd.value - val array = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = featuresStd(i) - if (std != 0) array(i) = v / std - } - Instance(label, weight, Vectors.dense(array)) - } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) - .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training dataset (blockSize=${$(blockSize)})") - - val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) - val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) def regParamL1Fun = (index: Int) => 0D @@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") ( scaledObjectiveHistory += state.adjustedValue } - blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } + if (handlePersistence) instances.unpersist() + copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 9b5b36257a58..50c14d086957 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth - with HasBlockSize { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) - /** - * Set block size for stacking input data in matrices. - * Default is 1024. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train( - dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr => + override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + train(dataset, handlePersistence) + } + + protected[spark] def train( + dataset: Dataset[_], + handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => + val instances = extractInstances(dataset) + + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) - val sc = dataset.sparkSession.sparkContext - val instances = extractInstances(dataset) - val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => @@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") ( s"dangerous ground, so the algorithm may not converge.") } - val featuresMean = summarizer.mean.compressed - val featuresStd = summarizer.std.compressed - val bcFeaturesStd = sc.broadcast(featuresStd) + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.std.toArray if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { @@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") ( val regParamL1 = $(elasticNetParam) * $(regParam) val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept), + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), multinomial = isMultinomial)(_) val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { featuresStd(j / numCoefficientSets) @@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") ( None } - val standardized = instances.map { - case Instance(label, weight, features) => - val featuresStd = bcFeaturesStd.value - val array = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = featuresStd(i) - if (std != 0) array(i) = v / std - } - Instance(label, weight, Vectors.dense(array)) - } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) - .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training dataset (blockSize=${$(blockSize)})") - - val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets @@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") ( state = states.next() arrayBuilder += state.adjustedValue } - blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { @@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") ( } } + if (handlePersistence) instances.unpersist() + val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, numClasses, isMultinomial)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 6e8f92b9b1e6..c7a8237849b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion /** Params for Multilayer Perceptron. */ private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams - with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize { + with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver { import MultilayerPerceptronClassifier._ @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl @Since("1.5.0") final def getLayers: Array[Int] = $(layers) + /** + * Block size for stacking input data in matrices to speed up the computation. + * Data is stacked within partitions. If block size is more than remaining data in + * a partition then it is adjusted to the size of this data. + * Recommended size is between 10 and 1000. + * Default: 128 + * + * @group expertParam + */ + @Since("1.5.0") + final val blockSize: IntParam = new IntParam(this, "blockSize", + "Block size for stacking input data in matrices. Data is stacked within partitions." + + " If block size is more than remaining data in a partition then " + + "it is adjusted to the size of this data. Recommended size is between 10 and 1000", + ParamValidators.gt(0)) + + /** @group expertGetParam */ + @Since("1.5.0") + final def getBlockSize: Int = $(blockSize) + /** * The solver algorithm for optimization. * Supported options: "gd" (minibatch gradient descent) or "l-bfgs". diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 5476a86eb9d7..11d0c4689cbb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,10 +17,7 @@ package org.apache.spark.ml.feature -import scala.collection.mutable - -import org.apache.spark.ml.linalg._ -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.linalg.Vector /** * Class that represents an instance of weighted data point with label and features. @@ -31,131 +28,6 @@ import org.apache.spark.rdd.RDD */ private[spark] case class Instance(label: Double, weight: Double, features: Vector) - -/** - * Class that represents an block of instance. - * If all weights are 1, then an empty array is stored. - */ -private[spark] case class InstanceBlock( - labels: Array[Double], - weights: Array[Double], - matrix: Matrix) { - require(labels.length == matrix.numRows) - require(matrix.isTransposed) - if (weights.nonEmpty) { - require(labels.length == weights.length) - } - - def size: Int = labels.length - - def numFeatures: Int = matrix.numCols - - def instanceIterator: Iterator[Instance] = { - if (weights.nonEmpty) { - labels.iterator.zip(weights.iterator).zip(matrix.rowIter) - .map { case ((label, weight), vec) => Instance(label, weight, vec) } - } else { - labels.iterator.zip(matrix.rowIter) - .map { case (label, vec) => Instance(label, 1.0, vec) } - } - } - - def getLabel(i: Int): Double = labels(i) - - def labelIter: Iterator[Double] = labels.iterator - - @transient lazy val getWeight: Int => Double = { - if (weights.nonEmpty) { - (i: Int) => weights(i) - } else { - (i: Int) => 1.0 - } - } - - def weightIter: Iterator[Double] = { - if (weights.nonEmpty) { - weights.iterator - } else { - Iterator.fill(size)(1.0) - } - } - - // directly get the non-zero iterator of i-th row vector without array copy or slice - @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { - matrix match { - case dm: DenseMatrix => - (i: Int) => - val start = numFeatures * i - Iterator.tabulate(numFeatures)(j => - (j, dm.values(start + j)) - ).filter(_._2 != 0) - case sm: SparseMatrix => - (i: Int) => - val start = sm.colPtrs(i) - val end = sm.colPtrs(i + 1) - Iterator.tabulate(end - start)(j => - (sm.rowIndices(start + j), sm.values(start + j)) - ).filter(_._2 != 0) - } - } -} - -private[spark] object InstanceBlock { - - def fromInstances(instances: Seq[Instance]): InstanceBlock = { - val labels = instances.map(_.label).toArray - val weights = if (instances.exists(_.weight != 1)) { - instances.map(_.weight).toArray - } else { - Array.emptyDoubleArray - } - val numRows = instances.length - val numCols = instances.head.features.size - val denseSize = Matrices.getDenseSize(numCols, numRows) - val nnz = instances.iterator.map(_.features.numNonzeros).sum - val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) - val matrix = if (denseSize < sparseSize) { - val values = Array.ofDim[Double](numRows * numCols) - var offset = 0 - var j = 0 - while (j < numRows) { - instances(j).features.foreachNonZero { (i, v) => - values(offset + i) = v - } - offset += numCols - j += 1 - } - new DenseMatrix(numRows, numCols, values, true) - } else { - val colIndices = mutable.ArrayBuilder.make[Int] - val values = mutable.ArrayBuilder.make[Double] - val rowPtrs = mutable.ArrayBuilder.make[Int] - var rowPtr = 0 - rowPtrs += 0 - var j = 0 - while (j < numRows) { - var nnz = 0 - instances(j).features.foreachNonZero { (i, v) => - colIndices += i - values += v - nnz += 1 - } - rowPtr += nnz - rowPtrs += rowPtr - j += 1 - } - new SparseMatrix(numRows, numCols, rowPtrs.result(), - colIndices.result(), values.result(), true) - } - InstanceBlock(labels, weights, matrix) - } - - def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { - instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) - } -} - - /** * Case class that represents an instance of data point with * label, weight, offset and features. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 292187b3e146..b0906f1b0651 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ /** @@ -32,28 +32,21 @@ import org.apache.spark.ml.linalg._ * * @param bcCoefficients The coefficients corresponding to the features. * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The standard deviation values of the features. */ private[ml] class HingeAggregator( - numFeatures: Int, + bcFeaturesStd: Broadcast[Array[Double]], fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { + extends DifferentiableLossAggregator[Instance, HingeAggregator] { + private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - protected override val dim: Int = numFeaturesPlusIntercept @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } - - @transient private lazy val linear = { - if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } - } - + protected override val dim: Int = numFeaturesPlusIntercept /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -69,13 +62,16 @@ private[ml] class HingeAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientSumArray = gradientSumArray val dotProduct = { var sum = 0.0 features.foreachNonZero { (index, value) => - sum += localCoefficients(index) * value + if (localFeaturesStd(index) != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -92,7 +88,9 @@ private[ml] class HingeAggregator( if (1.0 > labelScaled * dotProduct) { val gradientScale = -labelScaled * weight features.foreachNonZero { (index, value) => - localGradientSumArray(index) += value * gradientScale + if (localFeaturesStd(index) != 0.0) { + localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) + } } if (fitIntercept) { localGradientSumArray(localGradientSumArray.length - 1) += gradientScale @@ -104,78 +102,4 @@ private[ml] class HingeAggregator( this } } - - /** - * Add a new training instance block to this HingeAggregator, and update the loss and gradient - * of the objective function. - * - * @param block The InstanceBlock to be added. - * @return This HingeAggregator object. - */ - def add(block: InstanceBlock): this.type = { - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - val size = block.size - val localGradientSumArray = gradientSumArray - - // vec here represents dotProducts - val vec = if (fitIntercept && coefficientsArray.last != 0) { - val intercept = coefficientsArray.last - new DenseVector(Array.fill(size)(intercept)) - } else { - new DenseVector(Array.ofDim[Double](size)) - } - - if (fitIntercept) { - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) - } else { - BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) - } - - // in-place convert dotProducts to gradient scales - // then, vec represents gradient scales - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x - val label = block.getLabel(i) - val labelScaled = 2 * label - 1.0 - val loss = (1.0 - labelScaled * vec(i)) * weight - if (loss > 0) { - lossSum += loss - val gradScale = -labelScaled * weight - vec.values(i) = gradScale - } else { - vec.values(i) = 0.0 - } - } else { - vec.values(i) = 0.0 - } - i += 1 - } - - // predictions are all correct, no gradient signal - if (vec.values.forall(_ == 0)) return this - - if (fitIntercept) { - // localGradientSumArray is of size numFeatures+1, so can not - // be directly used as the output of BLAS.gemv - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) - linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += vec.values.sum - } else { - val gradSumVec = new DenseVector(localGradientSumArray) - BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) - } - - this - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala index f83621506500..8a1a41b2950c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala @@ -17,8 +17,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.Vector /** * HuberAggregator computes the gradient and loss for a huber loss function, @@ -62,17 +62,19 @@ import org.apache.spark.ml.linalg._ * * @param fitIntercept Whether to fit an intercept term. * @param epsilon The shape parameter to control the amount of robustness. + * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param bcParameters including three parts: the regression coefficients corresponding * to the features, the intercept (if fitIntercept is ture) * and the scale parameter (sigma). */ private[ml] class HuberAggregator( - numFeatures: Int, fitIntercept: Boolean, - epsilon: Double)(bcParameters: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, HuberAggregator] { + epsilon: Double, + bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, HuberAggregator] { protected override val dim: Int = bcParameters.value.size + private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1 private val sigma: Double = bcParameters.value(dim - 1) private val intercept: Double = if (fitIntercept) { bcParameters.value(dim - 2) @@ -80,8 +82,7 @@ private[ml] class HuberAggregator( 0.0 } // make transient so we do not serialize between aggregation stages - @transient private lazy val linear = - new DenseVector(bcParameters.value.toArray.take(numFeatures)) + @transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures) /** * Add a new training instance to this HuberAggregator, and update the loss and gradient @@ -97,13 +98,16 @@ private[ml] class HuberAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localCoefficients = linear.values + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficients val localGradientSumArray = gradientSumArray val margin = { var sum = 0.0 features.foreachNonZero { (index, value) => - sum += localCoefficients(index) * value + if (localFeaturesStd(index) != 0.0) { + sum += localCoefficients(index) * (value / localFeaturesStd(index)) + } } if (fitIntercept) sum += intercept sum @@ -115,7 +119,10 @@ private[ml] class HuberAggregator( val linearLossDivSigma = linearLoss / sigma features.foreachNonZero { (index, value) => - localGradientSumArray(index) -= weight * linearLossDivSigma * value + if (localFeaturesStd(index) != 0.0) { + localGradientSumArray(index) += + -1.0 * weight * linearLossDivSigma * (value / localFeaturesStd(index)) + } } if (fitIntercept) { localGradientSumArray(dim - 2) += -1.0 * weight * linearLossDivSigma @@ -127,7 +134,10 @@ private[ml] class HuberAggregator( (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon) features.foreachNonZero { (index, value) => - localGradientSumArray(index) += weight * sign * epsilon * value + if (localFeaturesStd(index) != 0.0) { + localGradientSumArray(index) += + weight * sign * epsilon * (value / localFeaturesStd(index)) + } } if (fitIntercept) { localGradientSumArray(dim - 2) += weight * sign * epsilon @@ -139,75 +149,4 @@ private[ml] class HuberAggregator( this } } - - /** - * Add a new training instance block to this HuberAggregator, and update the loss and gradient - * of the objective function. - * - * @param block The instance block of data point to be added. - * @return This HuberAggregator object. - */ - def add(block: InstanceBlock): HuberAggregator = { - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - val size = block.size - val localGradientSumArray = gradientSumArray - - // vec here represents margins or dotProducts - val vec = if (fitIntercept && intercept != 0) { - new DenseVector(Array.fill(size)(intercept)) - } else { - new DenseVector(Array.ofDim[Double](size)) - } - - if (fitIntercept) { - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) - } else { - BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) - } - - // in-place convert margins to multipliers - // then, vec represents multipliers - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - val label = block.getLabel(i) - val margin = vec(i) - val linearLoss = label - margin - - if (math.abs(linearLoss) <= sigma * epsilon) { - lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma) - val linearLossDivSigma = linearLoss / sigma - val multiplier = -1.0 * weight * linearLossDivSigma - vec.values(i) = multiplier - localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0)) - } else { - lossSum += 0.5 * weight * - (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon) - val sign = if (linearLoss >= 0) -1.0 else 1.0 - val multiplier = weight * sign * epsilon - vec.values(i) = multiplier - localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - epsilon * epsilon) - } - } else { - vec.values(i) = 0.0 - } - i += 1 - } - - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) - linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - if (fitIntercept) { - localGradientSumArray(dim - 2) += vec.values.sum - } - - this - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala index a8bda9ca5d24..7a5806dc24ae 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala @@ -17,8 +17,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} /** * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function, @@ -157,25 +157,26 @@ private[ml] class LeastSquaresAggregator( labelStd: Double, labelMean: Double, fitIntercept: Boolean, - bcFeaturesStd: Broadcast[Vector], - bcFeaturesMean: Broadcast[Vector])(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, LeastSquaresAggregator] { + bcFeaturesStd: Broadcast[Array[Double]], + bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] { require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " + s"deviation to be positive.") - private val numFeatures = bcFeaturesStd.value.size + private val numFeatures = bcFeaturesStd.value.length protected override val dim: Int = numFeatures // make transient so we do not serialize between aggregation stages + @transient private lazy val featuresStd = bcFeaturesStd.value @transient private lazy val effectiveCoefAndOffset = { val coefficientsArray = bcCoefficients.value.toArray.clone() val featuresMean = bcFeaturesMean.value - val featuresStd = bcFeaturesStd.value var sum = 0.0 var i = 0 val len = coefficientsArray.length while (i < len) { if (featuresStd(i) != 0.0) { - sum += coefficientsArray(i) / featuresStd(i) * featuresMean(i) + coefficientsArray(i) /= featuresStd(i) + sum += coefficientsArray(i) * featuresMean(i) } else { coefficientsArray(i) = 0.0 } @@ -185,7 +186,7 @@ private[ml] class LeastSquaresAggregator( (Vectors.dense(coefficientsArray), offset) } // do not use tuple assignment above because it will circumvent the @transient tag - @transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1 + @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1 @transient private lazy val offset = effectiveCoefAndOffset._2 /** @@ -203,20 +204,16 @@ private[ml] class LeastSquaresAggregator( if (weight == 0.0) return this - val localEffectiveCoefficientsVec = effectiveCoefficientsVec - - val diff = { - var dot = 0.0 - features.foreachNonZero { (index, value) => - dot += localEffectiveCoefficientsVec(index) * value - } - dot - label / labelStd + offset - } + val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset if (diff != 0) { val localGradientSumArray = gradientSumArray + val localFeaturesStd = featuresStd features.foreachNonZero { (index, value) => - localGradientSumArray(index) += weight * diff * value + val fStd = localFeaturesStd(index) + if (fStd != 0.0) { + localGradientSumArray(index) += weight * diff * value / fStd + } } lossSum += weight * diff * diff / 2.0 } @@ -224,43 +221,4 @@ private[ml] class LeastSquaresAggregator( this } } - - /** - * Add a new training instance block to this LeastSquaresAggregator, and update the loss - * and gradient of the objective function. - * - * @param block The instance block of data point to be added. - * @return This LeastSquaresAggregator object. - */ - def add(block: InstanceBlock): LeastSquaresAggregator = { - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - val size = block.size - - // vec here represents diffs - val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd)) - BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec) - - // in-place convert diffs to multipliers - // then, vec represents multipliers - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - val diff = vec(i) - lossSum += weight * diff * diff / 2 - weightSum += weight - val multiplier = weight * diff - vec.values(i) = multiplier - i += 1 - } - - val gradSumVec = new DenseVector(gradientSumArray) - BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) - - this - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 76d21995a2c5..f2b3566f8f09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{DenseVector, Vector} import org.apache.spark.mllib.util.MLUtils /** @@ -171,6 +171,7 @@ import org.apache.spark.mllib.util.MLUtils * * * @param bcCoefficients The broadcast coefficients corresponding to the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. @@ -182,12 +183,13 @@ import org.apache.spark.mllib.util.MLUtils * since this form is optimal for the matrix operations used for prediction. */ private[ml] class LogisticAggregator( - numFeatures: Int, + bcFeaturesStd: Broadcast[Array[Double]], numClasses: Int, fitIntercept: Boolean, multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { + extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging { + private val numFeatures = bcFeaturesStd.value.length private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size protected override val dim: Int = coefficientSize @@ -207,31 +209,6 @@ private[ml] class LogisticAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - @transient private lazy val binaryLinear = { - if (!multinomial) { - if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } - } else { - null - } - } - - @transient private lazy val multinomialLinear = { - if (multinomial) { - if (fitIntercept) { - new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures)) - } else { - new DenseMatrix(numClasses, numFeatures, coefficientsArray) - } - } else { - null - } - } - - if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + s"coefficients for positive and negative classes. When no regularization is applied, the" + @@ -242,12 +219,15 @@ private[ml] class LogisticAggregator( /** Update gradient and loss using binary loss function. */ private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray val margin = - { var sum = 0.0 features.foreachNonZero { (index, value) => - sum += localCoefficients(index) * value + if (localFeaturesStd(index) != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -256,7 +236,9 @@ private[ml] class LogisticAggregator( val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) features.foreachNonZero { (index, value) => - localGradientArray(index) += multiplier * value + if (localFeaturesStd(index) != 0.0) { + localGradientArray(index) += multiplier * value / localFeaturesStd(index) + } } if (fitIntercept) { @@ -271,61 +253,6 @@ private[ml] class LogisticAggregator( } } - /** Update gradient and loss using binary loss function. */ - private def binaryUpdateInPlace(block: InstanceBlock): Unit = { - val size = block.size - val localGradientSumArray = gradientSumArray - - // vec here represents margins or negative dotProducts - val vec = if (fitIntercept && coefficientsArray.last != 0) { - val intercept = coefficientsArray.last - new DenseVector(Array.fill(size)(intercept)) - } else { - new DenseVector(Array.ofDim[Double](size)) - } - - if (fitIntercept) { - BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) - } else { - BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec) - } - - // in-place convert margins to multiplier - // then, vec represents multiplier - var i = 0 - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - val label = block.getLabel(i) - val margin = vec(i) - if (label > 0) { - // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * MLUtils.log1pExp(margin) - } else { - lossSum += weight * (MLUtils.log1pExp(margin) - margin) - } - val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) - vec.values(i) = multiplier - } else { - vec.values(i) = 0.0 - } - i += 1 - } - - if (fitIntercept) { - // localGradientSumArray is of size numFeatures+1, so can not - // be directly used as the output of BLAS.gemv - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) - linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += vec.values.sum - } else { - val gradSumVec = new DenseVector(localGradientSumArray) - BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) - } - } - /** Update gradient and loss using multinomial (softmax) loss function. */ private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations @@ -333,6 +260,7 @@ private[ml] class LogisticAggregator( Note: this can still be used when numClasses = 2 for binary logistic regression without pivoting. */ + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray @@ -342,10 +270,13 @@ private[ml] class LogisticAggregator( val margins = new Array[Double](numClasses) features.foreachNonZero { (index, value) => - var j = 0 - while (j < numClasses) { - margins(j) += localCoefficients(index * numClasses + j) * value - j += 1 + if (localFeaturesStd(index) != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * stdValue + j += 1 + } } } var i = 0 @@ -383,10 +314,13 @@ private[ml] class LogisticAggregator( multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) } features.foreachNonZero { (index, value) => - var j = 0 - while (j < numClasses) { - localGradientArray(index * numClasses + j) += weight * multipliers(j) * value - j += 1 + if (localFeaturesStd(index) != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue + j += 1 + } } } if (fitIntercept) { @@ -405,112 +339,6 @@ private[ml] class LogisticAggregator( lossSum += weight * loss } - /** Update gradient and loss using multinomial (softmax) loss function. */ - private def multinomialUpdateInPlace(block: InstanceBlock): Unit = { - val size = block.size - val localGradientSumArray = gradientSumArray - - // mat here represents margins, shape: S X C - val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) - - if (fitIntercept) { - val intercept = coefficientsArray.takeRight(numClasses) - var i = 0 - while (i < size) { - var j = 0 - while (j < numClasses) { - mat.update(i, j, intercept(j)) - j += 1 - } - i += 1 - } - BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat) - } else { - BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat) - } - - // in-place convert margins to multipliers - // then, mat represents multipliers - var i = 0 - val tmp = Array.ofDim[Double](numClasses) - while (i < size) { - val weight = block.getWeight(i) - if (weight > 0) { - weightSum += weight - val label = block.getLabel(i) - - var maxMargin = Double.NegativeInfinity - var j = 0 - while (j < numClasses) { - tmp(j) = mat(i, j) - maxMargin = math.max(maxMargin, tmp(j)) - j += 1 - } - - // marginOfLabel is margins(label) in the formula - val marginOfLabel = tmp(label.toInt) - - var sum = 0.0 - j = 0 - while (j < numClasses) { - if (maxMargin > 0) tmp(j) -= maxMargin - val exp = math.exp(tmp(j)) - sum += exp - tmp(j) = exp - j += 1 - } - - j = 0 - while (j < numClasses) { - val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0)) - mat.update(i, j, multiplier) - j += 1 - } - - if (maxMargin > 0) { - lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) - } else { - lossSum += weight * (math.log(sum) - marginOfLabel) - } - } else { - var j = 0 - while (j < numClasses) { - mat.update(i, j, 0.0) - j += 1 - } - } - i += 1 - } - - // block.matrix: S X F, unknown type - // mat (multipliers): S X C, dense - // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense - block.matrix match { - case dm: DenseMatrix if !fitIntercept => - // If fitIntercept==false, gradientSumArray += mat.T X matrix - // GEMM requires block.matrix is dense - val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray) - BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) - - case _ => - // Otherwise, use linearGradSumMat (F X C) as a temp matrix: - // linearGradSumMat = matrix.T X mat - val linearGradSumMat = new DenseMatrix(numFeatures, numClasses, - Array.ofDim[Double](numFeatures * numClasses)) - BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat) - linearGradSumMat.foreachActive { (i, j, v) => - if (v != 0) localGradientSumArray(i * numClasses + j) += v - } - - if (fitIntercept) { - val start = numClasses * numFeatures - mat.foreachActive { (i, j, v) => - if (v != 0) localGradientSumArray(start + j) += v - } - } - } - } - /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient * of the objective function. @@ -535,28 +363,4 @@ private[ml] class LogisticAggregator( this } } - - /** - * Add a new training instance block to this LogisticAggregator, and update the loss and gradient - * of the objective function. - * - * @param block The instance block of data point to be added. - * @return This LogisticAggregator object. - */ - def add(block: InstanceBlock): this.type = { - require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${block.numFeatures}.") - require(block.weightIter.forall(_ >= 0), - s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (block.weightIter.forall(_ == 0)) return this - - if (multinomial) { - multinomialUpdateInPlace(block) - } else { - binaryUpdateInPlace(block) - } - - this - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 3d1fab8692af..7ac680ec1183 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,11 +104,7 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation"), - ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + - "stacked within partitions. If block size is more than remaining data in a partition " + - "then it is adjusted to the size of this data", Some("1024"), - isValid = "ParamValidators.gt(0)", isExpertParam = true) + "validation.") ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 7fe8ccd973a7..44c993eeafdd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -570,31 +570,12 @@ trait HasDistanceMeasure extends Params { trait HasValidationIndicatorCol extends Params { /** - * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation. + * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.. * @group param */ - final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation") + final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.") /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } - -/** - * Trait for shared param blockSize (default: 1024). This trait may be changed or - * removed between minor versions. - */ -@DeveloperApi -trait HasBlockSize extends Params { - - /** - * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. - * @group expertParam - */ - final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0)) - - setDefault(blockSize, 1024) - - /** @group expertGetParam */ - final def getBlockSize: Int = $(blockSize) -} // scalastyle:on diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 002146f89e79..2fb9a276be88 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom /** * Common params for ALS and ALSModel. */ -private[recommendation] trait ALSModelParams extends Params with HasPredictionCol - with HasBlockSize { +private[recommendation] trait ALSModelParams extends Params with HasPredictionCol { /** * Param for the column name for user ids. Ids must be integers. Other * numeric types are supported for this column, but will be cast to integers as long as they @@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** @group expertGetParam */ def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT) - - setDefault(blockSize -> 4096) } /** @@ -291,15 +288,6 @@ class ALSModel private[ml] ( @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) - /** - * Set block size for stacking input data in matrices. - * Default is 4096. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) => if (featuresA != null && featuresB != null) { var dotProduct = 0.0f @@ -363,7 +351,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) } /** @@ -378,7 +366,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) - recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) } /** @@ -389,7 +377,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) } /** @@ -404,7 +392,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) - recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) } /** @@ -453,12 +441,11 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int, - blockSize: Int): DataFrame = { + num: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -496,10 +483,11 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join + * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], - blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = { + blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { import factors.sparkSession.implicits._ factors.mapPartitions(_.grouped(blockSize)) } @@ -666,15 +654,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) - /** - * Set block size for stacking input data in matrices. - * Default is 4096. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - /** * Sets both numUserBlocks and numItemBlocks to the specific value. * @@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] instr.logDataset(dataset) instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, - seed, intermediateStorageLevel, finalStorageLevel, blockSize) + seed, intermediateStorageLevel, finalStorageLevel) val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), @@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] checkpointInterval = $(checkpointInterval), seed = $(seed)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") - val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize)) - .setParent(this) + val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) copyValues(model) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index fc59da8a9c16..64e5e191ffd1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -28,7 +28,7 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.{PipelineStage, PredictorParams} -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.linalg.BLAS._ import org.apache.spark.ml.optim.WeightedLeastSquares @@ -55,7 +55,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver - with HasAggregationDepth with HasLoss with HasBlockSize { + with HasAggregationDepth with HasLoss { import LinearRegression._ @@ -316,15 +316,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setEpsilon(value: Double): this.type = set(epsilon, value) setDefault(epsilon -> 1.35) - /** - * Set block size for stacking input data in matrices. - * Default is 1024. - * - * @group expertSetParam - */ - @Since("3.0.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr => // Extract the number of features before deciding optimization solver. val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol)) @@ -363,6 +354,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel.setSummary(Some(trainingSummary)) } + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + val (featuresSummarizer, ySummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std"), Summarizer.createSummarizerBuffer("mean", "std", "count")))( @@ -398,6 +392,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String s"will be zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") } + if (handlePersistence) instances.unpersist() val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean @@ -426,8 +421,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String // if y is constant (rawYStd is zero), then y cannot be scaled. In this case // setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm. val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean) - val featuresMean = featuresSummarizer.mean.compressed - val featuresStd = featuresSummarizer.std.compressed + val featuresMean = featuresSummarizer.mean.toArray + val featuresStd = featuresSummarizer.std.toArray val bcFeaturesMean = instances.context.broadcast(featuresMean) val bcFeaturesStd = instances.context.broadcast(featuresStd) @@ -447,36 +442,23 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam + val getFeaturesStd = (j: Int) => if (j >= 0 && j < numFeatures) featuresStd(j) else 0.0 val regularization = if (effectiveL2RegParam != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(effectiveL2RegParam, shouldApply, - if ($(standardization)) None else Some(featuresStd.apply))) + if ($(standardization)) None else Some(getFeaturesStd))) } else { None } - val standardized = instances.map { - case Instance(label, weight, features) => - val featuresStd = bcFeaturesStd.value - val array = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = featuresStd(i) - if (std != 0) array(i) = v / std - } - Instance(label, weight, Vectors.dense(array)) - } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) - .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training dataset (blockSize=${$(blockSize)})") - val costFun = $(loss) match { case SquaredError => val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), bcFeaturesStd, bcFeaturesMean)(_) - new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) + new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) case Huber => - val getAggregatorFunc = new HuberAggregator(numFeatures, $(fitIntercept), $(epsilon))(_) - new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) + val getAggregatorFunc = new HuberAggregator($(fitIntercept), $(epsilon), bcFeaturesStd)(_) + new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) } val optimizer = $(loss) match { @@ -542,7 +524,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String throw new SparkException(msg) } - blocks.unpersist() bcFeaturesMean.destroy() bcFeaturesStd.destroy() @@ -576,7 +557,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String after the coefficients are converged. See the following discussion for detail. http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet */ - yMean - dot(Vectors.dense(rawCoefficients), featuresMean) + yMean - dot(Vectors.dense(rawCoefficients), Vectors.dense(featuresMean)) case Huber => parameters(numFeatures) } } else { @@ -591,6 +572,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String (Vectors.dense(rawCoefficients).compressed, interceptValue, scaleValue, arrayBuilder.result()) } + if (handlePersistence) instances.unpersist() + val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept, scale)) // Handle possible missing or invalid prediction columns val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index f88f3fce61b3..21eb17dfaacb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -339,8 +339,10 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) + // Determine if we should cache the DF + val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df) + val mlLogisticRegressionModel = lr.train(df, handlePersistence) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 2b63dc259a14..c2072cea1185 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -179,7 +179,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in HingeAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(1, true)(bcCoefficients) + val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 9e359ba098bf..6d31e6efc7e1 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in LogisticAggregator") { val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val binaryAgg = new LogisticAggregator(1, 2, + val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2, fitIntercept = true, multinomial = false)(bcCoefficientsBinary) val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { @@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { assert(thrownBinary.getMessage.contains("coefficients only supports dense")) val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) - val multinomialAgg = new LogisticAggregator(1, 3, + val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3, fitIntercept = true, multinomial = true)(bcCoefficientsMulti) val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index d780bdf5f5dc..5a7449005839 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -42,36 +42,5 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[OffsetInstance](ser.serialize(o)) assert(o === o2) } - - val block1 = InstanceBlock.fromInstances(Seq(instance1)) - val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2)) - Seq(block1, block2).foreach { o => - val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) - assert(o.labels === o2.labels) - assert(o.weights === o2.weights) - assert(o.matrix === o2.matrix) - } - } - - test("InstanceBlock: check correctness") { - val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) - val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) - val instances = Seq(instance1, instance2) - - val block = InstanceBlock.fromInstances(instances) - assert(block.size === 2) - assert(block.numFeatures === 2) - block.instanceIterator.zipWithIndex.foreach { - case (instance, i) => - assert(instance.label === instances(i).label) - assert(instance.weight === instances(i).weight) - assert(instance.features.toArray === instances(i).features.toArray) - } - Seq(0, 1).foreach { i => - val nzIter = block.getNonZeroIter(i) - val vec = Vectors.sparse(2, nzIter.toSeq) - assert(vec.toArray === instances(i).features.toArray) - } } - } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index c02a0a5e5e7d..61b48ffa1094 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = standardize(Array( + instances = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) - )) - instancesConstantFeature = standardize(Array( + ) + instancesConstantFeature = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) - )) - instancesConstantFeatureFiltered = standardize(Array( + ) + instancesConstantFeatureFiltered = Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - )) + ) } /** Get summary statistics for some data and create a new HingeAggregator. */ @@ -54,23 +54,12 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean): HingeAggregator = { - val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new HingeAggregator(instances.head.features.size, fitIntercept)(bcCoefficients) - } - - private def standardize(instances: Array[Instance]): Array[Instance] = { - val (featuresSummarizer, _) = + val (featuresSummarizer, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) - val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) - val numFeatures = stdArray.length - instances.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = stdArray(i) - if (std != 0) standardized(i) = v / std - } - Instance(label, weight, Vectors.dense(standardized).compressed) - } + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) } test("aggregator add method input size") { @@ -171,21 +160,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } - test("add instance block") { - val coefArray = Array(1.0, 2.0) - val intercept = 1.0 - - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true) - instances.foreach(agg.add) - - val agg2 = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true) - val block = InstanceBlock.fromInstances(instances) - agg2.add(block) - - assert(agg.loss ~== agg2.loss relTol 1e-8) - assert(agg.gradient ~== agg2.gradient relTol 1e-8) - } - } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala index 7c544e99f88b..718ffa230a74 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = standardize(Array( + instances = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - )) - instancesConstantFeature = standardize(Array( + ) + instancesConstantFeature = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - )) - instancesConstantFeatureFiltered = standardize(Array( + ) + instancesConstantFeatureFiltered = Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - )) + ) } /** Get summary statistics for some data and create a new HuberAggregator. */ @@ -56,28 +56,10 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { fitIntercept: Boolean, epsilon: Double): HuberAggregator = { val (featuresSummarizer, _) = getRegressionSummarizers(instances) - val numFeatures = featuresSummarizer.variance.size + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcParameters = spark.sparkContext.broadcast(parameters) - new HuberAggregator(numFeatures, fitIntercept, epsilon)(bcParameters) - } - - private def standardize( - instances: Array[Instance], - std: Array[Double] = null): Array[Instance] = { - val stdArray = if (std == null) { - getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt) - } else { - std - } - val numFeatures = stdArray.length - instances.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = stdArray(i) - if (std != 0) standardized(i) = v / std - } - Instance(label, weight, Vectors.dense(standardized).compressed) - } + new HuberAggregator(fitIntercept, epsilon, bcFeaturesStd)(bcParameters) } test("aggregator add method should check input size") { @@ -173,15 +155,9 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val parametersFiltered = Vectors.dense(2.0, 3.0, 4.0) val aggConstantFeature = getNewAggregator(instancesConstantFeature, parameters, fitIntercept = true, epsilon = 1.35) - // std of instancesConstantFeature - val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature) - ._1.variance.toArray.map(math.sqrt) - // Since 3.0.0, we start to standardize input outside of gradient computation, - // so here we use std of instancesConstantFeature to standardize instances - standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add) - val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered, parametersFiltered, fitIntercept = true, epsilon = 1.35) + instances.foreach(aggConstantFeature.add) instancesConstantFeatureFiltered.foreach(aggConstantFeatureFiltered.add) // constant features should not affect gradient def validateGradient(grad: Vector, gradFiltered: Vector): Unit = { @@ -191,19 +167,4 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeature.gradient, aggConstantFeatureFiltered.gradient) } - - test("add instance block") { - val paramWithIntercept = Vectors.dense(1.0, 2.0, 3.0, 4.0) - val agg1 = getNewAggregator(instances, paramWithIntercept, - fitIntercept = true, epsilon = 1.35) - instances.foreach(agg1.add) - - val agg2 = getNewAggregator(instances, paramWithIntercept, - fitIntercept = true, epsilon = 1.35) - val block = InstanceBlock.fromInstances(instances) - agg2.add(block) - - assert(agg1.loss ~== agg2.loss relTol 1e-8) - assert(agg1.gradient ~== agg2.gradient relTol 1e-8) - } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala index 5eb4e41c5826..35b694462470 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte override def beforeAll(): Unit = { super.beforeAll() - instances = standardize(Array( + instances = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - )) - instancesConstantFeature = standardize(Array( + ) + instancesConstantFeature = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - )) - instancesConstantLabel = standardize(Array( + ) + instancesConstantLabel = Array( Instance(1.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(1.0, 0.3, Vectors.dense(4.0, 0.5)) - )) + ) } /** Get summary statistics for some data and create a new LeastSquaresAggregator. */ @@ -57,34 +57,15 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) - val featuresStd = Vectors.dense(featuresSummarizer.variance.toArray.map(math.sqrt)) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) - val featuresMean = featuresSummarizer.mean.asML - val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.compressed) - val bcCoefficients = spark.sparkContext.broadcast(coefficients.compressed) + val featuresMean = featuresSummarizer.mean + val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, bcFeaturesMean)(bcCoefficients) } - private def standardize( - instances: Array[Instance], - std: Array[Double] = null): Array[Instance] = { - val stdArray = if (std == null) { - getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt) - } else { - std - } - val numFeatures = stdArray.length - instances.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = stdArray(i) - if (std != 0) standardized(i) = v / std - } - Instance(label, weight, Vectors.dense(standardized).compressed) - } - } - test("aggregator add method input size") { val coefficients = Vectors.dense(1.0, 2.0) val agg = getNewAggregator(instances, coefficients, fitIntercept = true) @@ -164,15 +145,9 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte test("check with zero standard deviation") { val coefficients = Vectors.dense(1.0, 2.0) - // aggConstantFeature contains std of instancesConstantFeature, and the std of dim=0 is 0 val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients, fitIntercept = true) - // std of instancesConstantFeature - val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature) - ._1.variance.toArray.map(math.sqrt) - // Since 3.0.0, we start to standardize input outside of gradient computation, - // so here we use std of instancesConstantFeature to standardize instances - standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add) + instances.foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) @@ -182,17 +157,4 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte } } } - - test("add instance block") { - val coefficients = Vectors.dense(1.0, 2.0) - val agg1 = getNewAggregator(instances, coefficients, fitIntercept = true) - instances.foreach(agg1.add) - - val agg2 = getNewAggregator(instances, coefficients, fitIntercept = true) - val block = InstanceBlock.fromInstances(instances) - agg2.add(block) - - assert(agg1.loss ~== agg2.loss relTol 1e-8) - assert(agg1.gradient ~== agg2.gradient relTol 1e-8) - } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 83718076dde7..e699adcc14c0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = standardize(Array( + instances = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - )) - instancesConstantFeature = standardize(Array( + ) + instancesConstantFeature = Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - )) - instancesConstantFeatureFiltered = standardize(Array( + ) + instancesConstantFeatureFiltered = Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - )) + ) } /** Get summary statistics for some data and create a new LogisticAggregator. */ @@ -55,27 +55,13 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { coefficients: Vector, fitIntercept: Boolean, isMultinomial: Boolean): LogisticAggregator = { - val (_, ySummarizer) = + val (featuresSummarizer, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length - val numFeatures = instances.head.features.size + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients) - } - - private def standardize(instances: Array[Instance]): Array[Instance] = { - val (featuresSummarizer, _) = - DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) - val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) - val numFeatures = stdArray.length - instances.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = stdArray(i) - if (std != 0) standardized(i) = v / std - } - Instance(label, weight, Vectors.dense(standardized).compressed) - } + new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) } test("aggregator add method input size") { @@ -291,24 +277,4 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeatureBinary.gradient, aggConstantFeatureBinaryFiltered.gradient, 1) } - - test("add instance block") { - val binaryInstances = instances.map { instance => - if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) - } - val coefArray = Array(1.0, 2.0) - val intercept = 1.0 - - val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, isMultinomial = false) - binaryInstances.foreach(agg.add) - - val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, isMultinomial = false) - val block = InstanceBlock.fromInstances(binaryInstances) - agg2.add(block) - - assert(agg.loss ~== agg2.loss relTol 1e-8) - assert(agg.gradient ~== agg2.gradient relTol 1e-8) - } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 5ee42318afd4..5ab8e606bda0 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -165,8 +165,7 @@ def predictProbability(self, value): class _LinearSVCParams(_JavaClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, - HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize): + HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -215,8 +214,6 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable LinearSVCModel... >>> model.getThreshold() 0.5 - >>> model.getBlockSize() - 1024 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -255,19 +252,18 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1024): + aggregationDepth=2): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1024): + aggregationDepth=2): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, - standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=1024) + standardization=True, threshold=0.0, aggregationDepth=2) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -276,12 +272,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1024): + aggregationDepth=2): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1024): + aggregationDepth=2): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -346,13 +342,6 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): """ @@ -388,7 +377,7 @@ def intercept(self): class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold, HasBlockSize): + HasThreshold): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -570,8 +559,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams, 10 >>> blor.clear(blor.maxIter) >>> blorModel = blor.fit(bdf) - >>> blorModel.getBlockSize() - 1024 >>> blorModel.setFeaturesCol("features") LogisticRegressionModel... >>> blorModel.setProbabilityCol("newProbability") @@ -640,7 +627,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -649,14 +636,13 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", - blockSize=1024) + self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -669,7 +655,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -677,7 +663,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -772,13 +758,6 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams, JavaMLWritable, JavaMLReadable, HasTrainingSummary): @@ -2174,7 +2153,7 @@ def sigma(self): class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter, - HasTol, HasStepSize, HasSolver, HasBlockSize): + HasTol, HasStepSize, HasSolver): """ Params for :py:class:`MultilayerPerceptronClassifier`. @@ -2185,6 +2164,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " + "neurons and output layer of 10 neurons.", typeConverter=TypeConverters.toListInt) + blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " + + "matrices. Data is stacked within partitions. If block size is more than " + + "remaining data in a partition then it is adjusted to the size of this " + + "data. Recommended size is between 10 and 1000, default is 128.", + typeConverter=TypeConverters.toInt) solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " + "options: l-bfgs, gd.", typeConverter=TypeConverters.toString) initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.", @@ -2197,6 +2181,13 @@ def getLayers(self): """ return self.getOrDefault(self.layers) + @since("1.6.0") + def getBlockSize(self): + """ + Gets the value of blockSize or its default value. + """ + return self.getOrDefault(self.blockSize) + @since("2.0.0") def getInitialWeights(self): """ @@ -2220,17 +2211,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer ... (1.0, Vectors.dense([0.0, 1.0])), ... (1.0, Vectors.dense([1.0, 0.0])), ... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"]) - >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123) + >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123) >>> mlp.setMaxIter(100) MultilayerPerceptronClassifier... >>> mlp.getMaxIter() 100 - >>> mlp.getBlockSize() - 128 - >>> mlp.setBlockSize(1) - MultilayerPerceptronClassifier... - >>> mlp.getBlockSize() - 1 >>> model = mlp.fit(df) >>> model.setFeaturesCol("features") MultilayerPerceptronClassificationModel... diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index fb4d55d57a2d..ded3ca84b30f 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -164,10 +164,7 @@ def get$Name(self): "'euclidean'", "TypeConverters.toString"), ("validationIndicatorCol", "name of the column that indicates whether each row is for " + "training or for validation. False indicates training; true indicates validation.", - None, "TypeConverters.toString"), - ("blockSize", "block size for stacking input data in matrices. Data is stacked within " - "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", "1024", "TypeConverters.toInt")] + None, "TypeConverters.toString")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 456463580878..8fc115691f1a 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -580,21 +580,3 @@ def getValidationIndicatorCol(self): Gets the value of validationIndicatorCol or its default value. """ return self.getOrDefault(self.validationIndicatorCol) - - -class HasBlockSize(Params): - """ - Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. - """ - - blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt) - - def __init__(self): - super(HasBlockSize, self).__init__() - self._setDefault(blockSize=1024) - - def getBlockSize(self): - """ - Gets the value of blockSize or its default value. - """ - return self.getOrDefault(self.blockSize) diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index fe571e25c05f..ee276962c898 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -28,7 +28,7 @@ @inherit_doc -class _ALSModelParams(HasPredictionCol, HasBlockSize): +class _ALSModelParams(HasPredictionCol): """ Params for :py:class:`ALS` and :py:class:`ALSModel`. @@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): 0.1 >>> als.clear(als.regParam) >>> model = als.fit(df) - >>> model.getBlockSize() - 4096 >>> model.getUserCol() 'user' >>> model.setUserCol("user") @@ -284,13 +282,13 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): """ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=false, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") """ super(ALS, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid) @@ -298,8 +296,7 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", - blockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") kwargs = self._input_kwargs self.setParams(**kwargs) @@ -309,13 +306,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): """ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=False, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096) + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") Sets params for ALS. """ kwargs = self._input_kwargs @@ -446,13 +443,6 @@ def setSeed(self, value): """ return self._set(seed=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable): """ @@ -489,13 +479,6 @@ def setPredictionCol(self, value): """ return self._set(predictionCol=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - @property @since("1.4.0") def rank(self): diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index a74ba3485bcd..a4c97827d55d 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -62,7 +62,7 @@ class JavaRegressionModel(JavaPredictionModel, _JavaPredictorParams): class _LinearRegressionParams(_JavaPredictorParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasTol, HasFitIntercept, HasStandardization, HasWeightCol, HasSolver, - HasAggregationDepth, HasLoss, HasBlockSize): + HasAggregationDepth, HasLoss): """ Params for :py:class:`LinearRegression` and :py:class:`LinearRegressionModel`. @@ -124,8 +124,6 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J >>> lr.setRegParam(0.0) LinearRegression... >>> model = lr.fit(df) - >>> model.getBlockSize() - 1024 >>> model.setFeaturesCol("features") LinearRegressionModel... >>> model.setPredictionCol("newPrediction") @@ -171,18 +169,17 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35, blockSize=1024): + loss="squaredError", epsilon=1.35): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35, blockSize=1024) + loss="squaredError", epsilon=1.35) """ super(LinearRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.regression.LinearRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35, - blockSize=1024) + self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -191,12 +188,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35, blockSize=1024): + loss="squaredError", epsilon=1.35): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35, blockSize=1024) + loss="squaredError", epsilon=1.35) Sets params for linear regression. """ kwargs = self._input_kwargs @@ -272,13 +269,6 @@ def setLoss(self, value): """ return self._set(lossType=value) - @since("3.0.0") - def setBlockSize(self, value): - """ - Sets the value of :py:attr:`blockSize`. - """ - return self._set(blockSize=value) - class LinearRegressionModel(JavaRegressionModel, _LinearRegressionParams, GeneralJavaMLWritable, JavaMLReadable, HasTrainingSummary):