Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,52 @@ class RateStreamSource(
}

val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds)
val relativeMsPerValue =
TimeUnit.SECONDS.toMillis(endSeconds - startSeconds) / (rangeEnd - rangeStart)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that you would only change TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble. Wasn't expecting all this change!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid floating point inaccuracy :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's acceptable. Right now the code got overcomplicated :(

val timeIntervalSizeMs = TimeUnit.SECONDS.toMillis(endSeconds - startSeconds)

val rdd = sqlContext.sparkContext.range(rangeStart, rangeEnd, 1, numPartitions).map { v =>
val relative = (v - rangeStart) * relativeMsPerValue
InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v)
}
val func =
if (timeIntervalSizeMs < rangeEnd - rangeStart) {
// Different rows may have the same timestamp
val valueSizePerMs = (rangeEnd - rangeStart) / timeIntervalSizeMs
val remainderValue = (rangeEnd - rangeStart) % timeIntervalSizeMs

(v: Long) => {
val relativeValue = v - rangeStart
val relativeMs = {
// Increase the timestamp per "valueSizePerMs + 1" values before
// "(valueSizePerMs + 1) * remainderValue", and increase the timestamp per
// "valueSizePerMs" values for remaining values.

// The following condition is the same as
// "relativeValue < (valueSizePerMs + 1) * remainderValue", just rewrite it to avoid
// overflow.
if (relativeValue - remainderValue < valueSizePerMs * remainderValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add parenthesis around relativeValue - remainderValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also around valueSizePerMs * remainderValue
=> (relativeValue - remainderValue) < (valueSizePerMs * remainderValue)

relativeValue / (valueSizePerMs + 1)
} else {
(relativeValue - remainderValue) / valueSizePerMs
}
}
InternalRow(DateTimeUtils.fromMillis(relativeMs + localStartTimeMs), v)
}
} else {
// Different rows never have the same timestamp
val relativeMsPerValue = timeIntervalSizeMs / (rangeEnd - rangeStart)
val remainderMs = timeIntervalSizeMs % (rangeEnd - rangeStart)

(v: Long) => {
val relativeValue = v - rangeStart
// The interval size for the first "remainderMs" values will be "relativeMsPerValue + 1",
// and the interval size for remaining values will be "relativeMsPerValue".
val relativeMs =
if (relativeValue < remainderMs) {
relativeValue * (relativeMsPerValue + 1)
} else {
remainderMs + relativeValue * relativeMsPerValue
}
InternalRow(DateTimeUtils.fromMillis(relativeMs + localStartTimeMs), v)
}
}

val rdd = sqlContext.sparkContext.range(rangeStart, rangeEnd, 1, numPartitions).map(func)
sqlContext.internalCreateDataFrame(rdd, schema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ class RateSourceSuite extends StreamTest {
)
}

test("uniform distribution of event timestamps: tuplesPerSecond > 1000") {
val input = spark.readStream
.format("rate")
.option("tuplesPerSecond", "1500")
.option("useManualClock", "true")
.load()
.as[(java.sql.Timestamp, Long)]
.map(v => (v._1.getTime, v._2))
val expectedAnswer =
(0 until 1000).map(v => (v / 2, v)) ++ // Two values share the same timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there are 1000 timestamps in one second but we have 1500 values.

((1000 until 1500).map(v => (v - 500, v))) // Each value has one timestamp
testStream(input)(
AdvanceRateManualClock(seconds = 1),
CheckLastBatch(expectedAnswer: _*)
)
}

test("uniform distribution of event timestamps: tuplesPerSecond < 1000") {
val input = spark.readStream
.format("rate")
.option("tuplesPerSecond", "400")
.option("useManualClock", "true")
.load()
.as[(java.sql.Timestamp, Long)]
.map(v => (v._1.getTime, v._2))
val expectedAnswer = (0 until 200).map(v => (v * 3, v)) ++
((200 until 400).map(v => (600 + (v - 200) * 2, v)))
testStream(input)(
AdvanceRateManualClock(seconds = 1),
CheckLastBatch(expectedAnswer: _*)
)
}

test("valueAtSecond") {
import RateStreamSource._

Expand Down Expand Up @@ -87,7 +120,9 @@ class RateSourceSuite extends StreamTest {
AdvanceRateManualClock(seconds = 1),
CheckLastBatch((2 until 6).map(v => 1000 + (v - 2) * 250 -> v): _*), // speed = 4
AdvanceRateManualClock(seconds = 1),
CheckLastBatch((6 until 12).map(v => 2000 + (v - 6) * 166 -> v): _*), // speed = 6
CheckLastBatch({
Seq(2000 -> 6, 2167 -> 7, 2334 -> 8, 2501 -> 9, 2668 -> 10, 2834 -> 11)
}: _*), // speed = 6
AdvanceRateManualClock(seconds = 1),
CheckLastBatch((12 until 20).map(v => 3000 + (v - 12) * 125 -> v): _*), // speed = 8
AdvanceRateManualClock(seconds = 1),
Expand Down