@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
2323import org .apache .spark .{TaskContext , Partition }
2424
2525private [spark]
26- class SlidedRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
26+ class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
2727 extends Partition with Serializable {
2828 override val index : Int = idx
2929}
@@ -41,36 +41,36 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
4141 * @see [[org.apache.spark.rdd.RDD#sliding ]]
4242 */
4343private [spark]
44- class SlidedRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
44+ class SlidingRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
4545 extends RDD [Array [T ]](parent) {
4646
47- require(windowSize > 1 , " Window size must be greater than 1." )
47+ require(windowSize > 1 , s " Window size must be greater than 1, but got $windowSize . " )
4848
4949 override def compute (split : Partition , context : TaskContext ): Iterator [Array [T ]] = {
50- val part = split.asInstanceOf [SlidedRDDPartition [T ]]
50+ val part = split.asInstanceOf [SlidingRDDPartition [T ]]
5151 (firstParent[T ].iterator(part.prev, context) ++ part.tail)
5252 .sliding(windowSize)
5353 .map(_.toArray)
5454 .filter(_.size == windowSize)
5555 }
5656
5757 override def getPreferredLocations (split : Partition ): Seq [String ] =
58- firstParent[T ].preferredLocations(split.asInstanceOf [SlidedRDDPartition [T ]].prev)
58+ firstParent[T ].preferredLocations(split.asInstanceOf [SlidingRDDPartition [T ]].prev)
5959
6060 override def getPartitions : Array [Partition ] = {
6161 val parentPartitions = parent.partitions
6262 val n = parentPartitions.size
6363 if (n == 0 ) {
6464 Array .empty
6565 } else if (n == 1 ) {
66- Array (new SlidedRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
66+ Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
6767 } else {
6868 val n1 = n - 1
6969 val w1 = windowSize - 1
7070 // Get the first w1 items of each partition, starting from the second partition.
7171 val nextHeads =
7272 parent.context.runJob(parent, (iter : Iterator [T ]) => iter.take(w1).toArray, 1 until n, true )
73- val partitions = mutable.ArrayBuffer [SlidedRDDPartition [T ]]()
73+ val partitions = mutable.ArrayBuffer [SlidingRDDPartition [T ]]()
7474 var i = 0
7575 var partitionIndex = 0
7676 while (i < n1) {
@@ -85,14 +85,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
8585 tail ++= nextHeads(j)
8686 j += 1
8787 }
88- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray)
88+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray)
8989 partitionIndex += 1
9090 // Skip appended heads.
9191 i = j
9292 }
9393 // If the head of last partition has size w1, we also need to add this partition.
9494 if (nextHeads(n1 - 1 ).size == w1) {
95- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
95+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
9696 }
9797 partitions.toArray
9898 }
0 commit comments