Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,6 @@ private[spark] object CoarseGrainedClusterMessages {
// Used internally by executors to shut themselves down.
case object Shutdown extends CoarseGrainedClusterMessage

// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Option(delegationTokens.get()),
rp)
context.reply(reply)

case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId))

case e =>
logError(s"Received unexpected ask ${e}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ class BlockManagerMaster(
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

Expand All @@ -153,7 +154,8 @@ class BlockManagerMaster(
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

Expand All @@ -166,7 +168,8 @@ class BlockManagerMaster(
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal

import com.google.common.cache.CacheBuilder

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

Expand Down Expand Up @@ -95,6 +97,9 @@ class BlockManagerMasterEndpoint(
private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)

private lazy val driverEndpoint =
RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
Expand Down Expand Up @@ -168,6 +173,50 @@ class BlockManagerMasterEndpoint(
stop()
}

/**
* A function that used to handle the failures when removing blocks. In general, the failure
* should be considered as non-fatal since it won't cause any correctness issue. Therefore,
* this function would prefer to log the exception and return the default value. We only throw
* the exception when there's a TimeoutException from an active executor, which implies the
* unhealthy status of the executor while the driver still not be aware of it.
* @param blockType should be one of "RDD", "shuffle", "broadcast", "block", used for log
* @param blockId the string value of a certain block id, used for log
* @param bmId the BlockManagerId of the BlockManager, where we're trying to remove the block
* @param defaultValue the return value of a failure removal. e.g., 0 means no blocks are removed
* @tparam T the generic type for defaultValue, Int or Boolean.
* @return the defaultValue or throw exception if the executor is active but reply late.
*/
private def handleBlockRemovalFailure[T](
blockType: String,
blockId: String,
bmId: BlockManagerId,
defaultValue: T): PartialFunction[Throwable, T] = {
case e: IOException =>
logWarning(s"Error trying to remove $blockType $blockId" +
s" from block manager $bmId", e)
defaultValue

case t: TimeoutException =>
val executorId = bmId.executorId
val isAlive = try {
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(executorId))
} catch {
// ignore the non-fatal error from driverEndpoint since the caller doesn't really
// care about the return result of removing blocks. And so we could avoid breaking
// down the whole application.
case NonFatal(e) =>
logError(s"Fail to know the executor $executorId is alive or not.", e)
false
}
if (!isAlive) {
logWarning(s"Error trying to remove $blockType $blockId. " +
s"The executor $executorId may have been lost.", t)
defaultValue
} else {
throw t
}
}

private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the slaves.
Expand Down Expand Up @@ -207,10 +256,8 @@ class BlockManagerMasterEndpoint(
}
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " +
s"from block manager ${bmInfo.blockManagerId}", e)
0 // zero blocks were removed
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
}
}.toSeq

Expand All @@ -235,7 +282,10 @@ class BlockManagerMasterEndpoint(
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Boolean](removeMsg)
bm.slaveEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
}
}.toSeq
)
}
Expand All @@ -252,10 +302,8 @@ class BlockManagerMasterEndpoint(
}
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
s"${bm.blockManagerId}", e)
0 // zero blocks were removed
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("broadcast", broadcastId.toString, bm.blockManagerId, 0)
}
}.toSeq

Expand Down Expand Up @@ -350,11 +398,14 @@ class BlockManagerMasterEndpoint(
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
blockManager.foreach { bm =>
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
// use false as default value means no blocks were removed
handleBlockRemovalFailure("block", blockId.toString, bm.blockManagerId, false)
}
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.util

import scala.concurrent.duration._

import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Network._
Expand Down Expand Up @@ -54,6 +56,14 @@ private[spark] object RpcUtils {
RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
}

/**
* Infinite timeout is used internally, so there's no timeout configuration property that
* controls it. Therefore, we use "infinite" without any specific reason as its timeout
* configuration property. And its timeout property should never be accessed since infinite
* means we never timeout.
*/
val INFINITE_TIMEOUT = new RpcTimeout(Long.MaxValue.nanos, "infinite")

private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024

/** Returns the configured max message size for messages in bytes. */
Expand Down
109 changes: 106 additions & 3 deletions core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.reflect.ClassTag
Expand All @@ -49,8 +49,9 @@ import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport
import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated}
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
Expand Down Expand Up @@ -93,6 +94,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(MEMORY_STORAGE_FRACTION, 0.999)
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
.set(Network.RPC_ASK_TIMEOUT, "5s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the newly added tests, we need to simulate the timeout error from BlockManager. But at the same time, we also don't want the test run too long since the default timeout value is 120s. Therefore, we choose a quite short timeout for the tests. On the other hand, we don't set it to a smaller value, e.g. 1s, which may cause test flaky.

Note that the best way to set the timeout value is to set it for the newly added tests locally instead of setting it globally. However, with the limitation of the current test framework in Core side, it's hard to set it locally since it requires more changes.

}

private def makeBlockManager(
Expand Down Expand Up @@ -137,8 +139,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf = new SparkConf(false)
init(conf)

rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
rpcEnv = RpcEnv.create("test", conf.get(config.DRIVER_HOST_ADDRESS),
conf.get(config.DRIVER_PORT), conf, securityMgr)
conf.set(DRIVER_PORT, rpcEnv.address.port)
conf.set(DRIVER_HOST_ADDRESS, rpcEnv.address.host)

// Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we
// need to create a SparkContext is to initialize LiveListenerBus.
Expand Down Expand Up @@ -177,6 +181,105 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
blockManager.stop()
}

/**
* Setup driverEndpoint, executor-1(BlockManager), executor-2(BlockManager) to simulate
* the real cluster before the tests. Any requests from driver to executor-1 will be responded
* in time. However, any requests from driver to executor-2 will be timeouted, in order to test
* the specific handling of `TimeoutException`, which is raised at driver side.
*
* And, when `withLost` is true, we will not register the executor-2 to the driver. Therefore,
* it behaves like a lost executor in terms of driver's view. When `withLost` is false, we'll
* register the executor-2 normally.
*/
private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = {
// set up a simple DriverEndpoint which simply adds executorIds and
// checks whether a certain executorId has been added before.
val driverEndpoint = rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
new RpcEndpoint {
private val executorSet = mutable.HashSet[String]()
override val rpcEnv: RpcEnv = this.rpcEnv
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) =>
executorSet += executorId
context.reply(true)
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
context.reply(executorSet.contains(executorId))
}
}
)

def createAndRegisterBlockManager(timeout: Boolean): BlockManagerId = {
val id = if (timeout) "timeout" else "normal"
val bmRef = rpcEnv.setupEndpoint(s"bm-$id", new RpcEndpoint {
override val rpcEnv: RpcEnv = this.rpcEnv
private def reply[T](context: RpcCallContext, response: T): Unit = {
if (timeout) {
Thread.sleep(conf.getTimeAsMs(Network.RPC_ASK_TIMEOUT.key) + 1000)
}
context.reply(response)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RemoveRdd(_) => reply(context, 1)
case RemoveBroadcast(_, _) => reply(context, 1)
case RemoveShuffle(_) => reply(context, true)
}
})
val bmId = BlockManagerId(s"exec-$id", "localhost", 1234, None)
master.registerBlockManager(bmId, Array.empty, 2000, 0, bmRef)
}

// set up normal bm1
val bm1Id = createAndRegisterBlockManager(false)
// set up bm2, which intentionally takes more time than RPC_ASK_TIMEOUT to
// remove rdd/broadcast/shuffle in order to raise timeout error
val bm2Id = createAndRegisterBlockManager(true)

driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor(
bm1Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty,
Map.empty, 0))

if (!withLost) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does withLost means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always set up three roles, driver, executor-1, executor-2 before these two tests. withLost here indicates whether to initialize the executor-2 as a lost executor in terms of driver's view.

driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.RegisterExecutor(
bm2Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty, Map.empty, 0))
}

eventually(timeout(5.seconds)) {
// make sure both bm1 and bm2 are registered at driver side BlockManagerMaster
verify(master, times(2))
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
assert(driverEndpoint.askSync[Boolean](
CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId)))
assert(driverEndpoint.askSync[Boolean](
CoarseGrainedClusterMessages.IsExecutorAlive(bm2Id.executorId)) === !withLost)
}

// update RDD block info for bm1 and bm2 (Broadcast and shuffle don't report block
// locations to BlockManagerMaster)
master.updateBlockInfo(bm1Id, RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100, 0)
master.updateBlockInfo(bm2Id, RDDBlockId(0, 1), StorageLevel.MEMORY_ONLY, 100, 0)
}

test("SPARK-32091: count failures from active executors when remove rdd/broadcast/shuffle") {
setupBlockManagerMasterWithBlocks(false)
// fail because bm2 will timeout and it's not lost anymore
assert(intercept[Exception](master.removeRdd(0, true))
.getCause.isInstanceOf[TimeoutException])
assert(intercept[Exception](master.removeBroadcast(0, true, true))
.getCause.isInstanceOf[TimeoutException])
assert(intercept[Exception](master.removeShuffle(0, true))
.getCause.isInstanceOf[TimeoutException])
}

test("SPARK-32091: ignore failures from lost executors when remove rdd/broadcast/shuffle") {
setupBlockManagerMasterWithBlocks(true)
// succeed because bm1 will remove rdd/broadcast successfully and bm2 will
// timeout but ignored as it's lost
master.removeRdd(0, true)
master.removeBroadcast(0, true, true)
master.removeShuffle(0, true)
}

test("StorageLevel object caching") {
val level1 = StorageLevel(false, false, false, 3)
// this should return the same object as level1
Expand Down