File tree Expand file tree Collapse file tree 2 files changed +10
-1
lines changed
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree 2 files changed +10
-1
lines changed Original file line number Diff line number Diff line change @@ -46,6 +46,7 @@ class TaskContext(
4646 }
4747
4848 def executeOnCompleteCallbacks () {
49- onCompleteCallbacks.foreach{_()}
49+ // Process complete callbacks in the reverse order of registration
50+ onCompleteCallbacks.reverse.foreach{_()}
5051 }
5152}
Original file line number Diff line number Diff line change @@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
100100 }
101101 }.start()
102102
103+ /*
104+ * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
105+ * other completion callbacks might invalidate the input. Because interruption
106+ * is not synchronous this still leaves a potential race where the interruption is
107+ * processed only after the stream becomes invalid.
108+ */
109+ context.addOnCompleteCallback(() => context.interrupted = true )
110+
103111 // Return an iterator that read lines from the process's stdout
104112 val stream = new DataInputStream (new BufferedInputStream (worker.getInputStream, bufferSize))
105113 val stdoutIterator = new Iterator [Array [Byte ]] {
You can’t perform that action at this time.
0 commit comments