Skip to content

Commit 73bf3f2

Browse files
zsxwingJoshRosen
authored andcommitted
[SPARK-3741] Make ConnectionManager propagate errors properly and add mo...
...re logs to avoid Executors swallowing errors This PR made the following changes: * Register a callback to `Connection` so that the error will be propagated properly. * Add more logs so that the errors won't be swallowed by Executors. * Use trySuccess/tryFailure because `Promise` doesn't allow to call success/failure more than once. Author: zsxwing <[email protected]> Closes apache#2593 from zsxwing/SPARK-3741 and squashes the following commits: 1d5aed5 [zsxwing] Fix naming 0b8a61c [zsxwing] Merge branch 'master' into SPARK-3741 764aec5 [zsxwing] [SPARK-3741] Make ConnectionManager propagate errors properly and add more logs to avoid Executors swallowing errors
1 parent 1e0aa4d commit 73bf3f2

File tree

2 files changed

+172
-69
lines changed

2 files changed

+172
-69
lines changed

core/src/main/scala/org/apache/spark/network/nio/Connection.scala

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ package org.apache.spark.network.nio
2020
import java.net._
2121
import java.nio._
2222
import java.nio.channels._
23+
import java.util.concurrent.ConcurrentLinkedQueue
2324
import java.util.LinkedList
2425

2526
import org.apache.spark._
2627

28+
import scala.collection.JavaConversions._
2729
import scala.collection.mutable.{ArrayBuffer, HashMap}
30+
import scala.util.control.NonFatal
2831

2932
private[nio]
3033
abstract class Connection(val channel: SocketChannel, val selector: Selector,
@@ -51,7 +54,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
5154

5255
@volatile private var closed = false
5356
var onCloseCallback: Connection => Unit = null
54-
var onExceptionCallback: (Connection, Exception) => Unit = null
57+
val onExceptionCallbacks = new ConcurrentLinkedQueue[(Connection, Throwable) => Unit]
5558
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
5659

5760
val remoteAddress = getRemoteAddress()
@@ -130,20 +133,24 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
130133
onCloseCallback = callback
131134
}
132135

133-
def onException(callback: (Connection, Exception) => Unit) {
134-
onExceptionCallback = callback
136+
def onException(callback: (Connection, Throwable) => Unit) {
137+
onExceptionCallbacks.add(callback)
135138
}
136139

137140
def onKeyInterestChange(callback: (Connection, Int) => Unit) {
138141
onKeyInterestChangeCallback = callback
139142
}
140143

141-
def callOnExceptionCallback(e: Exception) {
142-
if (onExceptionCallback != null) {
143-
onExceptionCallback(this, e)
144-
} else {
145-
logError("Error in connection to " + getRemoteConnectionManagerId() +
146-
" and OnExceptionCallback not registered", e)
144+
def callOnExceptionCallbacks(e: Throwable) {
145+
onExceptionCallbacks foreach {
146+
callback =>
147+
try {
148+
callback(this, e)
149+
} catch {
150+
case NonFatal(e) => {
151+
logWarning("Ignored error in onExceptionCallback", e)
152+
}
153+
}
147154
}
148155
}
149156

@@ -323,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
323330
} catch {
324331
case e: Exception => {
325332
logError("Error connecting to " + address, e)
326-
callOnExceptionCallback(e)
333+
callOnExceptionCallbacks(e)
327334
}
328335
}
329336
}
@@ -348,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
348355
} catch {
349356
case e: Exception => {
350357
logWarning("Error finishing connection to " + address, e)
351-
callOnExceptionCallback(e)
358+
callOnExceptionCallbacks(e)
352359
}
353360
}
354361
true
@@ -393,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
393400
} catch {
394401
case e: Exception => {
395402
logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
396-
callOnExceptionCallback(e)
403+
callOnExceptionCallbacks(e)
397404
close()
398405
return false
399406
}
@@ -420,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
420427
case e: Exception =>
421428
logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
422429
e)
423-
callOnExceptionCallback(e)
430+
callOnExceptionCallbacks(e)
424431
close()
425432
}
426433

@@ -577,7 +584,7 @@ private[spark] class ReceivingConnection(
577584
} catch {
578585
case e: Exception => {
579586
logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
580-
callOnExceptionCallback(e)
587+
callOnExceptionCallbacks(e)
581588
close()
582589
return false
583590
}

0 commit comments

Comments
 (0)