@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
2323import org .apache .spark .{TaskContext , Partition }
2424
2525private [spark]
26- class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
26+ class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Seq [T ])
2727 extends Partition with Serializable {
2828 override val index : Int = idx
2929}
@@ -42,16 +42,15 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[
4242 */
4343private [spark]
4444class SlidingRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
45- extends RDD [Array [T ]](parent) {
45+ extends RDD [Seq [T ]](parent) {
4646
4747 require(windowSize > 1 , s " Window size must be greater than 1, but got $windowSize. " )
4848
49- override def compute (split : Partition , context : TaskContext ): Iterator [Array [T ]] = {
49+ override def compute (split : Partition , context : TaskContext ): Iterator [Seq [T ]] = {
5050 val part = split.asInstanceOf [SlidingRDDPartition [T ]]
5151 (firstParent[T ].iterator(part.prev, context) ++ part.tail)
5252 .sliding(windowSize)
53- .map(_.toArray)
54- .filter(_.size == windowSize)
53+ .withPartial(false )
5554 }
5655
5756 override def getPreferredLocations (split : Partition ): Seq [String ] =
@@ -63,7 +62,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
6362 if (n == 0 ) {
6463 Array .empty
6564 } else if (n == 1 ) {
66- Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
65+ Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Seq .empty))
6766 } else {
6867 val n1 = n - 1
6968 val w1 = windowSize - 1
@@ -75,7 +74,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
7574 var partitionIndex = 0
7675 while (i < n1) {
7776 var j = i
78- val tail = mutable.ArrayBuffer [T ]()
77+ val tail = mutable.ListBuffer [T ]()
7978 // Keep appending to the current tail until appended a head of size w1.
8079 while (j < n1 && nextHeads(j).size < w1) {
8180 tail ++= nextHeads(j)
@@ -85,14 +84,14 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
8584 tail ++= nextHeads(j)
8685 j += 1
8786 }
88- partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray )
87+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toSeq )
8988 partitionIndex += 1
9089 // Skip appended heads.
9190 i = j
9291 }
9392 // If the head of last partition has size w1, we also need to add this partition.
9493 if (nextHeads(n1 - 1 ).size == w1) {
95- partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
94+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Seq .empty)
9695 }
9796 partitions.toArray
9897 }
0 commit comments