Skip to content

Commit

Permalink
[Spark] Fix flaky test suite DeltaSourceDeletionVectorsSuite
Browse files Browse the repository at this point in the history
Remove flakiness from a previously flaky test case, by ensuring that the stream is stopped processing before running new DML commands on the source table.

Fixes #1982
Closes #1989

GitOrigin-RevId: a47f5fdd533e4f4c7ff2a044085cfd99367a7287
  • Loading branch information
larsk-db authored and vkorukanti committed Sep 6, 2023
1 parent 22ade20 commit 67c4b98
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
command1ShouldProduceDVs: Option[Boolean] = None,
command2ShouldProduceDVs: Option[Boolean] = None,
expectations: List[StreamAction]): Unit = {
val clock = new StreamManualClock

(0 until 15 by 3).foreach { i =>
Seq(i, i + 1, i + 2).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir)
Expand All @@ -159,15 +160,22 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
val shouldFailAfterCommands = expectations.exists(_.isInstanceOf[ExpectFailure[_]])

val baseActions: Seq[StreamAction] = Seq(
StartStream(
Trigger.ProcessingTime("10 seconds"),
new StreamManualClock(System.currentTimeMillis())),
AdvanceManualClock(10L * 1000L),
StartStream(Trigger.ProcessingTime(1000), clock),
AdvanceManualClock(1000L),
CheckAnswer((0 until 15): _*),
AssertOnQuery { q =>
// Make sure we only processed a single batch since the initial data load.
// Ensure we only processed a single batch since the initial data load.
q.commitLog.getLatestBatchId().get == 0
},
AssertOnQuery { q =>
eventually("Stream never stopped processing") {
// Wait until the stream stops processing, so we aren't racing with the next two
// commands on whether or not they end up in the same batch.
assert(!q.status.isTriggerActive)
assert(!q.status.isDataAvailable)
}
true
},
AssertOnQuery { q =>
sql(sqlCommand1)
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1)
Expand All @@ -176,7 +184,12 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
sql(sqlCommand2)
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2)
},
AdvanceManualClock(20L * 1000L)) ++
AssertOnQuery { q =>
// Ensure we still didn't process the DML commands.
q.commitLog.getLatestBatchId().get == 0
},
// Advance the clock, so that we process the two DML commands.
AdvanceManualClock(2000L)) ++
(if (shouldFailAfterCommands) {
Seq.empty[StreamAction]
} else {
Expand All @@ -185,7 +198,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
AssertOnQuery(waitUntilBatchProcessed(1, _)),
AssertOnQuery { q =>
eventually("Next batch was never processed") {
// Make sure we only processed a single batch since the initial data load.
// Ensure we only processed a single batch with the DML commands.
assert(q.commitLog.getLatestBatchId().get === 1)
}
true
Expand Down Expand Up @@ -378,11 +391,8 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
}

for (sourceOption <- allSourceOptions)
// TODO(larsk-db): Reinstate once flakiness is fixed: testQuietly(
ignore(
"subsequent DML commands are processed correctly in a batch - INSERT->UPDATE" +
s" - $sourceOption"
) {
testQuietly("subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
s" - $sourceOption") {
val expectations: List[StreamAction] = sourceOption.map(_._1) match {
case List(DeltaOptions.IGNORE_DELETES_OPTION) | Nil =>
// These two do not allow updates.
Expand Down

0 comments on commit 67c4b98

Please sign in to comment.