Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {

import MultilayerPerceptronClassifier._

Expand All @@ -54,26 +54,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)

/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))

/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually go away because it's in HasBlockSize ? just checking the API doesn't change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, getBlockSize doesn't go away because it is in HasBlockSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of blocksize in MLP is 128, so explicitly setDefault(blockSize -> 128) in MLP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set in MultilayerPerceptronParams at line 83

  setDefault(maxIter -> 100, tol -> 1e-6, blockSize -> 128,
    solver -> LBFGS, stepSize -> 0.03)


/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
Expand Down
46 changes: 34 additions & 12 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
with HasBlockSize {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
Expand Down Expand Up @@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo

/** @group expertGetParam */
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)

setDefault(blockSize -> 4096)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that I should set Default of blockSize in ALSModelParams, so this will apply to both ALS and ALSModel.

}

/**
Expand Down Expand Up @@ -288,6 +291,15 @@ class ALSModel private[ml] (
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think this is about to be a default of 1024, after the very latest PR from @zhengruifeng is merged. I think it's probably good to go so will merge it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. I actually saw the default changed to 1024 in that PR, but I want the default to be 4096, that's why I set it explicitly in line 675 in the Estimator
setDefault(blockSize -> 4096).

I want the default to be 4096 because the blockify has 4096 as default. I don't want to change the current default value.

  private def blockify(
      factors: Dataset[(Int, Array[Float])],
      blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
    import factors.sparkSession.implicits._
    factors.mapPartitions(_.grouped(blockSize))
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds fine.

*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
if (featuresA != null && featuresB != null) {
var dotProduct = 0.0f
Expand Down Expand Up @@ -351,7 +363,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllUsers(numItems: Int): DataFrame = {
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
}

/**
Expand All @@ -366,7 +378,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
}

/**
Expand All @@ -377,7 +389,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllItems(numUsers: Int): DataFrame = {
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
}

/**
Expand All @@ -392,7 +404,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
}

/**
Expand Down Expand Up @@ -441,11 +453,12 @@ class ALSModel private[ml] (
dstFactors: DataFrame,
srcOutputColumn: String,
dstOutputColumn: String,
num: Int): DataFrame = {
num: Int,
blockSize: Int): DataFrame = {
import srcFactors.sparkSession.implicits._

val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap { case (srcIter, dstIter) =>
Expand Down Expand Up @@ -483,11 +496,10 @@ class ALSModel private[ml] (

/**
* Blockifies factors to improve the efficiency of cross join
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
factors: Dataset[(Int, Array[Float])],
Copy link
Member

@viirya viirya Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need default blockSize in this method? i.e.,blockSize: Int = 4096.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. No need to have the default blockSize any more. I will update the code. Thanks!

blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
import factors.sparkSession.implicits._
factors.mapPartitions(_.grouped(blockSize))
}
Expand Down Expand Up @@ -654,6 +666,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
Expand Down Expand Up @@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
instr.logDataset(dataset)
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel)
seed, intermediateStorageLevel, finalStorageLevel, blockSize)

val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
Expand All @@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
.setParent(this)
copyValues(model)
}

Expand Down
22 changes: 8 additions & 14 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -2164,7 +2164,7 @@ def sigma(self):


class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
HasTol, HasStepSize, HasSolver):
HasTol, HasStepSize, HasSolver, HasBlockSize):
"""
Params for :py:class:`MultilayerPerceptronClassifier`.

Expand All @@ -2175,11 +2175,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
"neurons and output layer of 10 neurons.",
typeConverter=TypeConverters.toListInt)
blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
"matrices. Data is stacked within partitions. If block size is more than " +
"remaining data in a partition then it is adjusted to the size of this " +
"data. Recommended size is between 10 and 1000, default is 128.",
typeConverter=TypeConverters.toInt)
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
Expand All @@ -2192,13 +2187,6 @@ def getLayers(self):
"""
return self.getOrDefault(self.layers)

@since("1.6.0")
def getBlockSize(self):
"""
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)

@since("2.0.0")
def getInitialWeights(self):
"""
Expand All @@ -2222,11 +2210,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
... (1.0, Vectors.dense([0.0, 1.0])),
... (1.0, Vectors.dense([1.0, 0.0])),
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
>>> mlp.setMaxIter(100)
MultilayerPerceptronClassifier...
>>> mlp.getMaxIter()
100
>>> mlp.getBlockSize()
128
>>> mlp.setBlockSize(1)
MultilayerPerceptronClassifier...
>>> mlp.getBlockSize()
1
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...
Expand Down
29 changes: 23 additions & 6 deletions python/pyspark/ml/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@inherit_doc
class _ALSModelParams(HasPredictionCol):
class _ALSModelParams(HasPredictionCol, HasBlockSize):
"""
Params for :py:class:`ALS` and :py:class:`ALSModel`.

Expand Down Expand Up @@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
>>> model.getBlockSize()
4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
Expand Down Expand Up @@ -282,21 +284,22 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096)
"""
super(ALS, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
blockSize=4096)
kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand All @@ -306,13 +309,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
Sets params for ALS.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -443,6 +446,13 @@ def setSeed(self, value):
"""
return self._set(seed=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)


class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
Expand Down Expand Up @@ -479,6 +489,13 @@ def setPredictionCol(self, value):
"""
return self._set(predictionCol=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)

@property
@since("1.4.0")
def rank(self):
Expand Down