Skip to content

Conversation

@scwf
Copy link
Contributor

@scwf scwf commented Oct 14, 2016

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now CoarseGrainedSchedulerBackend reset will get the lock,

  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      driverEndpoint.askWithRetry[Boolean](
        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
    }
  }

but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

   private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
     ...

## How was this patch tested?

manual test.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66957 has finished for PR 15481 at commit 3681fae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Oct 14, 2016

@zsxwing or @andrewor14 might know best on this one.

@mridulm
Copy link
Contributor

mridulm commented Oct 14, 2016

Would be cleaner to simply copy executorDataMap.keys and works off that to ensure there is no coupling between actor thread and invoker.

@jerryshao
Copy link
Contributor

LGTM, sorry to bring in deadlock issue.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67054 has finished for PR 15481 at commit 2997ccb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor Author

scwf commented Oct 17, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67067 has finished for PR 15481 at commit 2997ccb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor Author

scwf commented Oct 18, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67105 has finished for PR 15481 at commit 2997ccb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset is called in YarnSchedulerEndpoint which ideally should not be a blocking action.

@jerryshao Can we just make it fire and forget?

// Note: here copy the code of remove executor from DriverEndpoint to avoid deadlock(reset
// and removeExecutor both to get the lock of schedulerbackend.)
val reason = SlaveLost("Stale executor after cluster manager re-registered.")
executorDataMap.toMap.foreach { case (executorId, executorInfo) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorDataMap should not be modified outside DriverEndpoint. See the comment for executorDataMap.

@volatile protected var currentExecutorIdCounter = 0

// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they were in DriverEndpoint to make sure they won't be accessed outside DriverEndpoint since they are not thread-safe. This changes break it.

@mridulm
Copy link
Contributor

mridulm commented Oct 18, 2016

@scwf I think the initial fix with a small change might be sufficient.
What I meant was something like this :

protected def reset(): Unit = {
  val executors = synchronized {
    Set() ++ executorDataMap.keys
  }

  executors.foreach ( eid => driverEndpoint.askWithRetry[Boolean](
    RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}

This will keep the synchronized block restricted to copying the executor keys, and leave the rest as-is removing the deadlock. Thoughts ?

@zsxwing
Copy link
Member

zsxwing commented Oct 18, 2016

reset is called in YarnSchedulerEndpoint which ideally should not be a blocking action.

I'm wondering if we can also fix this.

@mridulm
Copy link
Contributor

mridulm commented Oct 18, 2016

@zsxwing Ah, then simply making it send() instead of askWithRetry() should do, no ?
That was actually in the initial PR - I was not sure if we want to change the behavior from askWithRetry to send ...

@jerryshao
Copy link
Contributor

Seems it could be changed to send instead.

@scwf
Copy link
Contributor Author

scwf commented Oct 19, 2016

ok, i will revert to the initial commit.

// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
driverEndpoint.send(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When changed to send, we don't do retry anymore. It may become less tolerant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only be invoked in yarn client mode when AM is failed and some lingering executors are existed, such situation may not be happened normally as I know. So I think it should be OK to call send.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is actually just sending the message to the same process. retry is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also IIUC driverEndpoint is a in-process endpoint, so it should be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is synchronizing on the entire method which is not required - we need to restrict synchronized block to the subset which is actually required - the copying of the executors to remove.

The snippet I posted earlier does the same - can you please modify it accordingly ?
If we had coded it that way initially, this bug wouldn't have existed to begin with (with or without send/askWithRetry)

Copy link
Member

@viirya viirya Oct 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we change this to send, don't we need to move the handling of RemoveExecutor from receiveAndReply to receive in DriverEndpoint?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think here we can just call removeExecutor(...).

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67155 has finished for PR 15481 at commit af6072a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor Author

scwf commented Oct 19, 2016

Updated, can you review again?

@mridulm
Copy link
Contributor

mridulm commented Oct 19, 2016

LGTM, @zsxwing any comments ?

@mridulm
Copy link
Contributor

mridulm commented Oct 19, 2016

BTW, it was interesting that the earlier change did not trigger a test failure (the issue @viirya pointed out - about needing to move RemoveExecutor to receive)

@viirya
Copy link
Member

viirya commented Oct 19, 2016

@mridulm I checked #9963 and looks like we don't test against CoarseGrainedSchedulerBackend.reset.

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67173 has finished for PR 15481 at commit 7d86054.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))?

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2016

Sorry that my comment was unclear. I meant we can just do the following changes:

  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
    }
  }

@mridulm
Copy link
Contributor

mridulm commented Oct 19, 2016

@zsxwing To minimize scope of synchronized block.
The way @scwf has now, the synchronized block is limited to duplicating key and setting some state.
Remaining can happen outside of the lock.

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2016

@mridulm ask is very cheap. It just puts the serialized message into a buffer. The current codes now need to duplicate the codes and as @viirya pointed out, it misses executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)).

How about just the following changes? I agreed that removeExecutor doesn't need to be in the synchronized block.

  protected def reset(): Unit = {
    val executors = synchronized {
      numPendingExecutors = 0
      executorsPendingToRemove.clear()
      Set() ++ executorDataMap.keys
    }

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executors.foreach { eid =>
      removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
    }
  }

@mridulm
Copy link
Contributor

mridulm commented Oct 19, 2016

@zsxwing I think the issue is that case RemoveExecutor() is not identical to what exists in receiveAndReply - which it should be.
We should pull that out into a single method and have both pat match delegate to it.
Any reason 'executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))' is missing from case RemoveExecutor(executorId, reason) in receive ?

If we add that in there, will it not be sufficient ?
I dont want to call removeExecutor from outside the event loop.

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2016

I meant CoarseGrainedSchedulerBackend.removeExecutor not DriverEndpoint.removeExecutor. It's confusing that we have two methods having the same name :(

@mridulm
Copy link
Contributor

mridulm commented Oct 19, 2016

Ah ! Apologies, I got confused. Yes, I agree, that is a better approach.

It also means we can get rid of the RemoveExecutor pattern match from receive right ? As it stands now, that looks buggy.

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2016

It also means we can get rid of the RemoveExecutor pattern match from receive right ?

yep

@scwf
Copy link
Contributor Author

scwf commented Oct 21, 2016

CoarseGrainedSchedulerBackend.removeExecutor also use ask, but it does not matter right? because it just send msg once and log the error if failure. And if we use CoarseGrainedSchedulerBackend.removeExecutor, the removeExecutor should not in the synchronized block.

@SparkQA
Copy link

SparkQA commented Oct 21, 2016

Test build #67312 has finished for PR 15481 at commit 7bf3bf8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 21, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 21, 2016

Test build #67322 has finished for PR 15481 at commit 7bf3bf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 21, 2016

LGTM now.

@zsxwing
Copy link
Member

zsxwing commented Oct 21, 2016

LGTM. Merging to master and 2.0. Thanks!

@asfgit asfgit closed this in c1f344f Oct 21, 2016
asfgit pushed a commit that referenced this pull request Oct 21, 2016
…eset

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      driverEndpoint.askWithRetry[Boolean](
        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
    }
  }
```
 but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

```
   private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
     ...

## How was this patch tested?

manual test.

Author: w00228970 <[email protected]>

Closes #15481 from scwf/spark-17929.

(cherry picked from commit c1f344f)
Signed-off-by: Shixiong Zhu <[email protected]>
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…eset

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      driverEndpoint.askWithRetry[Boolean](
        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
    }
  }
```
 but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

```
   private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
     ...

## How was this patch tested?

manual test.

Author: w00228970 <[email protected]>

Closes apache#15481 from scwf/spark-17929.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…eset

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
  protected def reset(): Unit = synchronized {
    numPendingExecutors = 0
    executorsPendingToRemove.clear()

    // Remove all the lingering executors that should be removed but not yet. The reason might be
    // because (1) disconnected event is not yet received; (2) executors die silently.
    executorDataMap.toMap.foreach { case (eid, _) =>
      driverEndpoint.askWithRetry[Boolean](
        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
    }
  }
```
 but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

```
   private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
      logDebug(s"Asked to remove executor $executorId with reason $reason")
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
            addressToExecutorId -= executorInfo.executorAddress
            executorDataMap -= executorId
            executorsPendingLossReason -= executorId
            executorsPendingToRemove.remove(executorId).getOrElse(false)
          }
     ...

## How was this patch tested?

manual test.

Author: w00228970 <[email protected]>

Closes apache#15481 from scwf/spark-17929.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants