diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index 91910b936e7c..b3904f3362e8 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -56,8 +56,12 @@ class NettyBlockRpcServer( message match { case openBlocks: OpenBlocks => val blocksNum = openBlocks.blockIds.length - val blocks = for (i <- (0 until blocksNum).view) - yield blockManager.getLocalBlockData(BlockId.apply(openBlocks.blockIds(i))) + val blocks = (0 until blocksNum).map { i => + val blockId = BlockId.apply(openBlocks.blockIds(i)) + assert(!blockId.isInstanceOf[ShuffleBlockBatchId], + "Continuous shuffle block fetching only works for new fetch protocol.") + blockManager.getLocalBlockData(blockId) + } val streamId = streamManager.registerStream(appId, blocks.iterator.asJava, client.getChannel) logTrace(s"Registered streamId $streamId with $blocksNum buffers") diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index d5a66db23301..bc2a0fbc36d5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -50,14 +50,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( } else { true } + val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) val doBatchFetch = shouldBatchFetch && serializerRelocatable && - (!compressed || codecConcatenation) + (!compressed || codecConcatenation) && !useOldFetchProtocol if (shouldBatchFetch && !doBatchFetch) { logDebug("The feature tag of continuous shuffle block fetching is set to true, but " + "we can not enable the feature because other conditions are not satisfied. " + s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " + - s"codec concatenation: $codecConcatenation.") + s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " + + s"$useOldFetchProtocol.") } doBatchFetch } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9479fc21b599..105b2a857e5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -367,7 +367,8 @@ object SQLConf { "reduce IO and improve performance. Note, multiple continuous blocks exist in single " + s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " + - "on a relocatable serializer and the concatenation support codec in use.") + "on a relocatable serializer, the concatenation support codec in use and the new version" + + "shuffle fetch protocol.") .booleanConf .createWithDefault(true)