Skip to content

Commit bb8dc88

Browse files
jinxingcmonkey
authored andcommitted
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry.
## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. **To reproduce**: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` **To fix**: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Author: jinxing <[email protected]> Closes apache#16690 from jinxing64/SPARK-19347.
1 parent 782ff82 commit bb8dc88

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
6363
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
6464

6565
/**
66-
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
67-
* timeout, or throw a SparkException if this fails even after the default number of retries.
66+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
67+
* default timeout, throw an exception if this fails.
68+
*
69+
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
70+
* loop of [[RpcEndpoint]].
71+
72+
* @param message the message to send
73+
* @tparam T type of the reply message
74+
* @return the reply message from the corresponding [[RpcEndpoint]]
75+
*/
76+
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
77+
78+
/**
79+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
80+
* specified timeout, throw an exception if this fails.
81+
*
82+
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
83+
* loop of [[RpcEndpoint]].
84+
*
85+
* @param message the message to send
86+
* @param timeout the timeout duration
87+
* @tparam T type of the reply message
88+
* @return the reply message from the corresponding [[RpcEndpoint]]
89+
*/
90+
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
91+
val future = ask[T](message, timeout)
92+
timeout.awaitResult(future)
93+
}
94+
95+
/**
96+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
97+
* default timeout, throw a SparkException if this fails even after the default number of retries.
6898
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
6999
* method retries, the message handling in the receiver side should be idempotent.
70100
*
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
75105
* @tparam T type of the reply message
76106
* @return the reply message from the corresponding [[RpcEndpoint]]
77107
*/
108+
@deprecated("use 'askSync' instead.", "2.2.0")
78109
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
79110

80111
/**
81-
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
112+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
82113
* specified timeout, throw a SparkException if this fails even after the specified number of
83114
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
84115
* retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
91122
* @tparam T type of the reply message
92123
* @return the reply message from the corresponding [[RpcEndpoint]]
93124
*/
125+
@deprecated("use 'askSync' instead.", "2.2.0")
94126
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
95127
// TODO: Consider removing multiple attempts
96128
var attempts = 0

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
159159
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
160160
val numRecords = blockStoreResult.numRecords
161161
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
162-
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
162+
trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
163163
logDebug(s"Reported block $blockId")
164164
}
165165

0 commit comments

Comments
 (0)