Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
Expand Down Expand Up @@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}

override val outputPartitioning: Partitioning = {
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.flatMap { n =>
output.find(_.name == n)
}
if (bucketColumns.size == spec.bucketColumnNames.size) {
HashPartitioning(bucketColumns, numBuckets)
} else {
UnknownPartitioning(0)
}
}.getOrElse {
UnknownPartitioning(0)
bucketSpec match {
case Some(spec) =>
// For bucketed columns:
// -----------------------
// `HashPartitioning` would be used only when:
// 1. ALL the bucketing columns are being read from the table
//
// For sorted columns:
// ---------------------
// Sort ordering should be used when ALL these criteria's match:
// 1. `HashPartitioning` is being used
// 2. A prefix (or all) of the sort columns are being read from the table.
//
// Sort ordering would be over the prefix subset of `sort columns` being read
// from the table.
// eg.
// Assume (col0, col2, col3) are the columns read from the table
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
// above

def toAttribute(colName: String): Option[Attribute] =
output.find(_.name == colName)

val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
if (bucketColumns.size == spec.bucketColumnNames.size) {
val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)

val sortOrder = if (sortColumns.nonEmpty) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it

val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listing files and grouping by bucket id can be expensive, if there are a lot of files. What's worse, we will do it again in createBucketedReadRDD.

Instead of doing this, I'd like to fix the sorting problem for bucketed table first, then we don't need to scan file names to get the outputOrdering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sorting problem, one way to fix would be to do what Hive does : create single file per bucket. For any other approach, since there would be multiple files per bucket, one would have to globally sort them while reading it. This would in a way be sub-optimal because tables tend to be "write-once, read many" and spending more CPU once for write path to generate single file would be better.

When I came across this, I wondered why it was designed this way. I even posted about this to dev group earlier today : http://apache-spark-developers-list.1001551.n3.nabble.com/Questions-about-bucketing-in-Spark-td18814.html

To give you some context, I am trying to drive adoption for Spark within Facebook. We have lot of tables which would benefit from having full bucketing support. So my high level goal is to get Spark's bucketing in par with Hive's in terms of features and compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's a good question, single file per bucket looks more reasonable, it's more important to read bucketed table fast than writing it fast. But how about data insertion? Does hive support inserting into bucketed table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan : Open source Hive allows INSERTing data into bucketed table but it breaks the guarantee about one file per bucket. We could do better in two ways:

  1. Disallow operations which would break bucketing guarantee OR
  2. Always preserve bucketing across all operations which would mean rewriting of the entire table at times (eg INSERT INTO) and more complication in the code.

I think the later is a better model for longer term. But we could start with first one and work over it iteratively.


if (singleFilePartitions) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else {
Nil
}
} else {
Nil
}
(partitioning, sortOrder)
} else {
(UnknownPartitioning(0), Nil)
}
case _ =>
(UnknownPartitioning(0), Nil)
}
}

Expand All @@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))

private lazy val inputRDD: RDD[InternalRow] = {
val selectedPartitions = relation.location.listFiles(partitionFilters)

val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand Down Expand Up @@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketSpecRight: Option[BucketSpec],
joinColumns: Seq[String],
shuffleLeft: Boolean,
shuffleRight: Boolean): Unit = {
shuffleRight: Boolean,
sortLeft: Boolean = true,
sortRight: Boolean = true): Unit = {
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
Expand All @@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
spec.numBuckets,
spec.bucketColumnNames.head,
spec.bucketColumnNames.tail: _*)

if (spec.sortColumnNames.nonEmpty) {
writer.sortBy(
spec.sortColumnNames.head,
spec.sortColumnNames.tail: _*
)
} else {
writer
}
}.getOrElse(writer)
}

Expand All @@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]

// check existence of shuffle
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")

// check existence of sort
assert(
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
}
}
}
Expand Down Expand Up @@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}

test("avoid shuffle and sort when bucket and sort columns are join keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
testBucketing(
bucketSpec, bucketSpec, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = false
)
}

test("avoid shuffle and sort when sort columns are a super set of join keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = false
)
}

test("only sort one side when sort columns are different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = true
)
}

test("only sort one side when sort columns are same but their ordering is different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = true
)
}

test("avoid shuffle when grouping keys are equal to bucket keys") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")
Expand Down