Skip to content

Commit

Permalink
[Spark][2.4] Fix flaky test suite DeltaSourceDeletionVectorsSuite
Browse files Browse the repository at this point in the history
## Description
(Cherrypick of 67c4b98 to branch-2.4)

- Remove flakiness from a previously flaky test case, by ensuring that the stream is stopped processing before running new DML commands on the source table.

Fixes #1982

## How was this patch tested?
Testing-only PR.

## Does this PR introduce _any_ user-facing changes?
No.
  • Loading branch information
larsk-db authored Sep 7, 2023
1 parent 9ef19f1 commit 68608d7
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTest
command1ShouldProduceDVs: Option[Boolean] = None,
command2ShouldProduceDVs: Option[Boolean] = None,
expectations: List[StreamAction]): Unit = {
val clock = new StreamManualClock

(0 until 15 by 3).foreach { i =>
Seq(i, i + 1, i + 2).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir)
Expand All @@ -157,15 +158,22 @@ trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTest
val shouldFailAfterCommands = expectations.exists(_.isInstanceOf[ExpectFailure[_]])

val baseActions: Seq[StreamAction] = Seq(
StartStream(
Trigger.ProcessingTime("10 seconds"),
new StreamManualClock(System.currentTimeMillis())),
AdvanceManualClock(10L * 1000L),
StartStream(Trigger.ProcessingTime(1000), clock),
AdvanceManualClock(1000L),
CheckAnswer((0 until 15): _*),
AssertOnQuery { q =>
// Make sure we only processed a single batch since the initial data load.
// Ensure we only processed a single batch since the initial data load.
q.commitLog.getLatestBatchId().get == 0
},
AssertOnQuery { q =>
eventually("Stream never stopped processing") {
// Wait until the stream stops processing, so we aren't racing with the next two
// commands on whether or not they end up in the same batch.
assert(!q.status.isTriggerActive)
assert(!q.status.isDataAvailable)
}
true
},
AssertOnQuery { q =>
sql(sqlCommand1)
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1)
Expand All @@ -174,7 +182,12 @@ trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTest
sql(sqlCommand2)
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2)
},
AdvanceManualClock(20L * 1000L)) ++
AssertOnQuery { q =>
// Ensure we still didn't process the DML commands.
q.commitLog.getLatestBatchId().get == 0
},
// Advance the clock, so that we process the two DML commands.
AdvanceManualClock(2000L)) ++
(if (shouldFailAfterCommands) {
Seq.empty[StreamAction]
} else {
Expand All @@ -183,7 +196,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTest
AssertOnQuery(waitUntilBatchProcessed(1, _)),
AssertOnQuery { q =>
eventually("Next batch was never processed") {
// Make sure we only processed a single batch since the initial data load.
// Ensure we only processed a single batch with the DML commands.
assert(q.commitLog.getLatestBatchId().get === 1)
}
true
Expand Down Expand Up @@ -375,7 +388,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTest
}

for (sourceOption <- allSourceOptions)
testQuietly("subsequent DML commands are processed correctly in a batch - INSERT->UPDATE" +
testQuietly("subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
s" - $sourceOption") {
val expectations: List[StreamAction] = sourceOption.map(_._1) match {
case List(DeltaOptions.IGNORE_DELETES_OPTION) | Nil =>
Expand Down

0 comments on commit 68608d7

Please sign in to comment.