-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15453] [SQL] FileSourceScanExec to extract outputOrdering information
#14864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
07196a8
568b742
445549b
7db6c10
070c249
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils | |
| import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.{Row, SparkSession, SQLContext} | ||
| import org.apache.spark.sql.{AnalysisException, SparkSession} | ||
| import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} | ||
|
|
@@ -156,24 +155,57 @@ case class FileSourceScanExec( | |
| false | ||
| } | ||
|
|
||
| override val outputPartitioning: Partitioning = { | ||
| @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters) | ||
|
|
||
| override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { | ||
| val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { | ||
| relation.bucketSpec | ||
| } else { | ||
| None | ||
| } | ||
| bucketSpec.map { spec => | ||
| val numBuckets = spec.numBuckets | ||
| val bucketColumns = spec.bucketColumnNames.flatMap { n => | ||
| output.find(_.name == n) | ||
| } | ||
| if (bucketColumns.size == spec.bucketColumnNames.size) { | ||
| HashPartitioning(bucketColumns, numBuckets) | ||
| } else { | ||
| UnknownPartitioning(0) | ||
| } | ||
| }.getOrElse { | ||
| UnknownPartitioning(0) | ||
| bucketSpec match { | ||
| case Some(spec) => | ||
| val numBuckets = spec.numBuckets | ||
|
|
||
| def toAttribute(colName: String, columnType: String): Attribute = | ||
| output.find(_.name == colName).getOrElse { | ||
| throw new AnalysisException(s"Could not find $columnType column $colName for " + | ||
| s"relation ${metastoreTableIdentifier.get.toString} in its existing " + | ||
| s"columns : (${output.map(_.name).mkString(", ")})") | ||
| } | ||
|
|
||
| val bucketColumns = spec.bucketColumnNames.flatMap(n => Some(toAttribute(n, "bucketing"))) | ||
| if (bucketColumns.size == spec.bucketColumnNames.size) { | ||
| val partitioning = HashPartitioning(bucketColumns, numBuckets) | ||
|
|
||
| val sortOrder = if (spec.sortColumnNames.nonEmpty) { | ||
| // In case of bucketing, its possible to have multiple files belonging to the | ||
| // same bucket in a given relation. Each of these files are locally sorted | ||
| // but those files combined together are not globally sorted. Given that, | ||
| // the RDD partition will not be sorted even if the relation has sort columns set | ||
| // Current solution is to check if all the buckets have a single file in it | ||
|
|
||
| val files = selectedPartitions.flatMap(partition => partition.files) | ||
| val bucketToFilesGrouping = | ||
| files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) | ||
| val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listing files and grouping by bucket id can be expensive, if there are a lot of files. What's worse, we will do it again in Instead of doing this, I'd like to fix the sorting problem for bucketed table first, then we don't need to scan file names to get the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the sorting problem, one way to fix would be to do what Hive does : create single file per bucket. For any other approach, since there would be multiple files per bucket, one would have to globally sort them while reading it. This would in a way be sub-optimal because tables tend to be "write-once, read many" and spending more CPU once for write path to generate single file would be better. When I came across this, I wondered why it was designed this way. I even posted about this to dev group earlier today : http://apache-spark-developers-list.1001551.n3.nabble.com/Questions-about-bucketing-in-Spark-td18814.html To give you some context, I am trying to drive adoption for Spark within Facebook. We have lot of tables which would benefit from having full bucketing support. So my high level goal is to get Spark's bucketing in par with Hive's in terms of features and compatibility.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea that's a good question, single file per bucket looks more reasonable, it's more important to read bucketed table fast than writing it fast. But how about data insertion? Does hive support inserting into bucketed table?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan : Open source Hive allows INSERTing data into bucketed table but it breaks the guarantee about one file per bucket. We could do better in two ways:
I think the later is a better model for longer term. But we could start with first one and work over it iteratively. |
||
|
|
||
| if (singleFilePartitions) { | ||
| // TODO Currently Spark does not support writing columns sorting in descending order | ||
| // so using Ascending order. This can be fixed in future | ||
| spec.sortColumnNames.map(c => SortOrder(toAttribute(c, "sort"), Ascending)) | ||
| } else { | ||
| Nil | ||
| } | ||
| } else { | ||
| Nil | ||
| } | ||
| (partitioning, sortOrder) | ||
| } else { | ||
| (UnknownPartitioning(0), Nil) | ||
| } | ||
| case _ => | ||
| (UnknownPartitioning(0), Nil) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -187,8 +219,6 @@ case class FileSourceScanExec( | |
| "InputPaths" -> relation.location.paths.mkString(", ")) | ||
|
|
||
| private lazy val inputRDD: RDD[InternalRow] = { | ||
| val selectedPartitions = relation.location.listFiles(partitionFilters) | ||
|
|
||
| val readFile: (PartitionedFile) => Iterator[InternalRow] = | ||
| relation.fileFormat.buildReaderWithPartitionValues( | ||
| sparkSession = relation.sparkSession, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import scala.language.existentials | |
|
|
||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.execution.SortExec | ||
| import org.apache.spark.sql.execution.joins._ | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
@@ -61,6 +62,51 @@ class JoinSuite extends QueryTest with SharedSQLContext { | |
| } | ||
| } | ||
|
|
||
| test("SPARK-15453 : Sort Merge join on bucketed + sorted tables should not add `sort` step " + | ||
| "if the join predicates are subset of the sorted columns of the tables") { | ||
| withTable("SPARK_15453_table_a", "SPARK_15453_table_b") { | ||
| withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { | ||
| val df = | ||
| (0 until 8) | ||
| .map(i => (i, i * 2, i.toString)) | ||
| .toDF("i", "j", "k") | ||
| .coalesce(1) | ||
| df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_a") | ||
|
||
| df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_b") | ||
|
|
||
| val query = """ | ||
| |SELECT * | ||
| |FROM | ||
| | SPARK_15453_table_a a | ||
| |JOIN | ||
| | SPARK_15453_table_b b | ||
| |ON a.j=b.j AND | ||
| | a.k=b.k | ||
| """.stripMargin | ||
| val joinDF = sql(query) | ||
|
|
||
| val executedPlan = joinDF.queryExecution.executedPlan | ||
| val operators = executedPlan.collect { | ||
|
||
| case j: SortMergeJoinExec => j | ||
| case j: SortExec => j | ||
| } | ||
| assert(operators.size === 1) | ||
| assert(operators.head.getClass == classOf[SortMergeJoinExec]) | ||
|
|
||
| checkAnswer(joinDF, | ||
| Row(0, 0, "0", 0, 0, "0") :: | ||
| Row(1, 2, "1", 1, 2, "1") :: | ||
| Row(2, 4, "2", 2, 4, "2") :: | ||
| Row(3, 6, "3", 3, 6, "3") :: | ||
| Row(4, 8, "4", 4, 8, "4") :: | ||
| Row(5, 10, "5", 5, 10, "5") :: | ||
| Row(6, 12, "6", 6, 12, "6") :: | ||
| Row(7, 14, "7", 7, 14, "7") :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| test("join operator selection") { | ||
| spark.sharedState.cacheManager.clearCache() | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that, if a table has 3 columns:
i,j,k, and is bucketed byiandj, sorted byjandk. Now we wanna readiandjfrom this table, then the generated RDD should be bucketed, i.e. the number of partitions of this RDD should be equal to the number of buckets. For each RDD partition, can we treat it as sorted byj?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you meant earlier. I have made changes to the PR to follow this:
For bucketed columns:
HashPartitioningwould be used only when:For sorted columns:
Sort ordering should be used when ALL these criteria's match:
HashPartitioningis being usedSort ordering would be over the prefix subset of
sort columnsbeing readfrom the table. eg.
Assume (col0, col2, col3) are the columns read from the table