diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 072ac86129e05..6fc31c116fa37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1022,6 +1022,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val MERGE_SMALL_OUTPUT_FILE = buildConf("spark.sql.files.mergeSmallFile.enabled") + .doc("When true, we will merge final stage's partition one by one according to value" + + "set by spark.sql.files.mergeSmallFile.maxBytes") + .booleanConf + .createWithDefault(false) + + val MERGE_SMALL_OUTPUT_FILE_MAX_BYTES = buildConf("spark.sql.files.mergeSmallFile.maxBytes") + .doc("Max size when we combine neighbouring small partitions") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(128 * 1024 * 1024) // parquet.block.size + val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files. " + "This configuration is effective only when using file-based sources such as Parquet, JSON " + @@ -2222,6 +2233,10 @@ class SQLConf extends Serializable with Logging { def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) + def mergeSmallOutputFiles: Boolean = getConf(MERGE_SMALL_OUTPUT_FILE) + + def mergeSmallOutputFilesMaxSize: Long = getConf(MERGE_SMALL_OUTPUT_FILE_MAX_BYTES) + def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 6b6ca531c6d3b..bcd9c3d763af0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, GenerateOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.execution.metric.SQLMetrics @@ -203,3 +203,42 @@ case class SortExec( super.cleanupResources() } } + +object SortExec { + + def createSorter(plan: SparkPlan, + enableRadixSort: Boolean, + sortOrder: Seq[SortOrder]): UnsafeExternalRowSorter = { + val ordering = GenerateOrdering.generate(sortOrder, plan.output) + + // The comparator for comparing prefix + val boundSortExpression = BindReferences.bindReference(sortOrder.head, plan.output) + val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) + + val canUseRadixSort = + enableRadixSort && + sortOrder.length == 1 && + SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression) + + // The generator for prefix + val prefixExpr = SortPrefix(boundSortExpression) + val prefixProjection = UnsafeProjection.create(Seq(prefixExpr)) + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + + override def computePrefix(row: InternalRow): + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + val prefix = prefixProjection.apply(row) + result.isNull = prefix.isNullAt(0) + result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) + result + } + } + + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes + val sorter = UnsafeExternalRowSorter.create( + plan.schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort) + + sorter + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 219c778b9164a..7a97f724598bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -168,18 +168,51 @@ object FileFormatWriter extends Logging { committer.setupJob(job) try { - val rdd = if (orderingMatched) { - empty2NullPlan.execute() + val rdd = if (sparkSession.sessionState.conf.mergeSmallOutputFiles) { + val originRdd = empty2NullPlan.execute() + val maxSize = sparkSession.sessionState.conf.mergeSmallOutputFilesMaxSize + val coalescer = new SizeBasedCoalescer(maxSize, plan.output) + val shufflePartitionNum = if (originRdd.partitions.length > 0) { + originRdd.partitions.length + } else { + 1 + } + val coalescedRdd = + originRdd.coalesce(shufflePartitionNum, shuffle = true, Some(coalescer)) + if (orderingMatched) { + coalescedRdd + } else { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = requiredOrdering + .map(SortOrder(_, Ascending)) + .map(BindReferences.bindReference(_, outputSpec.outputColumns)) + val enableRadixSort = plan.sqlContext.conf.enableRadixSort + coalescedRdd.mapPartitionsInternal { iter => + val sorter = SortExec.createSorter(plan, enableRadixSort, orderingExpr) + val metrics = TaskContext.get().taskMetrics() + // Remember spill data size of this task before execute this operator so that we can + // figure out how many bytes we spilled for this operator. + val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) + sortedIterator + } + } } else { - // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and - // the physical plan may have different attribute ids due to optimizer removing some - // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. - val orderingExpr = bindReferences( - requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) - SortExec( - orderingExpr, - global = false, - child = empty2NullPlan).execute() + if (orderingMatched) { + empty2NullPlan.execute() + } else { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = bindReferences( + requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + SortExec( + orderingExpr, + global = false, + child = empty2NullPlan).execute() + } } // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SizeBasedCoalescer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SizeBasedCoalescer.scala new file mode 100644 index 0000000000000..7113c4d50e724 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SizeBasedCoalescer.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.Partition +import org.apache.spark.rdd._ +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils + +class SizeBasedCoalescer(val maxSize: Long, val schema: Seq[Attribute]) + extends PartitionCoalescer with Serializable { + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { + val rowSize = EstimationUtils.getSizePerRow(schema).toLong + val partitionIndexToSize = parent.mapPartitionsWithIndexInternal((index, part) => { + // TODO make it more accurate + Map(index -> rowSize * part.size).iterator + }).collectAsMap() + + val partitions = parent.partitions + + val groups = ArrayBuffer[PartitionGroup]() + var currentGroup = new PartitionGroup() + var currentSum = 0L + var totalSum = 0L + var index = 0 + + def updateGroups(): Unit = { + groups += currentGroup + currentGroup = new PartitionGroup() + currentSum = 0 + } + + def addPartition(partition: Partition, splitSize: Long): Unit = { + currentGroup.partitions += partition + currentSum += splitSize + totalSum += splitSize + } + + while (index < partitions.length) { + val partition = partitions(index) + val partitionSize = partitionIndexToSize(index) + if (currentSum + partitionSize < maxSize) { + addPartition(partition, partitionSize) + } else { + if (currentGroup.partitions.isEmpty) { + addPartition(partition, partitionSize) + } else { + updateGroups() + addPartition(partition, partitionSize) + } + } + index += 1 + if (index == partitions.length) { + updateGroups() + } + } + groups.toArray + } +} +