From 25f268880940079693c984c1952b75b74cbd1b56 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 12 Oct 2020 10:38:18 +0800 Subject: [PATCH 01/11] resend --- .../spark/ml/classification/LinearSVC.scala | 91 ++++++------------- .../apache/spark/ml/feature/Instance.scala | 78 ++++++++++++++++ .../ml/param/shared/SharedParamsCodeGen.scala | 7 +- .../spark/ml/param/shared/sharedParams.scala | 18 ++++ .../ml/classification/LinearSVCSuite.scala | 4 +- .../spark/ml/feature/InstanceSuite.scala | 32 +++++++ python/pyspark/ml/classification.py | 26 +++--- python/pyspark/ml/classification.pyi | 9 +- .../ml/param/_shared_params_code_gen.py | 6 +- python/pyspark/ml/param/shared.py | 18 ++++ python/pyspark/ml/param/shared.pyi | 5 + 11 files changed, 209 insertions(+), 85 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 77272c65eb23..21df75eeca7e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold with HasBlockSize { + with HasAggregationDepth with HasThreshold with HasBlockSizeInMB { /** * Param for threshold in binary classification prediction. @@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR "threshold in binary classification prediction applied to rawPrediction") setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6, - standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSize -> 1) + standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSizeInMB -> 0.0) } /** @@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) /** - * Set block size for stacking input data in matrices. - * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines - * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * Recommended size is between 10 and 1000. An appropriate choice of the block size depends - * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, - * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). - * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, stacking may bring no performance gain, the worse is possible - * performance regression. - * Default is 1. + * Sets the value of param [[blockSizeInMB]]. + * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) + def setBlockSizeInMB(value: Double): this.type = set(blockSizeInMB, value) @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -177,17 +168,20 @@ class LinearSVC @Since("2.2.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, + blockSizeInMB) + + if (dataset.storageLevel != StorageLevel.NONE) { + instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + + s"then cached during training. Be careful of double caching!") + } val instances = extractInstances(dataset) .setName("training instances") - if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } - + var actualBlockSizeInMB = $(blockSizeInMB) var requestedMetrics = Seq("mean", "std", "count") - if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros" + if (actualBlockSizeInMB == 0) requestedMetrics +:= "numNonZeros" val (summarizer, labelSummarizer) = Summarizer .getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics) @@ -199,14 +193,11 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) - if ($(blockSize) > 1) { - val scale = 1.0 / summarizer.count / numFeatures - val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { - instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } + if (actualBlockSizeInMB == 0) { + val avgNNZ = summarizer.numNonzeros.activeIterator.map(_._2 / summarizer.count).sum + actualBlockSizeInMB = InstanceBlock.inferBlockSizeInMB(numFeatures, avgNNZ) + require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { @@ -245,12 +236,8 @@ class LinearSVC @Since("2.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { - trainOnRows(instances, featuresStd, regularization, optimizer) - } else { - trainOnBlocks(instances, featuresStd, regularization, optimizer) - } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() + val (rawCoefficients, objectiveHistory) = + trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer) if (rawCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -284,35 +271,9 @@ class LinearSVC @Since("2.2.0") ( model.setSummary(Some(summary)) } - private def trainOnRows( - instances: RDD[Instance], - featuresStd: Array[Double], - regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { - val numFeatures = featuresStd.length - val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures - - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, - regularization, $(aggregationDepth)) - - val states = optimizer.iterations(new CachedDiffFunction(costFun), - Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) - - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - bcFeaturesStd.destroy() - - (if (state != null) state.x.toArray else null, arrayBuilder.result) - } - - private def trainOnBlocks( + private def trainImpl( instances: RDD[Instance], + actualBlockSizeInMB: Double, featuresStd: Array[Double], regularization: Option[L2Regularization], optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { @@ -326,9 +287,11 @@ class LinearSVC @Since("2.2.0") ( val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + + val maxMemoryUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemoryUsage(standardized, maxMemoryUsage) .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index db5f88d5dddc..cdbd8ff11296 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.feature +import scala.collection.mutable + import org.apache.spark.ml.linalg._ import org.apache.spark.rdd.RDD @@ -100,6 +102,23 @@ private[spark] case class InstanceBlock( private[spark] object InstanceBlock { + private def getBlockSize( + numCols: Long, + numRows: Long, + nnz: Long, + allUnitWeight: Boolean): Long = { + val doubleBytes = java.lang.Double.BYTES + val arrayHeader = 12L + val denseSize = Matrices.getDenseSize(numCols, numRows) + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + val matrixSize = math.min(denseSize, sparseSize) + if (allUnitWeight) { + matrixSize + doubleBytes * numRows + arrayHeader * 2 + } else { + matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2 + } + } + def fromInstances(instances: Seq[Instance]): InstanceBlock = { val labels = instances.map(_.label).toArray val weights = if (instances.exists(_.weight != 1)) { @@ -114,6 +133,65 @@ private[spark] object InstanceBlock { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } + + def blokifyWithMaxMemoryUsage( + iterator: Iterator[Instance], + maxMemoryUsage: Long): Iterator[InstanceBlock] = { + require(maxMemoryUsage > 0) + val buff = mutable.ArrayBuilder.make[Instance] + var numCols = -1L + var count = 0L + var nnz = 0L + var allUnitWeight = true + + iterator.flatMap { instance => + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val n = instance.features.numNonzeros + var block = Option.empty[InstanceBlock] + // Check if enough memory remains to add this instance to the block. + if (getBlockSize(numCols, count + 1L, nnz + n, + allUnitWeight && (instance.weight == 1)) > maxMemoryUsage) { + // Check if this instance is too large + require(count > 0, s"instance $instance exceeds memory limit $maxMemoryUsage, " + + s"please increase block size") + + block = Some(InstanceBlock.fromInstances(buff.result())) + buff.clear() + count = 0L + nnz = 0L + allUnitWeight = true + } + buff += instance + count += 1L + nnz += n + allUnitWeight &&= (instance.weight == 1) + block.iterator + } ++ { + val instances = buff.result() + if (instances.nonEmpty) { + Iterator.single(InstanceBlock.fromInstances(instances)) + } else Iterator.empty + } + } + + def blokifyWithMaxMemoryUsage( + instances: RDD[Instance], + maxMemoryUsage: Long): RDD[InstanceBlock] = { + require(maxMemoryUsage > 0) + instances.mapPartitions(iter => blokifyWithMaxMemoryUsage(iter, maxMemoryUsage)) + } + + def inferBlockSizeInMB( + dim: Int, + avgNNZ: Double, + blasLevel: Int = 2): Double = { + if (dim <= avgNNZ * 3) { + 0.25 + } else { + 64.0 + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7fd5f5938b56..64261bdfac7d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -108,7 +108,12 @@ private[shared] object SharedParamsCodeGen { ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + "stacked within partitions. If block size is more than remaining data in a partition " + "then it is adjusted to the size of this data.", - isValid = "ParamValidators.gt(0)", isExpertParam = true) + isValid = "ParamValidators.gt(0)", isExpertParam = true), + ParamDesc[Double]("blockSizeInMB", "Maximum memory in MB for stacking input data " + + "in blocks. Data is stacked within partitions. If more than remaining data size in a " + + "partition then it is adjusted to the data size. If 0, try to infer an appropriate value " + + "based on the statistics of dataset. Must be >= 0.", + Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true) ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 60203eba61ea..1c741545dade 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -562,4 +562,22 @@ trait HasBlockSize extends Params { /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) } + +/** + * Trait for shared param blockSizeInMB (default: 0.0). This trait may be changed or + * removed between minor versions. + */ +trait HasBlockSizeInMB extends Params { + + /** + * Param for Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.. + * @group expertParam + */ + final val blockSizeInMB: DoubleParam = new DoubleParam(this, "blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0)) + + setDefault(blockSizeInMB, 0.0) + + /** @group expertGetParam */ + final def getBlockSizeInMB: Double = $(blockSizeInMB) +} // scalastyle:on diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index a66397324c1a..55558f06ee36 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { .setFitIntercept(fitIntercept) .setMaxIter(5) val model = lsvc.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = lsvc.setBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index d780bdf5f5dc..a861aa8f8301 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -74,4 +74,36 @@ class InstanceSuite extends SparkFunSuite{ } } + test("InstanceBlock: blokify with max memory usage") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val blocks = InstanceBlock + .blokifyWithMaxMemoryUsage(Iterator.apply(instance1, instance2), 128).toArray + require(blocks.length == 1) + val block = blocks.head + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } + + val bigInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(10000)(1.0))) + val inputIter1 = Iterator.apply(bigInstance) + val inputIter2 = Iterator.apply(instance1, instance2, bigInstance) + Seq(inputIter1, inputIter2).foreach { inputIter => + intercept[IllegalArgumentException] { + InstanceBlock.blokifyWithMaxMemoryUsage(inputIter, 1024).toArray + } + } + } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index d6c861361a24..f96bbd4d3357 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -26,8 +26,8 @@ from pyspark.ml import Estimator, Predictor, PredictionModel, Model from pyspark.ml.param.shared import HasRawPredictionCol, HasProbabilityCol, HasThresholds, \ HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, \ - HasAggregationDepth, HasThreshold, HasBlockSize, Param, Params, TypeConverters, \ - HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism + HasAggregationDepth, HasThreshold, HasBlockSize, HasBlockSizeInMB, Param, Params, \ + TypeConverters, HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ _TreeEnsembleModel, _RandomForestParams, _GBTParams, \ _HasVarianceImpurity, _TreeClassifierParams @@ -504,7 +504,7 @@ def recallByThreshold(self): class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize): + HasBlockSizeInMB): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -521,7 +521,7 @@ def __init__(self, *args): super(_LinearSVCParams, self).__init__(*args) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=1) + blockSizeInMB=0.0) @inherit_doc @@ -565,8 +565,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl LinearSVCModel... >>> model.getThreshold() 0.5 - >>> model.getBlockSize() - 1 + >>> model.getBlockSizeInMB() + 0.0 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -605,12 +605,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1): + aggregationDepth=2, blockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1): + aggregationDepth=2, blockSizeInMB=0.0): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( @@ -623,12 +623,12 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1): + aggregationDepth=2, blockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1): + aggregationDepth=2, blockSizeInMB=0.0): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -694,11 +694,11 @@ def setAggregationDepth(self, value): return self._set(aggregationDepth=value) @since("3.1.0") - def setBlockSize(self, value): + def setBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSize`. + Sets the value of :py:attr:`blockSizeInMB`. """ - return self._set(blockSize=value) + return self._set(blockSizeInMB=value) class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, diff --git a/python/pyspark/ml/classification.pyi b/python/pyspark/ml/classification.pyi index 55afc20a54cb..241f5baf8dfd 100644 --- a/python/pyspark/ml/classification.pyi +++ b/python/pyspark/ml/classification.pyi @@ -26,6 +26,7 @@ from pyspark.ml.base import _PredictorParams from pyspark.ml.param.shared import ( HasAggregationDepth, HasBlockSize, + HasBlockSizeInMB, HasElasticNetParam, HasFitIntercept, HasMaxIter, @@ -172,7 +173,7 @@ class _LinearSVCParams( HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize, + HasBlockSizeInMB, ): threshold: Param[float] def __init__(self, *args: Any) -> None: ... @@ -198,7 +199,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + blockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -215,7 +216,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + blockSizeInMB: float = ... ) -> LinearSVC: ... def setMaxIter(self, value: int) -> LinearSVC: ... def setRegParam(self, value: float) -> LinearSVC: ... @@ -225,7 +226,7 @@ class LinearSVC( def setThreshold(self, value: float) -> LinearSVC: ... def setWeightCol(self, value: str) -> LinearSVC: ... def setAggregationDepth(self, value: int) -> LinearSVC: ... - def setBlockSize(self, value: int) -> LinearSVC: ... + def setBlockSizeInMB(self, value: float) -> LinearSVC: ... class LinearSVCModel( _JavaClassificationModel[Vector], diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index bc1ea87ad629..b6fc170abe78 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -165,7 +165,11 @@ def get$Name(self): None, "TypeConverters.toString"), ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", None, "TypeConverters.toInt")] + "adjusted to the size of this data.", None, "TypeConverters.toInt"), + ("blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is " + + "stacked within partitions. If more than remaining data size in a partition then it " + + "is adjusted to the data size. If 0, try to infer an appropriate value based on the " + + "statistics of dataset. Must be >= 0.", "0.0", "TypeConverters.toFloat")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 24fb0d3e2554..a829a2e76b38 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -597,3 +597,21 @@ def getBlockSize(self): Gets the value of blockSize or its default value. """ return self.getOrDefault(self.blockSize) + + +class HasBlockSizeInMB(Params): + """ + Mixin for param blockSizeInMB: maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0. + """ + + blockSizeInMB = Param(Params._dummy(), "blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", typeConverter=TypeConverters.toFloat) + + def __init__(self): + super(HasBlockSizeInMB, self).__init__() + self._setDefault(blockSizeInMB=0.0) + + def getBlockSizeInMB(self): + """ + Gets the value of blockSizeInMB or its default value. + """ + return self.getOrDefault(self.blockSizeInMB) diff --git a/python/pyspark/ml/param/shared.pyi b/python/pyspark/ml/param/shared.pyi index 5999c0eaa466..bbb4890455de 100644 --- a/python/pyspark/ml/param/shared.pyi +++ b/python/pyspark/ml/param/shared.pyi @@ -185,3 +185,8 @@ class HasBlockSize(Params): blockSize: Param[int] def __init__(self) -> None: ... def getBlockSize(self) -> int: ... + +class HasBlockSizeInMB(Params): + blockSizeInMB: Param[float] + def __init__(self) -> None: ... + def getBlockSizeInMB(self) -> float: ... From a862d4813deff12556ea123081dfb996dcebec09 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 13 Oct 2020 09:16:09 +0800 Subject: [PATCH 02/11] address comments --- .../spark/ml/classification/LinearSVC.scala | 4 +-- .../apache/spark/ml/feature/Instance.scala | 35 +++++++++---------- .../spark/ml/feature/InstanceSuite.scala | 4 +-- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 21df75eeca7e..6ce1c9893b22 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -288,8 +288,8 @@ class LinearSVC @Since("2.2.0") ( iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val maxMemoryUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong - val blocks = InstanceBlock.blokifyWithMaxMemoryUsage(standardized, maxMemoryUsage) + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index cdbd8ff11296..9db44f28e445 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -102,7 +102,7 @@ private[spark] case class InstanceBlock( private[spark] object InstanceBlock { - private def getBlockSize( + private def getBlockMemUsage( numCols: Long, numRows: Long, nnz: Long, @@ -134,52 +134,51 @@ private[spark] object InstanceBlock { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } - def blokifyWithMaxMemoryUsage( + def blokifyWithMaxMemUsage( iterator: Iterator[Instance], - maxMemoryUsage: Long): Iterator[InstanceBlock] = { - require(maxMemoryUsage > 0) + maxMemUsage: Long): Iterator[InstanceBlock] = { + require(maxMemUsage > 0) val buff = mutable.ArrayBuilder.make[Instance] var numCols = -1L var count = 0L - var nnz = 0L + var buffNnz = 0L var allUnitWeight = true iterator.flatMap { instance => if (numCols < 0L) numCols = instance.features.size require(numCols == instance.features.size) - val n = instance.features.numNonzeros + val nnz = instance.features.numNonzeros var block = Option.empty[InstanceBlock] // Check if enough memory remains to add this instance to the block. - if (getBlockSize(numCols, count + 1L, nnz + n, - allUnitWeight && (instance.weight == 1)) > maxMemoryUsage) { + if (getBlockMemUsage(numCols, count + 1L, buffNnz + nnz, + allUnitWeight && (instance.weight == 1)) > maxMemUsage) { // Check if this instance is too large - require(count > 0, s"instance $instance exceeds memory limit $maxMemoryUsage, " + + require(count > 0, s"instance $instance exceeds memory limit $maxMemUsage, " + s"please increase block size") block = Some(InstanceBlock.fromInstances(buff.result())) buff.clear() count = 0L - nnz = 0L + buffNnz = 0L allUnitWeight = true } buff += instance count += 1L - nnz += n + buffNnz += nnz allUnitWeight &&= (instance.weight == 1) block.iterator } ++ { - val instances = buff.result() - if (instances.nonEmpty) { - Iterator.single(InstanceBlock.fromInstances(instances)) + if (count > 0) { + Iterator.single(InstanceBlock.fromInstances(buff.result())) } else Iterator.empty } } - def blokifyWithMaxMemoryUsage( + def blokifyWithMaxMemUsage( instances: RDD[Instance], - maxMemoryUsage: Long): RDD[InstanceBlock] = { - require(maxMemoryUsage > 0) - instances.mapPartitions(iter => blokifyWithMaxMemoryUsage(iter, maxMemoryUsage)) + maxMemUsage: Long): RDD[InstanceBlock] = { + require(maxMemUsage > 0) + instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) } def inferBlockSizeInMB( diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index a861aa8f8301..da29ebb8509c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -80,7 +80,7 @@ class InstanceSuite extends SparkFunSuite{ val instances = Seq(instance1, instance2) val blocks = InstanceBlock - .blokifyWithMaxMemoryUsage(Iterator.apply(instance1, instance2), 128).toArray + .blokifyWithMaxMemUsage(Iterator.apply(instance1, instance2), 128).toArray require(blocks.length == 1) val block = blocks.head assert(block.size === 2) @@ -102,7 +102,7 @@ class InstanceSuite extends SparkFunSuite{ val inputIter2 = Iterator.apply(instance1, instance2, bigInstance) Seq(inputIter1, inputIter2).foreach { inputIter => intercept[IllegalArgumentException] { - InstanceBlock.blokifyWithMaxMemoryUsage(inputIter, 1024).toArray + InstanceBlock.blokifyWithMaxMemUsage(inputIter, 1024).toArray } } } From 9f96739da838864be90b89115125f84a5c42fa03 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 13 Oct 2020 17:29:44 +0800 Subject: [PATCH 03/11] directly gen new Iter --- .../apache/spark/ml/feature/Instance.scala | 76 ++++++++++++------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 9db44f28e445..6724dc6d1c6c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -138,39 +138,59 @@ private[spark] object InstanceBlock { iterator: Iterator[Instance], maxMemUsage: Long): Iterator[InstanceBlock] = { require(maxMemUsage > 0) - val buff = mutable.ArrayBuilder.make[Instance] - var numCols = -1L - var count = 0L - var buffNnz = 0L - var allUnitWeight = true - - iterator.flatMap { instance => - if (numCols < 0L) numCols = instance.features.size - require(numCols == instance.features.size) - val nnz = instance.features.numNonzeros - var block = Option.empty[InstanceBlock] - // Check if enough memory remains to add this instance to the block. - if (getBlockMemUsage(numCols, count + 1L, buffNnz + nnz, - allUnitWeight && (instance.weight == 1)) > maxMemUsage) { - // Check if this instance is too large - require(count > 0, s"instance $instance exceeds memory limit $maxMemUsage, " + - s"please increase block size") + new Iterator[InstanceBlock] { + private var numCols = -1L + private val buff = mutable.ArrayBuilder.make[Instance] + private var buffCnt = 0L + private var buffNnz = 0L + private var buffUnitWeight = true + private var block = Option.empty[InstanceBlock] + + private def flush(): Unit = { block = Some(InstanceBlock.fromInstances(buff.result())) buff.clear() - count = 0L + buffCnt = 0L buffNnz = 0L - allUnitWeight = true + buffUnitWeight = true + } + + private def blockify(): Unit = { + block = None + + while (block.isEmpty && iterator.hasNext) { + val instance = iterator.next() + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + + // Check if enough memory remains to add this instance to the block. + if (getBlockMemUsage(numCols, buffCnt + 1L, buffNnz + nnz, + buffUnitWeight && (instance.weight == 1)) > maxMemUsage) { + // Check if this instance is too large + require(buffCnt > 0, s"instance $instance exceeds memory limit $maxMemUsage, " + + s"please increase block size") + flush() + } + + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + } + + if (block.isEmpty && buffCnt > 0) flush() + } + + override def hasNext: Boolean = { + block.nonEmpty || { blockify(); block.nonEmpty } + } + + override def next(): InstanceBlock = { + val ret = block.get + blockify() + ret } - buff += instance - count += 1L - buffNnz += nnz - allUnitWeight &&= (instance.weight == 1) - block.iterator - } ++ { - if (count > 0) { - Iterator.single(InstanceBlock.fromInstances(buff.result())) - } else Iterator.empty } } From 7ceb5dd756cdda82de16eca7bbec81cc0540c9b3 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 13 Oct 2020 17:32:31 +0800 Subject: [PATCH 04/11] directly gen new Iter --- mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 6724dc6d1c6c..a7aa61b59a0e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -187,6 +187,7 @@ private[spark] object InstanceBlock { } override def next(): InstanceBlock = { + if (block.isEmpty) blockify() val ret = block.get blockify() ret From 2493562d6ba21c589392c319c242ce451a7ecda2 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 14 Oct 2020 14:23:19 +0800 Subject: [PATCH 05/11] update blockify strategy --- .../apache/spark/ml/feature/Instance.scala | 47 +++++-------------- .../spark/ml/feature/InstanceSuite.scala | 12 ++--- 2 files changed, 18 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index a7aa61b59a0e..c16d020a1ac1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -142,55 +142,32 @@ private[spark] object InstanceBlock { new Iterator[InstanceBlock] { private var numCols = -1L private val buff = mutable.ArrayBuilder.make[Instance] - private var buffCnt = 0L - private var buffNnz = 0L - private var buffUnitWeight = true - private var block = Option.empty[InstanceBlock] - private def flush(): Unit = { - block = Some(InstanceBlock.fromInstances(buff.result())) - buff.clear() - buffCnt = 0L - buffNnz = 0L - buffUnitWeight = true - } + override def hasNext: Boolean = iterator.hasNext - private def blockify(): Unit = { - block = None + override def next(): InstanceBlock = { + buff.clear() + var buffCnt = 0L + var buffNnz = 0L + var buffUnitWeight = true + var blockMemUsage = 0L - while (block.isEmpty && iterator.hasNext) { + while (iterator.hasNext && blockMemUsage < maxMemUsage) { val instance = iterator.next() if (numCols < 0L) numCols = instance.features.size require(numCols == instance.features.size) val nnz = instance.features.numNonzeros - // Check if enough memory remains to add this instance to the block. - if (getBlockMemUsage(numCols, buffCnt + 1L, buffNnz + nnz, - buffUnitWeight && (instance.weight == 1)) > maxMemUsage) { - // Check if this instance is too large - require(buffCnt > 0, s"instance $instance exceeds memory limit $maxMemUsage, " + - s"please increase block size") - flush() - } - buff += instance buffCnt += 1L buffNnz += nnz buffUnitWeight &&= (instance.weight == 1) + blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) } - if (block.isEmpty && buffCnt > 0) flush() - } - - override def hasNext: Boolean = { - block.nonEmpty || { blockify(); block.nonEmpty } - } - - override def next(): InstanceBlock = { - if (block.isEmpty) blockify() - val ret = block.get - blockify() - ret + // the block mem usage may slightly exceed threshold, not a big issue. + // and this ensure even if one row exceed block limit, each block has one row + InstanceBlock.fromInstances(buff.result()) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index da29ebb8509c..330380d6efe0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -97,13 +97,13 @@ class InstanceSuite extends SparkFunSuite{ assert(vec.toArray === instances(i).features.toArray) } + // instances larger than maxMemUsage val bigInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(10000)(1.0))) - val inputIter1 = Iterator.apply(bigInstance) - val inputIter2 = Iterator.apply(instance1, instance2, bigInstance) - Seq(inputIter1, inputIter2).foreach { inputIter => - intercept[IllegalArgumentException] { - InstanceBlock.blokifyWithMaxMemUsage(inputIter, 1024).toArray - } + InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(bigInstance), 64).size + + // different numFeatures + intercept[IllegalArgumentException] { + InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, bigInstance), 64).size } } } From 5daa49a1502031ead875d696fe78b89a8cc2761a Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 16 Oct 2020 16:20:33 +0800 Subject: [PATCH 06/11] address comments --- .../apache/spark/ml/feature/Instance.scala | 14 ++++++++++ .../spark/ml/feature/InstanceSuite.scala | 28 +++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index c16d020a1ac1..0a4a14c9f9a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -179,13 +179,27 @@ private[spark] object InstanceBlock { instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) } + + /** + * Suggested value for BlockSizeInMB, based on performance tests of BLAS operation. + * + * @param dim size of vector. + * @param avgNNZ average nnz of vectors. + * @param blasLevel level of BLAS operation. + */ def inferBlockSizeInMB( dim: Int, avgNNZ: Double, blasLevel: Int = 2): Double = { if (dim <= avgNNZ * 3) { + // When the dataset is relatively dense, Spark will use netlib-java for optimised numerical + // processing, which will try to use nativeBLAS implementations (like OpenBLAS, Intel MKL), + // and fallback to the Java implementation (f2jBLAS) if necessary. + // The suggested value for dense cases is 0.25. 0.25 } else { + // When the dataset is sparse, Spark will use its own Scala implementation. + // The suggested value for sparse cases is 64.0. 64.0 } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 330380d6efe0..f1e071357bab 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -98,12 +98,34 @@ class InstanceSuite extends SparkFunSuite{ } // instances larger than maxMemUsage - val bigInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(10000)(1.0))) - InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(bigInstance), 64).size + val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0))) + InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size + InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size // different numFeatures intercept[IllegalArgumentException] { - InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, bigInstance), 64).size + InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, denseInstance), 64).size } + + // nnz = 10 + val sparseInstance = Instance(-2.0, 3.0, + Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1))) + + // normally, memory usage of a block does not exceed maxMemUsage too much + val maxMemUsage = 1 << 18 + val mixedIter = Iterator.fill(100)(denseInstance) ++ + Iterator.fill(1000)(sparseInstance) ++ + Iterator.fill(10)(denseInstance) ++ + Iterator.fill(10)(sparseInstance) ++ + Iterator.fill(100)(denseInstance) ++ + Iterator.fill(100)(sparseInstance) + InstanceBlock.blokifyWithMaxMemUsage(mixedIter, maxMemUsage) + .foreach { block => + val doubleBytes = java.lang.Double.BYTES + val arrayHeader = 12L + val blockMemUsage = block.matrix.getSizeInBytes + + (block.labels.length + block.weights.length) * doubleBytes + arrayHeader * 2 + require(blockMemUsage < maxMemUsage * 1.05) + } } } From 823563c1aad156935fb7bda98ca3291e01d187a0 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 19 Oct 2020 18:58:30 +0800 Subject: [PATCH 07/11] try to fix 2.13 --- mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 0a4a14c9f9a6..00b9c9dbcd5e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -153,7 +153,7 @@ private[spark] object InstanceBlock { var blockMemUsage = 0L while (iterator.hasNext && blockMemUsage < maxMemUsage) { - val instance = iterator.next() + val instance: Instance = iterator.next() if (numCols < 0L) numCols = instance.features.size require(numCols == instance.features.size) val nnz = instance.features.numNonzeros From 6d3000b9633af81d53248fe7055fbe8e79ce5413 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 3 Nov 2020 13:28:58 +0800 Subject: [PATCH 08/11] try to fix scala 2.13 --- .../apache/spark/ml/feature/Instance.scala | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 00b9c9dbcd5e..ca14b4c5acd8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -139,36 +139,34 @@ private[spark] object InstanceBlock { maxMemUsage: Long): Iterator[InstanceBlock] = { require(maxMemUsage > 0) - new Iterator[InstanceBlock] { - private var numCols = -1L - private val buff = mutable.ArrayBuilder.make[Instance] - - override def hasNext: Boolean = iterator.hasNext - - override def next(): InstanceBlock = { + var numCols = -1L + val buff = mutable.ArrayBuilder.make[Instance] + var buffCnt = 0L + var buffNnz = 0L + var buffUnitWeight = true + + iterator.flatMap { instance => + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + + if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= maxMemUsage) { + val block = InstanceBlock.fromInstances(buff.result()) buff.clear() - var buffCnt = 0L - var buffNnz = 0L - var buffUnitWeight = true - var blockMemUsage = 0L - - while (iterator.hasNext && blockMemUsage < maxMemUsage) { - val instance: Instance = iterator.next() - if (numCols < 0L) numCols = instance.features.size - require(numCols == instance.features.size) - val nnz = instance.features.numNonzeros - - buff += instance - buffCnt += 1L - buffNnz += nnz - buffUnitWeight &&= (instance.weight == 1) - blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) - } - - // the block mem usage may slightly exceed threshold, not a big issue. - // and this ensure even if one row exceed block limit, each block has one row - InstanceBlock.fromInstances(buff.result()) - } + buffCnt = 0L + buffNnz = 0L + buffUnitWeight = true + Iterator.single(block) + } else Iterator.empty + } ++ { + if (buffCnt > 0) { + val block = InstanceBlock.fromInstances(buff.result()) + Iterator.single(block) + } else Iterator.empty } } From 88004a40ae8ce36e049d951f2cfc37be2e50060a Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 10 Nov 2020 10:26:53 +0800 Subject: [PATCH 09/11] use 1.0 as the default value for gemv --- .../spark/ml/classification/LinearSVC.scala | 10 ++--- .../apache/spark/ml/feature/Instance.scala | 37 +++++-------------- 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 6ce1c9893b22..a2e7b0fadd4c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -179,11 +179,8 @@ class LinearSVC @Since("2.2.0") ( val instances = extractInstances(dataset) .setName("training instances") - var actualBlockSizeInMB = $(blockSizeInMB) - var requestedMetrics = Seq("mean", "std", "count") - if (actualBlockSizeInMB == 0) requestedMetrics +:= "numNonZeros" val (summarizer, labelSummarizer) = Summarizer - .getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics) + .getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count")) val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid @@ -193,9 +190,10 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) + + var actualBlockSizeInMB = $(blockSizeInMB) if (actualBlockSizeInMB == 0) { - val avgNNZ = summarizer.numNonzeros.activeIterator.map(_._2 / summarizer.count).sum - actualBlockSizeInMB = InstanceBlock.inferBlockSizeInMB(numFeatures, avgNNZ) + actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index ca14b4c5acd8..3c7ffd58e9c9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -102,6 +102,15 @@ private[spark] case class InstanceBlock( private[spark] object InstanceBlock { + /** + * Suggested value for BlockSizeInMB in Level-2 routine cases. + * According to performance tests of BLAS routine (see SPARK-31714) and + * LinearSVC (see SPARK-32907), 1.0 MB should be an acceptable value for + * linear models using Level-2 routine (GEMV) to perform prediction and + * gradient computation. + */ + val DefaultBlockSizeInMB = 1.0 + private def getBlockMemUsage( numCols: Long, numRows: Long, @@ -148,10 +157,9 @@ private[spark] object InstanceBlock { iterator.flatMap { instance => if (numCols < 0L) numCols = instance.features.size require(numCols == instance.features.size) - val nnz = instance.features.numNonzeros buff += instance buffCnt += 1L - buffNnz += nnz + buffNnz += instance.features.numNonzeros buffUnitWeight &&= (instance.weight == 1) if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= maxMemUsage) { @@ -176,31 +184,6 @@ private[spark] object InstanceBlock { require(maxMemUsage > 0) instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) } - - - /** - * Suggested value for BlockSizeInMB, based on performance tests of BLAS operation. - * - * @param dim size of vector. - * @param avgNNZ average nnz of vectors. - * @param blasLevel level of BLAS operation. - */ - def inferBlockSizeInMB( - dim: Int, - avgNNZ: Double, - blasLevel: Int = 2): Double = { - if (dim <= avgNNZ * 3) { - // When the dataset is relatively dense, Spark will use netlib-java for optimised numerical - // processing, which will try to use nativeBLAS implementations (like OpenBLAS, Intel MKL), - // and fallback to the Java implementation (f2jBLAS) if necessary. - // The suggested value for dense cases is 0.25. - 0.25 - } else { - // When the dataset is sparse, Spark will use its own Scala implementation. - // The suggested value for sparse cases is 64.0. - 64.0 - } - } } From ecf3dfebe1f5f6381920a4b0daaad354c3c42fc2 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Wed, 11 Nov 2020 15:02:29 +0800 Subject: [PATCH 10/11] update --- .../apache/spark/ml/feature/Instance.scala | 58 ++++++++++--------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 3c7ffd58e9c9..0b47c48e9a92 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -144,37 +144,39 @@ private[spark] object InstanceBlock { } def blokifyWithMaxMemUsage( - iterator: Iterator[Instance], + instanceIterator: Iterator[Instance], maxMemUsage: Long): Iterator[InstanceBlock] = { require(maxMemUsage > 0) - var numCols = -1L - val buff = mutable.ArrayBuilder.make[Instance] - var buffCnt = 0L - var buffNnz = 0L - var buffUnitWeight = true - - iterator.flatMap { instance => - if (numCols < 0L) numCols = instance.features.size - require(numCols == instance.features.size) - buff += instance - buffCnt += 1L - buffNnz += instance.features.numNonzeros - buffUnitWeight &&= (instance.weight == 1) - - if (getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) >= maxMemUsage) { - val block = InstanceBlock.fromInstances(buff.result()) - buff.clear() - buffCnt = 0L - buffNnz = 0L - buffUnitWeight = true - Iterator.single(block) - } else Iterator.empty - } ++ { - if (buffCnt > 0) { - val block = InstanceBlock.fromInstances(buff.result()) - Iterator.single(block) - } else Iterator.empty + new Iterator[InstanceBlock]() { + private var numCols = -1L + + override def hasNext: Boolean = instanceIterator.hasNext + + override def next(): InstanceBlock = { + val buff = mutable.ArrayBuilder.make[Instance] + var buffCnt = 0L + var buffNnz = 0L + var buffUnitWeight = true + var blockMemUsage = 0L + + while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) { + val instance: Instance = instanceIterator.next() + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + val nnz = instance.features.numNonzeros + + buff += instance + buffCnt += 1L + buffNnz += nnz + buffUnitWeight &&= (instance.weight == 1) + blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) + } + + // the block mem usage may slightly exceed threshold, not a big issue. + // and this ensure even if one row exceed block limit, each block has one row + InstanceBlock.fromInstances(buff.result()) + } } } From a69ca83c393f63a1fed13393ee5b3e04cffa384f Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 12 Nov 2020 11:19:55 +0800 Subject: [PATCH 11/11] rename param --- .../spark/ml/classification/LinearSVC.scala | 12 +++++----- .../apache/spark/ml/feature/Instance.scala | 9 ++++---- .../ml/param/shared/SharedParamsCodeGen.scala | 8 +++---- .../spark/ml/param/shared/sharedParams.scala | 12 +++++----- .../ml/classification/LinearSVCSuite.scala | 2 +- python/pyspark/ml/classification.py | 22 +++++++++---------- python/pyspark/ml/classification.pyi | 10 ++++----- .../ml/param/_shared_params_code_gen.py | 6 ++--- python/pyspark/ml/param/shared.py | 16 +++++++------- python/pyspark/ml/param/shared.pyi | 6 ++--- 10 files changed, 51 insertions(+), 52 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index a2e7b0fadd4c..95f37671e139 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold with HasBlockSizeInMB { + with HasAggregationDepth with HasThreshold with HasMaxBlockSizeInMB { /** * Param for threshold in binary classification prediction. @@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR "threshold in binary classification prediction applied to rawPrediction") setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6, - standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSizeInMB -> 0.0) + standardization -> true, threshold -> 0.0, aggregationDepth -> 2, maxBlockSizeInMB -> 0.0) } /** @@ -153,13 +153,13 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) /** - * Sets the value of param [[blockSizeInMB]]. + * Sets the value of param [[maxBlockSizeInMB]]. * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSizeInMB(value: Double): this.type = set(blockSizeInMB, value) + def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value) @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -169,7 +169,7 @@ class LinearSVC @Since("2.2.0") ( instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, - blockSizeInMB) + maxBlockSizeInMB) if (dataset.storageLevel != StorageLevel.NONE) { instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + @@ -191,7 +191,7 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) - var actualBlockSizeInMB = $(blockSizeInMB) + var actualBlockSizeInMB = $(maxBlockSizeInMB) if (actualBlockSizeInMB == 0) { actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 0b47c48e9a92..c237366ec5c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -161,20 +161,19 @@ private[spark] object InstanceBlock { var blockMemUsage = 0L while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) { - val instance: Instance = instanceIterator.next() + val instance = instanceIterator.next() if (numCols < 0L) numCols = instance.features.size require(numCols == instance.features.size) - val nnz = instance.features.numNonzeros buff += instance buffCnt += 1L - buffNnz += nnz + buffNnz += instance.features.numNonzeros buffUnitWeight &&= (instance.weight == 1) blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) } - // the block mem usage may slightly exceed threshold, not a big issue. - // and this ensure even if one row exceed block limit, each block has one row + // the block memory usage may slightly exceed threshold, not a big issue. + // and this ensure even if one row exceed block limit, each block has one row. InstanceBlock.fromInstances(buff.result()) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 64261bdfac7d..0640fe355fdd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -109,10 +109,10 @@ private[shared] object SharedParamsCodeGen { "stacked within partitions. If block size is more than remaining data in a partition " + "then it is adjusted to the size of this data.", isValid = "ParamValidators.gt(0)", isExpertParam = true), - ParamDesc[Double]("blockSizeInMB", "Maximum memory in MB for stacking input data " + - "in blocks. Data is stacked within partitions. If more than remaining data size in a " + - "partition then it is adjusted to the data size. If 0, try to infer an appropriate value " + - "based on the statistics of dataset. Must be >= 0.", + ParamDesc[Double]("maxBlockSizeInMB", "Maximum memory in MB for stacking input data " + + "into blocks. Data is stacked within partitions. If more than remaining data size in a " + + "partition then it is adjusted to the data size. If 0, try to infer an appropriate " + + "value. Must be >= 0.", Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 1c741545dade..2fbda45a9e97 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -564,20 +564,20 @@ trait HasBlockSize extends Params { } /** - * Trait for shared param blockSizeInMB (default: 0.0). This trait may be changed or + * Trait for shared param maxBlockSizeInMB (default: 0.0). This trait may be changed or * removed between minor versions. */ -trait HasBlockSizeInMB extends Params { +trait HasMaxBlockSizeInMB extends Params { /** - * Param for Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.. + * Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.. * @group expertParam */ - final val blockSizeInMB: DoubleParam = new DoubleParam(this, "blockSizeInMB", "Maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", ParamValidators.gtEq(0.0)) + final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", ParamValidators.gtEq(0.0)) - setDefault(blockSizeInMB, 0.0) + setDefault(maxBlockSizeInMB, 0.0) /** @group expertGetParam */ - final def getBlockSizeInMB: Double = $(blockSizeInMB) + final def getMaxBlockSizeInMB: Double = $(maxBlockSizeInMB) } // scalastyle:on diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 55558f06ee36..d8b9c6a606ec 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -215,7 +215,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { .setMaxIter(5) val model = lsvc.fit(dataset) Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => - val model2 = lsvc.setBlockSizeInMB(s).fit(dataset) + val model2 = lsvc.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9) } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index f96bbd4d3357..8f13f3275cb5 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -26,7 +26,7 @@ from pyspark.ml import Estimator, Predictor, PredictionModel, Model from pyspark.ml.param.shared import HasRawPredictionCol, HasProbabilityCol, HasThresholds, \ HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, \ - HasAggregationDepth, HasThreshold, HasBlockSize, HasBlockSizeInMB, Param, Params, \ + HasAggregationDepth, HasThreshold, HasBlockSize, HasMaxBlockSizeInMB, Param, Params, \ TypeConverters, HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ _TreeEnsembleModel, _RandomForestParams, _GBTParams, \ @@ -504,7 +504,7 @@ def recallByThreshold(self): class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSizeInMB): + HasMaxBlockSizeInMB): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -521,7 +521,7 @@ def __init__(self, *args): super(_LinearSVCParams, self).__init__(*args) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSizeInMB=0.0) + maxBlockSizeInMB=0.0) @inherit_doc @@ -565,7 +565,7 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl LinearSVCModel... >>> model.getThreshold() 0.5 - >>> model.getBlockSizeInMB() + >>> model.getMaxBlockSizeInMB() 0.0 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) @@ -605,12 +605,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSizeInMB=0.0): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSizeInMB=0.0): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( @@ -623,12 +623,12 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSizeInMB=0.0): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSizeInMB=0.0): + aggregationDepth=2, maxBlockSizeInMB=0.0): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -694,11 +694,11 @@ def setAggregationDepth(self, value): return self._set(aggregationDepth=value) @since("3.1.0") - def setBlockSizeInMB(self, value): + def setMaxBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSizeInMB`. + Sets the value of :py:attr:`maxBlockSizeInMB`. """ - return self._set(blockSizeInMB=value) + return self._set(maxBlockSizeInMB=value) class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, diff --git a/python/pyspark/ml/classification.pyi b/python/pyspark/ml/classification.pyi index 241f5baf8dfd..9f72d24f6311 100644 --- a/python/pyspark/ml/classification.pyi +++ b/python/pyspark/ml/classification.pyi @@ -26,7 +26,7 @@ from pyspark.ml.base import _PredictorParams from pyspark.ml.param.shared import ( HasAggregationDepth, HasBlockSize, - HasBlockSizeInMB, + HasMaxBlockSizeInMB, HasElasticNetParam, HasFitIntercept, HasMaxIter, @@ -173,7 +173,7 @@ class _LinearSVCParams( HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSizeInMB, + HasMaxBlockSizeInMB, ): threshold: Param[float] def __init__(self, *args: Any) -> None: ... @@ -199,7 +199,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSizeInMB: float = ... + maxBlockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -216,7 +216,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSizeInMB: float = ... + maxBlockSizeInMB: float = ... ) -> LinearSVC: ... def setMaxIter(self, value: int) -> LinearSVC: ... def setRegParam(self, value: float) -> LinearSVC: ... @@ -226,7 +226,7 @@ class LinearSVC( def setThreshold(self, value: float) -> LinearSVC: ... def setWeightCol(self, value: str) -> LinearSVC: ... def setAggregationDepth(self, value: int) -> LinearSVC: ... - def setBlockSizeInMB(self, value: float) -> LinearSVC: ... + def setMaxBlockSizeInMB(self, value: float) -> LinearSVC: ... class LinearSVCModel( _JavaClassificationModel[Vector], diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index b6fc170abe78..53d26972c4b4 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -166,10 +166,10 @@ def get$Name(self): ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " "adjusted to the size of this data.", None, "TypeConverters.toInt"), - ("blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is " + + ("maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is " + "stacked within partitions. If more than remaining data size in a partition then it " + - "is adjusted to the data size. If 0, try to infer an appropriate value based on the " + - "statistics of dataset. Must be >= 0.", "0.0", "TypeConverters.toFloat")] + "is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", + "0.0", "TypeConverters.toFloat")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index a829a2e76b38..cbef7386e221 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -599,19 +599,19 @@ def getBlockSize(self): return self.getOrDefault(self.blockSize) -class HasBlockSizeInMB(Params): +class HasMaxBlockSizeInMB(Params): """ - Mixin for param blockSizeInMB: maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0. + Mixin for param maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0. """ - blockSizeInMB = Param(Params._dummy(), "blockSizeInMB", "maximum memory in MB for stacking input data in blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value based on the statistics of dataset. Must be >= 0.", typeConverter=TypeConverters.toFloat) + maxBlockSizeInMB = Param(Params._dummy(), "maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", typeConverter=TypeConverters.toFloat) def __init__(self): - super(HasBlockSizeInMB, self).__init__() - self._setDefault(blockSizeInMB=0.0) + super(HasMaxBlockSizeInMB, self).__init__() + self._setDefault(maxBlockSizeInMB=0.0) - def getBlockSizeInMB(self): + def getMaxBlockSizeInMB(self): """ - Gets the value of blockSizeInMB or its default value. + Gets the value of maxBlockSizeInMB or its default value. """ - return self.getOrDefault(self.blockSizeInMB) + return self.getOrDefault(self.maxBlockSizeInMB) diff --git a/python/pyspark/ml/param/shared.pyi b/python/pyspark/ml/param/shared.pyi index bbb4890455de..0ff4d544205b 100644 --- a/python/pyspark/ml/param/shared.pyi +++ b/python/pyspark/ml/param/shared.pyi @@ -186,7 +186,7 @@ class HasBlockSize(Params): def __init__(self) -> None: ... def getBlockSize(self) -> int: ... -class HasBlockSizeInMB(Params): - blockSizeInMB: Param[float] +class HasMaxBlockSizeInMB(Params): + maxBlockSizeInMB: Param[float] def __init__(self) -> None: ... - def getBlockSizeInMB(self) -> float: ... + def getMaxBlockSizeInMB(self) -> float: ...