Skip to content

Commit 479e64e

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6314] Make sure registerExecutor event must be handled in HeartBeatReceiver (#1097)
1 parent a726a47 commit 479e64e

File tree

3 files changed

+6
-8
lines changed

3 files changed

+6
-8
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,6 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
175175
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
176176
}
177177

178-
/**
179-
* If the heartbeat receiver is not stopped, notify it of executor registrations.
180-
*/
181-
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
182-
addExecutor(executorAdded.executorId)
183-
}
184-
185178
/**
186179
* Send ExecutorRemoved to the event loop to remove an executor. Only for test.
187180
*

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ class SparkContext(config: SparkConf) extends Logging {
262262
def master: String = _conf.get("spark.master")
263263
def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
264264
def appName: String = _conf.get("spark.app.name")
265+
def heartbeatReceiverRef: RpcEndpointRef = _heartbeatReceiver
265266

266267
private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
267268
private[spark] def eventLogDir: Option[URI] = _eventLogDir

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.util.Failure
2929

3030
import org.apache.hadoop.security.UserGroupInformation
3131

32-
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
32+
import org.apache.spark.{ExecutorAllocationClient, ExecutorRegistered, HeartbeatReceiver, SparkEnv, SparkException, TaskState}
3333
import org.apache.spark.deploy.SparkHadoopUtil
3434
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
3535
import org.apache.spark.executor.ExecutorLogUrlHandler
@@ -150,6 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
150150
@volatile
151151
protected var lastOfferWarnLogTime = 0L
152152

153+
private val heartbeatReceiverRef = scheduler.sc.heartbeatReceiverRef
154+
153155
class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
154156

155157
override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
@@ -298,6 +300,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
298300
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
299301
}
300302
}
303+
// notify the heartbeatReceiver the executor is registered
304+
heartbeatReceiverRef.ask[Boolean](ExecutorRegistered(executorId))
301305
listenerBus.post(
302306
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
303307
// Note: some tests expect the reply to come after we put the executor in the map

0 commit comments

Comments
 (0)