Skip to content

Commit 0c140c1

Browse files
uncleGenzsxwing
authored andcommitted
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <[email protected]> Closes #17221 from uncleGen/SPARK-19859. (cherry picked from commit eeb1d6d) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 00859e1 commit 0c140c1

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
2424
object EventTimeWatermark {
2525
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
2626
val delayKey = "spark.watermarkDelayMs"
27+
28+
def getDelayMs(delay: CalendarInterval): Long = {
29+
// We define month as `31 days` to simplify calculation.
30+
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
31+
delay.milliseconds + delay.months * millisPerMonth
32+
}
2733
}
2834

2935
/**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
3743
// Update the metadata on the eventTime column to include the desired delay.
3844
override val output: Seq[Attribute] = child.output.map { a =>
3945
if (a semanticEquals eventTime) {
46+
val delayMs = EventTimeWatermark.getDelayMs(delay)
4047
val updatedMetadata = new MetadataBuilder()
4148
.withMetadata(a.metadata)
42-
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
49+
.putLong(EventTimeWatermark.delayKey, delayMs)
4350
.build()
4451
a.withMetadata(updatedMetadata)
4552
} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ case class EventTimeWatermarkExec(
8484
child: SparkPlan) extends SparkPlan {
8585

8686
val eventTimeStats = new EventTimeStatsAccum()
87-
val delayMs = {
88-
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
89-
delay.milliseconds + delay.months * millisPerMonth
90-
}
87+
val delayMs = EventTimeWatermark.getDelayMs(delay)
9188

9289
sparkContext.register(eventTimeStats)
9390

@@ -105,10 +102,16 @@ case class EventTimeWatermarkExec(
105102
override val output: Seq[Attribute] = child.output.map { a =>
106103
if (a semanticEquals eventTime) {
107104
val updatedMetadata = new MetadataBuilder()
108-
.withMetadata(a.metadata)
109-
.putLong(EventTimeWatermark.delayKey, delayMs)
110-
.build()
111-
105+
.withMetadata(a.metadata)
106+
.putLong(EventTimeWatermark.delayKey, delayMs)
107+
.build()
108+
a.withMetadata(updatedMetadata)
109+
} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
110+
// Remove existing watermark
111+
val updatedMetadata = new MetadataBuilder()
112+
.withMetadata(a.metadata)
113+
.remove(EventTimeWatermark.delayKey)
114+
.build()
112115
a.withMetadata(updatedMetadata)
113116
} else {
114117
a

0 commit comments

Comments
 (0)