Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,29 +355,33 @@ object SQLConf {

val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.doc("The target post-shuffle input size in bytes of a task.")
.doc("The target post-shuffle input size in bytes of a task. This configuration only has " +
s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also when spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled is true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fix the doc of SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS and SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.

.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled")
.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, this feature also depends on a relocatable " +
"serializer and the concatenation support codec in use.")
"reduce IO and improve performance. Note, multiple continuous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, " +
"this feature also depends on a relocatable serializer and the concatenation support " +
"codec in use.")
.booleanConf
.createWithDefault(true)

val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
"post-shuffle partitions based on map output statistics.")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " +
"the number of post-shuffle partitions based on map output statistics.")
.booleanConf
.createWithDefault(true)

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.doc("The advisory minimum number of post-shuffle partitions used when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
Expand All @@ -387,25 +391,27 @@ object SQLConf {
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
"spark.sql.shuffle.partitions")
"spark.sql.shuffle.partitions. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
.intConf
.checkValue(_ > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createOptional

val LOCAL_SHUFFLE_READER_ENABLED =
buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
.doc("When true and adaptive execution is enabled, this enables the optimization of" +
" converting the shuffle reader to local shuffle reader for the shuffle exchange" +
" of the broadcast hash join in probe side.")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " +
"optimization of converting the shuffle reader to local shuffle reader for the shuffle " +
"exchange of the broadcast hash join in probe side.")
.booleanConf
.createWithDefault(true)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.")
"of its size.This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)
Expand Down