Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,19 @@ private[spark] class CoarseGrainedExecutorBackend(

logInfo("Connecting to driver: " + driverUrl)
try {
if (PlatformDependent.directBufferPreferred() &&
// The following logic originally comes from the constructor of
Copy link
Member

@Ngone51 Ngone51 Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add this logic into a NettyUitls function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid not, NettyUtils located at network-common, it can not access SparkConf which in core

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about NettyUtils just accept boolean inputs, not config

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, @Ngone51 would you please take a look again?

// org.apache.spark.network.client.TransportClientFactory
val sharedByteBufAllocators =
env.conf.getBoolean("spark.network.sharedByteBufAllocators.enabled", true)
val preferDirectBufsForSharedByteBufAllocators =
env.conf.getBoolean("spark.network.io.preferDirectBufs", true)
val preferDirectBufs =
env.conf.getBoolean("spark.shuffle.io.preferDirectBufs", true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see where is this conf accessed? I didn't find it used anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a combination of ("spark", module, "io.preferDirectBufs"), for shuffle client, the module is "shuffle", see org.apache.spark.network.util.TransportConf

val shuffleClientPreferDirectBufs = PlatformDependent.directBufferPreferred() && {
if (sharedByteBufAllocators) preferDirectBufsForSharedByteBufAllocators
else preferDirectBufs
}
if (shuffleClientPreferDirectBufs &&
PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) {
throw new SparkException(s"Netty direct memory should at least be bigger than " +
s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " +
Expand Down