Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val TaskEnded = "TASK_ENDED"
val JobEnded = "JOB_ENDED"

test(s"verify that an already running task which is going to cache data succeeds " +
testRetry(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor after task start") {
runDecomTest(true, false, TaskStarted)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS

val sleepIntervalMs = whenToDecom match {
// Increase the window of time b/w task started and ended so that we can decom within that.
case TaskStarted => 2000
case TaskStarted => 10000
// Make one task take a really short time so that we can decommission right after it is
// done but before its peers are done.
case TaskEnded =>
Expand Down Expand Up @@ -176,11 +176,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
} else {
10.milliseconds
}
eventually(timeout(6.seconds), interval(intervalMs)) {
eventually(timeout(20.seconds), interval(intervalMs)) {
assert(getCandidateExecutorToDecom.isDefined)
}
} else {
ThreadUtils.awaitResult(asyncCount, 15.seconds)
ThreadUtils.awaitResult(asyncCount, 1.minute)
}

// Decommission one of the executors.
Expand All @@ -194,7 +194,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val decomTime = new SystemClock().getTimeMillis()

// Wait for job to finish.
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 1.minute)
assert(asyncCountResult === numParts)
// All tasks finished, so accum should have been increased numParts times.
assert(accum.value === numParts)
Expand Down Expand Up @@ -226,7 +226,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}

// Wait for our respective blocks to have migrated
eventually(timeout(30.seconds), interval(10.milliseconds)) {
eventually(timeout(1.minute), interval(10.milliseconds)) {
if (persist) {
// One of our blocks should have moved.
val rddUpdates = blocksUpdated.filter { update =>
Expand Down