Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val MERGE_SMALL_OUTPUT_FILE = buildConf("spark.sql.files.mergeSmallFile.enabled")
.doc("When true, we will merge final stage's partition one by one according to value" +
"set by spark.sql.files.mergeSmallFile.maxBytes")
.booleanConf
.createWithDefault(false)

val MERGE_SMALL_OUTPUT_FILE_MAX_BYTES = buildConf("spark.sql.files.mergeSmallFile.maxBytes")
.doc("Max size when we combine neighbouring small partitions")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(128 * 1024 * 1024) // parquet.block.size

val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files. " +
"This configuration is effective only when using file-based sources such as Parquet, JSON " +
Expand Down Expand Up @@ -2222,6 +2233,10 @@ class SQLConf extends Serializable with Logging {

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

def mergeSmallOutputFiles: Boolean = getConf(MERGE_SMALL_OUTPUT_FILE)

def mergeSmallOutputFilesMaxSize: Long = getConf(MERGE_SMALL_OUTPUT_FILE_MAX_BYTES)

def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)

def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, GenerateOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.execution.metric.SQLMetrics
Expand Down Expand Up @@ -203,3 +203,42 @@ case class SortExec(
super.cleanupResources()
}
}

object SortExec {

def createSorter(plan: SparkPlan,
enableRadixSort: Boolean,
sortOrder: Seq[SortOrder]): UnsafeExternalRowSorter = {
val ordering = GenerateOrdering.generate(sortOrder, plan.output)

// The comparator for comparing prefix
val boundSortExpression = BindReferences.bindReference(sortOrder.head, plan.output)
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)

val canUseRadixSort =
enableRadixSort &&
sortOrder.length == 1 &&
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)

// The generator for prefix
val prefixExpr = SortPrefix(boundSortExpression)
val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix

override def computePrefix(row: InternalRow):
UnsafeExternalRowSorter.PrefixComputer.Prefix = {
val prefix = prefixProjection.apply(row)
result.isNull = prefix.isNullAt(0)
result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
result
}
}

val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
val sorter = UnsafeExternalRowSorter.create(
plan.schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)

sorter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,51 @@ object FileFormatWriter extends Logging {
committer.setupJob(job)

try {
val rdd = if (orderingMatched) {
empty2NullPlan.execute()
val rdd = if (sparkSession.sessionState.conf.mergeSmallOutputFiles) {
val originRdd = empty2NullPlan.execute()
val maxSize = sparkSession.sessionState.conf.mergeSmallOutputFilesMaxSize
val coalescer = new SizeBasedCoalescer(maxSize, plan.output)
val shufflePartitionNum = if (originRdd.partitions.length > 0) {
originRdd.partitions.length
} else {
1
}
val coalescedRdd =
originRdd.coalesce(shufflePartitionNum, shuffle = true, Some(coalescer))
if (orderingMatched) {
coalescedRdd
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = requiredOrdering
.map(SortOrder(_, Ascending))
.map(BindReferences.bindReference(_, outputSpec.outputColumns))
val enableRadixSort = plan.sqlContext.conf.enableRadixSort
coalescedRdd.mapPartitionsInternal { iter =>
val sorter = SortExec.createSorter(plan, enableRadixSort, orderingExpr)
val metrics = TaskContext.get().taskMetrics()
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
sortedIterator
}
}
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns)
SortExec(
orderingExpr,
global = false,
child = empty2NullPlan).execute()
if (orderingMatched) {
empty2NullPlan.execute()
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns)
SortExec(
orderingExpr,
global = false,
child = empty2NullPlan).execute()
}
}

// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
import org.apache.spark.rdd._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils

class SizeBasedCoalescer(val maxSize: Long, val schema: Seq[Attribute])
extends PartitionCoalescer with Serializable {
override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val rowSize = EstimationUtils.getSizePerRow(schema).toLong
val partitionIndexToSize = parent.mapPartitionsWithIndexInternal((index, part) => {
// TODO make it more accurate
Map(index -> rowSize * part.size).iterator
}).collectAsMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too costly to trigger an action to compute all the RDDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too costly to trigger an action to compute all the RDDs?

this is after compute rdds and coalese computed rdd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too costly to trigger an action to compute all the RDDs?

Sorry for I have make some mistake, it will re-compute last stage. but won't recompute all stage


val partitions = parent.partitions

val groups = ArrayBuffer[PartitionGroup]()
var currentGroup = new PartitionGroup()
var currentSum = 0L
var totalSum = 0L
var index = 0

def updateGroups(): Unit = {
groups += currentGroup
currentGroup = new PartitionGroup()
currentSum = 0
}

def addPartition(partition: Partition, splitSize: Long): Unit = {
currentGroup.partitions += partition
currentSum += splitSize
totalSum += splitSize
}

while (index < partitions.length) {
val partition = partitions(index)
val partitionSize = partitionIndexToSize(index)
if (currentSum + partitionSize < maxSize) {
addPartition(partition, partitionSize)
} else {
if (currentGroup.partitions.isEmpty) {
addPartition(partition, partitionSize)
} else {
updateGroups()
addPartition(partition, partitionSize)
}
}
index += 1
if (index == partitions.length) {
updateGroups()
}
}
groups.toArray
}
}