Skip to content

SPARK-23660: Fix exception in yarn cluster mode when application ended fast#20807

Closed
gaborgsomogyi wants to merge 5 commits intoapache:masterfrom
gaborgsomogyi:SPARK-23660
Closed

SPARK-23660: Fix exception in yarn cluster mode when application ended fast#20807
gaborgsomogyi wants to merge 5 commits intoapache:masterfrom
gaborgsomogyi:SPARK-23660

Conversation

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

@gaborgsomogyi gaborgsomogyi commented Mar 13, 2018

What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is really small:

18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception: 
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
	at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
	at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
	at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
	... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )

Example application:

object ExampleApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ExampleApp")
    val sc = new SparkContext(conf)
    try {
      // Do nothing
    } finally {
      sc.stop()
    }
  }

This PR pauses user class thread after SparkContext created and keeps it so until application master initialises properly.

How was this patch tested?

Automated: Existing unit tests
Manual: Application submitted into small cluster

@SparkQA
Copy link
Copy Markdown

SparkQA commented Mar 13, 2018

Test build #88193 has finished for PR 20807 at commit 114ac05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val executorId = (initialExecutorIdCounter + executorIdCounter).toString
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a bit strange to me to "add" the Ids?
@vanzin @jerryshao

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial problem was that initialExecutorIdCounter is coming from the driver which is already stopped. Making this lazy solved this. The other integer is necessary because make it lazy var is not possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the point of fix. But also it seems a little strange to me.

Besides, do we really need to fix your issue? As I know the case here is not a normal one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is an issue only when the application is quite fast. Do you have concerns in general solving this or related the fix in the first commit? Asking it because pausing the user class thread would be definitely better as I've written below.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor Author

I've executed more invasive tests on the cluster and this PR didn't solve all the issues.

As another not so invasive approach tried to catch the exception in runDriver but failed another way because ApplicationMaster halfway initialised.

I see that the only option is to pause the userClassThread until the initialisation finishes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Mar 13, 2018

Test build #88202 has finished for PR 20807 at commit 442cfb4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Copy Markdown
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestions for making things a bit clearer, otherwise looks good.

}

private def sparkContextInitialized(sc: SparkContext) = {
private def sparkContextInitialized(sc: SparkContext) = synchronized {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other code in this class uses synchronization on this, so I think it would be better to synchronize on sparkContextPromise in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used sparkContextPromise as lock.

throw new IllegalStateException("User did not initialize spark context!")
}
// After initialisation notify user class thread to continue
synchronized { notify() }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have to do this in two places, I'd create a method (e.g. resumeDriver) close to where sparkContextInitialized is declared, so that it's easier to find the context of why this is needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into resumeDriver function right below sparkContextInitialized.

// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
// After initialisation notify user class thread to continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rest of the code uses American spelling ("initialization"), so this should be consistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and switched to US spell checker.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove this comment and add one to the resumeDriver method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Mar 16, 2018

Test build #88289 has finished for PR 20807 at commit 5b304d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialization in runDriver function.
// When it happened the thread has to be resumed with resumeDriver function.
sparkContextPromise.synchronized {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, there's a race now.

You're updating the promise outside the lock, so it's possible that the runDriver thread can see that and notify the lock before this thread grabs it, so this thread would hang forever in that case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Mar 16, 2018

Test build #88319 has finished for PR 20807 at commit 284ed68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor Author

@vanzin sorry, one useless comment left in the code. Just removed.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Mar 16, 2018

Test build #88326 has finished for PR 20807 at commit 33ca59d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Copy Markdown
Contributor

vanzin commented Mar 20, 2018

Merging to master / 2.3.

@asfgit asfgit closed this in 5f4deff Mar 20, 2018
asfgit pushed a commit that referenced this pull request Mar 20, 2018
…ed fast

## What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is really small:

```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
	at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
	at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
	at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
	... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )
```

Example application:

```
object ExampleApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ExampleApp")
    val sc = new SparkContext(conf)
    try {
      // Do nothing
    } finally {
      sc.stop()
    }
  }
```

This PR pauses user class thread after `SparkContext` created and keeps it so until application master initialises properly.

## How was this patch tested?

Automated: Existing unit tests
Manual: Application submitted into small cluster

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20807 from gaborgsomogyi/SPARK-23660.

(cherry picked from commit 5f4deff)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…ed fast

## What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is really small:

```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
	at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
	at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
	at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
	... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )
```

Example application:

```
object ExampleApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ExampleApp")
    val sc = new SparkContext(conf)
    try {
      // Do nothing
    } finally {
      sc.stop()
    }
  }
```

This PR pauses user class thread after `SparkContext` created and keeps it so until application master initialises properly.

## How was this patch tested?

Automated: Existing unit tests
Manual: Application submitted into small cluster

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes apache#20807 from gaborgsomogyi/SPARK-23660.

(cherry picked from commit 5f4deff)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants