Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean

/**
* Sort merge join would sort the two side of join first, and then iterate both sides together
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair

object Exchange {
/**
* Returns true when the ordering expressions are a subset of the key.
* if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]].
*/
def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = {
desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
}
}

/**
* :: DeveloperApi ::
* Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each
Expand Down Expand Up @@ -143,7 +133,6 @@ case class Exchange(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean,
numPartitions: Int): Serializer = {
// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
Expand All @@ -159,7 +148,7 @@ case class Exchange(

val serializer = if (useSqlSerializer2) {
logInfo("Using SparkSqlSerializer2.")
new SparkSqlSerializer2(keySchema, valueSchema, hasKeyOrdering)
new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Using SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
Expand All @@ -173,7 +162,7 @@ case class Exchange(
case HashPartitioning(expressions, numPartitions) =>
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
val serializer = getSerializer(keySchema, valueSchema, numPartitions)
val part = new HashPartitioner(numPartitions)

val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
Expand All @@ -189,15 +178,12 @@ case class Exchange(
}
}
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
if (newOrdering.nonEmpty) {
shuffled.setKeyOrdering(keyOrdering)
}
shuffled.setSerializer(serializer)
shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
val keySchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
val serializer = getSerializer(keySchema, null, numPartitions)

val childRdd = child.execute()
val part: Partitioner = {
Expand All @@ -222,15 +208,12 @@ case class Exchange(
}

val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
if (newOrdering.nonEmpty) {
shuffled.setKeyOrdering(keyOrdering)
}
shuffled.setSerializer(serializer)
shuffled.map(_._1)

case SinglePartition =>
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
val serializer = getSerializer(null, valueSchema, numPartitions = 1)
val partitioner = new HashPartitioner(1)

val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
Expand Down Expand Up @@ -306,29 +289,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
child: SparkPlan): SparkPlan = {
val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
val needsShuffle = child.outputPartitioning != partitioning
val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering)

if (needSort && needsShuffle && canSortWithShuffle) {
Exchange(partitioning, rowOrdering, child)
val withShuffle = if (needsShuffle) {
Exchange(partitioning, Nil, child)
} else {
val withShuffle = if (needsShuffle) {
Exchange(partitioning, Nil, child)
} else {
child
}
child
}

val withSort = if (needSort) {
if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, withShuffle)
} else {
Sort(rowOrdering, global = false, withShuffle)
}
val withSort = if (needSort) {
if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, withShuffle)
} else {
withShuffle
Sort(rowOrdering, global = false, withShuffle)
}

withSort
} else {
withShuffle
}

withSort
}

if (meetsRequirements && compatible && !needsAnySort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ private[sql] class Serializer2SerializationStream(
private[sql] class Serializer2DeserializationStream(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean,
in: InputStream)
extends DeserializationStream with Logging {

Expand All @@ -96,14 +95,9 @@ private[sql] class Serializer2DeserializationStream(
if (schema == null) {
() => null
} else {
if (hasKeyOrdering) {
// We have key ordering specified in a ShuffledRDD, it is not safe to reuse a mutable row.
() => new GenericMutableRow(schema.length)
} else {
// It is safe to reuse the mutable row.
val mutableRow = new SpecificMutableRow(schema)
() => mutableRow
}
// It is safe to reuse the mutable row.
val mutableRow = new SpecificMutableRow(schema)
() => mutableRow
}
}

Expand Down Expand Up @@ -133,8 +127,7 @@ private[sql] class Serializer2DeserializationStream(

private[sql] class SparkSqlSerializer2Instance(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean)
valueSchema: Array[DataType])
extends SerializerInstance {

def serialize[T: ClassTag](t: T): ByteBuffer =
Expand All @@ -151,7 +144,7 @@ private[sql] class SparkSqlSerializer2Instance(
}

def deserializeStream(s: InputStream): DeserializationStream = {
new Serializer2DeserializationStream(keySchema, valueSchema, hasKeyOrdering, s)
new Serializer2DeserializationStream(keySchema, valueSchema, s)
}
}

Expand All @@ -164,14 +157,13 @@ private[sql] class SparkSqlSerializer2Instance(
*/
private[sql] class SparkSqlSerializer2(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean)
valueSchema: Array[DataType])
extends Serializer
with Logging
with Serializable{

def newInstance(): SerializerInstance =
new SparkSqlSerializer2Instance(keySchema, valueSchema, hasKeyOrdering)
new SparkSqlSerializer2Instance(keySchema, valueSchema)

override def supportsRelocationOfSerializedObjects: Boolean = {
// SparkSqlSerializer2 is stateless and writes no stream headers
Expand Down