diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 4f070f02a125..cc4657efe39a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -179,4 +179,18 @@ public static PooledByteBufAllocator createPooledByteBufAllocator( allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false ); } + + /** + * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark allows to create direct + * ByteBuf and Netty enables directBufferPreferred. + */ + public static boolean preferDirectBufs(TransportConf conf) { + boolean allowDirectBufs; + if (conf.sharedByteBufAllocators()) { + allowDirectBufs = conf.preferDirectBufsForSharedByteBufAllocators(); + } else { + allowDirectBufs = conf.preferDirectBufs(); + } + return allowDirectBufs && PlatformDependent.directBufferPreferred(); + } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a94e63656e1a..4903421f9063 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -35,6 +35,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.util.NettyUtils import org.apache.spark.resource.ResourceInformation import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ @@ -85,7 +87,8 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Connecting to driver: " + driverUrl) try { - if (PlatformDependent.directBufferPreferred() && + val shuffleClientTransportConf = SparkTransportConf.fromSparkConf(env.conf, "shuffle") + if (NettyUtils.preferDirectBufs(shuffleClientTransportConf) && PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) { throw new SparkException(s"Netty direct memory should at least be bigger than " + s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " +