From f69da1085e2e9766f2a60984b81209aff482ddcf Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Sat, 8 Aug 2020 00:25:44 -0700 Subject: [PATCH] more bm test hardening tweaking --- .../BlockManagerDecommissionIntegrationSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 25145dac5268..7cf008381af6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -40,7 +40,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val TaskEnded = "TASK_ENDED" val JobEnded = "JOB_ENDED" - test(s"verify that an already running task which is going to cache data succeeds " + + testRetry(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor after task start") { runDecomTest(true, false, TaskStarted) } @@ -89,7 +89,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val sleepIntervalMs = whenToDecom match { // Increase the window of time b/w task started and ended so that we can decom within that. - case TaskStarted => 2000 + case TaskStarted => 10000 // Make one task take a really short time so that we can decommission right after it is // done but before its peers are done. case TaskEnded => @@ -176,11 +176,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } else { 10.milliseconds } - eventually(timeout(6.seconds), interval(intervalMs)) { + eventually(timeout(20.seconds), interval(intervalMs)) { assert(getCandidateExecutorToDecom.isDefined) } } else { - ThreadUtils.awaitResult(asyncCount, 15.seconds) + ThreadUtils.awaitResult(asyncCount, 1.minute) } // Decommission one of the executors. @@ -194,7 +194,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val decomTime = new SystemClock().getTimeMillis() // Wait for job to finish. - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 1.minute) assert(asyncCountResult === numParts) // All tasks finished, so accum should have been increased numParts times. assert(accum.value === numParts) @@ -226,7 +226,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Wait for our respective blocks to have migrated - eventually(timeout(30.seconds), interval(10.milliseconds)) { + eventually(timeout(1.minute), interval(10.milliseconds)) { if (persist) { // One of our blocks should have moved. val rddUpdates = blocksUpdated.filter { update =>