From cdddecd355ed3e444d87efe09c1506c9b65f288c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 16:07:26 -0700 Subject: [PATCH 1/5] add CrossValidator in Python --- python/pyspark/ml/pipeline.py | 11 ++- python/pyspark/ml/tuning.py | 165 +++++++++++++++++++++++++++++++++- python/pyspark/ml/wrapper.py | 4 +- 3 files changed, 175 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 7b875e4b7125..8253b208ce94 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -22,7 +22,7 @@ from pyspark.mllib.common import inherit_doc -__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel', 'Evaluator'] +__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel', 'Evaluator', 'Model'] @inherit_doc @@ -70,6 +70,15 @@ def transform(self, dataset, params={}): raise NotImplementedError() +@inherit_doc +class Model(Transformer): + """ + Abstract class for models that fitted by estimators. + """ + + __metaclass__ = ABCMeta + + @inherit_doc class Pipeline(Estimator): """ diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 1773ab5bdcdb..002adc6db96e 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -16,8 +16,13 @@ # import itertools +import numpy as np -__all__ = ['ParamGridBuilder'] +from pyspark.ml.param import Params, Param +from pyspark.ml import Estimator, Model +from pyspark.sql.functions import rand + +__all__ = ['ParamGridBuilder', 'CrossValidator'] class ParamGridBuilder(object): @@ -78,7 +83,163 @@ def build(self): grid_values = self._param_grid.values() return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)] +class CrossValidator(Estimator): + """ + K-fold cross validation. + + >>> from pyspark.ml.classification import LogisticRegression + >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator + >>> from pyspark.mllib.linalg import Vectors + >>> dataset = sqlContext.createDataFrame( + ... [(Vectors.dense([0.0, 1.0]), 0.0), + ... (Vectors.dense([1.0, 2.0]), 1.0), + ... (Vectors.dense([0.55, 3.0]), 0.0), + ... (Vectors.dense([0.45, 4.0]), 1.0), + ... (Vectors.dense([0.51, 5.0]), 1.0)] * 10, + ... ["features", "label"]) + >>> lr = LogisticRegression() + >>> grid = ParamGridBuilder() \ + .addGrid(lr.maxIter, [0, 1, 5]) \ + .build() + >>> evaluator = BinaryClassificationEvaluator() + >>> cv = CrossValidator() \ + .setEstimator(lr) \ + .setEstimatorParamMaps(grid) \ + .setEvaluator(evaluator) \ + .setNumFolds(3) + >>> cvModel = cv.fit(dataset) + >>> expected = lr.fit(dataset, {lr.maxIter: 5}).transform(dataset) + >>> cvModel.transform(dataset).collect() == expected.collect() + True + """ + + # a placeholder to make it appear in the generated doc + estimator = Param(Params._dummy(), "estimator", "estimator to be cross-validated") + + # a placeholder to make it appear in the generated doc + estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps") + + # a placeholder to make it appear in the generated doc + evaluator = Param(Params._dummy(), "evaluator", "evaluator for selection") + + # a placeholder to make it appear in the generated doc + numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation") + + def __init__(self): + super(CrossValidator, self).__init__() + #: param for estimator to be cross-validated + self.estimator = Param(self, "estimator", "estimator to be cross-validated") + #: param for estimator param maps + self.estimatorParamMaps = Param(self, "estimatorParamMaps", "estimator param maps") + #: param for evaluator for selection + self.evaluator = Param(self, "evaluator", "evaluator for selection") + #: param for number of folds for cross validation + self.numFolds = Param(self, "numFolds", "number of folds for cross validation") + + def setEstimator(self, value): + """ + Sets the value of :py:attr:`estimator`. + """ + self.paramMap[self.estimator] = value + return self + + def getEstimator(self): + """ + Gets the value of estimator or its default value. + """ + return self.getOrDefault(self.estimator) + + def setEstimatorParamMaps(self, value): + """ + Sets the value of :py:attr:`estimatorParamMaps`. + """ + self.paramMap[self.estimatorParamMaps] = value + return self + + def getEstimatorParamMaps(self): + """ + Gets the value of estimatorParamMaps or its default value. + """ + return self.getOrDefault(self.estimatorParamMaps) + + def setEvaluator(self, value): + """ + Sets the value of :py:attr:`evaluator`. + """ + self.paramMap[self.evaluator] = value + return self + + def getEvaluator(self): + """ + Gets the value of evaluator or its default value. + """ + return self.getOrDefault(self.evaluator) + + def setNumFolds(self, value): + """ + Sets the value of :py:attr:`numFolds`. + """ + self.paramMap[self.numFolds] = value + return self + + def getNumFolds(self): + """ + Gets the value of numFolds or its default value. + """ + return self.getOrDefault(self.numFolds) + + def fit(self, dataset, params={}): + paramMap = self.extractParamMap(params) + est = paramMap[self.estimator] + epm = paramMap[self.estimatorParamMaps] + numModels = len(epm) + eva = paramMap[self.evaluator] + nFolds = paramMap[self.numFolds] + h = 1.0 / nFolds + randCol = self.uid + "_rand" + df = dataset.select("*", rand(0).alias(randCol)) + metrics = np.zeros(numModels) + for i in range(nFolds): + validateLB = i * h + validateUB = (i + 1) * h + condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) + validation = df.filter(condition) + train = df.filter(~condition) + for j in range(numModels): + model = est.fit(train, epm[j]) + metric = eva.evaluate(model.transform(validation, epm[j])) + metrics[j] += metric + bestIndex = np.argmax(metrics) + bestModel = est.fit(dataset, epm[bestIndex]) + return CrossValidatorModel(bestModel) + + +class CrossValidatorModel(Model): + """ + Model from k-fold corss validation. + """ + + def __init__(self, bestModel): + #: best model from cross validation + self.bestModel = bestModel + + def transform(self, dataset, params={}): + return self.bestModel.transform(dataset, params) + if __name__ == "__main__": import doctest - doctest.testmod() + from pyspark.context import SparkContext + from pyspark.sql import SQLContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + sc = SparkContext("local[2]", "ml.tuning tests") + sqlContext = SQLContext(sc) + globs['sc'] = sc + globs['sqlContext'] = sqlContext + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) + sc.stop() + if failure_count: + exit(-1) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 73741c4b40df..0634254bbd5c 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -20,7 +20,7 @@ from pyspark import SparkContext from pyspark.sql import DataFrame from pyspark.ml.param import Params -from pyspark.ml.pipeline import Estimator, Transformer, Evaluator +from pyspark.ml.pipeline import Estimator, Transformer, Evaluator, Model from pyspark.mllib.common import inherit_doc @@ -133,7 +133,7 @@ def transform(self, dataset, params={}): @inherit_doc -class JavaModel(JavaTransformer): +class JavaModel(Model, JavaTransformer): """ Base class for :py:class:`Model`s that wrap Java/Scala implementations. From acac72718e289fbbf651d60251820f76394bd1cd Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 16:12:18 -0700 Subject: [PATCH 2/5] add keyword args --- python/pyspark/ml/tuning.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 002adc6db96e..1cb30a0420ab 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -21,6 +21,7 @@ from pyspark.ml.param import Params, Param from pyspark.ml import Estimator, Model from pyspark.sql.functions import rand +from pyspark.ml.util import keyword_only __all__ = ['ParamGridBuilder', 'CrossValidator'] @@ -102,11 +103,7 @@ class CrossValidator(Estimator): .addGrid(lr.maxIter, [0, 1, 5]) \ .build() >>> evaluator = BinaryClassificationEvaluator() - >>> cv = CrossValidator() \ - .setEstimator(lr) \ - .setEstimatorParamMaps(grid) \ - .setEvaluator(evaluator) \ - .setNumFolds(3) + >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) >>> cvModel = cv.fit(dataset) >>> expected = lr.fit(dataset, {lr.maxIter: 5}).transform(dataset) >>> cvModel.transform(dataset).collect() == expected.collect() @@ -125,7 +122,11 @@ class CrossValidator(Estimator): # a placeholder to make it appear in the generated doc numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation") - def __init__(self): + @keyword_only + def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3): + """ + __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3) + """ super(CrossValidator, self).__init__() #: param for estimator to be cross-validated self.estimator = Param(self, "estimator", "estimator to be cross-validated") @@ -135,6 +136,18 @@ def __init__(self): self.evaluator = Param(self, "evaluator", "evaluator for selection") #: param for number of folds for cross validation self.numFolds = Param(self, "numFolds", "number of folds for cross validation") + self._setDefault(numFolds=3) + kwargs = self.__init__._input_kwargs + self._set(**kwargs) + + @keyword_only + def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3): + """ + setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3): + Sets params for cross validator. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) def setEstimator(self, value): """ From 060f7c340fd8281ca48ba99142d73a5786b8d737 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 16:13:23 -0700 Subject: [PATCH 3/5] update doctest --- python/pyspark/ml/tuning.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 1cb30a0420ab..ad33b4b23c09 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -20,10 +20,10 @@ from pyspark.ml.param import Params, Param from pyspark.ml import Estimator, Model -from pyspark.sql.functions import rand from pyspark.ml.util import keyword_only +from pyspark.sql.functions import rand -__all__ = ['ParamGridBuilder', 'CrossValidator'] +__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel'] class ParamGridBuilder(object): @@ -84,6 +84,7 @@ def build(self): grid_values = self._param_grid.values() return [dict(zip(keys, prod)) for prod in itertools.product(*grid_values)] + class CrossValidator(Estimator): """ K-fold cross validation. @@ -99,9 +100,7 @@ class CrossValidator(Estimator): ... (Vectors.dense([0.51, 5.0]), 1.0)] * 10, ... ["features", "label"]) >>> lr = LogisticRegression() - >>> grid = ParamGridBuilder() \ - .addGrid(lr.maxIter, [0, 1, 5]) \ - .build() + >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1, 5]).build() >>> evaluator = BinaryClassificationEvaluator() >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) >>> cvModel = cv.fit(dataset) From 8285134fa970e94bb9c3da075e799f4b254be96f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 23:12:03 -0700 Subject: [PATCH 4/5] update doc --- .../org/apache/spark/ml/tuning/CrossValidator.scala | 6 ++++-- python/pyspark/ml/pipeline.py | 4 ++-- python/pyspark/ml/tuning.py | 13 +++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index cee2aa6e8552..d7bdb8a914ca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -52,10 +52,12 @@ private[ml] trait CrossValidatorParams extends Params { def getEstimatorParamMaps: Array[ParamMap] = $(estimatorParamMaps) /** - * param for the evaluator for selection + * param for the evaluator used to select hyper-parameters that maximize the cross-validated + * metric * @group param */ - val evaluator: Param[Evaluator] = new Param(this, "evaluator", "evaluator for selection") + val evaluator: Param[Evaluator] = new Param(this, "evaluator", + "evaluator used to select hyper-parameters that maximize the cross-validated metric") /** @group getParam */ def getEvaluator: Evaluator = $(evaluator) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 8253b208ce94..c1b2077c985c 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -73,7 +73,7 @@ def transform(self, dataset, params={}): @inherit_doc class Model(Transformer): """ - Abstract class for models that fitted by estimators. + Abstract class for models that are fitted by estimators. """ __metaclass__ = ABCMeta @@ -163,7 +163,7 @@ def fit(self, dataset, params={}): @inherit_doc -class PipelineModel(Transformer): +class PipelineModel(Model): """ Represents a compiled pipeline with transformers and fitted models. """ diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index ad33b4b23c09..b4d63ea55452 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -116,7 +116,9 @@ class CrossValidator(Estimator): estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps") # a placeholder to make it appear in the generated doc - evaluator = Param(Params._dummy(), "evaluator", "evaluator for selection") + evaluator = Param( + Params._dummy(), "evaluator", + "evaluator used to select hyper-parameters that maximize the cross-validated metric") # a placeholder to make it appear in the generated doc numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation") @@ -131,8 +133,11 @@ def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numF self.estimator = Param(self, "estimator", "estimator to be cross-validated") #: param for estimator param maps self.estimatorParamMaps = Param(self, "estimatorParamMaps", "estimator param maps") - #: param for evaluator for selection - self.evaluator = Param(self, "evaluator", "evaluator for selection") + #: param for the evaluator used to select hyper-parameters that + #: maximize the cross-validated metric + self.evaluator = Param( + self, "evaluator", + "evaluator used to select hyper-parameters that maximize the cross-validated metric") #: param for number of folds for cross validation self.numFolds = Param(self, "numFolds", "number of folds for cross validation") self._setDefault(numFolds=3) @@ -228,7 +233,7 @@ def fit(self, dataset, params={}): class CrossValidatorModel(Model): """ - Model from k-fold corss validation. + Model from k-fold cross validation. """ def __init__(self, bestModel): From 6af181fe655843eae4dc4785b9d9f3532f848b0e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 23:15:33 -0700 Subject: [PATCH 5/5] add TODOs --- .../main/scala/org/apache/spark/ml/tuning/CrossValidator.scala | 1 + python/pyspark/ml/tuning.py | 1 + 2 files changed, 2 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index d7bdb8a914ca..9208127eb1d7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -122,6 +122,7 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP trainingDataset.unpersist() var i = 0 while (i < numModels) { + // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(models(i).transform(validationDataset, epm(i))) logDebug(s"Got metric $metric for model trained with ${epm(i)}.") metrics(i) += metric diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index b4d63ea55452..f6cf2c3439ba 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -224,6 +224,7 @@ def fit(self, dataset, params={}): train = df.filter(~condition) for j in range(numModels): model = est.fit(train, epm[j]) + # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, epm[j])) metrics[j] += metric bestIndex = np.argmax(metrics)