diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index c7a8237849b5..6e8f92b9b1e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion /** Params for Multilayer Perceptron. */ private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams - with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver { + with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize { import MultilayerPerceptronClassifier._ @@ -54,26 +54,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl @Since("1.5.0") final def getLayers: Array[Int] = $(layers) - /** - * Block size for stacking input data in matrices to speed up the computation. - * Data is stacked within partitions. If block size is more than remaining data in - * a partition then it is adjusted to the size of this data. - * Recommended size is between 10 and 1000. - * Default: 128 - * - * @group expertParam - */ - @Since("1.5.0") - final val blockSize: IntParam = new IntParam(this, "blockSize", - "Block size for stacking input data in matrices. Data is stacked within partitions." + - " If block size is more than remaining data in a partition then " + - "it is adjusted to the size of this data. Recommended size is between 10 and 1000", - ParamValidators.gt(0)) - - /** @group expertGetParam */ - @Since("1.5.0") - final def getBlockSize: Int = $(blockSize) - /** * The solver algorithm for optimization. * Supported options: "gd" (minibatch gradient descent) or "l-bfgs". diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2fb9a276be88..002146f89e79 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom /** * Common params for ALS and ALSModel. */ -private[recommendation] trait ALSModelParams extends Params with HasPredictionCol { +private[recommendation] trait ALSModelParams extends Params with HasPredictionCol + with HasBlockSize { /** * Param for the column name for user ids. Ids must be integers. Other * numeric types are supported for this column, but will be cast to integers as long as they @@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** @group expertGetParam */ def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT) + + setDefault(blockSize -> 4096) } /** @@ -288,6 +291,15 @@ class ALSModel private[ml] ( @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) => if (featuresA != null && featuresB != null) { var dotProduct = 0.0f @@ -351,7 +363,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) } /** @@ -366,7 +378,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol)) - recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize)) } /** @@ -377,7 +389,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) } /** @@ -392,7 +404,7 @@ class ALSModel private[ml] ( @Since("2.3.0") def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = { val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol)) - recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize)) } /** @@ -441,11 +453,12 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int): DataFrame = { + num: Int, + blockSize: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -483,11 +496,10 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], - blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { + blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = { import factors.sparkSession.implicits._ factors.mapPartitions(_.grouped(blockSize)) } @@ -654,6 +666,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * @@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] instr.logDataset(dataset) instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, - seed, intermediateStorageLevel, finalStorageLevel) + seed, intermediateStorageLevel, finalStorageLevel, blockSize) val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), @@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] checkpointInterval = $(checkpointInterval), seed = $(seed)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") - val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) + val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize)) + .setParent(this) copyValues(model) } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 89d27fbfa316..5ae31c6728f1 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -2164,7 +2164,7 @@ def sigma(self): class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter, - HasTol, HasStepSize, HasSolver): + HasTol, HasStepSize, HasSolver, HasBlockSize): """ Params for :py:class:`MultilayerPerceptronClassifier`. @@ -2175,11 +2175,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " + "neurons and output layer of 10 neurons.", typeConverter=TypeConverters.toListInt) - blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " + - "matrices. Data is stacked within partitions. If block size is more than " + - "remaining data in a partition then it is adjusted to the size of this " + - "data. Recommended size is between 10 and 1000, default is 128.", - typeConverter=TypeConverters.toInt) solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " + "options: l-bfgs, gd.", typeConverter=TypeConverters.toString) initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.", @@ -2192,13 +2187,6 @@ def getLayers(self): """ return self.getOrDefault(self.layers) - @since("1.6.0") - def getBlockSize(self): - """ - Gets the value of blockSize or its default value. - """ - return self.getOrDefault(self.blockSize) - @since("2.0.0") def getInitialWeights(self): """ @@ -2222,11 +2210,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer ... (1.0, Vectors.dense([0.0, 1.0])), ... (1.0, Vectors.dense([1.0, 0.0])), ... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"]) - >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123) + >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123) >>> mlp.setMaxIter(100) MultilayerPerceptronClassifier... >>> mlp.getMaxIter() 100 + >>> mlp.getBlockSize() + 128 + >>> mlp.setBlockSize(1) + MultilayerPerceptronClassifier... + >>> mlp.getBlockSize() + 1 >>> model = mlp.fit(df) >>> model.setFeaturesCol("features") MultilayerPerceptronClassificationModel... diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index ee276962c898..fe571e25c05f 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -28,7 +28,7 @@ @inherit_doc -class _ALSModelParams(HasPredictionCol): +class _ALSModelParams(HasPredictionCol, HasBlockSize): """ Params for :py:class:`ALS` and :py:class:`ALSModel`. @@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable): 0.1 >>> als.clear(als.regParam) >>> model = als.fit(df) + >>> model.getBlockSize() + 4096 >>> model.getUserCol() 'user' >>> model.setUserCol("user") @@ -282,13 +284,13 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): """ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=false, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096) """ super(ALS, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid) @@ -296,7 +298,8 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -306,13 +309,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"): + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096): """ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \ ratingCol="rating", nonnegative=False, checkpointInterval=10, \ intermediateStorageLevel="MEMORY_AND_DISK", \ - finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan") + finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096) Sets params for ALS. """ kwargs = self._input_kwargs @@ -443,6 +446,13 @@ def setSeed(self, value): """ return self._set(seed=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable): """ @@ -479,6 +489,13 @@ def setPredictionCol(self, value): """ return self._set(predictionCol=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + @property @since("1.4.0") def rank(self):