diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 2a5b91f364b2c..aa901d6568b26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -355,9 +355,12 @@ private[spark] class ExecutorMonitor( override def rddCleaned(rddId: Int): Unit = { } override def shuffleCleaned(shuffleId: Int): Unit = { - // Because this is called in a completely separate thread, we post a custom event to the - // listener bus so that the internal state is safely updated. - listenerBus.post(ShuffleCleanedEvent(shuffleId)) + // Only post the event if tracking is enabled + if (shuffleTrackingEnabled) { + // Because this is called in a completely separate thread, we post a custom event to the + // listener bus so that the internal state is safely updated. + listenerBus.post(ShuffleCleanedEvent(shuffleId)) + } } override def broadcastCleaned(broadcastId: Long): Unit = { } diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index e9330f8c22cef..d3feb35537b34 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -333,6 +333,19 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) } + + test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is on") { + val bus = mockListenerBus() + conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, true) + monitor = new ExecutorMonitor(conf, client, bus, clock) { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + throw new IllegalStateException("No event should be sent.") + } + } + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.shuffleCleaned(0) + } + test("shuffle tracking with multiple executors and concurrent jobs") { val bus = mockListenerBus() conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)