1515 * limitations under the License.
1616 */
1717
18- package org .apache .spark .rdd
18+ package org .apache .spark .mllib . rdd
1919
2020import scala .collection .mutable
2121import scala .reflect .ClassTag
2222
2323import org .apache .spark .{TaskContext , Partition }
24+ import org .apache .spark .rdd .RDD
2425
25- private [spark ]
26- class SlidedRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
26+ private [mllib ]
27+ class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Seq [T ])
2728 extends Partition with Serializable {
2829 override val index : Int = idx
2930}
@@ -33,49 +34,50 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
3334 * window over them. The ordering is first based on the partition index and then the ordering of
3435 * items within each partition. This is similar to sliding in Scala collections, except that it
3536 * becomes an empty RDD if the window size is greater than the total number of items. It needs to
36- * trigger a Spark job if the parent RDD has more than one partitions.
37+ * trigger a Spark job if the parent RDD has more than one partitions. To make this operation
38+ * efficient, the number of items per partition should be larger than the window size and the
39+ * window size should be small, e.g., 2.
3740 *
3841 * @param parent the parent RDD
3942 * @param windowSize the window size, must be greater than 1
4043 *
41- * @see [[org.apache.spark.rdd.RDD #sliding ]]
44+ * @see [[org.apache.spark.mllib. rdd.RDDFunctions #sliding ]]
4245 */
43- private [spark ]
44- class SlidedRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
45- extends RDD [Array [T ]](parent) {
46+ private [mllib ]
47+ class SlidingRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
48+ extends RDD [Seq [T ]](parent) {
4649
47- require(windowSize > 1 , " Window size must be greater than 1." )
50+ require(windowSize > 1 , s " Window size must be greater than 1, but got $windowSize . " )
4851
49- override def compute (split : Partition , context : TaskContext ): Iterator [Array [T ]] = {
50- val part = split.asInstanceOf [SlidedRDDPartition [T ]]
52+ override def compute (split : Partition , context : TaskContext ): Iterator [Seq [T ]] = {
53+ val part = split.asInstanceOf [SlidingRDDPartition [T ]]
5154 (firstParent[T ].iterator(part.prev, context) ++ part.tail)
5255 .sliding(windowSize)
53- .map(_.toArray)
54- .filter(_.size == windowSize)
56+ .withPartial(false )
5557 }
5658
5759 override def getPreferredLocations (split : Partition ): Seq [String ] =
58- firstParent[T ].preferredLocations(split.asInstanceOf [SlidedRDDPartition [T ]].prev)
60+ firstParent[T ].preferredLocations(split.asInstanceOf [SlidingRDDPartition [T ]].prev)
5961
6062 override def getPartitions : Array [Partition ] = {
6163 val parentPartitions = parent.partitions
6264 val n = parentPartitions.size
6365 if (n == 0 ) {
6466 Array .empty
6567 } else if (n == 1 ) {
66- Array (new SlidedRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
68+ Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Seq .empty))
6769 } else {
6870 val n1 = n - 1
6971 val w1 = windowSize - 1
7072 // Get the first w1 items of each partition, starting from the second partition.
7173 val nextHeads =
7274 parent.context.runJob(parent, (iter : Iterator [T ]) => iter.take(w1).toArray, 1 until n, true )
73- val partitions = mutable.ArrayBuffer [SlidedRDDPartition [T ]]()
75+ val partitions = mutable.ArrayBuffer [SlidingRDDPartition [T ]]()
7476 var i = 0
7577 var partitionIndex = 0
7678 while (i < n1) {
7779 var j = i
78- val tail = mutable.ArrayBuffer [T ]()
80+ val tail = mutable.ListBuffer [T ]()
7981 // Keep appending to the current tail until appended a head of size w1.
8082 while (j < n1 && nextHeads(j).size < w1) {
8183 tail ++= nextHeads(j)
@@ -85,14 +87,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
8587 tail ++= nextHeads(j)
8688 j += 1
8789 }
88- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray )
90+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail)
8991 partitionIndex += 1
9092 // Skip appended heads.
9193 i = j
9294 }
9395 // If the head of last partition has size w1, we also need to add this partition.
94- if (nextHeads(n1 - 1 ) .size == w1) {
95- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
96+ if (nextHeads.last .size == w1) {
97+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Seq .empty)
9698 }
9799 partitions.toArray
98100 }
0 commit comments