diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 2a9e6b849d89..81b22be0b8bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -342,55 +342,51 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } testWithAllStateVersions("prune results by current_date, complete mode") { - withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { - import testImplicits._ - val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) - .toDF("value") - .groupBy($"value") - .agg(count("*")) - .where($"value".cast("date") >= date_sub(current_date(), 10)) - .select( - ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") - testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // advance clock to 10 days, should retain all keys - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 days. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.commitLog.purge(3) - // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) - true - }, - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // Commit log blown, causing a re-run of the last batch - CheckLastBatch((20L, 1), (85L, 1)), - - // advance clock to 100 days, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) - } + import testImplicits._ + val clock = new StreamManualClock + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value")) + .groupBy($"value") + .agg(count("*")) + .where($"value".cast("date") >= date_sub(current_timestamp().cast("date"), 10)) + .select( + ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") + testStream(aggregated, Complete)( + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 days. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.commitLog.purge(3) + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + true + }, + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) } testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +