Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Aug 22, 2019

What changes were proposed in this pull request?

This PR proposes to avoid to thrown NPE at context cleaner when shuffle service is on - it is kind of a small followup of #24817

Seems like it sets null for shuffleIds to track when the service is on. Later, removeShuffle tries to remove an element at shuffleIds which leads to NPE. It fixes it by explicitly not sending the event (ShuffleCleanedEvent) in this case.

See the code path below:

doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}

override def shuffleCleaned(shuffleId: Int): Unit = {
// Because this is called in a completely separate thread, we post a custom event to the
// listener bus so that the internal state is safely updated.
listenerBus.post(ShuffleCleanedEvent(shuffleId))
}

case ShuffleCleanedEvent(id) => cleanupShuffle(id)

private def cleanupShuffle(id: Int): Unit = {
logDebug(s"Cleaning up state related to shuffle $id.")
shuffleToActiveJobs -= id
executors.asScala.foreach { case (_, exec) =>
exec.removeShuffle(id)
}
}

if (shuffleIds.remove(id) && shuffleIds.isEmpty) {

private val shuffleIds = if (shuffleTrackingEnabled) new mutable.HashSet[Int]() else null

Why are the changes needed?

This is a bug fix.

Does this PR introduce any user-facing change?

It prevents the exception:

19/08/21 06:44:01 ERROR AsyncEventQueue: Listener ExecutorMonitor threw an exception
java.lang.NullPointerException
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor$Tracker.removeShuffle(ExecutorMonitor.scala:479)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2(ExecutorMonitor.scala:408)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2$adapted(ExecutorMonitor.scala:407)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.cleanupShuffle(ExecutorMonitor.scala:407)
	at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.onOtherEvent(ExecutorMonitor.sc

How was this patch test?

Unittest was added.

@HyukjinKwon
Copy link
Member Author

@vanzin do you mind if I ask to take a look and see if it makes sense?


def removeShuffle(id: Int): Unit = {
if (shuffleIds.remove(id) && shuffleIds.isEmpty) {
if (shuffleIds != null && shuffleIds.remove(id) && shuffleIds.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is this only for 3.0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think so.

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109568 has finished for PR 25551 at commit 4363ef1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109570 has finished for PR 25551 at commit 4363ef1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109577 has finished for PR 25551 at commit 4363ef1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@HyukjinKwon HyukjinKwon changed the title [SPARK-28839][CORE] Avoids NPE in context cleaner when shuffle service is on [SPARK-28839][CORE] Avoids NPE in context cleaner when dynamic allocation and shuffle service are on Aug 23, 2019
@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109599 has finished for PR 25551 at commit 4f5de95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109637 has finished for PR 25551 at commit c8b0da9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #4839 has finished for PR 25551 at commit c8b0da9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member

wangyum commented Aug 23, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109646 has finished for PR 25551 at commit c8b0da9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging to master.

val bus = mockListenerBus()
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, true)
monitor = new ExecutorMonitor(conf, client, bus, clock) {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have used mockito's verify instead of this, but this is ok.

@vanzin vanzin closed this in d25cbd4 Aug 23, 2019
@HyukjinKwon
Copy link
Member Author

Thanks all!

@HyukjinKwon HyukjinKwon deleted the SPARK-28839 branch March 3, 2020 01:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants