Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
true
}
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still have enough time to take a look and make AQE supported with IO encryption. Does it require a lot of change to make it supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. This PR makes a simple read control, which may sacrifice shuffle read performance in AQE. It would be better to allow batch fetching for contiguous shuffle blocks while IO encryption enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it seems to me that when io encryption enabled, the demarcation point (end of blocks) between consecutive blocks cannot be resolved correctly during deserialization (shuffle read).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could store the size info of each consecutive block in ShuffleBlockBatchId to resolve it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good idea but we need to think about how to support it in the current shuffle protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have this bandage fix first. The encryption shuffle mode breaks the assumption of batch serialized and deserialize, similar to the issue with codec concatenation.


val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
(!compressed || codecConcatenation) && !useOldFetchProtocol
(!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
if (shouldBatchFetch && !doBatchFetch) {
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
"we can not enable the feature because other conditions are not satisfied. " +
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
s"$useOldFetchProtocol.")
s"$useOldFetchProtocol, io encryption: $ioEncryption.")
}
doBatchFetch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ object SQLConf {
"reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
"on a relocatable serializer, the concatenation support codec in use and the new version " +
"shuffle fetch protocol.")
"on a relocatable serializer, the concatenation support codec in use, the new version " +
"shuffle fetch protocol and io encryption is disabled.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._
Expand Down Expand Up @@ -57,15 +58,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
def withSparkSession(
f: SparkSession => Unit,
targetPostShuffleInputSize: Int,
minNumPostShufflePartitions: Option[Int]): Unit = {
minNumPostShufflePartitions: Option[Int],
enableIOEncryption: Boolean = false): Unit = {
val sparkConf =
new SparkConf(false)
.setMaster("local[*]")
.setAppName("test")
.set(UI_ENABLED, false)
.set(IO_ENCRYPTION_ENABLED, enableIOEncryption)
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.FETCH_SHUFFLE_BLOCKS_IN_BATCH.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
Expand Down Expand Up @@ -408,6 +412,25 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
}
withSparkSession(test, 100, None)
}

test("SPARK-34790: enable IO encryption in AQE partition coalescing") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't reproduce the issue by this test. @hezuojiao Could you double-check it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reproduce the issue because my laptop has only 4 cores :)
I updated the test case by using fixed numInputPartitions instead of sparkContext.defaultParallelism which is related to the number of computer cores.
You can try to run this test again now. @Ngone51

val test: SparkSession => Unit = { spark: SparkSession =>
val ds = spark.range(0, 100, 1, numInputPartitions)
val resultDf = ds.repartition(ds.col("id"))
resultDf.collect()

val finalPlan = resultDf.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
}.isDefinedAt(0))
}
Seq(true, false).foreach { enableIOEncryption =>
// Before SPARK-34790, it will throw an exception when io encryption enabled.
withSparkSession(test, Int.MaxValue, None, enableIOEncryption)
}
}
}

object CoalescedShuffleReader {
Expand Down