From b96206a1c784c78d79b215b949efcfa8620bbd77 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 26 Mar 2015 23:42:17 +0800 Subject: [PATCH 1/9] Support FPGrowth algorithm in Python API --- .../mllib/api/python/PythonMLLibAPI.scala | 28 +++++++ python/pyspark/mllib/fpm.py | 74 +++++++++++++++++++ python/run-tests | 1 + 3 files changed, 103 insertions(+) create mode 100644 python/pyspark/mllib/fpm.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 22fa684fd289..678d5e8f84ef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -34,6 +34,7 @@ import org.apache.spark.api.python.SerDeUtil import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.feature._ +import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.random.{RandomRDDs => RG} @@ -406,6 +407,33 @@ private[python] class PythonMLLibAPI extends Serializable { new MatrixFactorizationModelWrapper(model) } + /** + * A Wrapper of FPGrowthModel to provide helpfer method for Python + */ + private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any]) + extends FPGrowthModel(model.freqItemsets) { + def getFreqItemsets: RDD[Array[Any]] = { + SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq))) + } + } + + /** + * Java stub for Python mllib FPGrowth.train(). This stub returns a handle + * to the Java object instead of the content of the Java object. Extra care + * needs to be taken in the Python code to ensure it gets freed on exit; see + * the Py4J documentation. + */ + def trainFPGrowthModel(data: JavaRDD[java.lang.Iterable[Any]], + minSupport: Double, + numPartition: Int): FPGrowthModel[Any] = { + val fpm = new FPGrowth() + .setMinSupport(minSupport) + .setNumPartitions(numPartition) + + val model = fpm.run(data.rdd.map(_.asScala.toArray)) + new FPGrowthModelWrapper(model) + } + /** * Java stub for Normalizer.transform() */ diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py new file mode 100644 index 000000000000..a001b8a6e291 --- /dev/null +++ b/python/pyspark/mllib/fpm.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import SparkContext +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc + +__all__ = ['FPGrowth','FPGrowthModel'] + + +@inherit_doc +class FPGrowthModel(JavaModelWrapper): + + """A FP-Growth model for mining frequent itemsets using Parallel FP-Growth algorithm. + + >>> r1 = ["r","z","h","k","p"] + >>> r2 = ["z","y","x","w","v","u","t","s"] + >>> r3 = ["s","x","o","n","r"] + >>> r4 = ["x","z","y","m","t","s","q","e"] + >>> r5 = ["z"] + >>> r6 = ["x","z","y","r","q","t","p"] + >>> rdd = sc.parallelize([r1,r2,r3,r4,r5,r6], 2) + >>> model = FPGrowth.train(rdd, 0.5, 2) + >>> result = model.freqItemsets().collect() + >>> expected = [([u"s"], 3), ([u"z"], 5), ([u"x"], 4), ([u"t"], 3), ([u"y"], 3), ([u"r"],3), + ... ([u"x", u"z"], 3), ([u"y", u"t"], 3), ([u"t", u"x"], 3), ([u"s",u"x"], 3), + ... ([u"y", u"x"], 3), ([u"y", u"z"], 3), ([u"t", u"z"], 3), ([u"y", u"x", u"z"], 3), + ... ([u"t", u"x", u"z"], 3), ([u"y", u"t", u"z"], 3), ([u"y", u"t", u"x"], 3), + ... ([u"y", u"t", u"x", u"z"], 3)] + >>> diff1 = [x for x in result if x not in expected] + >>> len(diff1) + 0 + >>> diff2 = [x for x in expected if x not in result] + >>> len(diff2) + 0 + """ + def freqItemsets(self): + return self.call("getFreqItemsets") + + +class FPGrowth(object): + + @classmethod + def train(cls, data, minSupport=0.3, numPartition=-1): + model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartition)) + return FPGrowthModel(model) + + +def _test(): + import doctest + import pyspark.mllib.fpm + globs = pyspark.mllib.fpm.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest') + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index b7630c356cfa..f569a56fb7a9 100755 --- a/python/run-tests +++ b/python/run-tests @@ -77,6 +77,7 @@ function run_mllib_tests() { run_test "pyspark/mllib/clustering.py" run_test "pyspark/mllib/evaluation.py" run_test "pyspark/mllib/feature.py" + run_test "pyspark/mllib/fpm.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/rand.py" run_test "pyspark/mllib/recommendation.py" From 7f62c8f3aafe60bd78a1ac4b893530e3cf4394cc Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 26 Mar 2015 23:55:40 +0800 Subject: [PATCH 2/9] add fpm to __init__.py --- python/pyspark/mllib/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index 6449800d9c12..f2ef573fe9f6 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -25,7 +25,7 @@ if numpy.version.version < '1.4': raise Exception("MLlib requires NumPy 1.4+") -__all__ = ['classification', 'clustering', 'feature', 'linalg', 'random', +__all__ = ['classification', 'clustering', 'feature', 'fpm', 'linalg', 'random', 'recommendation', 'regression', 'stat', 'tree', 'util'] import sys From 2c951b8d8712530d9b7bd3eb595656808a5bf044 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 26 Mar 2015 23:56:52 +0800 Subject: [PATCH 3/9] fix typos --- python/pyspark/mllib/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index a001b8a6e291..2122108e52a6 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -18,7 +18,7 @@ from pyspark import SparkContext from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc -__all__ = ['FPGrowth','FPGrowthModel'] +__all__ = ['FPGrowth', 'FPGrowthModel'] @inherit_doc From b18fd077fd5ecf74d3a1ad88e4ac46aa360b5c51 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 27 Mar 2015 00:26:22 +0800 Subject: [PATCH 4/9] trigger jenkins --- python/pyspark/mllib/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index 2122108e52a6..b09dfd4fcc60 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -24,7 +24,7 @@ @inherit_doc class FPGrowthModel(JavaModelWrapper): - """A FP-Growth model for mining frequent itemsets using Parallel FP-Growth algorithm. + """A FP-Growth model for mining frequent itemsets using the Parallel FP-Growth algorithm. >>> r1 = ["r","z","h","k","p"] >>> r2 = ["z","y","x","w","v","u","t","s"] From dcf7d73a1f326ecc8e8548c7bc3c5e12df338350 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 29 Mar 2015 16:42:35 +0800 Subject: [PATCH 5/9] add python doc --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 14 +++++++------- python/docs/pyspark.mllib.rst | 7 +++++++ python/pyspark/mllib/fpm.py | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 678d5e8f84ef..d6845c9e8510 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -345,9 +345,7 @@ private[python] class PythonMLLibAPI extends Serializable { val model = new GaussianMixtureModel(weight, gaussians) model.predictSoft(data) } - - - + /** * Java stub for Python mllib ALS.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care @@ -408,10 +406,11 @@ private[python] class PythonMLLibAPI extends Serializable { } /** - * A Wrapper of FPGrowthModel to provide helpfer method for Python + * A Wrapper of FPGrowthModel to provide helper method for Python */ private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any]) extends FPGrowthModel(model.freqItemsets) { + def getFreqItemsets: RDD[Array[Any]] = { SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq))) } @@ -423,12 +422,13 @@ private[python] class PythonMLLibAPI extends Serializable { * needs to be taken in the Python code to ensure it gets freed on exit; see * the Py4J documentation. */ - def trainFPGrowthModel(data: JavaRDD[java.lang.Iterable[Any]], + def trainFPGrowthModel( + data: JavaRDD[java.lang.Iterable[Any]], minSupport: Double, - numPartition: Int): FPGrowthModel[Any] = { + numPartitions: Int): FPGrowthModel[Any] = { val fpm = new FPGrowth() .setMinSupport(minSupport) - .setNumPartitions(numPartition) + .setNumPartitions(numPartitions) val model = fpm.run(data.rdd.map(_.asScala.toArray)) new FPGrowthModelWrapper(model) diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index 15101470afc0..26ece4c2c389 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -31,6 +31,13 @@ pyspark.mllib.feature module :undoc-members: :show-inheritance: +pyspark.mllib.fpm module +------------------------ + +.. automodule:: pyspark.mllib.fpm + :members: + :undoc-members: + pyspark.mllib.linalg module --------------------------- diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index b09dfd4fcc60..423fc7187521 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -54,8 +54,8 @@ def freqItemsets(self): class FPGrowth(object): @classmethod - def train(cls, data, minSupport=0.3, numPartition=-1): - model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartition)) + def train(cls, data, minSupport=0.3, numPartitions=-1): + model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) return FPGrowthModel(model) From a2d7cf797d7fc681ecf3a8dfd0908100d282f4ce Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 31 Mar 2015 14:56:47 +0800 Subject: [PATCH 6/9] add doc for FPGrowth.train() --- python/pyspark/mllib/fpm.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index 423fc7187521..c191e5ee0733 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -55,6 +55,13 @@ class FPGrowth(object): @classmethod def train(cls, data, minSupport=0.3, numPartitions=-1): + """ + Computes an FP-Growth model that contains frequent itemsets. + :param data: The input data set, each element contains a transaction. + :param minSupport: The minimal support level (default: `0.3`). + :param numPartitions: The number of partitions used by parallel FP-growth + (default: same as input data). + """ model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) return FPGrowthModel(model) From 544c725705540395c91f01b16cc63055546d3445 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sat, 4 Apr 2015 12:55:36 -0400 Subject: [PATCH 7/9] address comments --- .../api/python/FPGrowthModelWrapper.scala | 33 +++++++++++++++++++ .../mllib/api/python/PythonMLLibAPI.scala | 15 ++------- python/pyspark/mllib/fpm.py | 24 +++----------- 3 files changed, 40 insertions(+), 32 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/api/python/FPGrowthModelWrapper.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/FPGrowthModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/FPGrowthModelWrapper.scala new file mode 100644 index 000000000000..ee933f4cfcaf --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/FPGrowthModelWrapper.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel} +import org.apache.spark.rdd.RDD + +/** + * A Wrapper of FPGrowthModel to provide helper method for Python + */ +private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any]) + extends FPGrowthModel(model.freqItemsets) { + + def getFreqItemsets: RDD[Array[Any]] = { + SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq))) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d6845c9e8510..1dc03aa722fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -405,17 +405,6 @@ private[python] class PythonMLLibAPI extends Serializable { new MatrixFactorizationModelWrapper(model) } - /** - * A Wrapper of FPGrowthModel to provide helper method for Python - */ - private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any]) - extends FPGrowthModel(model.freqItemsets) { - - def getFreqItemsets: RDD[Array[Any]] = { - SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq))) - } - } - /** * Java stub for Python mllib FPGrowth.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care @@ -426,11 +415,11 @@ private[python] class PythonMLLibAPI extends Serializable { data: JavaRDD[java.lang.Iterable[Any]], minSupport: Double, numPartitions: Int): FPGrowthModel[Any] = { - val fpm = new FPGrowth() + val fpg = new FPGrowth() .setMinSupport(minSupport) .setNumPartitions(numPartitions) - val model = fpm.run(data.rdd.map(_.asScala.toArray)) + val model = fpg.run(data.rdd.map(_.asScala.toArray)) new FPGrowthModelWrapper(model) } diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index c191e5ee0733..f274d2b3d48f 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -26,26 +26,12 @@ class FPGrowthModel(JavaModelWrapper): """A FP-Growth model for mining frequent itemsets using the Parallel FP-Growth algorithm. - >>> r1 = ["r","z","h","k","p"] - >>> r2 = ["z","y","x","w","v","u","t","s"] - >>> r3 = ["s","x","o","n","r"] - >>> r4 = ["x","z","y","m","t","s","q","e"] - >>> r5 = ["z"] - >>> r6 = ["x","z","y","r","q","t","p"] - >>> rdd = sc.parallelize([r1,r2,r3,r4,r5,r6], 2) - >>> model = FPGrowth.train(rdd, 0.5, 2) + >>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] + >>> rdd = sc.parallelize(data, 2) + >>> model = FPGrowth.train(rdd, 0.6, 2) >>> result = model.freqItemsets().collect() - >>> expected = [([u"s"], 3), ([u"z"], 5), ([u"x"], 4), ([u"t"], 3), ([u"y"], 3), ([u"r"],3), - ... ([u"x", u"z"], 3), ([u"y", u"t"], 3), ([u"t", u"x"], 3), ([u"s",u"x"], 3), - ... ([u"y", u"x"], 3), ([u"y", u"z"], 3), ([u"t", u"z"], 3), ([u"y", u"x", u"z"], 3), - ... ([u"t", u"x", u"z"], 3), ([u"y", u"t", u"z"], 3), ([u"y", u"t", u"x"], 3), - ... ([u"y", u"t", u"x", u"z"], 3)] - >>> diff1 = [x for x in result if x not in expected] - >>> len(diff1) - 0 - >>> diff2 = [x for x in expected if x not in result] - >>> len(diff2) - 0 + >>> sorted(model.freqItemsets().collect()) + [([u'a'], 4), ([u'c'], 3), ([u'c', u'a'], 3)] """ def freqItemsets(self): return self.call("getFreqItemsets") From 8ce0359e42d05b095147ec121a3d868e580bae7d Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 9 Apr 2015 10:43:24 -0400 Subject: [PATCH 8/9] fix docstring style --- python/pyspark/mllib/fpm.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index f274d2b3d48f..03905e11c5b1 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -24,29 +24,43 @@ @inherit_doc class FPGrowthModel(JavaModelWrapper): - """A FP-Growth model for mining frequent itemsets using the Parallel FP-Growth algorithm. + """ + .. note:: Experimental + + A FP-Growth model for mining frequent itemsets + using the Parallel FP-Growth algorithm. >>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] >>> rdd = sc.parallelize(data, 2) >>> model = FPGrowth.train(rdd, 0.6, 2) - >>> result = model.freqItemsets().collect() >>> sorted(model.freqItemsets().collect()) [([u'a'], 4), ([u'c'], 3), ([u'c', u'a'], 3)] """ + def freqItemsets(self): + """ + Get the frequent itemsets of this model + """ return self.call("getFreqItemsets") class FPGrowth(object): + """ + .. note:: Experimental + + A parallel FP-growth algorithm to mine frequent itemsets. + """ @classmethod def train(cls, data, minSupport=0.3, numPartitions=-1): """ Computes an FP-Growth model that contains frequent itemsets. - :param data: The input data set, each element contains a transaction. - :param minSupport: The minimal support level (default: `0.3`). - :param numPartitions: The number of partitions used by parallel FP-growth - (default: same as input data). + :param data: The input data set, each element + contains a transaction. + :param minSupport: The minimal support level + (default: `0.3`). + :param numPartitions: The number of partitions used by parallel + FP-growth (default: same as input data). """ model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) return FPGrowthModel(model) From ed62eadccc83599855bed162103dbafdc59d8226 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 10 Apr 2015 01:52:55 +0800 Subject: [PATCH 9/9] trigger jenkins --- python/pyspark/mllib/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index 03905e11c5b1..3aa6d79d7093 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -48,7 +48,7 @@ class FPGrowth(object): """ .. note:: Experimental - A parallel FP-growth algorithm to mine frequent itemsets. + A Parallel FP-growth algorithm to mine frequent itemsets. """ @classmethod