diff --git a/client-flink/flink-1.16-shaded/pom.xml b/client-flink/flink-1.16-shaded/pom.xml index 4eb0fbfb565..c856b11083b 100644 --- a/client-flink/flink-1.16-shaded/pom.xml +++ b/client-flink/flink-1.16-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-1.17-shaded/pom.xml b/client-flink/flink-1.17-shaded/pom.xml index 83223ae6068..fd8512fe7df 100644 --- a/client-flink/flink-1.17-shaded/pom.xml +++ b/client-flink/flink-1.17-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-1.18-shaded/pom.xml b/client-flink/flink-1.18-shaded/pom.xml index 4dbb4b9a664..04d5b965129 100644 --- a/client-flink/flink-1.18-shaded/pom.xml +++ b/client-flink/flink-1.18-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-1.19-shaded/pom.xml b/client-flink/flink-1.19-shaded/pom.xml index 7f1070c41f6..060ce16446d 100644 --- a/client-flink/flink-1.19-shaded/pom.xml +++ b/client-flink/flink-1.19-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-1.20-shaded/pom.xml b/client-flink/flink-1.20-shaded/pom.xml index d215d6eefa5..3b004cbed5a 100644 --- a/client-flink/flink-1.20-shaded/pom.xml +++ b/client-flink/flink-1.20-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-2.0-shaded/pom.xml b/client-flink/flink-2.0-shaded/pom.xml index 7be10544bc3..020728018f6 100644 --- a/client-flink/flink-2.0-shaded/pom.xml +++ b/client-flink/flink-2.0-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-flink/flink-2.1-shaded/pom.xml b/client-flink/flink-2.1-shaded/pom.xml index 0faa1f65155..d2c874ba53d 100644 --- a/client-flink/flink-2.1-shaded/pom.xml +++ b/client-flink/flink-2.1-shaded/pom.xml @@ -120,11 +120,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-mr/mr-shaded/pom.xml b/client-mr/mr-shaded/pom.xml index 089d8301900..2ffa40e1aad 100644 --- a/client-mr/mr-shaded/pom.xml +++ b/client-mr/mr-shaded/pom.xml @@ -131,11 +131,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-spark/spark-2-shaded/pom.xml b/client-spark/spark-2-shaded/pom.xml index cfbc020797d..9db62b423e0 100644 --- a/client-spark/spark-2-shaded/pom.xml +++ b/client-spark/spark-2-shaded/pom.xml @@ -121,11 +121,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-spark/spark-3-shaded/pom.xml b/client-spark/spark-3-shaded/pom.xml index 8cce8577a4a..bc8c2065e2d 100644 --- a/client-spark/spark-3-shaded/pom.xml +++ b/client-spark/spark-3-shaded/pom.xml @@ -125,11 +125,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-spark/spark-4-shaded/pom.xml b/client-spark/spark-4-shaded/pom.xml index 633eabbecc5..5e741d60179 100644 --- a/client-spark/spark-4-shaded/pom.xml +++ b/client-spark/spark-4-shaded/pom.xml @@ -125,11 +125,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/client-tez/tez-shaded/pom.xml b/client-tez/tez-shaded/pom.xml index e192d3bdbed..e8060d95a81 100644 --- a/client-tez/tez-shaded/pom.xml +++ b/client-tez/tez-shaded/pom.xml @@ -145,11 +145,24 @@ - - - - - + + + + + + + + + + + + + + + + + + diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/IOMode.java b/common/src/main/java/org/apache/celeborn/common/network/util/IOMode.java index 7d2ddd89588..ed66464c6dc 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/IOMode.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/IOMode.java @@ -17,11 +17,12 @@ package org.apache.celeborn.common.network.util; -/** - * Selector for which form of low-level IO we should use. NIO is always available, while EPOLL is - * only available on Linux. AUTO is used to select EPOLL if it's available, or NIO otherwise. - */ +/** Selector for which form of low-level IO we should use. */ public enum IOMode { + /** Java NIO (Selector), cross-platform portable */ NIO, - EPOLL + /** Native EPOLL via JNI, Linux only */ + EPOLL, + /** Native KQUEUE via JNI, MacOS/BSD only */ + KQUEUE } diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java index 2f04179d7a4..109328233d0 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java @@ -36,6 +36,9 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -47,7 +50,10 @@ import org.apache.celeborn.common.metrics.source.AbstractSource; import org.apache.celeborn.common.util.JavaUtils; -/** Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */ +/** + * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, or + * KQUEUE. + */ public class NettyUtils { private static final ByteBufAllocator[] _sharedByteBufAllocator = new ByteBufAllocator[2]; private static final ConcurrentHashMap allocatorsIndex = @@ -80,6 +86,8 @@ public static EventLoopGroup createEventLoop( : new NioEventLoopGroup(numThreads, threadFactory); case EPOLL: return new EpollEventLoopGroup(numThreads, threadFactory); + case KQUEUE: + return new KQueueEventLoopGroup(numThreads, threadFactory); default: throw new IllegalArgumentException("Unknown io mode: " + mode); } @@ -92,6 +100,8 @@ public static Class getClientChannelClass(IOMode mode) { return NioSocketChannel.class; case EPOLL: return EpollSocketChannel.class; + case KQUEUE: + return KQueueSocketChannel.class; default: throw new IllegalArgumentException("Unknown io mode: " + mode); } @@ -104,6 +114,8 @@ public static Class getServerChannelClass(IOMode mode) return NioServerSocketChannel.class; case EPOLL: return EpollServerSocketChannel.class; + case KQUEUE: + return KQueueServerSocketChannel.class; default: throw new IllegalArgumentException("Unknown io mode: " + mode); } diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java index 55b22306d6b..26a0ce25e6e 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java @@ -20,6 +20,7 @@ import java.io.File; import io.netty.channel.epoll.Epoll; +import io.netty.channel.kqueue.KQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +45,11 @@ public String getModuleName() { return module; } - /** IO mode: nio or epoll */ + /** IO mode: NIO, EPOLL, or KQUEUE */ public String ioMode() { - return Epoll.isAvailable() ? celebornConf.networkIoMode(module) : IOMode.NIO.name(); + return Epoll.isAvailable() || KQueue.isAvailable() + ? celebornConf.networkIoMode(module) + : IOMode.NIO.name(); } /** If true, we will prefer allocating off-heap byte buffers within Netty. */ diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 2fa19f5e38c..010cd57a4e0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -28,6 +28,7 @@ import scala.util.Try import scala.util.matching.Regex import io.netty.channel.epoll.Epoll +import io.netty.channel.kqueue.KQueue import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl import org.apache.celeborn.common.client.{ApplicationInfoProvider, DefaultApplicationInfoProvider} @@ -543,7 +544,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def networkIoMode(module: String): String = { get( NETWORK_IO_MODE.key.replace("", module), - if (Epoll.isAvailable) IOMode.EPOLL.name() else IOMode.NIO.name()) + if (Epoll.isAvailable) { IOMode.EPOLL.name() } + else if (KQueue.isAvailable) { IOMode.KQUEUE.name() } + else { IOMode.NIO.name() }) } def networkIoPreferDirectBufs(module: String): Boolean = { @@ -2089,10 +2092,12 @@ object CelebornConf extends Logging { val NETWORK_IO_MODE: OptionalConfigEntry[String] = buildConf("celeborn..io.mode") .categories("network") - .doc("Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO.") + .doc("Netty EventLoopGroup backend, available options: NIO, EPOLL, KQUEUE. " + + "For Linux environments, EPOLL is used if available before using NIO. " + + "For MacOS/BSD environments, KQUEUE is used if available before using NIO.") .stringConf .transform(_.toUpperCase) - .checkValues(Set(IOMode.NIO.name(), IOMode.EPOLL.name())) + .checkValues(Set(IOMode.NIO.name(), IOMode.EPOLL.name(), IOMode.KQUEUE.name())) .createOptional val NETWORK_IO_PREFER_DIRECT_BUFS: ConfigEntry[Boolean] = diff --git a/docs/configuration/network.md b/docs/configuration/network.md index ec809f57d21..2cdad5aa096 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | false | Connection active timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | <undefined> | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | <undefined> | false | Netty EventLoopGroup backend, available options: NIO, EPOLL, KQUEUE. For Linux environments, EPOLL is used if available before using NIO. For MacOS/BSD environments, KQUEUE is used if available before using NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | diff --git a/docs/migration.md b/docs/migration.md index 81babe0ab07..2c302a53414 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -29,6 +29,8 @@ license: | - Since 0.7.0, Celeborn worker metrics `FlushDataTime` is renamed as `FlushLocalDataTime`. +- Since 0.7.0, Celeborn changed the default value of `celeborn..io.mode` from `NIO` to `KQUEUE` if kqueue mode is available, falling back to `NIO` otherwise. + # Upgrading from 0.5 to 0.6 - Since 0.6.0, Celeborn deprecate `celeborn.client.spark.fetch.throwsFetchFailure`. Please use `celeborn.client.spark.stageRerun.enabled` instead.