diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 6ab401b9a0d5a..8709d30ef1be1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -32,5 +32,9 @@ public enum IOMode { /** * Native KQUEUE via JNI, MacOS/BSD only */ - KQUEUE + KQUEUE, + /** + * Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO. + */ + AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 627ebd0045f2d..c113b72f557cf 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -21,9 +21,11 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; +import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueIoHandler; import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.kqueue.KQueueSocketChannel; @@ -35,7 +37,7 @@ /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, - * or KQUEUE. + * , KQUEUE, or AUTO. */ public class NettyUtils { @@ -71,6 +73,15 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case NIO -> NioIoHandler.newFactory(); case EPOLL -> EpollIoHandler.newFactory(); case KQUEUE -> KQueueIoHandler.newFactory(); + case AUTO -> { + if (JavaUtils.isLinux && Epoll.isAvailable()) { + yield EpollIoHandler.newFactory(); + } else if (JavaUtils.isMac && KQueue.isAvailable()) { + yield KQueueIoHandler.newFactory(); + } else { + yield NioIoHandler.newFactory(); + } + } }; return new MultiThreadIoEventLoopGroup(numThreads, threadFactory, handlerFactory); } @@ -81,6 +92,15 @@ public static Class getClientChannelClass(IOMode mode) { case NIO -> NioSocketChannel.class; case EPOLL -> EpollSocketChannel.class; case KQUEUE -> KQueueSocketChannel.class; + case AUTO -> { + if (JavaUtils.isLinux && Epoll.isAvailable()) { + yield EpollSocketChannel.class; + } else if (JavaUtils.isMac && KQueue.isAvailable()) { + yield KQueueSocketChannel.class; + } else { + yield NioSocketChannel.class; + } + } }; } @@ -90,6 +110,15 @@ public static Class getServerChannelClass(IOMode mode) case NIO -> NioServerSocketChannel.class; case EPOLL -> EpollServerSocketChannel.class; case KQUEUE -> KQueueServerSocketChannel.class; + case AUTO -> { + if (JavaUtils.isLinux && Epoll.isAvailable()) { + yield EpollServerSocketChannel.class; + } else if (JavaUtils.isMac && KQueue.isAvailable()) { + yield KQueueServerSocketChannel.class; + } else { + yield NioServerSocketChannel.class; + } + } }; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 5718c20c7d119..849e58e5db4d7 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -87,7 +87,7 @@ public String getModuleName() { return module; } - /** IO mode: NIO, EPOLL, or KQUEUE */ + /** IO mode: NIO, EPOLL, KQUEUE, or AUTO */ public String ioMode() { String defaultIOMode = conf.get(SPARK_NETWORK_DEFAULT_IO_MODE_KEY, "NIO"); return conf.get(SPARK_NETWORK_IO_MODE_KEY, defaultIOMode).toUpperCase(Locale.ROOT); diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index 3d2108f11a6f6..d7f9e248dfe5f 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -55,3 +55,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { override def shouldRunTests: Boolean = Utils.isMac override def ioMode: IOMode = IOMode.KQUEUE } + +class ShuffleNettyAutoSuite extends ShuffleNettySuite { + override def ioMode: IOMode = IOMode.AUTO +}