diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 465c0d20de481..bb929c27b6a65 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -132,4 +132,6 @@ private[spark] object CoarseGrainedClusterMessages { // Used internally by executors to shut themselves down. case object Shutdown extends CoarseGrainedClusterMessage + // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. + case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5f9593c..e4f4000d3574d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -285,6 +285,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Option(delegationTokens.get()), rp) context.reply(reply) + + case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId)) + case e => logError(s"Received unexpected ask ${e}") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3cfa5d2a25818..f571e428522c5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -142,7 +142,8 @@ class BlockManagerMaster( logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) )(ThreadUtils.sameThread) if (blocking) { - timeout.awaitResult(future) + // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here + RpcUtils.INFINITE_TIMEOUT.awaitResult(future) } } @@ -153,7 +154,8 @@ class BlockManagerMaster( logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) )(ThreadUtils.sameThread) if (blocking) { - timeout.awaitResult(future) + // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here + RpcUtils.INFINITE_TIMEOUT.awaitResult(future) } } @@ -166,7 +168,8 @@ class BlockManagerMaster( s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e) )(ThreadUtils.sameThread) if (blocking) { - timeout.awaitResult(future) + // the underlying Futures will timeout anyway, so it's safe to use infinite timeout here + RpcUtils.INFINITE_TIMEOUT.awaitResult(future) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d936420a99276..f90216b973776 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.Random +import scala.util.control.NonFatal import com.google.common.cache.CacheBuilder @@ -32,8 +33,9 @@ import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExternalBlockStoreClient -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} @@ -95,6 +97,9 @@ class BlockManagerMasterEndpoint( private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) + private lazy val driverEndpoint = + RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv) + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) @@ -168,6 +173,50 @@ class BlockManagerMasterEndpoint( stop() } + /** + * A function that used to handle the failures when removing blocks. In general, the failure + * should be considered as non-fatal since it won't cause any correctness issue. Therefore, + * this function would prefer to log the exception and return the default value. We only throw + * the exception when there's a TimeoutException from an active executor, which implies the + * unhealthy status of the executor while the driver still not be aware of it. + * @param blockType should be one of "RDD", "shuffle", "broadcast", "block", used for log + * @param blockId the string value of a certain block id, used for log + * @param bmId the BlockManagerId of the BlockManager, where we're trying to remove the block + * @param defaultValue the return value of a failure removal. e.g., 0 means no blocks are removed + * @tparam T the generic type for defaultValue, Int or Boolean. + * @return the defaultValue or throw exception if the executor is active but reply late. + */ + private def handleBlockRemovalFailure[T]( + blockType: String, + blockId: String, + bmId: BlockManagerId, + defaultValue: T): PartialFunction[Throwable, T] = { + case e: IOException => + logWarning(s"Error trying to remove $blockType $blockId" + + s" from block manager $bmId", e) + defaultValue + + case t: TimeoutException => + val executorId = bmId.executorId + val isAlive = try { + driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(executorId)) + } catch { + // ignore the non-fatal error from driverEndpoint since the caller doesn't really + // care about the return result of removing blocks. And so we could avoid breaking + // down the whole application. + case NonFatal(e) => + logError(s"Fail to know the executor $executorId is alive or not.", e) + false + } + if (!isAlive) { + logWarning(s"Error trying to remove $blockType $blockId. " + + s"The executor $executorId may have been lost.", t) + defaultValue + } else { + throw t + } + } + private def removeRdd(rddId: Int): Future[Seq[Int]] = { // First remove the metadata for the given RDD, and then asynchronously remove the blocks // from the slaves. @@ -207,10 +256,8 @@ class BlockManagerMasterEndpoint( } val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo => bmInfo.slaveEndpoint.ask[Int](removeMsg).recover { - case e: IOException => - logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " + - s"from block manager ${bmInfo.blockManagerId}", e) - 0 // zero blocks were removed + // use 0 as default value means no blocks were removed + handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0) } }.toSeq @@ -235,7 +282,10 @@ class BlockManagerMasterEndpoint( val removeMsg = RemoveShuffle(shuffleId) Future.sequence( blockManagerInfo.values.map { bm => - bm.slaveEndpoint.ask[Boolean](removeMsg) + bm.slaveEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } }.toSeq ) } @@ -252,10 +302,8 @@ class BlockManagerMasterEndpoint( } val futures = requiredBlockManagers.map { bm => bm.slaveEndpoint.ask[Int](removeMsg).recover { - case e: IOException => - logWarning(s"Error trying to remove broadcast $broadcastId from block manager " + - s"${bm.blockManagerId}", e) - 0 // zero blocks were removed + // use 0 as default value means no blocks were removed + handleBlockRemovalFailure("broadcast", broadcastId.toString, bm.blockManagerId, 0) } }.toSeq @@ -350,11 +398,14 @@ class BlockManagerMasterEndpoint( if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) - if (blockManager.isDefined) { + blockManager.foreach { bm => // Remove the block from the slave's BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. - blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) + bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover { + // use false as default value means no blocks were removed + handleBlockRemovalFailure("block", blockId.toString, bm.blockManagerId, false) + } } } } diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 7272b375e5388..0e4debc595345 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.concurrent.duration._ + import org.apache.spark.SparkConf import org.apache.spark.internal.config import org.apache.spark.internal.config.Network._ @@ -54,6 +56,14 @@ private[spark] object RpcUtils { RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") } + /** + * Infinite timeout is used internally, so there's no timeout configuration property that + * controls it. Therefore, we use "infinite" without any specific reason as its timeout + * configuration property. And its timeout property should never be accessed since infinite + * means we never timeout. + */ + val INFINITE_TIMEOUT = new RpcTimeout(Long.MaxValue.nanos, "infinite") + private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 /** Returns the configured max message size for messages in bytes. */ diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bfef8f1ab29d8..75e755f70ab0a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future +import scala.concurrent.{Future, TimeoutException} import scala.concurrent.duration._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -49,8 +49,9 @@ import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} -import org.apache.spark.rpc.RpcEnv +import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated} +import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -93,6 +94,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(MEMORY_STORAGE_FRACTION, 0.999) .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) + .set(Network.RPC_ASK_TIMEOUT, "5s") } private def makeBlockManager( @@ -137,8 +139,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf = new SparkConf(false) init(conf) - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", conf.get(config.DRIVER_HOST_ADDRESS), + conf.get(config.DRIVER_PORT), conf, securityMgr) conf.set(DRIVER_PORT, rpcEnv.address.port) + conf.set(DRIVER_HOST_ADDRESS, rpcEnv.address.host) // Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we // need to create a SparkContext is to initialize LiveListenerBus. @@ -177,6 +181,105 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE blockManager.stop() } + /** + * Setup driverEndpoint, executor-1(BlockManager), executor-2(BlockManager) to simulate + * the real cluster before the tests. Any requests from driver to executor-1 will be responded + * in time. However, any requests from driver to executor-2 will be timeouted, in order to test + * the specific handling of `TimeoutException`, which is raised at driver side. + * + * And, when `withLost` is true, we will not register the executor-2 to the driver. Therefore, + * it behaves like a lost executor in terms of driver's view. When `withLost` is false, we'll + * register the executor-2 normally. + */ + private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = { + // set up a simple DriverEndpoint which simply adds executorIds and + // checks whether a certain executorId has been added before. + val driverEndpoint = rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, + new RpcEndpoint { + private val executorSet = mutable.HashSet[String]() + override val rpcEnv: RpcEnv = this.rpcEnv + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) => + executorSet += executorId + context.reply(true) + case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) => + context.reply(executorSet.contains(executorId)) + } + } + ) + + def createAndRegisterBlockManager(timeout: Boolean): BlockManagerId = { + val id = if (timeout) "timeout" else "normal" + val bmRef = rpcEnv.setupEndpoint(s"bm-$id", new RpcEndpoint { + override val rpcEnv: RpcEnv = this.rpcEnv + private def reply[T](context: RpcCallContext, response: T): Unit = { + if (timeout) { + Thread.sleep(conf.getTimeAsMs(Network.RPC_ASK_TIMEOUT.key) + 1000) + } + context.reply(response) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RemoveRdd(_) => reply(context, 1) + case RemoveBroadcast(_, _) => reply(context, 1) + case RemoveShuffle(_) => reply(context, true) + } + }) + val bmId = BlockManagerId(s"exec-$id", "localhost", 1234, None) + master.registerBlockManager(bmId, Array.empty, 2000, 0, bmRef) + } + + // set up normal bm1 + val bm1Id = createAndRegisterBlockManager(false) + // set up bm2, which intentionally takes more time than RPC_ASK_TIMEOUT to + // remove rdd/broadcast/shuffle in order to raise timeout error + val bm2Id = createAndRegisterBlockManager(true) + + driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor( + bm1Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty, + Map.empty, 0)) + + if (!withLost) { + driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor( + bm2Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty, Map.empty, 0)) + } + + eventually(timeout(5.seconds)) { + // make sure both bm1 and bm2 are registered at driver side BlockManagerMaster + verify(master, times(2)) + .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + assert(driverEndpoint.askSync[Boolean]( + CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId))) + assert(driverEndpoint.askSync[Boolean]( + CoarseGrainedClusterMessages.IsExecutorAlive(bm2Id.executorId)) === !withLost) + } + + // update RDD block info for bm1 and bm2 (Broadcast and shuffle don't report block + // locations to BlockManagerMaster) + master.updateBlockInfo(bm1Id, RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100, 0) + master.updateBlockInfo(bm2Id, RDDBlockId(0, 1), StorageLevel.MEMORY_ONLY, 100, 0) + } + + test("SPARK-32091: count failures from active executors when remove rdd/broadcast/shuffle") { + setupBlockManagerMasterWithBlocks(false) + // fail because bm2 will timeout and it's not lost anymore + assert(intercept[Exception](master.removeRdd(0, true)) + .getCause.isInstanceOf[TimeoutException]) + assert(intercept[Exception](master.removeBroadcast(0, true, true)) + .getCause.isInstanceOf[TimeoutException]) + assert(intercept[Exception](master.removeShuffle(0, true)) + .getCause.isInstanceOf[TimeoutException]) + } + + test("SPARK-32091: ignore failures from lost executors when remove rdd/broadcast/shuffle") { + setupBlockManagerMasterWithBlocks(true) + // succeed because bm1 will remove rdd/broadcast successfully and bm2 will + // timeout but ignored as it's lost + master.removeRdd(0, true) + master.removeBroadcast(0, true, true) + master.removeShuffle(0, true) + } + test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, 3) // this should return the same object as level1