-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years #16304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7fc3411
33a7d1b
8c603f4
67ad327
736c903
0a42fab
9d3175a
40c200f
b9a37bf
debc748
29f0037
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,15 +19,17 @@ package org.apache.spark.sql.streaming | |
|
|
||
| import java.{util => ju} | ||
| import java.text.SimpleDateFormat | ||
| import java.util.{Calendar, Date} | ||
|
|
||
| import org.scalatest.BeforeAndAfter | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.{AnalysisException, Row} | ||
| import org.apache.spark.sql.execution.streaming._ | ||
| import org.apache.spark.sql.functions.{count, window} | ||
| import org.apache.spark.sql.InternalOutputModes.Complete | ||
|
|
||
| class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { | ||
| class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
|
|
@@ -52,48 +54,59 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { | |
| assert(e.getMessage contains "int") | ||
| } | ||
|
|
||
|
|
||
| test("event time and watermark metrics") { | ||
| val inputData = MemoryStream[Int] | ||
| // No event time metrics when there is no watermarking | ||
| val inputData1 = MemoryStream[Int] | ||
| val aggWithoutWatermark = inputData1.toDF() | ||
| .withColumn("eventTime", $"value".cast("timestamp")) | ||
| .groupBy(window($"eventTime", "5 seconds") as 'window) | ||
| .agg(count("*") as 'count) | ||
| .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) | ||
|
|
||
| val windowedAggregation = inputData.toDF() | ||
| testStream(aggWithoutWatermark, outputMode = Complete)( | ||
| AddData(inputData1, 15), | ||
| CheckAnswer((15, 1)), | ||
| assertEventStats { e => assert(e.isEmpty) }, | ||
| AddData(inputData1, 10, 12, 14), | ||
| CheckAnswer((10, 3), (15, 1)), | ||
| assertEventStats { e => assert(e.isEmpty) } | ||
| ) | ||
|
|
||
| // All event time metrics where watermarking is set | ||
| val inputData2 = MemoryStream[Int] | ||
| val aggWithWatermark = inputData2.toDF() | ||
| .withColumn("eventTime", $"value".cast("timestamp")) | ||
| .withWatermark("eventTime", "10 seconds") | ||
| .groupBy(window($"eventTime", "5 seconds") as 'window) | ||
| .agg(count("*") as 'count) | ||
| .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) | ||
|
|
||
| def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => | ||
| body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) | ||
| true | ||
| } | ||
|
|
||
| testStream(windowedAggregation)( | ||
| AddData(inputData, 15), | ||
| testStream(aggWithWatermark)( | ||
| AddData(inputData2, 15), | ||
| CheckAnswer(), | ||
| assertEventStats { e => | ||
| assert(e.get("max") === formatTimestamp(15)) | ||
| assert(e.get("min") === formatTimestamp(15)) | ||
| assert(e.get("avg") === formatTimestamp(15)) | ||
| assert(e.get("watermark") === formatTimestamp(0)) | ||
| }, | ||
| AddData(inputData, 10, 12, 14), | ||
| AddData(inputData2, 10, 12, 14), | ||
| CheckAnswer(), | ||
| assertEventStats { e => | ||
| assert(e.get("max") === formatTimestamp(14)) | ||
| assert(e.get("min") === formatTimestamp(10)) | ||
| assert(e.get("avg") === formatTimestamp(12)) | ||
| assert(e.get("watermark") === formatTimestamp(5)) | ||
| }, | ||
| AddData(inputData, 25), | ||
| AddData(inputData2, 25), | ||
| CheckAnswer(), | ||
| assertEventStats { e => | ||
| assert(e.get("max") === formatTimestamp(25)) | ||
| assert(e.get("min") === formatTimestamp(25)) | ||
| assert(e.get("avg") === formatTimestamp(25)) | ||
| assert(e.get("watermark") === formatTimestamp(5)) | ||
| }, | ||
| AddData(inputData, 25), | ||
| AddData(inputData2, 25), | ||
| CheckAnswer((10, 3)), | ||
| assertEventStats { e => | ||
| assert(e.get("max") === formatTimestamp(25)) | ||
|
|
@@ -124,6 +137,29 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { | |
| ) | ||
| } | ||
|
|
||
| test("delay in years handled correctly") { | ||
| val input = MemoryStream[Long] | ||
| val aggWithWatermark = input.toDF() | ||
| .withColumn("eventTime", $"value".cast("timestamp")) | ||
| .withWatermark("eventTime", "1 month") | ||
|
||
| .groupBy(window($"eventTime", "5 seconds") as 'window) | ||
| .agg(count("*") as 'count) | ||
| .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) | ||
|
|
||
| val currentTimeMs = System.currentTimeMillis | ||
| val millisPerYear = 1000L * 60 * 60 * 24 * 366 // assume leap year | ||
|
|
||
| testStream(aggWithWatermark)( | ||
| AddData(input, currentTimeMs / 1000), | ||
| CheckAnswer(), | ||
| assertEventStats { e => | ||
| assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) | ||
| val watermarkTime = timestampFormat.parse(e.get("watermark")) | ||
| assert(currentTimeMs - watermarkTime.getTime >= 2 * millisPerYear) | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| test("recovery") { | ||
| val inputData = MemoryStream[Int] | ||
| val df = inputData.toDF() | ||
|
|
@@ -231,6 +267,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { | |
| ) | ||
| } | ||
|
|
||
| private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { | ||
| AssertOnQuery { q => | ||
| body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) | ||
| true | ||
| } | ||
| } | ||
|
|
||
| private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 | ||
| timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newWatermarkMscan be negative. Image the user is processing some very old data which is before 1970.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think a lot of things are going to break for that usecase. I dont think our sql functions, Java time format, etc are even designed to handle negative millis. The way I found this issue is that when i tried for convert -ve watermark to formatted string, it gave a very weird date. So I dont think we should add complexity for that use case.
@marmbrus any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not totally sure, but it does seem likely that we don't handle negative dates (and that is probably okay).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a SQL bug. IMO, a negative timestamp is valid, it's not a negative date, just a date before 1970.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I think I was doing something to goof up the conversion using SimpleDateFormat. I am fixing that.