From e7002c13d0f9f19bcbec3de75560abf53f903bb2 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 24 Jan 2017 17:33:23 +0800 Subject: [PATCH 1/2] [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry --- .../org/apache/spark/rpc/RpcEndpointRef.scala | 39 +++++++++++++++++-- .../receiver/ReceiverSupervisorImpl.scala | 2 +- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 994e18676ec4..cc358eff5db1 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -19,6 +19,7 @@ package org.apache.spark.rpc import scala.concurrent.Future import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -63,8 +64,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) /** - * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default - * timeout, or throw a SparkException if this fails even after the default number of retries. + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, throw an exception if this fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + + * @param message the message to send + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout) + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * specified timeout, throw an exception if this fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @param timeout the timeout duration + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { + val future = ask[T](message, timeout) + timeout.awaitResult(future) + } + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, throw a SparkException if this fails even after the default number of retries. * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * @@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.1.0") def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout) /** - * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a * specified timeout, throw a SparkException if this fails even after the specified number of * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. @@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.1.0") def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { // TODO: Consider removing multiple attempts var attempts = 0 diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index eca7c79465c6..722024b8a6d5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) - trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) + trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } From 42eb540f8617ee3dc22c81014452896eedec97a3 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Feb 2017 11:46:50 +0800 Subject: [PATCH 2/2] Fix versions. --- .../src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index cc358eff5db1..a5778876d490 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -19,7 +19,6 @@ package org.apache.spark.rpc import scala.concurrent.Future import scala.reflect.ClassTag -import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -106,7 +105,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - @deprecated("use 'askSync' instead.", "2.1.0") + @deprecated("use 'askSync' instead.", "2.2.0") def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout) /** @@ -123,7 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - @deprecated("use 'askSync' instead.", "2.1.0") + @deprecated("use 'askSync' instead.", "2.2.0") def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { // TODO: Consider removing multiple attempts var attempts = 0