Skip to content
Closed
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
91fd7e6
Add new algorithm PrefixSpan and test file.
zhangjiajin Jul 7, 2015
575995f
Modified the code according to the review comments.
zhangjiajin Jul 8, 2015
951fd42
Delete Prefixspan.scala
zhangjiajin Jul 8, 2015
a2eb14c
Delete PrefixspanSuite.scala
zhangjiajin Jul 8, 2015
89bc368
Fixed a Scala style error.
zhangjiajin Jul 8, 2015
1dd33ad
Modified the code according to the review comments.
zhangjiajin Jul 9, 2015
4c60fb3
Fix some Scala style errors.
zhangjiajin Jul 9, 2015
ba5df34
Fix a Scala style error.
zhangjiajin Jul 9, 2015
574e56c
Add new object LocalPrefixSpan, and do some optimization.
zhangjiajin Jul 10, 2015
ca9c4c8
Modified the code according to the review comments.
zhangjiajin Jul 11, 2015
22b0ef4
Add feature: Collect enough frequent prefixes before projection in Pr…
zhangjiajin Jul 14, 2015
078d410
fix a scala style error.
zhangjiajin Jul 14, 2015
4dd1c8a
initialize file before rebase.
zhangjiajin Jul 15, 2015
a8fde87
Merge branch 'master' of https://github.com/apache/spark
zhangjiajin Jul 15, 2015
6560c69
Add feature: Collect enough frequent prefixes before projection in Pr…
zhangjiajin Jul 15, 2015
baa2885
Modified the code according to the review comments.
zhangjiajin Jul 15, 2015
095aa3a
Modified the code according to the review comments.
zhangjiajin Jul 16, 2015
b07e20c
Merge branch 'master' of https://github.com/apache/spark into Collect…
zhangjiajin Jul 16, 2015
d2250b7
remove minPatternsBeforeLocalProcessing, add maxSuffixesBeforeLocalPr…
zhangjiajin Jul 18, 2015
64271b3
Modified codes according to comments.
zhangjiajin Jul 27, 2015
6e149fa
Fix splitPrefixSuffixPairs
Jul 28, 2015
01c9ae9
Add getters
Jul 28, 2015
cb2a4fc
Inline code for readability
Jul 28, 2015
da0091b
Use lists for prefixes to reuse data
Jul 28, 2015
1235cfc
Use Iterable[Array[_]] over Array[Array[_]] for database
Jul 28, 2015
c2caa5c
Readability improvements and comments
Jul 28, 2015
87fa021
Improve extend prefix readability
Jul 28, 2015
ad23aa9
Merge pull request #1 from feynmanliang/SPARK-8998-collectBeforeLocal
zhangjiajin Jul 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class PrefixSpan private (
private var minSupport: Double,
private var maxPatternLength: Int) extends Logging with Serializable {

private val minPatternsBeforeShuffle: Int = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minPatternsBeforeLocalProcessing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


/**
* Constructs a default instance with default parameters
* {minSupport: `0.1`, maxPatternLength: `10`}.
Expand Down Expand Up @@ -86,16 +88,69 @@ class PrefixSpan private (
getFreqItemAndCounts(minCount, sequences).collect()
val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every reference of "projected database" before L105 should be renamed to "prefixSuffixPairs" since the groupBy has not yet occurred

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

lengthOnePatternsAndCounts.map(_._1), sequences)
val groupedProjectedDatabase = prefixAndProjectedDatabase
.map(x => (x._1.toSeq, x._2))
.groupByKey()
.map(x => (x._1.toArray, x._2.toArray))
val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase)
val lengthOnePatternsAndCountsRdd =
sequences.sparkContext.parallelize(
lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
allPatterns

var patternsCount = lengthOnePatternsAndCounts.length
var allPatternAndCounts = sequences.sparkContext.parallelize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to parallelize if you remove the collect on L88 (will still need collect on L90 for now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
var currentProjectedDatabase = prefixAndProjectedDatabase
while (patternsCount <= minPatternsBeforeShuffle &&
currentProjectedDatabase.count() != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for linebreak

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val (nextPatternAndCounts, nextProjectedDatabase) =
getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase)
patternsCount = nextPatternAndCounts.count().toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just declaring var patternsCount : Long on L92 is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for toInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

currentProjectedDatabase = nextProjectedDatabase
allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ++=

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
if (patternsCount > 0) {
val groupedProjectedDatabase = currentProjectedDatabase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the projectedDatabases; everything before was (prefix,suffix) pairs. groupedProjectedDatabase -> projectedDatabase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

.map(x => (x._1.toSeq, x._2))
.groupByKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that all the values (suffixes) associated to a key (prefix) will fit on an executor, but I don't think that patternsCount > minPatternsBeforeShuffle will guarantee that. Better to count the suffixes for each prefix using aggregateByKey before doing local processing.

.map(x => (x._1.toArray, x._2.toArray))
val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase)
allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ++=

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
allPatternAndCounts
}

/**
* Get the pattern and counts, and projected database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"projected database" -> "suffix", ditto for all other occurrences in this method (projected database is all suffixes for a prefix and is an Array[Array[Int]]; the groupBy hasn't happened yet in this method)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

* @param minCount minimum count
* @param prefixAndProjectedDatabase prefix and projected database,
* @return pattern and counts, and projected database
* (Array[pattern, count], RDD[prefix, projected database ])
*/
private def getPatternCountsAndProjectedDatabase(
minCount: Long,
prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
(RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x =>
x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
}.reduceByKey(_ + _)
.filter(_._2 >= minCount)
val patternAndCounts = prefixAndFreqentItemAndCounts
.map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { case ((prefix, item), count) => (prefix.toArray :+ item, count) } (the tuples are starting to get confusing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take(1)(0) -> first(). Actually, can we move the prefixlength check up into the loop on L96 and cut L133-L134?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not easy that we move prefix length check into the loop on L96.
The current process:

  1. get length-1 patterns
  2. get length-1 prefix suffix pairs
  3. while ( (current patterns count < minPatternsBeforeShuffle) and (prefix suffix pairs is not empty) ) {
  4. get next pattern patterns and prefix suffix pairs
  5. }
  6. shuffle and next work

if we move the check into the loop, we must split getPatternCountsAndPrefixSuffixPairs() to two method: getPatternCounts() and getPrefixSuffixPairs(),
ps. the method of get length-1 patterns and prefix suffix pairs are different from get length-n (n>1) patterns and prefix suffix pairs.

  1. get length-1 patterns
  2. get length-1 prefix suffix pairs
  3. get length-2 patterns ( call new method getPatternCounts() )
  4. while ( (current patterns count < minPatternsBeforeShuffle) and (prefix suffix pairs is not empty) and (pattern length < maxPatternLength) ) {
  5. get next prefix suffix pairs
  6. get next pattern patterns
  7. }
  8. get lenght-n+1 suffix pairs for shuffle
  9. shuffle and next work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing discussion on the latest diff.

if (prefixlength + 1 >= maxPatternLength) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefixlength == prefixSuffixPairs.first()._1.length == currentPrefixSuffixPairs.first()._1.length which can be checked on L93, so you should be able to do the check in the while loop and eliminate L130-L132

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually doesn't prefixlength only grow by 1 each iteration of L93? You could just use a counter there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

(patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
} else {
val frequentItemsMap = prefixAndFreqentItemAndCounts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frequentItemsMap -> prefixToFrequentNextItemsMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

.keys.map(x => (x._1, x._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for the .map; it is an identity function here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

.groupByKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupByKey().mapValues() -> aggregateByKey

.mapValues(_.toSet)
.collect
.toMap
val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase
.filter(x => frequentItemsMap.contains(x._1))
.flatMap { x =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x -> case (prefix, suffix)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val frequentItemSet = frequentItemsMap(x._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x._1 -> prefix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val filteredSequence = x._2.filter(frequentItemSet.contains(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x._2 -> suffix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val subProjectedDabase = frequentItemSet.map{ y =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before {. Also, y -> case (item, count)`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

(y, LocalPrefixSpan.getSuffix(y, filteredSequence))
}.filter(_._2.nonEmpty)
subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2))
}
(patternAndCounts, nextPrefixAndProjectedDatabase)
}
}

/**
Expand Down