@@ -97,14 +97,31 @@ class PrefixSpan private (
9797 if (sequences.getStorageLevel == StorageLevel .NONE ) {
9898 logWarning(" Input data is not cached." )
9999 }
100- val minCount = getMinCount(sequences)
101- val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences)
102- val prefixSuffixPairs = getPrefixSuffixPairs(
103- lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
100+
101+ // Convert min support to a min number of transactions for this dataset
102+ val minCount = if (minSupport == 0 ) 0L else math.ceil(sequences.count() * minSupport).toLong
103+
104+ val itemCounts = sequences
105+ .flatMap(_.distinct.map((_, 1L )))
106+ .reduceByKey(_ + _)
107+ .filter(_._2 >= minCount)
108+
109+ val prefixSuffixPairs = {
110+ val frequentItems = itemCounts.map(_._1).collect()
111+ val candidates = sequences.map { p =>
112+ p.filter (frequentItems.contains(_) )
113+ }
114+ candidates.flatMap { x =>
115+ frequentItems.map { y =>
116+ val sub = LocalPrefixSpan .getSuffix(y, x)
117+ (ArrayBuffer (y), sub)
118+ }.filter(_._2.nonEmpty)
119+ }
120+ }
104121 prefixSuffixPairs.persist(StorageLevel .MEMORY_AND_DISK )
105- var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => ( ArrayBuffer (x._1), x._2))
106- var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
107- splitPrefixSuffixPairs(prefixSuffixPairs)
122+
123+ var allPatternAndCounts = itemCounts.map(x => ( ArrayBuffer (x._1), x._2))
124+ var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs)
108125 while (largePrefixSuffixPairs.count() != 0 ) {
109126 val (nextPatternAndCounts, nextPrefixSuffixPairs) =
110127 getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
@@ -115,6 +132,7 @@ class PrefixSpan private (
115132 smallPrefixSuffixPairs ++= smallerPairsPart
116133 allPatternAndCounts ++= nextPatternAndCounts
117134 }
135+
118136 if (smallPrefixSuffixPairs.count() > 0 ) {
119137 val projectedDatabase = smallPrefixSuffixPairs
120138 .map(x => (x._1.toSeq, x._2))
@@ -189,29 +207,6 @@ class PrefixSpan private (
189207 (patternAndCounts, nextPrefixSuffixPairs)
190208 }
191209
192- /**
193- * Get the minimum count (sequences count * minSupport).
194- * @param sequences input data set, contains a set of sequences,
195- * @return minimum count,
196- */
197- private def getMinCount (sequences : RDD [Array [Int ]]): Long = {
198- if (minSupport == 0 ) 0L else math.ceil(sequences.count() * minSupport).toLong
199- }
200-
201- /**
202- * Generates frequent items by filtering the input data using minimal count level.
203- * @param minCount the absolute minimum count
204- * @param sequences original sequences data
205- * @return array of item and count pair
206- */
207- private def getFreqItemAndCounts (
208- minCount : Long ,
209- sequences : RDD [Array [Int ]]): RDD [(Int , Long )] = {
210- sequences.flatMap(_.distinct.map((_, 1L )))
211- .reduceByKey(_ + _)
212- .filter(_._2 >= minCount)
213- }
214-
215210 /**
216211 * Get the frequent prefixes and suffix pairs.
217212 * @param frequentPrefixes frequent prefixes
@@ -221,15 +216,6 @@ class PrefixSpan private (
221216 private def getPrefixSuffixPairs (
222217 frequentPrefixes : Array [Int ],
223218 sequences : RDD [Array [Int ]]): RDD [(ArrayBuffer [Int ], Array [Int ])] = {
224- val filteredSequences = sequences.map { p =>
225- p.filter (frequentPrefixes.contains(_) )
226- }
227- filteredSequences.flatMap { x =>
228- frequentPrefixes.map { y =>
229- val sub = LocalPrefixSpan .getSuffix(y, x)
230- (ArrayBuffer (y), sub)
231- }.filter(_._2.nonEmpty)
232- }
233219 }
234220
235221 /**
0 commit comments