Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
shouldBatchFetch: Boolean)
shouldBatchFetch: Boolean = false)
extends ShuffleReader[K, C] with Logging {

private val dep = handle.dependency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
taskContext,
metrics,
serializerManager,
blockManager,
shouldBatchFetch = false)
blockManager)

assert(shuffleReader.read().length === keyValuePairsPerMap * numMaps)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,53 +349,43 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update/add the corresponding description for all the SQLConf that are affected by this conf. Otherwise, end users might not know the relation between these confs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in #26664.

.doc("When true, enable adaptive query execution.")
.booleanConf
.createWithDefault(false)

val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.doc("The target post-shuffle input size in bytes of a task.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)


val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled")
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need to improve the documentation because it doesn't seem to support the old shuffle service:

19/11/21 01:02:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 33.2 (TID 192743, hdc49-mcc10-01-0710-2509-029-tess0035.stratus.rno.ebay.com, executor 734): FetchFailed(BlockManagerId(730, hdc49-mcc10-01-0710-4003-037-tess0035.stratus.rno.ebay.com, 7337, None), shuffleId=14, mapIndex=2991, mapId=2991, reduceId=6507, message=
org.apache.spark.shuffle.FetchFailedException: Failure while fetching StreamChunkId{streamId=1015050587051, chunkIndex=1}: java.lang.IndexOutOfBoundsException
	at java.nio.Buffer.checkIndex(Buffer.java:540)
	at java.nio.ByteBufferAsLongBufferB.get(ByteBufferAsLongBufferB.java:115)
	at org.apache.spark.network.shuffle.ShuffleIndexInformation.getIndex(ShuffleIndexInformation.java:64)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:242)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:175)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:252)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:208)
	at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:96)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
	at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
	at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)

	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:649)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:562)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:69)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1015050587051, chunkIndex=1}: java.lang.IndexOutOfBoundsException
	at java.nio.Buffer.checkIndex(Buffer.java:540)
	at java.nio.ByteBufferAsLongBufferB.get(ByteBufferAsLongBufferB.java:115)
	at org.apache.spark.network.shuffle.ShuffleIndexInformation.getIndex(ShuffleIndexInformation.java:64)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:242)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:175)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:252)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:208)
	at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:96)
	at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
	at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
	at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
	at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)

	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:139)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should improve the error message as well. cc @xuanyuanking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting this, fix it in #26663.

.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, this feature also depends on a relocatable " +
"serializer and the concatenation support codec in use.")
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
.createWithDefault(false)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)

val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
"post-shuffle partitions based on map output statistics.")
.booleanConf
.createWithDefault(true)

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
"spark.sql.shuffle.partitions")
Expand All @@ -405,13 +395,22 @@ object SQLConf {
.createOptional

val OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED =
buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled")
buildConf("spark.sql.adaptive.shuffle.optimizedLocalShuffleReader.enabled")
.doc("When true and adaptive execution is enabled, this enables the optimization of" +
" converting the shuffle reader to local shuffle reader for the shuffle exchange" +
" of the broadcast hash join in probe side.")
.booleanConf
.createWithDefault(true)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()
Expand Down Expand Up @@ -2148,21 +2147,18 @@ class SQLConf extends Serializable with Logging {

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def fetchShuffleBlocksInBatchEnabled: Boolean =
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)

def nonEmptyPartitionRatioForBroadcastJoin: Double =
getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN)

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
Expand Down