Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,55 +342,51 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
}

testWithAllStateVersions("prune results by current_date, complete mode") {
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
import testImplicits._
val clock = new StreamManualClock
val tz = TimeZone.getDefault.getID
val inputData = MemoryStream[Long]
val aggregated =
inputData.toDF()
.select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz))
.toDF("value")
.groupBy($"value")
.agg(count("*"))
.where($"value".cast("date") >= date_sub(current_date(), 10))
.select(
($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
testStream(aggregated, Complete)(
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// advance clock to 10 days, should retain all keys
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
// advance clock to 20 days, should retain keys >= 10
AddData(inputData, 15L, 15L, 20L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
// advance clock to 30 days, should retain keys >= 20
AddData(inputData, 85L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((20L, 1), (85L, 1)),

// bounce stream and ensure correct batch timestamp is used
// i.e., we don't take it from the clock, which is at 90 days.
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.commitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true
},
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// Commit log blown, causing a re-run of the last batch
CheckLastBatch((20L, 1), (85L, 1)),

// advance clock to 100 days, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}
import testImplicits._
val clock = new StreamManualClock
val inputData = MemoryStream[Long]
val aggregated =
inputData.toDF()
.select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the only change right? can you remove tz too?
Seems OK, especially if pulling timezones out of the equation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is one more: I replaced current_date with current_timestamp().cast("date").

current_date returns date in UTC. We can't compare it with the value of casting timestamp column to date. There is a timezone shift when casting between date and timestamp.

.groupBy($"value")
.agg(count("*"))
.where($"value".cast("date") >= date_sub(current_timestamp().cast("date"), 10))
.select(
($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
testStream(aggregated, Complete)(
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// advance clock to 10 days, should retain all keys
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
// advance clock to 20 days, should retain keys >= 10
AddData(inputData, 15L, 15L, 20L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
// advance clock to 30 days, should retain keys >= 20
AddData(inputData, 85L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((20L, 1), (85L, 1)),

// bounce stream and ensure correct batch timestamp is used
// i.e., we don't take it from the clock, which is at 90 days.
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.commitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true
},
StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// Commit log blown, causing a re-run of the last batch
CheckLastBatch((20L, 1), (85L, 1)),

// advance clock to 100 days, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}

testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +
Expand Down