Skip to content

Conversation

@wangshuo128
Copy link
Contributor

What changes were proposed in this pull request?

PR #21356 stop AsyncEventQueue when interrupted in postToAll.
However, if it's interrupted in AsyncEventQueue#dispatch, SparkContext would be stopped.
This PR proposes to stop AsyncEventQueue when interrupted in dispatch, rather than stop the SparkContext.

Why are the changes needed?

Avoid stopping the SparkContext when interrupted in AsyncEventQueue#dispatch.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT.

@wangshuo128
Copy link
Contributor Author

wangshuo128 commented Nov 26, 2019

I applied patch #21356 in my cluster. Found that the AsyncEventQueue thread was set interrupted when queue.take() sometimes. I guess it's interrupted by some other thread asynchronously. Unfortunately, I didn't find which thread (in Spark or HDFS) did this.

Here is the log:

java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.132.165.35:46887
remote=/10.132.78.10:50010]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2319)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1087)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1056)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1197)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:942)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:453)
19/11/24 03:58:01 ERROR spark-listener-group-eventLog Utils: uncaught error in thread spark-listener-group-eventLog, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1303)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
19/11/24 03:58:01 ERROR spark-listener-group-eventLog Utils: throw uncaught fatal error in thread spark-listener-group-eventLog
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1303)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)

Stopping the entire queue when interrupted in dispatch maybe not the best choice. If it's an important queue (e.g. dynamic resource allocation), I think it's better to stop the SparkContext.
However, in this case, it's in event log queue, I think we could keep the job running rather than stop the SparkContext. So stop AsyncEventQueue for event log and stop SparkContext for others may be an option.

Do you have any advice? cc @squito @cloud-fan :)

@squito
Copy link
Contributor

squito commented Nov 26, 2019

Can you please open another jira for this, since SPARK-24309 is already in shipped releases?

I haven't thought about this a lot, but I don't know if I really like this idea. You would be able to stop the running job if there were only one job, but what about with concurrent jobs? I wonder if we should just have some special case handling in the EventLoggingListener to retry once after interrupt?

@wangshuo128 wangshuo128 changed the title [SPARK-24309][CORE][FOLLOWUP]Stop AsyncEventQueue when interrupted in dispatch [SPARK-30059][CORE]Stop AsyncEventQueue when interrupted in dispatch Nov 27, 2019
@wangshuo128
Copy link
Contributor Author

Thanks for your reply. @squito

You would be able to stop the running job if there were only one job, but what about with concurrent jobs?

I didn't get the point. What would happen if just stopping event log queue when concurrent jobs running. Could you explain this in detail?

I wonder if we should just have some special case handling in the EventLoggingListener to retry once after interrupt?

I agree with this.
AFAIK, the interruption issue only appears in the event log queue, however, it seems that the current approach can't cover all the cases, e.g. interrupted in queue.take().
I came up with an idea that wrapping EventLoggingListener#logEvent in an isolated thread and handle InterruptedException in that thread, thus AsyncEventQueue thread wouldn't be affected.

@squito
Copy link
Contributor

squito commented Nov 27, 2019

You would be able to stop the running job if there were only one job, but what about with concurrent jobs?

I didn't get the point. What would happen if just stopping event log queue when concurrent jobs running. Could you explain this in detail?

sorry, please ignore that -- I misread your earlier comments, I had thought you were discussing stopping running jobs.

AFAIK, the interruption issue only appears in the event log queue, however, it seems that the current approach can't cover all the cases, e.g. interrupted in queue.take().
I came up with an idea that wrapping EventLoggingListener#logEvent in an isolated thread and handle InterruptedException in that thread, thus AsyncEventQueue thread wouldn't be affected.

yes good point. I'd need to walk through this very carefully but that sounds reasonable to me.

@squito
Copy link
Contributor

squito commented Nov 27, 2019

do you know what version of hadoop you are on? I am trying to compare with the code -- its clearly not trunk (the DataStreamer class has been moved and plenty of other refactoring has happened).

Still, even looking at trunk, I have a guess at what is happening. The first part of your log shows the interrupt is coming from the DataStreamer, though that is running in a separate thread and isnt' directly interrupting the event log queue thread. But my guess is that calls to flush() in the event log thread check the status of that data streamer, and will set the event log thread's interrupt status. (eg. something like this, though probably not these lines: https://github.com/apache/hadoop/blob/7f2ea2ac46596883fb8f110f754a0eadeb69205e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java#L730-L734)

so its possible that we're actually leaving the previous call to flush() with the interrupt status set, but we're ignoring it. And then when we get to the next call to queue.take(), it immediately throws an interrupted exception because the interrupt status has already been set.

that would probably be an hdfs bug, but seems to at least fit the pattern of what we see here, and something we could at least check for.

@steveloughran do you have any idea how this interrupt from DataStreamer is getting back to the spark event log writer?

@wangshuo128
Copy link
Contributor Author

do you know what version of hadoop you are on?

my hadoop version is 2.7.1

@steveloughran
Copy link
Contributor

@steveloughran do you have any idea how this interrupt from DataStreamer is getting back to the spark event log writer?

errors terminating the datastreamer thread are caught and then escalated to whichever the next API call uses the instance. Usually they are IO problems considered non-recoverable

I don't know HDFS internals, but a look at the code hints this happened due to failures to talk to any datanode. Or at least, that's what the code is assuming -that any interrupt is a timeout in connections,

If you can show there's a problem happening on the 3.2.x libraries you should be able to persuade some (else!) to have a look @ this.

Comment on lines +684 to +692
val listenerThread = Thread.currentThread()
new Thread(new Runnable {
override def run(): Unit = {
while (sleep) {
Thread.sleep(10)
}
listenerThread.interrupt()
}
}).start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it means that EventLogListener does similar thing like this inside?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 12, 2020
@github-actions github-actions bot closed this Jul 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants