Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5083][Core] Fix a flaky test in TaskResultGetterSuite #3894

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package org.apache.spark.scheduler

import java.nio.ByteBuffer

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.storage.TaskResultBlockId
Expand All @@ -34,6 +39,8 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false

@volatile var removeBlockSuccessfully = false

override def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
Expand All @@ -42,6 +49,15 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId, size) =>
sparkEnv.blockManager.master.removeBlock(blockId)
// removeBlock is asynchronous. Need to wait it's removed successfully
try {
eventually(timeout(3 seconds), interval(200 milliseconds)) {
assert(!sparkEnv.blockManager.master.contains(blockId))
}
removeBlockSuccessfully = true
} catch {
case NonFatal(e) => removeBlockSuccessfully = false
}
case directResult: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
Expand Down Expand Up @@ -92,10 +108,12 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSpark
assert(false, "Expect local cluster to use TaskSchedulerImpl")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
scheduler.taskResultGetter = resultGetter
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(resultGetter.removeBlockSuccessfully)
assert(result === 1.to(akkaFrameSize).toArray)

// Make sure two tasks were run (one failed one, and a second retried one).
Expand Down