Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
Expand Down Expand Up @@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}

override val outputPartitioning: Partitioning = {
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.flatMap { n =>
output.find(_.name == n)
}
if (bucketColumns.size == spec.bucketColumnNames.size) {
HashPartitioning(bucketColumns, numBuckets)
} else {
UnknownPartitioning(0)
}
}.getOrElse {
UnknownPartitioning(0)
bucketSpec match {
case Some(spec) =>
// For bucketed columns:
// -----------------------
// `HashPartitioning` would be used only when:
// 1. ALL the bucketing columns are being read from the table
//
// For sorted columns:
// ---------------------
// Sort ordering should be used when ALL these criteria's match:
// 1. `HashPartitioning` is being used
// 2. A prefix (or all) of the sort columns are being read from the table.
//
// Sort ordering would be over the prefix subset of `sort columns` being read
// from the table.
// eg.
// Assume (col0, col2, col3) are the columns read from the table
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
// above

def toAttribute(colName: String): Option[Attribute] =
output.find(_.name == colName)

val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
if (bucketColumns.size == spec.bucketColumnNames.size) {
val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)

val sortOrder = if (sortColumns.nonEmpty) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it

val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listing files and grouping by bucket id can be expensive, if there are a lot of files. What's worse, we will do it again in createBucketedReadRDD.

Instead of doing this, I'd like to fix the sorting problem for bucketed table first, then we don't need to scan file names to get the outputOrdering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sorting problem, one way to fix would be to do what Hive does : create single file per bucket. For any other approach, since there would be multiple files per bucket, one would have to globally sort them while reading it. This would in a way be sub-optimal because tables tend to be "write-once, read many" and spending more CPU once for write path to generate single file would be better.

When I came across this, I wondered why it was designed this way. I even posted about this to dev group earlier today : http://apache-spark-developers-list.1001551.n3.nabble.com/Questions-about-bucketing-in-Spark-td18814.html

To give you some context, I am trying to drive adoption for Spark within Facebook. We have lot of tables which would benefit from having full bucketing support. So my high level goal is to get Spark's bucketing in par with Hive's in terms of features and compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's a good question, single file per bucket looks more reasonable, it's more important to read bucketed table fast than writing it fast. But how about data insertion? Does hive support inserting into bucketed table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan : Open source Hive allows INSERTing data into bucketed table but it breaks the guarantee about one file per bucket. We could do better in two ways:

  1. Disallow operations which would break bucketing guarantee OR
  2. Always preserve bucketing across all operations which would mean rewriting of the entire table at times (eg INSERT INTO) and more complication in the code.

I think the later is a better model for longer term. But we could start with first one and work over it iteratively.


if (singleFilePartitions) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else {
Nil
}
} else {
Nil
}
(partitioning, sortOrder)
} else {
(UnknownPartitioning(0), Nil)
}
case _ =>
(UnknownPartitioning(0), Nil)
}
}

Expand All @@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))

private lazy val inputRDD: RDD[InternalRow] = {
val selectedPartitions = relation.location.listFiles(partitionFilters)

val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down
46 changes: 46 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.language.existentials

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -61,6 +62,51 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-15453 : Sort Merge join on bucketed + sorted tables should not add `sort` step " +
"if the join predicates are subset of the sorted columns of the tables") {
withTable("SPARK_15453_table_a", "SPARK_15453_table_b") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
val df =
(0 until 8)
.map(i => (i, i * 2, i.toString))
.toDF("i", "j", "k")
.coalesce(1)
df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we bucket the table by i, j and sort it by j, k? To reflect the test name if the join predicates are subset of the sorted columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SMB to happen, bucketing columns == sort columns == join keys. My naming for the test case was wrong. I have deleted this test as I found a better place to add this test.

df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_b")

val query = """
|SELECT *
|FROM
| SPARK_15453_table_a a
|JOIN
| SPARK_15453_table_b b
|ON a.j=b.j AND
| a.k=b.k
""".stripMargin
val joinDF = sql(query)

val executedPlan = joinDF.queryExecution.executedPlan
val operators = executedPlan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to collect twice here. First collect is used to prove we do use sort merge join. Second collect is used to prove we don't sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L93 and L94 were ensuring that there is only one SortMergeJoinExec and no sort operations. Anyways, I have deleted this test.

case j: SortMergeJoinExec => j
case j: SortExec => j
}
assert(operators.size === 1)
assert(operators.head.getClass == classOf[SortMergeJoinExec])

checkAnswer(joinDF,
Row(0, 0, "0", 0, 0, "0") ::
Row(1, 2, "1", 1, 2, "1") ::
Row(2, 4, "2", 2, 4, "2") ::
Row(3, 6, "3", 3, 6, "3") ::
Row(4, 8, "4", 4, 8, "4") ::
Row(5, 10, "5", 5, 10, "5") ::
Row(6, 12, "6", 6, 12, "6") ::
Row(7, 14, "7", 7, 14, "7") :: Nil)
}
}
}


test("join operator selection") {
spark.sharedState.cacheManager.clearCache()

Expand Down