diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index bc2a0fbc36d5..30c752960d5d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C]( true } val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) + // SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption. + val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED) val doBatchFetch = shouldBatchFetch && serializerRelocatable && - (!compressed || codecConcatenation) && !useOldFetchProtocol + (!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption if (shouldBatchFetch && !doBatchFetch) { logDebug("The feature tag of continuous shuffle block fetching is set to true, but " + "we can not enable the feature because other conditions are not satisfied. " + s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " + s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " + - s"$useOldFetchProtocol.") + s"$useOldFetchProtocol, io encryption: $ioEncryption.") } doBatchFetch } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 610f436050b0..130af3ae964c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -500,8 +500,8 @@ object SQLConf { "reduce IO and improve performance. Note, multiple contiguous blocks exist in single " + s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " + - "on a relocatable serializer, the concatenation support codec in use and the new version " + - "shuffle fetch protocol.") + "on a relocatable serializer, the concatenation support codec in use, the new version " + + "shuffle fetch protocol and io encryption is disabled.") .version("3.0.0") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index e56298102230..6b9f49409e48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ import org.apache.spark.sql.execution.adaptive._ @@ -57,15 +58,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl def withSparkSession( f: SparkSession => Unit, targetPostShuffleInputSize: Int, - minNumPostShufflePartitions: Option[Int]): Unit = { + minNumPostShufflePartitions: Option[Int], + enableIOEncryption: Boolean = false): Unit = { val sparkConf = new SparkConf(false) .setMaster("local[*]") .setAppName("test") .set(UI_ENABLED, false) + .set(IO_ENCRYPTION_ENABLED, enableIOEncryption) .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(SQLConf.FETCH_SHUFFLE_BLOCKS_IN_BATCH.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, @@ -408,6 +412,25 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl } withSparkSession(test, 100, None) } + + test("SPARK-34790: enable IO encryption in AQE partition coalescing") { + val test: SparkSession => Unit = { spark: SparkSession => + val ds = spark.range(0, 100, 1, numInputPartitions) + val resultDf = ds.repartition(ds.col("id")) + resultDf.collect() + + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert( + finalPlan.collect { + case r @ CoalescedShuffleReader() => r + }.isDefinedAt(0)) + } + Seq(true, false).foreach { enableIOEncryption => + // Before SPARK-34790, it will throw an exception when io encryption enabled. + withSparkSession(test, Int.MaxValue, None, enableIOEncryption) + } + } } object CoalescedShuffleReader {