diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 77272c65eb23..95f37671e139 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold with HasBlockSize { + with HasAggregationDepth with HasThreshold with HasMaxBlockSizeInMB { /** * Param for threshold in binary classification prediction. @@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR "threshold in binary classification prediction applied to rawPrediction") setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6, - standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSize -> 1) + standardization -> true, threshold -> 0.0, aggregationDepth -> 2, maxBlockSizeInMB -> 0.0) } /** @@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) /** - * Set block size for stacking input data in matrices. - * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines - * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * Recommended size is between 10 and 1000. An appropriate choice of the block size depends - * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, - * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). - * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, stacking may bring no performance gain, the worse is possible - * performance regression. - * Default is 1. + * Sets the value of param [[maxBlockSizeInMB]]. + * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) + def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value) @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -177,19 +168,19 @@ class LinearSVC @Since("2.2.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, + maxBlockSizeInMB) + + if (dataset.storageLevel != StorageLevel.NONE) { + instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + + s"then cached during training. Be careful of double caching!") + } val instances = extractInstances(dataset) .setName("training instances") - if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } - - var requestedMetrics = Seq("mean", "std", "count") - if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros" val (summarizer, labelSummarizer) = Summarizer - .getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics) + .getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count")) val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid @@ -199,14 +190,12 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) - if ($(blockSize) > 1) { - val scale = 1.0 / summarizer.count / numFeatures - val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { - instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } + + var actualBlockSizeInMB = $(maxBlockSizeInMB) + if (actualBlockSizeInMB == 0) { + actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB + require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { @@ -245,12 +234,8 @@ class LinearSVC @Since("2.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { - trainOnRows(instances, featuresStd, regularization, optimizer) - } else { - trainOnBlocks(instances, featuresStd, regularization, optimizer) - } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() + val (rawCoefficients, objectiveHistory) = + trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer) if (rawCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -284,35 +269,9 @@ class LinearSVC @Since("2.2.0") ( model.setSummary(Some(summary)) } - private def trainOnRows( - instances: RDD[Instance], - featuresStd: Array[Double], - regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { - val numFeatures = featuresStd.length - val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures - - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, - regularization, $(aggregationDepth)) - - val states = optimizer.iterations(new CachedDiffFunction(costFun), - Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) - - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - bcFeaturesStd.destroy() - - (if (state != null) state.x.toArray else null, arrayBuilder.result) - } - - private def trainOnBlocks( + private def trainImpl( instances: RDD[Instance], + actualBlockSizeInMB: Double, featuresStd: Array[Double], regularization: Option[L2Regularization], optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { @@ -326,9 +285,11 @@ class LinearSVC @Since("2.2.0") ( val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index db5f88d5dddc..c237366ec5c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.feature +import scala.collection.mutable + import org.apache.spark.ml.linalg._ import org.apache.spark.rdd.RDD @@ -100,6 +102,32 @@ private[spark] case class InstanceBlock( private[spark] object InstanceBlock { + /** + * Suggested value for BlockSizeInMB in Level-2 routine cases. + * According to performance tests of BLAS routine (see SPARK-31714) and + * LinearSVC (see SPARK-32907), 1.0 MB should be an acceptable value for + * linear models using Level-2 routine (GEMV) to perform prediction and + * gradient computation. + */ + val DefaultBlockSizeInMB = 1.0 + + private def getBlockMemUsage( + numCols: Long, + numRows: Long, + nnz: Long, + allUnitWeight: Boolean): Long = { + val doubleBytes = java.lang.Double.BYTES + val arrayHeader = 12L + val denseSize = Matrices.getDenseSize(numCols, numRows) + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + val matrixSize = math.min(denseSize, sparseSize) + if (allUnitWeight) { + matrixSize + doubleBytes * numRows + arrayHeader * 2 + } else { + matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2 + } + } + def fromInstances(instances: Seq[Instance]): InstanceBlock = { val labels = instances.map(_.label).toArray val weights = if (instances.exists(_.weight != 1)) { @@ -114,6 +142,49 @@ private[spark] object InstanceBlock { def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) } + + def blokifyWithMaxMemUsage( + instanceIterator: Iterator[Instance], + maxMemUsage: Long): Iterator[InstanceBlock] = { + require(maxMemUsage > 0) + + new Iterator[InstanceBlock]() { + private var numCols = -1L + + override def hasNext: Boolean = instanceIterator.hasNext + + override def next(): InstanceBlock = { + val buff = mutable.ArrayBuilder.make[Instance] + var buffCnt = 0L + var buffNnz = 0L + var buffUnitWeight = true + var blockMemUsage = 0L + + while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) { + val instance = instanceIterator.next() + if (numCols < 0L) numCols = instance.features.size + require(numCols == instance.features.size) + + buff += instance + buffCnt += 1L + buffNnz += instance.features.numNonzeros + buffUnitWeight &&= (instance.weight == 1) + blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight) + } + + // the block memory usage may slightly exceed threshold, not a big issue. + // and this ensure even if one row exceed block limit, each block has one row. + InstanceBlock.fromInstances(buff.result()) + } + } + } + + def blokifyWithMaxMemUsage( + instances: RDD[Instance], + maxMemUsage: Long): RDD[InstanceBlock] = { + require(maxMemUsage > 0) + instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage)) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7fd5f5938b56..0640fe355fdd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -108,7 +108,12 @@ private[shared] object SharedParamsCodeGen { ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + "stacked within partitions. If block size is more than remaining data in a partition " + "then it is adjusted to the size of this data.", - isValid = "ParamValidators.gt(0)", isExpertParam = true) + isValid = "ParamValidators.gt(0)", isExpertParam = true), + ParamDesc[Double]("maxBlockSizeInMB", "Maximum memory in MB for stacking input data " + + "into blocks. Data is stacked within partitions. If more than remaining data size in a " + + "partition then it is adjusted to the data size. If 0, try to infer an appropriate " + + "value. Must be >= 0.", + Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true) ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 60203eba61ea..2fbda45a9e97 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -562,4 +562,22 @@ trait HasBlockSize extends Params { /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) } + +/** + * Trait for shared param maxBlockSizeInMB (default: 0.0). This trait may be changed or + * removed between minor versions. + */ +trait HasMaxBlockSizeInMB extends Params { + + /** + * Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.. + * @group expertParam + */ + final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", ParamValidators.gtEq(0.0)) + + setDefault(maxBlockSizeInMB, 0.0) + + /** @group expertGetParam */ + final def getMaxBlockSizeInMB: Double = $(maxBlockSizeInMB) +} // scalastyle:on diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index a66397324c1a..d8b9c6a606ec 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { .setFitIntercept(fitIntercept) .setMaxIter(5) val model = lsvc.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = lsvc.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index d780bdf5f5dc..f1e071357bab 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -74,4 +74,58 @@ class InstanceSuite extends SparkFunSuite{ } } + test("InstanceBlock: blokify with max memory usage") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val blocks = InstanceBlock + .blokifyWithMaxMemUsage(Iterator.apply(instance1, instance2), 128).toArray + require(blocks.length == 1) + val block = blocks.head + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } + + // instances larger than maxMemUsage + val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0))) + InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size + InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size + + // different numFeatures + intercept[IllegalArgumentException] { + InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, denseInstance), 64).size + } + + // nnz = 10 + val sparseInstance = Instance(-2.0, 3.0, + Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1))) + + // normally, memory usage of a block does not exceed maxMemUsage too much + val maxMemUsage = 1 << 18 + val mixedIter = Iterator.fill(100)(denseInstance) ++ + Iterator.fill(1000)(sparseInstance) ++ + Iterator.fill(10)(denseInstance) ++ + Iterator.fill(10)(sparseInstance) ++ + Iterator.fill(100)(denseInstance) ++ + Iterator.fill(100)(sparseInstance) + InstanceBlock.blokifyWithMaxMemUsage(mixedIter, maxMemUsage) + .foreach { block => + val doubleBytes = java.lang.Double.BYTES + val arrayHeader = 12L + val blockMemUsage = block.matrix.getSizeInBytes + + (block.labels.length + block.weights.length) * doubleBytes + arrayHeader * 2 + require(blockMemUsage < maxMemUsage * 1.05) + } + } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index d6c861361a24..8f13f3275cb5 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -26,8 +26,8 @@ from pyspark.ml import Estimator, Predictor, PredictionModel, Model from pyspark.ml.param.shared import HasRawPredictionCol, HasProbabilityCol, HasThresholds, \ HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, \ - HasAggregationDepth, HasThreshold, HasBlockSize, Param, Params, TypeConverters, \ - HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism + HasAggregationDepth, HasThreshold, HasBlockSize, HasMaxBlockSizeInMB, Param, Params, \ + TypeConverters, HasElasticNetParam, HasSeed, HasStepSize, HasSolver, HasParallelism from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ _TreeEnsembleModel, _RandomForestParams, _GBTParams, \ _HasVarianceImpurity, _TreeClassifierParams @@ -504,7 +504,7 @@ def recallByThreshold(self): class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize): + HasMaxBlockSizeInMB): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -521,7 +521,7 @@ def __init__(self, *args): super(_LinearSVCParams, self).__init__(*args) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=1) + maxBlockSizeInMB=0.0) @inherit_doc @@ -565,8 +565,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl LinearSVCModel... >>> model.getThreshold() 0.5 - >>> model.getBlockSize() - 1 + >>> model.getMaxBlockSizeInMB() + 0.0 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -605,12 +605,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( @@ -623,12 +623,12 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=1): + aggregationDepth=2, maxBlockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=1): + aggregationDepth=2, maxBlockSizeInMB=0.0): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -694,11 +694,11 @@ def setAggregationDepth(self, value): return self._set(aggregationDepth=value) @since("3.1.0") - def setBlockSize(self, value): + def setMaxBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSize`. + Sets the value of :py:attr:`maxBlockSizeInMB`. """ - return self._set(blockSize=value) + return self._set(maxBlockSizeInMB=value) class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, diff --git a/python/pyspark/ml/classification.pyi b/python/pyspark/ml/classification.pyi index 55afc20a54cb..9f72d24f6311 100644 --- a/python/pyspark/ml/classification.pyi +++ b/python/pyspark/ml/classification.pyi @@ -26,6 +26,7 @@ from pyspark.ml.base import _PredictorParams from pyspark.ml.param.shared import ( HasAggregationDepth, HasBlockSize, + HasMaxBlockSizeInMB, HasElasticNetParam, HasFitIntercept, HasMaxIter, @@ -172,7 +173,7 @@ class _LinearSVCParams( HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize, + HasMaxBlockSizeInMB, ): threshold: Param[float] def __init__(self, *args: Any) -> None: ... @@ -198,7 +199,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -215,7 +216,7 @@ class LinearSVC( threshold: float = ..., weightCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> LinearSVC: ... def setMaxIter(self, value: int) -> LinearSVC: ... def setRegParam(self, value: float) -> LinearSVC: ... @@ -225,7 +226,7 @@ class LinearSVC( def setThreshold(self, value: float) -> LinearSVC: ... def setWeightCol(self, value: str) -> LinearSVC: ... def setAggregationDepth(self, value: int) -> LinearSVC: ... - def setBlockSize(self, value: int) -> LinearSVC: ... + def setMaxBlockSizeInMB(self, value: float) -> LinearSVC: ... class LinearSVCModel( _JavaClassificationModel[Vector], diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index bc1ea87ad629..53d26972c4b4 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -165,7 +165,11 @@ def get$Name(self): None, "TypeConverters.toString"), ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", None, "TypeConverters.toInt")] + "adjusted to the size of this data.", None, "TypeConverters.toInt"), + ("maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is " + + "stacked within partitions. If more than remaining data size in a partition then it " + + "is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", + "0.0", "TypeConverters.toFloat")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 24fb0d3e2554..cbef7386e221 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -597,3 +597,21 @@ def getBlockSize(self): Gets the value of blockSize or its default value. """ return self.getOrDefault(self.blockSize) + + +class HasMaxBlockSizeInMB(Params): + """ + Mixin for param maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0. + """ + + maxBlockSizeInMB = Param(Params._dummy(), "maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", typeConverter=TypeConverters.toFloat) + + def __init__(self): + super(HasMaxBlockSizeInMB, self).__init__() + self._setDefault(maxBlockSizeInMB=0.0) + + def getMaxBlockSizeInMB(self): + """ + Gets the value of maxBlockSizeInMB or its default value. + """ + return self.getOrDefault(self.maxBlockSizeInMB) diff --git a/python/pyspark/ml/param/shared.pyi b/python/pyspark/ml/param/shared.pyi index 5999c0eaa466..0ff4d544205b 100644 --- a/python/pyspark/ml/param/shared.pyi +++ b/python/pyspark/ml/param/shared.pyi @@ -185,3 +185,8 @@ class HasBlockSize(Params): blockSize: Param[int] def __init__(self) -> None: ... def getBlockSize(self) -> int: ... + +class HasMaxBlockSizeInMB(Params): + maxBlockSizeInMB: Param[float] + def __init__(self) -> None: ... + def getMaxBlockSizeInMB(self) -> float: ...