From 9def9fc649ec94a0ccdf96fdaeb8d30dedb5f132 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sun, 4 Dec 2022 21:17:19 +0800 Subject: [PATCH 1/4] [SPARK-41376][CORE] Executor netty direct memory check should respect spark.shuffle.io.preferDirectBufs --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a94e63656e1a..5d1183cd332b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -85,7 +85,8 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Connecting to driver: " + driverUrl) try { - if (PlatformDependent.directBufferPreferred() && + if (env.conf.getBoolean("spark.shuffle.io.preferDirectBufs", true) && + PlatformDependent.directBufferPreferred() && PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) { throw new SparkException(s"Netty direct memory should at least be bigger than " + s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " + From 91515df0a2dfe4411dd8c9609df1113578d5cfa7 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 5 Dec 2022 10:30:26 +0800 Subject: [PATCH 2/4] address --- .../executor/CoarseGrainedExecutorBackend.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 5d1183cd332b..d0ab28a327d1 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -85,8 +85,19 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Connecting to driver: " + driverUrl) try { - if (env.conf.getBoolean("spark.shuffle.io.preferDirectBufs", true) && - PlatformDependent.directBufferPreferred() && + // The following logic originally comes from the constructor of + // org.apache.spark.network.client.TransportClientFactory + val sharedByteBufAllocators = + env.conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true) + val preferDirectBufsForSharedByteBufAllocators = + env.conf.getBoolean("spark.network.io.preferDirectBufs", true) + val preferDirectBufs = + env.conf.getBoolean("spark.shuffle.io.preferDirectBufs", true) + val shuffleClientPreferDirectBufs = PlatformDependent.directBufferPreferred() && { + if (sharedByteBufAllocators) preferDirectBufsForSharedByteBufAllocators + else preferDirectBufs + } + if (shuffleClientPreferDirectBufs && PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) { throw new SparkException(s"Netty direct memory should at least be bigger than " + s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " + From 5fc50803e6ab4180e59c3f06ab24e657c838b0da Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 7 Dec 2022 10:47:51 +0800 Subject: [PATCH 3/4] address comment --- .../apache/spark/network/util/NettyUtils.java | 14 ++++++++++++++ .../executor/CoarseGrainedExecutorBackend.scala | 17 ++++------------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 4f070f02a125..10494a82ce55 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -179,4 +179,18 @@ public static PooledByteBufAllocator createPooledByteBufAllocator( allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false ); } + + /** + * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark configurations allows to + * create direct ByteBuf and Netty enables directBufferPreferred. + */ + public static boolean preferDirectBufs(TransportConf conf) { + boolean allowDirectBufs; + if (conf.sharedByteBufAllocators()) { + allowDirectBufs = conf.preferDirectBufsForSharedByteBufAllocators(); + } else { + allowDirectBufs = conf.preferDirectBufs(); + } + return allowDirectBufs && PlatformDependent.directBufferPreferred(); + } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d0ab28a327d1..4903421f9063 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -35,6 +35,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.util.NettyUtils import org.apache.spark.resource.ResourceInformation import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ @@ -85,19 +87,8 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Connecting to driver: " + driverUrl) try { - // The following logic originally comes from the constructor of - // org.apache.spark.network.client.TransportClientFactory - val sharedByteBufAllocators = - env.conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true) - val preferDirectBufsForSharedByteBufAllocators = - env.conf.getBoolean("spark.network.io.preferDirectBufs", true) - val preferDirectBufs = - env.conf.getBoolean("spark.shuffle.io.preferDirectBufs", true) - val shuffleClientPreferDirectBufs = PlatformDependent.directBufferPreferred() && { - if (sharedByteBufAllocators) preferDirectBufsForSharedByteBufAllocators - else preferDirectBufs - } - if (shuffleClientPreferDirectBufs && + val shuffleClientTransportConf = SparkTransportConf.fromSparkConf(env.conf, "shuffle") + if (NettyUtils.preferDirectBufs(shuffleClientTransportConf) && PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) { throw new SparkException(s"Netty direct memory should at least be bigger than " + s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " + From e1198f4cc8213946c74b79671f4e0811c4e4d5df Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 7 Dec 2022 10:57:53 +0800 Subject: [PATCH 4/4] nit --- .../main/java/org/apache/spark/network/util/NettyUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 10494a82ce55..cc4657efe39a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -181,8 +181,8 @@ public static PooledByteBufAllocator createPooledByteBufAllocator( } /** - * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark configurations allows to - * create direct ByteBuf and Netty enables directBufferPreferred. + * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark allows to create direct + * ByteBuf and Netty enables directBufferPreferred. */ public static boolean preferDirectBufs(TransportConf conf) { boolean allowDirectBufs;