Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.{List => JList}
import java.util.UUID

import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.existentials

import org.apache.hadoop.fs.Path
Expand All @@ -33,7 +35,7 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -273,15 +275,38 @@ final class OneVsRest @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {

/**
* param for the number of processes to use when running parallel one vs. rest
* The implementation of parallel one vs. rest runs the classification for
* each class in a separate process.
* @group param
*/
@Since("2.3.0")
val parallelism = new IntParam(this, "parallelism",
"the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1))

setDefault(
parallelism -> 4
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the HasParallelism into OneVsRestParams ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by cross validator and train validation split, that's why it's shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can define
trait OneVsRestParams extends PredictorParams with HasParallelism
trait TrainValidationSplitParams extends ValidatorParams with HasParallelism
and so on, like other ml algos do ?
But I am not sure whether there is other reasons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 earlier in the discussion for this PR we had been coordinating with similar changes for parallel cross validator and train validation split at #16774 .

If trait OneVsRestParams extends PredictorParams with HasParallelism was done instead, then it would just give the parallelism param to the OneVsRestModel which isn't really useful since it's already been trained.

@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneVsRest"))

/** @group getParam */
def getParallelism: Int = $(parallelism)

/** @group setParam */
@Since("1.4.0")
def setClassifier(value: Classifier[_, _, _]): this.type = {
set(classifier, value.asInstanceOf[ClassifierType])
}

/** @group setParam */
@Since("2.3.0")
def setParallelism(value: Int): this.type = {
set(parallelism, value)
}

/** @group setParam */
@Since("1.5.0")
def setLabelCol(value: String): this.type = set(labelCol, value)
Expand Down Expand Up @@ -325,8 +350,13 @@ final class OneVsRest @Since("1.4.0") (
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
}

val iters = Range(0, numClasses).par
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @thunterdb just to double-check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkbradley thanks for calling this out. Indeed, the code as it stands may cause some non-deterministic issues in complex environments. I put a comment about that in that PR:
#16774
See how it is done in this file:
https://github.com/apache/spark/pull/16774/files#diff-d14539cadce0fba9d8b7d970adaf8b26

It should be a quick change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with @thunterdb : Setting the parallel collection tasksupport value is essentially doing the same thing as in @BryanCutler 's PR: ForkJoinTaskSupport is using the ExecutionContext created by ForkJoinPool under the hood.

iters.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(Math.min(getParallelism, numClasses))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ForkJoinPool is the best thing to use here. It's more geared for a bunch of small tasks that might spawn other tasks. Just a regular thread-pool is fine. Also, it is a little better for the case of parallelism 1 to use sameThreadExecutor. This will just run in the current thread and has no overhead (equivalent to running in serial), where a thread-pool of size 1 will still create another thread and does have some overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good way to implement my suggestion above. +1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, quick question: Where are docs for sameThreadExecutor? ...having trouble finding API docs on Google.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to log a message here indicating the parallelism used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use instr.log?

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val models = iters.map { index =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Futures is a simple change and imho is clearer than using parallel collections once you start messing around with the task support. It would just require changing a couple lines to something like this

  ...
  Future {classifier.fit(trainingDataset, paramMap)} (executionContext)
}.map(ThreadUtils.awaitResult(_, Duration.Inf)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do Futures add? They require more code, and the functionality seems to be the same for the purposes of OneVsRest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't necessarily add anything to here, but they are a more standard way of parallelism in Spark over using TaskSupport and it's more flexible for setting an ExecutorService. I'm not sure if you can set TaskSupport to sameThreadExecutor or what really happens behind the scenes if you make a ThreadPoolTaskSupport with 1 thread.

// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,37 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
}

test("one-vs-rest: tuning parallelism does not change output") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know this would be annoying to do - but would it make sense to test that we're actually training the models in parallel? I think we're probably doing it correctly right now, but I could see this accidentally getting screwed up and using the wrong execution context in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good way to do that? I'm having trouble thinking of ways to do it which would not produce flaky tests.

val numClasses = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure what this is doing, it is not used?

val ovaPar2 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(2)

val ovaModelPar2 = ovaPar2.fit(dataset)

val transformedDatasetPar2 = ovaModelPar2.transform(dataset)

val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val ovaPar4 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(4)

val ovaModelPar4 = ovaPar4.fit(dataset)

val transformedDatasetPar4 = ovaModelPar4.transform(dataset)

val ovaResultsPar4 = transformedDatasetPar4.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
val metricsPar4 = new MulticlassMetrics(ovaResultsPar4)
assert(metricsPar2.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test would be better if you just compared single threaded vs multi

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

test("one-vs-rest: pass label metadata correctly during train") {
val numClasses = 3
val ova = new OneVsRest()
Expand Down
34 changes: 29 additions & 5 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import operator
from multiprocessing.pool import ThreadPool

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
Expand Down Expand Up @@ -1511,27 +1512,47 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):
.. versionadded:: 2.0.0
"""

parallelism = Param(Params._dummy(), "parallelism",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a shared Param in Scala, do you want to make it one in Python too? You can add it to _shared_params_code_gen.py and then re-generate the shared.py file.

"number of processors to use when fitting models in parallel",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processors -> threads

typeConverter=TypeConverters.toInt)

@keyword_only
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None):
classifier=None, parallelism=4):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should be the same default as scala

"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
classifier=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add parallelism=1 here too

"""
super(OneVsRest, self).__init__()
self._setDefault(parallelism=4)
kwargs = self._input_kwargs
self._set(**kwargs)

@keyword_only
@since("2.0.0")
def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None):
def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
classifier=None, parallelism=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discrepency here between __init__ (default 1) and setParams (default None)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's strange these were all None before, would you mind changing these all to match the defaults in the constructor?

"""
setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: add parallelism=1 to doc

Sets params for OneVsRest.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)

@since("2.3.0")
def setParallelism(self, value):
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)

@since("2.3.0")
def getParallelism(self):
"""
Gets the value of parallelism or its default value.
"""
return self.getOrDefault(self.parallelism)

def _fit(self, dataset):
labelCol = self.getLabelCol()
featuresCol = self.getFeaturesCol()
Expand Down Expand Up @@ -1560,8 +1581,9 @@ def trainSingleClass(index):
(classifier.predictionCol, predictionCol)])
return classifier.fit(trainingDataset, paramMap)

# TODO: Parallel training for all classes.
models = [trainSingleClass(i) for i in range(numClasses)]
pool = ThreadPool(processes=min(self.getParallelism(), numClasses))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We limit the pool here, but not in Scala. Is there a real benefit to limiting it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala, it doesn't matter because this just sets the max size of the pool and creates threads as needed, so it will never go above numClasses anyway. In Python its a little unclear what it's doing, the ThreadPool class is not well documented, so I'm not sure if it makes a difference here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in Scala the threadpool is cached, here we aren't doing that and I think its a bit more heavy weight in Python so we might want to consider if there is a reasonable way to reuse (if not that's probably OK to since this overhead pales in comparison to training serially).


models = pool.map(trainSingleClass, range(numClasses))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the training of the models takes different times and there is a relatively large number of models we might end up blocking a worker here since the map effectively splits in advance. What do you think of providing a chunksize hint or using imap or imap_unordered instead? Since the overhead of sending the element to the worker should be relatively small compared to the time it takes to train the model?


if handlePersistence:
multiclassLabeled.unpersist()
Expand Down Expand Up @@ -1611,8 +1633,9 @@ def _from_java(cls, java_stage):
labelCol = java_stage.getLabelCol()
predictionCol = java_stage.getPredictionCol()
classifier = JavaParams._from_java(java_stage.getClassifier())
parallelism = java_stage.getParallelism()
py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol,
classifier=classifier)
classifier=classifier, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage

Expand All @@ -1625,6 +1648,7 @@ def _to_java(self):
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest",
self.uid)
_java_obj.setClassifier(self.getClassifier()._to_java())
_java_obj.setParallelism(self.getParallelism())
_java_obj.setFeaturesCol(self.getFeaturesCol())
_java_obj.setLabelCol(self.getLabelCol())
_java_obj.setPredictionCol(self.getPredictionCol())
Expand Down
17 changes: 14 additions & 3 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ def test_onevsrest(self):
(2.0, Vectors.dense(0.5, 0.5))] * 10,
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr = OneVsRest(classifier=lr, parallelism=8)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use the default here, don't set the param

model = ovr.fit(df)
ovrPath = temp_path + "/ovr"
ovr.save(ovrPath)
Expand Down Expand Up @@ -1215,7 +1215,7 @@ def test_copy(self):
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr = OneVsRest(classifier=lr, parallelism=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the default is 1, don't need this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed since default is 1

ovr1 = ovr.copy({lr.maxIter: 10})
self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
Expand All @@ -1229,11 +1229,22 @@ def test_output_columns(self):
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr = OneVsRest(classifier=lr, parallelism=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

model = ovr.fit(df)
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "prediction"])

def test_parallelism_doesnt_change_output(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
modelPar2 = ovrPar2.fit(df)
ovrPar4 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just test parallelism=1 vs parallelism=2

modelPar4 = ovrPar4.fit(df)
self.assertEqual(modelPar2.getPredictionCol(), modelPar4.getPredictionCol())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment, can we add a test to make sure that we are actually training in parallel? This is perhaps especially important in Python because I could see us accidentally blocking on something unexpected.


class HashingTFTest(SparkSessionTestCase):

Expand Down