Skip to content

[SPARK-14503][ML] spark.ml API for FPGrowth#15415

Closed
hhbyyh wants to merge 34 commits intoapache:masterfrom
hhbyyh:mlfpm
Closed

[SPARK-14503][ML] spark.ml API for FPGrowth#15415
hhbyyh wants to merge 34 commits intoapache:masterfrom
hhbyyh:mlfpm

Conversation

@hhbyyh
Copy link
Contributor

@hhbyyh hhbyyh commented Oct 10, 2016

What changes were proposed in this pull request?

jira: https://issues.apache.org/jira/browse/SPARK-14503
Function parity: Add FPGrowth and AssociationRules to ML.

design doc: https://docs.google.com/document/d/1bVhABn5DiEj8bw0upqGMJT2L4nvO_0_cXdwu4uMT6uU/pub

Currently I make FPGrowthModel a transformer. For each association rule, it will just examine the input items against antecedents and summarize the consequents.

Update:
Thinking again, FPGrowth is only the algorithm to find the frequent itemsets, and can be replaced by other algorithms. The frequent itemsets are used by AssociationRules to generate the association rules. Then we can use the association rules to predict with other records.

drawing1

For reviewers, Let's first decide if the current transform function meets your expectation.

Current options:

  1. Current implementation: Use Estimator and Transformer pattern in ML, the transform function will examine the input items against all the association rules and summarize the consequents. Users can also access frequent items and association rules via other model members.

  2. Keep the Estimator and Transformer pattern. But AssociationRulesModel and FPGrowthModel will have empty transform function, meaning DataFrame has no change after transform. But users can access frequent items and association rules via other model members.

  3. (mentioned by @zhengruifeng) Keep the Estimator and Transformer pattern. But FPGrowthModel and AssociationRulesModel will just return frequent itemsets and association rules DataFrame in the transform function. Meaning the resulting DataFrame after transform will not be related to the input DataFrame.

  4. Discard the Estimator and Transformer pattern. Both FPGrowth and FPGrowthModel will directly extend from PipelineStage, thus we don't need to have a transform function.

I'd like to hear more concrete suggestions. I would prefer option 1 or 2.

update 2:

As discussed in the jira, we will not expose AssociationRules as a public API for now.

How was this patch tested?

new unit test suites

@SparkQA
Copy link

SparkQA commented Oct 10, 2016

Test build #66630 has finished for PR 15415 at commit 2f1a08c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67995 has finished for PR 15415 at commit e5574be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Nov 2, 2016

@yanboliang @jkbradley This should be ready for review. Thanks.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68337 has finished for PR 15415 at commit e5574be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2016

Test build #70283 has finished for PR 15415 at commit 63eaf08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FPGrowth @Since(\"2.2.0\") (

* Param for minimum confidence, range [0.0, 1.0].
* @group param
*/
final val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", "min confidence")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a ParamValidators.inRange(...)

/**
* Computes the association rules with confidence above [[minConfidence]].
* @param freqItemsets DataFrame containing frequent itemset obtained from algorithms like
* [[FPGrowth]]. Users can set itemsCol (frequent itemSet, Array[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array[String] confilct with Array[Int] in https://github.com/apache/spark/pull/15415/files#diff-0a641720038f962d333ef38402a02207R41
and is there some way to support general types?

val freqItemSetRdd = freqItemsets.select($(itemsCol), $(freqCol)).rdd
.map(row => new FreqItemset(row.getSeq[String](0).toArray, row.getLong(1)))

val sqlContext = SparkSession.builder().getOrCreate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since val sqlContext is of type SparkSession, what about rename it spark?

*/
@Since("2.1.0")
@Experimental
class AssociationRules(override val uid: String) extends Params {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AssociationRules transform DataFrame freqItemsets to DataFrame rules, can it be a subclass of Transformer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

freqItemsets and rules does not have a one-to-one mapping relation and will probably violates the primitives of Transformer.

*
*/
@Since("2.1.0")
def run(freqItemsets: Dataset[_]): DataFrame = {
Copy link
Contributor

@zhengruifeng zhengruifeng Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If inheriting Transformer, here should be override def transform(dataset: Dataset[_]): DataFrame

* Generates association rules from frequent itemsets ("items", "freq"). This method only generates
* association rules which have a single item as the consequent.
*/
@Since("2.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be 2.2.0

*/
@Since("2.2.0")
val minSupport: DoubleParam = new DoubleParam(this, "minSupport",
"the minimal support level of the frequent pattern (Default: 0.3)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need a ParamValidator here

/** @group getParam */
@Since("2.2.0")
def getMinSupport: Double = $(minSupport)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MLLib's FPGrowth have a param numPartitions, will it be included here?

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 20, 2017

@jkbradley Sent an update to refine the transform code and address the comments.

Regarding to the behavior changing concern, I think different partition strategy will only affect the overall efficiency and maybe the order of the frequent itemsets and association rules. As long as the result set does not change, I don't think it will disturb the users. Let me know if I miss anything.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates!

Regarding to the behavior changing concern

I think you're right. I'd never read through the algorithm carefully, but now that I do, I think you're right.

refine the transform code

I'd prefer we hold off on this. If we're changing the transform code (rather than wrapping the old code), then we'll need to spend more time writing new tests and QAing it. We don't really have time to do that before the 2.2 release. Also, when we get around to optimizing it, I'd prefer to do everything using DataFrame operations if possible. Could you please save your code but revert the transform() code to wrap the old AssociationRules code? Thanks!


test("FPGrowth fit and transform with different data types") {
Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt =>
val intData = dataset.withColumn("features", col("features").cast(ArrayType(dt)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename intData -> data

assert(checkDF.count() == 3 && checkDF.filter(col("freq") === col("freqExp")).count() == 3)
}

test("FPGrowth getFreqItems with Null") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In FPGrowth, document that null values are treated as empty sequences.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I could imagine us wanting to change this later. If we're recommending items a user could add to their basket, then we might want to suggest the most frequent item rather than nothing.

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 21, 2017

Hi @jkbradley
We can hold the transform code.

wrap the old AssociationRules code

Do you mean to make transform return the Association Rules DataFrame, like the currect getAssociationRules ?

@jkbradley
Copy link
Member

wrap the old AssociationRules code

Sorry, forget this comment from me; I was thinking that something like FPGrowthModel.transform had already been implemented, but it's new.

Btw, I'm adding more comments now, so please hold off on changes for an hour. Thanks!

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, done with another pass. I'll think more about genericTransform too. Thanks!

* @return a DataFrame("antecedent", "consequent", "confidence") containing the association
* rules.
*/
def getAssociationRulesFromFP[T: ClassTag](dataset: Dataset[_],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix style of function args

* :: Experimental ::
* A parallel FP-growth algorithm to mine frequent itemsets.
*
* @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please go ahead and copy the relevant text and links from the Scaladoc string for mllib.fpm.FPGrowth?

*/
@Since("2.2.0")
val minSupport: DoubleParam = new DoubleParam(this, "minSupport",
"the minimal support level of the frequent pattern (Default: 0.3)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of the frequent pattern" -> "of a frequent pattern"

*/
@Since("2.2.0")
val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence",
"minimal confidence for generating Association Rule (Default: 0.8)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't state default value in built-in Param doc

def setNumPartitions(value: Int): this.type = set(numPartitions, value)

/** @group setParam
* Note that minConfidence has no effect during fitting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line; it's already in the minConfidence doc

// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance, path, sc)
val dataPath = new Path(path, "data").toString
instance.freqItemsets.write.save(dataPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify parquet explicitly

override def load(path: String): FPGrowthModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val frequentItems = sparkSession.read.load(dataPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify parquet

* rules.
*/
def getAssociationRulesFromFP[T: ClassTag](dataset: Dataset[_],
itemsCol: String = "items",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify default arg values. If we make this public, we will need to avoid default args to make it Java friendly.

* :: Experimental ::
* Model fitted by FPGrowth.
*
* @param freqItemsets frequent items in the format of DataFrame("items", "freq")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify schema (Seq, Long)

private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
val data = dataset.select($(featuresCol))
val items = data.where(col($(featuresCol)).isNotNull).rdd.map(r => r.getSeq[T](0).toArray)
val parentModel = new MLlibFPGrowth().setMinSupport($(minSupport)).run(items)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify numPartitions

@SparkQA
Copy link

SparkQA commented Feb 23, 2017

Test build #73354 has finished for PR 15415 at commit d8e4884.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! I think this is almost done. We should try to optimize transform in the future, but I think it's OK for an MVP. It'd be interesting to write it using DataFrames instead of RDDs and benchmark (in the future).

Seq.empty
}
(id, consequents)
}.aggregateByKey(new ArrayBuffer[T])((ar, seq) => ar ++= seq, (ar, seq) => ar ++= seq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using an OpenHashSet here to avoid collecting duplicates during aggregation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried OpenHashSet and it's about %15 slower than ArrayBuffer.

.aggregateByKey(new OpenHashSet[T])((set, seq) => {
seq.foreach(t => set.add(t))
set
} , (set1, set2) => set1.union(set2))

Copy link
Member

@jkbradley jkbradley Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, that's surprising to me. Maybe it depends on how many duplicates are introduced. Let's leave it as is then.

Just curious: What dataset did you test it on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random number... The duplicate number should be small.

.map { case (index, cons) => (index, cons.distinct) }

val rowAndConsequents = dataset.toDF().rdd.zipWithIndex().map(_.swap)
.join(indexToConsequents).sortByKey(ascending = true, dataset.rdd.getNumPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to sortByKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we try to keep the original order of the input dataset? The time cost is about 5% of the total transform time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrames don't really have a row ordering, so we don't need to maintain one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, does zipWithIndex() guarantees the same result for multiple calls?
And if can ignore the ordering, perhaps I have a faster way to write the transform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think zipWithIndex guarantees the order if the data have not been shuffled between load/creation and the zipWithIndex. AFAIK, shuffling can reorder rows in a partition arbitrarily.

We can definitely ignore the ordering for DataFrames.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked again and the current implementation is as quick. I will just remove the sort.

* independent group of mining tasks. The FP-Growth algorithm is described in
* <a href="http://dx.doi.org/10.1145/335191.335372">Han et al., Mining frequent patterns without
* candidate generation</a>.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here or elsewhere, comment that null featuresCol values are ignored during fit() and are treated as empty sets during transform().

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 23, 2017

I tried a few different ways to implement the transform. https://gist.github.com/hhbyyh/889b88ae2176d1263fdc9dd3e29d1c2d.

The performance actually are similiar, while the current one can maintain the original order of the input dataset. I would be glad to see a more optimized version.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73384 has finished for PR 15415 at commit bfcef4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 24, 2017

Hi @jkbradley After further performance comparison, I found using broadcast would give much better performance for the transform.

I tested with some public data from http://fimi.ua.ac.be/data/.
For kosarak (.gz) data (300K records), the current transform would take more than 3 hours for only 2 rules, while the broadcast version only cost 0.15 sec with 900 rules. ( I adjusted support and confidence)

  private def genericTransform4[T: Manifest](dataset: Dataset[_]): DataFrame = {
    val rules = associationRules.rdd.map(r =>
      (r.getSeq[Int](0), r.getSeq[Int](1))
    ).collect()
    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)

    // For each rule, examine the input items and summarize the consequents
    val predictUDF = udf { (items: Seq[Int]) =>
      val itemset = items.toSet
      brRules.value.flatMap { r =>
        if (r._1.forall(itemset.contains)) r._2.filterNot(itemset.contains) else Seq.empty[Int]
      }.distinct
    }
    dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
  }

The test can be verified by the code:
https://gist.github.com/hhbyyh/06fcf3fdc8f6edda971847bcb5783d99
https://gist.github.com/hhbyyh/889b88ae2176d1263fdc9dd3e29d1c2d

Thinking again, the broadcast implementation may have much better performance in any case, especially when the rule number is large. The major issue is how to support generic with the UDF, the only way I found is to enum the datatype in a match phrase. Any suggestion?

@jkbradley
Copy link
Member

I agree that, if the set of rules is small (1-2 GB max), then collecting and broadcasting it is best. But for larger sets of rules, we'd have to keep it distributed.

I'm very surprised by the time difference in your comparison. I'll experiment a little myself and get back soon.

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, I'm OK with keeping the broadcast. Later on, we could write this using DataFrames and give a broadcast hint. Can you please make the warning for transform() more distinct (maybe with a "WARNING" label) and specific about broadcasting if we're going with this option?

I got this to work:

  private def genericTransform4(dataset: Dataset[_]): DataFrame = {
    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.rdd.map(r =>
      (r.getSeq(0), r.getSeq(1))
    ).collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)

    val dt = dataset.schema($(featuresCol)).dataType

    // For each rule, examine the input items and summarize the consequents
    val predictUDF = udf((items: Seq[_]) => brRules.value.flatMap( rule =>
      if (items != null && rule._1.forall(item => items.contains(item))) {
        rule._2.filter(item => !items.contains(item))
      } else {
        Seq.empty
      }
    ).distinct, dt)
    dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
  }

Thanks!

*/
@Since("2.2.0")
@transient lazy val associationRules: DataFrame = {
val freqItems = freqItemsets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this temp val

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 25, 2017

Thanks @jkbradley for contributing the code. That helps a lot. I'll merge and send an update now.

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 25, 2017

@jkbradley I changed a little from your version to first convert Seq to Set, to speedup contains operations.

Btw, I could imagine us wanting to change this later. If we're recommending items a user could add to their basket, then we might want to suggest the most frequent item rather than nothing.

Do we need to support this now? That may not be expected for all the scenarios. And it seems an Imputer can help with the issue.

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73452 has finished for PR 15415 at commit 3d7ed0b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73453 has finished for PR 15415 at commit 9940c47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

I don't think we need to support the default prediction (for empty/null inputs) now. I agree we could use an inputer or add something as an option later on.

Will take a final look now

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just small comments

if (items != null) {
val itemset = items.toSet
brRules.value.flatMap(rule =>
if (items != null && rule._1.forall(item => itemset.contains(item))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check items != null here since you put that above.

val predictUDF = udf((items: Seq[_]) => {
if (items != null) {
val itemset = items.toSet
brRules.value.flatMap(rule =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

      brRules.value.flatMap { rule =>
        ...
      }

@jkbradley
Copy link
Member

test this please

@jkbradley
Copy link
Member

I'm going to go ahead and merge this after tests to make sure it's in 2.2, but can you please send a follow-up for my last 2 comments? Thanks!

@hhbyyh
Copy link
Contributor Author

hhbyyh commented Feb 28, 2017

Sorry to miss your comments. I can send a follow-up together with document.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73616 has finished for PR 15415 at commit 9940c47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

No problem, thanks! Could you please create a subtask for docs?

Merging with master

@asfgit asfgit closed this in 0fe8020 Feb 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants