-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32091][CORE] Ignore timeout error when remove blocks on the lost executor #28924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32091][CORE] Ignore timeout error when remove blocks on the lost executor #28924
Conversation
|
Test build #124490 has finished for PR 28924 at commit
|
|
Retest this please. |
|
Test build #124606 has finished for PR 28924 at commit
|
|
retest this please. |
|
Test build #124620 has finished for PR 28924 at commit
|
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, I left some minor concerns and questions when you've got a chance.
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
| .set(MEMORY_STORAGE_FRACTION, 0.999) | ||
| .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") | ||
| .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) | ||
| .set(Network.RPC_ASK_TIMEOUT, "5s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason why 5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the newly added tests, we need to simulate the timeout error from BlockManager. But at the same time, we also don't want the test run too long since the default timeout value is 120s. Therefore, we choose a quite short timeout for the tests. On the other hand, we don't set it to a smaller value, e.g. 1s, which may cause test flaky.
Note that the best way to set the timeout value is to set it for the newly added tests locally instead of setting it globally. However, with the limitation of the current test framework in Core side, it's hard to set it locally since it requires more changes.
|
Also for |
|
@holdenk Thanks for review. I've also updated the PR description for the user-facing part. |
|
ping @tgravescs @jiangxb1987 Could you also take a look? thanks! |
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, few nits
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
Outdated
Show resolved
Hide resolved
|
Test build #124660 has finished for PR 28924 at commit
|
| bm.slaveEndpoint.ask[Boolean](removeMsg) | ||
| bm.slaveEndpoint.ask[Boolean](removeMsg).recover { | ||
| // use false as default value means no shuffle data were removed | ||
| handleFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we were not swallowing IOException here for removing shuffle blocks? But handleFailure method will start swallowing it? is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may miss it previously. The failure of removing blocks should be considered as non-fatal. For example, it's not worth failing the whole application if we fail to remove some blocks just after some heavy computation.
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
|
thanks for the review. I've updated the PR. Please take another look:) |
|
Test build #124903 has finished for PR 28924 at commit
|
|
test this please |
|
Test build #125103 has finished for PR 28924 at commit
|
|
retest this please |
|
cc @cloud-fan Could you also take a look? Thanks. |
|
Test build #125169 has finished for PR 28924 at commit
|
|
retest this please |
|
Test build #125186 has finished for PR 28924 at commit
|
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
| bm1Id.executorId, null, bm1Id.host, 1, Map.empty, Map.empty, | ||
| Map.empty, 0)) | ||
|
|
||
| if (!withLost) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does withLost means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always set up three roles, driver, executor-1, executor-2 before these two tests. withLost here indicates whether to initialize the executor-2 as a lost executor in terms of driver's view.
|
Test build #125568 has finished for PR 28924 at commit
|
|
Test build #125578 has finished for PR 28924 at commit
|
|
thanks, merging to master! |
|
thanks all!! |
….timeout` config
### What changes were proposed in this pull request?
SparkContext stop stuck on ContextCleaner
```
25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils:
14 Driver BLOCKED Blocked by Thread 60 Lock(org.apache.spark.ContextCleaner1726738661})
org.apache.spark.ContextCleaner.stop(ContextCleaner.scala:145)
org.apache.spark.SparkContext.$anonfun$stop$9(SparkContext.scala:2094)
org.apache.spark.SparkContext.$anonfun$stop$9$adapted(SparkContext.scala:2094)
org.apache.spark.SparkContext$$Lambda$5309/807013918.apply(Unknown Source)
scala.Option.foreach(Option.scala:407)
org.apache.spark.SparkContext.$anonfun$stop$8(SparkContext.scala:2094)
org.apache.spark.SparkContext$$Lambda$5308/1445921225.apply$mcV$sp(Unknown Source)
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1512)
org.apache.spark.SparkContext.stop(SparkContext.scala:2094)
org.apache.spark.SparkContext.stop(SparkContext.scala:2050)
org.apache.spark.sql.SparkSession.stop(SparkSession.scala:718)
com.shopee.data.content.ods.live_performance.Main$.main(Main.scala:62)
com.shopee.data.content.ods.live_performance.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:751)
```
ContextCleaner stop() will wait lock
```
def stop(): Unit = {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
synchronized {
cleaningThread.interrupt()
}
cleaningThread.join()
periodicGCService.shutdown()
}
```
, but one call on keepCleaning() hold the lock
```
25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils:
60 Spark Context Cleaner TIMED_WAITING Monitor(org.apache.spark.ContextCleaner1726738661})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:294)
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:194)
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:351)
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:78)
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:254)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:204)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
org.apache.spark.ContextCleaner$$Lambda$1178/1994584033.apply(Unknown Source)
scala.Option.foreach(Option.scala:407)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195) => holding Monitor(org.apache.spark.ContextCleaner1726738661})
org.apache.spark.ContextCleaner$$Lambda$1109/1496842179.apply$mcV$sp(Unknown Source)
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1474)
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
```
BlockManager stuck on removeBroadcast RpcUtils.INFINITE_TIMEOUT.awaitResult(future) 【PR #28924 change here】
```
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
```
For such case only reason should be RPC was missing handling
Driver OOM or A thread leak in yarn nm prevents the creation of new threads to handle RPC.
```
25/11/05 08:16:22 ERROR [metrics-paimon-push-gateway-reporter-2-thread-1] ScheduledReporter: Exception thrown from PushGatewayReporter#report. Exception was suppressed.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1115)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1388)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1416)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1400)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:243)
at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:134)
at org.apache.paimon.metrics.reporter.PushGatewayReporter.report(PushGatewayReporter.java:84)
at com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:253)
at com.codahale.metrics.ScheduledReporter.lambda$start$0(ScheduledReporter.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
For such case we can add a customized wait timeout here to avoid forever stuck
then whole process stuck on here
### Why are the changes needed?
Avoid app stuck
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52919 from AngersZhuuuu/SPARK-54219.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
….timeout` config
### What changes were proposed in this pull request?
SparkContext stop stuck on ContextCleaner
```
25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils:
14 Driver BLOCKED Blocked by Thread 60 Lock(org.apache.spark.ContextCleaner1726738661})
org.apache.spark.ContextCleaner.stop(ContextCleaner.scala:145)
org.apache.spark.SparkContext.$anonfun$stop$9(SparkContext.scala:2094)
org.apache.spark.SparkContext.$anonfun$stop$9$adapted(SparkContext.scala:2094)
org.apache.spark.SparkContext$$Lambda$5309/807013918.apply(Unknown Source)
scala.Option.foreach(Option.scala:407)
org.apache.spark.SparkContext.$anonfun$stop$8(SparkContext.scala:2094)
org.apache.spark.SparkContext$$Lambda$5308/1445921225.apply$mcV$sp(Unknown Source)
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1512)
org.apache.spark.SparkContext.stop(SparkContext.scala:2094)
org.apache.spark.SparkContext.stop(SparkContext.scala:2050)
org.apache.spark.sql.SparkSession.stop(SparkSession.scala:718)
com.shopee.data.content.ods.live_performance.Main$.main(Main.scala:62)
com.shopee.data.content.ods.live_performance.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:751)
```
ContextCleaner stop() will wait lock
```
def stop(): Unit = {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
synchronized {
cleaningThread.interrupt()
}
cleaningThread.join()
periodicGCService.shutdown()
}
```
, but one call on keepCleaning() hold the lock
```
25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils:
60 Spark Context Cleaner TIMED_WAITING Monitor(org.apache.spark.ContextCleaner1726738661})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:294)
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:194)
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:351)
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:78)
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:254)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:204)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
org.apache.spark.ContextCleaner$$Lambda$1178/1994584033.apply(Unknown Source)
scala.Option.foreach(Option.scala:407)
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195) => holding Monitor(org.apache.spark.ContextCleaner1726738661})
org.apache.spark.ContextCleaner$$Lambda$1109/1496842179.apply$mcV$sp(Unknown Source)
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1474)
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
```
BlockManager stuck on removeBroadcast RpcUtils.INFINITE_TIMEOUT.awaitResult(future) 【PR apache#28924 change here】
```
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
```
For such case only reason should be RPC was missing handling
Driver OOM or A thread leak in yarn nm prevents the creation of new threads to handle RPC.
```
25/11/05 08:16:22 ERROR [metrics-paimon-push-gateway-reporter-2-thread-1] ScheduledReporter: Exception thrown from PushGatewayReporter#report. Exception was suppressed.
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1115)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1388)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1416)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1400)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
at io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:243)
at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:134)
at org.apache.paimon.metrics.reporter.PushGatewayReporter.report(PushGatewayReporter.java:84)
at com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:253)
at com.codahale.metrics.ScheduledReporter.lambda$start$0(ScheduledReporter.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
For such case we can add a customized wait timeout here to avoid forever stuck
then whole process stuck on here
### Why are the changes needed?
Avoid app stuck
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No
Closes apache#52919 from AngersZhuuuu/SPARK-54219.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR adds the check to see whether the executor is lost (by asking the
CoarseGrainedSchedulerBackend) after timeout error raised inBlockManagerMasterEndponitdue to removing blocks(e.g. RDD, broadcast, shuffle). If the executor is lost, we will ignore the error. Otherwise, throw the error.Why are the changes needed?
When removing blocks(e.g. RDD, broadcast, shuffle),
BlockManagerMaserEndpointwill make RPC calls to each knownBlockManagerSlaveEndpointto remove the specific blocks. The PRC call sometimes could end in a timeout when the executor has been lost, but only notified theBlockManagerMasterEndpointafter the removing call has already happened. The timeout error could therefore fail the whole job.In this case, we actually could just ignore the error since those blocks on the lost executor could be considered as removed already.
Does this PR introduce any user-facing change?
Yes. In case of users hits this issue, they will have the job executed successfully instead of throwing the exception.
How was this patch tested?
Added unit tests.