File tree Expand file tree Collapse file tree 1 file changed +6
-4
lines changed
core/src/main/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 1 file changed +6
-4
lines changed Original file line number Diff line number Diff line change @@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator {
156156 override val rpcEnv : RpcEnv , outputCommitCoordinator : OutputCommitCoordinator )
157157 extends RpcEndpoint with Logging {
158158
159+ override def receive : PartialFunction [Any , Unit ] = {
160+ case StopCoordinator =>
161+ logInfo(" OutputCommitCoordinator stopped!" )
162+ stop()
163+ }
164+
159165 override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
160166 case AskPermissionToCommitOutput (stage, partition, taskAttempt) =>
161167 context.reply(
162168 outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
163- case StopCoordinator =>
164- logInfo(" OutputCommitCoordinator stopped!" )
165- context.reply(true )
166- stop()
167169 }
168170 }
169171}
You can’t perform that action at this time.
0 commit comments