Skip to content

Commit bfe9502

Browse files
committed
Fixed Scala style
1 parent 9dd7826 commit bfe9502

File tree

5 files changed

+18
-8
lines changed

5 files changed

+18
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/binning/BinStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.streaming.dstream.DStream
2323

24-
class BinStream[T: ClassTag](@transient ds: DStream[T], sizeInNumBatches: Int, delayInNumBatches: Int) {
24+
class BinStream[T: ClassTag](
25+
@transient ds: DStream[T], sizeInNumBatches: Int, delayInNumBatches: Int) {
2526
def getDStream = ds
2627
}

streaming/src/main/scala/org/apache/spark/streaming/binning/BinStreamer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import org.apache.spark.streaming.Time
2222
import scala.reflect.ClassTag
2323

2424

25-
class BinStreamer[T: ClassTag](@transient ds: DStream[T], getStartTime: (T) => Time, getEndTime: (T) => Time) extends Serializable {
25+
class BinStreamer[T: ClassTag](
26+
@transient ds: DStream[T], getStartTime: (T) => Time, getEndTime: (T) => Time
27+
) extends Serializable {
2628

2729
def prorate(binStart: Time, binEnd: Time)(x: T) = {
2830

@@ -34,7 +36,8 @@ class BinStreamer[T: ClassTag](@transient ds: DStream[T], getStartTime: (T) => T
3436
}
3537
else {
3638

37-
// Even though binStart is not inclusive, setting s = binStart implies limit s as x approaches binStart+
39+
// Even though binStart is not inclusive,
40+
// setting s = binStart implies limit s as x approaches binStart+
3841
val s = if (sx > binStart) sx else binStart
3942

4043
val e = if (ex < binEnd) ex else binEnd

streaming/src/main/scala/org/apache/spark/streaming/dstream/BinAlignedWindowDStream.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ class BinAlignedWindowDStream[T: ClassTag](
3636

3737
override def slideDuration: Duration = parent.slideDuration
3838

39-
override def parentRememberDuration: Duration = rememberDuration + parent.slideDuration * sizeNumBatches * (delayNumBins + 1)
39+
override def parentRememberDuration: Duration =
40+
rememberDuration + parent.slideDuration * sizeNumBatches * (delayNumBins + 1)
4041

4142
override def compute(validTime: Time): Option[RDD[T]] = {
4243

43-
val binStart = (validTime - Duration(1)).floor(slideDuration * sizeNumBatches) - slideDuration * sizeNumBatches * delayNumBins
44+
val binStart =
45+
(validTime - Duration(1)).floor(slideDuration * sizeNumBatches) -
46+
slideDuration * sizeNumBatches * delayNumBins
4447

4548
if ((validTime - binStart).isMultipleOf(slideDuration * sizeNumBatches)) {
4649
val currentWindow = new Interval(binStart + slideDuration, validTime)

streaming/src/main/scala/org/apache/spark/streaming/dstream/ProratedEventDStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class ProratedEventDStream[T: ClassTag](parent: DStream[T],
4444
// Assumption: start(x) <= end(x) <= boundaryEnd
4545
//
4646

47-
def binStart = (validTime - Duration(1)).floor(slideDuration * sizeNumBatches) - slideDuration * sizeNumBatches * delayNumBins
47+
def binStart = (validTime - Duration(1)).floor(slideDuration * sizeNumBatches) -
48+
slideDuration * sizeNumBatches * delayNumBins
4849
def binEnd = binStart + slideDuration * sizeNumBatches
4950

5051
parent.getOrCompute(validTime).map(

streaming/src/main/scala/org/apache/spark/streaming/dstream/PulsatingWindowDStream.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ class PulsatingWindowDStream[T: ClassTag](parent: DStream[T],
3535

3636
override def slideDuration: Duration = parent.slideDuration
3737

38-
override def parentRememberDuration: Duration = rememberDuration + parent.slideDuration * sizeNumBatches * (delayNumBins + 1)
38+
override def parentRememberDuration: Duration =
39+
rememberDuration + parent.slideDuration * sizeNumBatches * (delayNumBins + 1)
3940

4041
override def compute(validTime: Time): Option[RDD[T]] = {
4142

42-
val binStart = (validTime - Duration(1)).floor(slideDuration * sizeNumBatches) - slideDuration * sizeNumBatches * delayNumBins
43+
val binStart = (validTime - Duration(1)).floor(slideDuration * sizeNumBatches) -
44+
slideDuration * sizeNumBatches * delayNumBins
4345

4446
val currentWindow = new Interval(binStart + slideDuration, validTime)
4547

0 commit comments

Comments
 (0)