diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index a01744f7b67f..853eeb39bf8d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -137,8 +137,8 @@ class CrossValidatorSuite cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) - val serialMetrics = cvSerialModel.avgMetrics.sorted - val parallelMetrics = cvParallelModel.avgMetrics.sorted + val serialMetrics = cvSerialModel.avgMetrics + val parallelMetrics = cvParallelModel.avgMetrics assert(serialMetrics === parallelMetrics) val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 2ed4fbb601b6..f8d9c66be2c4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -138,8 +138,8 @@ class TrainValidationSplitSuite cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) - val serialMetrics = cvSerialModel.validationMetrics.sorted - val parallelMetrics = cvParallelModel.validationMetrics.sorted + val serialMetrics = cvSerialModel.validationMetrics + val parallelMetrics = cvParallelModel.validationMetrics assert(serialMetrics === parallelMetrics) val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 8b8bcc7b13a3..2f1f3af957e4 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -836,6 +836,27 @@ def test_save_load_simple_estimator(self): loadedModel = CrossValidatorModel.load(cvModelPath) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + def test_parallel_evaluation(self): + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build() + evaluator = BinaryClassificationEvaluator() + + # test save/load of CrossValidator + cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + cv.setParallelism(1) + cvSerialModel = cv.fit(dataset) + cv.setParallelism(2) + cvParallelModel = cv.fit(dataset) + self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics) + def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( @@ -986,6 +1007,24 @@ def test_save_load_simple_estimator(self): loadedModel = TrainValidationSplitModel.load(tvsModelPath) self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + def test_parallel_evaluation(self): + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build() + evaluator = BinaryClassificationEvaluator() + tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + tvs.setParallelism(1) + tvsSerialModel = tvs.fit(dataset) + tvs.setParallelism(2) + tvsParallelModel = tvs.fit(dataset) + self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics) + def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 00c348aa9f7d..47351133524e 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -14,15 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import itertools import numpy as np +from multiprocessing.pool import ThreadPool from pyspark import since, keyword_only from pyspark.ml import Estimator, Model from pyspark.ml.common import _py2java from pyspark.ml.param import Params, Param, TypeConverters -from pyspark.ml.param.shared import HasSeed +from pyspark.ml.param.shared import HasParallelism, HasSeed from pyspark.ml.util import * from pyspark.ml.wrapper import JavaParams from pyspark.sql.functions import rand @@ -170,7 +170,7 @@ def _to_java_impl(self): return java_estimator, java_epms, java_evaluator -class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): +class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): """ K-fold cross validation performs model selection by splitting the dataset into a set of @@ -193,7 +193,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() - >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + ... parallelism=2) >>> cvModel = cv.fit(dataset) >>> cvModel.avgMetrics[0] 0.5 @@ -208,23 +209,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None): + seed=None, parallelism=1): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None) + seed=None, parallelism=1) """ super(CrossValidator, self).__init__() - self._setDefault(numFolds=3) + self._setDefault(numFolds=3, parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @keyword_only @since("1.4.0") def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None): + seed=None, parallelism=1): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None): + seed=None, parallelism=1): Sets params for cross validator. """ kwargs = self._input_kwargs @@ -255,18 +256,27 @@ def _fit(self, dataset): randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels + + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + for i in range(nFolds): validateLB = i * h validateUB = (i + 1) * h condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) - validation = df.filter(condition) - train = df.filter(~condition) - models = est.fit(train, epm) - for j in range(numModels): - model = models[j] + validation = df.filter(condition).cache() + train = df.filter(~condition).cache() + + def singleTrain(paramMap): + model = est.fit(train, paramMap) # TODO: duplicate evaluator to take extra params from input - metric = eva.evaluate(model.transform(validation, epm[j])) - metrics[j] += metric/nFolds + metric = eva.evaluate(model.transform(validation, paramMap)) + return metric + + currentFoldMetrics = pool.map(singleTrain, epm) + for j in range(numModels): + metrics[j] += (currentFoldMetrics[j] / nFolds) + validation.unpersist() + train.unpersist() if eva.isLargerBetter(): bestIndex = np.argmax(metrics) @@ -316,9 +326,10 @@ def _from_java(cls, java_stage): estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage) numFolds = java_stage.getNumFolds() seed = java_stage.getSeed() + parallelism = java_stage.getParallelism() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - numFolds=numFolds, seed=seed) + numFolds=numFolds, seed=seed, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -337,6 +348,7 @@ def _to_java(self): _java_obj.setEstimator(estimator) _java_obj.setSeed(self.getSeed()) _java_obj.setNumFolds(self.getNumFolds()) + _java_obj.setParallelism(self.getParallelism()) return _java_obj @@ -427,7 +439,7 @@ def _to_java(self): return _java_obj -class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): +class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): """ .. note:: Experimental @@ -448,7 +460,8 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() - >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + ... parallelism=2) >>> tvsModel = tvs.fit(dataset) >>> evaluator.evaluate(tvsModel.transform(dataset)) 0.8333... @@ -461,23 +474,23 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - seed=None): + parallelism=1, seed=None): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - seed=None) + parallelism=1, seed=None) """ super(TrainValidationSplit, self).__init__() - self._setDefault(trainRatio=0.75) + self._setDefault(trainRatio=0.75, parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @since("2.0.0") @keyword_only def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - seed=None): + parallelism=1, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - seed=None): + parallelism=1, seed=None): Sets params for the train validation split. """ kwargs = self._input_kwargs @@ -506,15 +519,20 @@ def _fit(self, dataset): seed = self.getOrDefault(self.seed) randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) - metrics = [0.0] * numModels condition = (df[randCol] >= tRatio) - validation = df.filter(condition) - train = df.filter(~condition) - models = est.fit(train, epm) - for j in range(numModels): - model = models[j] - metric = eva.evaluate(model.transform(validation, epm[j])) - metrics[j] += metric + validation = df.filter(condition).cache() + train = df.filter(~condition).cache() + + def singleTrain(paramMap): + model = est.fit(train, paramMap) + metric = eva.evaluate(model.transform(validation, paramMap)) + return metric + + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + metrics = pool.map(singleTrain, epm) + train.unpersist() + validation.unpersist() + if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: @@ -563,9 +581,10 @@ def _from_java(cls, java_stage): estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage) trainRatio = java_stage.getTrainRatio() seed = java_stage.getSeed() + parallelism = java_stage.getParallelism() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - trainRatio=trainRatio, seed=seed) + trainRatio=trainRatio, seed=seed, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -584,6 +603,7 @@ def _to_java(self): _java_obj.setEstimator(estimator) _java_obj.setTrainRatio(self.getTrainRatio()) _java_obj.setSeed(self.getSeed()) + _java_obj.setParallelism(self.getParallelism()) return _java_obj