@@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
300300 )
301301 }
302302
303+ test(" watermark with 2 streams" ) {
304+ import org .apache .spark .sql .functions .sum
305+ val first = MemoryStream [Int ]
306+
307+ val firstDf = first.toDF()
308+ .withColumn(" eventTime" , $" value" .cast(" timestamp" ))
309+ .withWatermark(" eventTime" , " 10 seconds" )
310+ .select(' value )
311+
312+ val second = MemoryStream [Int ]
313+
314+ val secondDf = second.toDF()
315+ .withColumn(" eventTime" , $" value" .cast(" timestamp" ))
316+ .withWatermark(" eventTime" , " 5 seconds" )
317+ .select(' value )
318+
319+ withTempDir { checkpointDir =>
320+ val unionWriter = firstDf.union(secondDf).agg(sum(' value ))
321+ .writeStream
322+ .option(" checkpointLocation" , checkpointDir.getCanonicalPath)
323+ .format(" memory" )
324+ .outputMode(" complete" )
325+ .queryName(" test" )
326+
327+ val union = unionWriter.start()
328+
329+ def getWatermarkAfterData (
330+ firstData : Seq [Int ] = Seq .empty,
331+ secondData : Seq [Int ] = Seq .empty,
332+ query : StreamingQuery = union): Long = {
333+ if (firstData.nonEmpty) first.addData(firstData)
334+ if (secondData.nonEmpty) second.addData(secondData)
335+ query.processAllAvailable()
336+ // add a dummy batch so lastExecution has the new watermark
337+ first.addData(0 )
338+ query.processAllAvailable()
339+ // get last watermark
340+ val lastExecution = query.asInstanceOf [StreamingQueryWrapper ].streamingQuery.lastExecution
341+ lastExecution.offsetSeqMetadata.batchWatermarkMs
342+ }
343+
344+ // Global watermark starts at 0 until we get data from both sides
345+ assert(getWatermarkAfterData(firstData = Seq (11 )) == 0 )
346+ assert(getWatermarkAfterData(secondData = Seq (6 )) == 1000 )
347+ // Global watermark stays at left watermark 1 when right watermark moves to 2
348+ assert(getWatermarkAfterData(secondData = Seq (8 )) == 1000 )
349+ // Global watermark switches to right side value 2 when left watermark goes higher
350+ assert(getWatermarkAfterData(firstData = Seq (21 )) == 3000 )
351+ // Global watermark goes back to left
352+ assert(getWatermarkAfterData(secondData = Seq (17 , 28 , 39 )) == 11000 )
353+ // Global watermark stays on left as long as it's below right
354+ assert(getWatermarkAfterData(firstData = Seq (31 )) == 21000 )
355+ assert(getWatermarkAfterData(firstData = Seq (41 )) == 31000 )
356+ // Global watermark switches back to right again
357+ assert(getWatermarkAfterData(firstData = Seq (51 )) == 34000 )
358+
359+ // Global watermark is updated correctly with simultaneous data from both sides
360+ assert(getWatermarkAfterData(firstData = Seq (100 ), secondData = Seq (100 )) == 90000 )
361+ assert(getWatermarkAfterData(firstData = Seq (120 ), secondData = Seq (110 )) == 105000 )
362+ assert(getWatermarkAfterData(firstData = Seq (130 ), secondData = Seq (125 )) == 120000 )
363+
364+ // Global watermark doesn't decrement with simultaneous data
365+ assert(getWatermarkAfterData(firstData = Seq (100 ), secondData = Seq (100 )) == 120000 )
366+ assert(getWatermarkAfterData(firstData = Seq (140 ), secondData = Seq (100 )) == 120000 )
367+ assert(getWatermarkAfterData(firstData = Seq (100 ), secondData = Seq (135 )) == 130000 )
368+
369+ // Global watermark recovers after restart, but left side watermark ahead of it does not.
370+ assert(getWatermarkAfterData(firstData = Seq (200 ), secondData = Seq (190 )) == 185000 )
371+ union.stop()
372+ val union2 = unionWriter.start()
373+ assert(getWatermarkAfterData(query = union2) == 185000 )
374+ // Even though the left side was ahead of 185000 in the last execution, the watermark won't
375+ // increment until it gets past it in this execution.
376+ assert(getWatermarkAfterData(secondData = Seq (200 ), query = union2) == 185000 )
377+ assert(getWatermarkAfterData(firstData = Seq (200 ), query = union2) == 190000 )
378+ }
379+ }
380+
303381 test(" complete mode" ) {
304382 val inputData = MemoryStream [Int ]
305383
0 commit comments