Skip to content

[SPARK-32986][SQL] Add bucketed scan info in query plan of data source v1#33698

Closed
c21 wants to merge 2 commits intoapache:masterfrom
c21:scan-v1
Closed

[SPARK-32986][SQL] Add bucketed scan info in query plan of data source v1#33698
c21 wants to merge 2 commits intoapache:masterfrom
c21:scan-v1

Conversation

@c21
Copy link
Contributor

@c21 c21 commented Aug 11, 2021

What changes were proposed in this pull request?

As a followup from discussion in #29804 (comment) , currently the query plan for data source v1 scan operator - FileSourceScanExec has no information to indicate whether the table is read as bucketed table or not. And if table not read as bucketed table, what's the reason behind it. Add this info into FileSourceScanExec physical query plan output, can help users and developers understand query plan more easily without spending a lot of time debugging why table is not read as bucketed table.

Why are the changes needed?

Help users and developers debug query plan for bucketed table.

Does this PR introduce any user-facing change?

The added Bucketed information in physical query plan when reading bucketed table.
Note for reading non-bucketed table, the query plan stays same and nothing is changed.

Example:

Seq((1, 2), (2, 3)).toDF("i", "j").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.bucketBy(8, "i").saveAsTable("t2")
val df1 = spark.table("t1")
val df2 = spark.table("t2")
df1.join(df2, df1("i") === df2("i"))
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#20], [i#24], Inner
   :- Sort [i#20 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#20)
   :     +- FileScan parquet default.t1[i#20,j#21] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#20)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
   +- Sort [i#24 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(i#24)
         +- FileScan parquet default.t2[i#24] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#24)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8

How was this patch tested?

Added unit test in ExplainSuite.scala.

@github-actions github-actions bot added the SQL label Aug 11, 2021
@c21
Copy link
Contributor Author

c21 commented Aug 11, 2021

@cloud-fan and @maropu could you help take a look when you have time? Thanks!

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46813/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46813/

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me.

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
checkKeywordsExistsInExplain(
df1.join(df2, df1("i") === df2("i")),
"Bucketed: true" :: Nil: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Bucketed: true" :: Nil: _* -> "Bucketed: true"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, updated for this and other places.

withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
checkKeywordsExistsInExplain(
df1.join(df2, df1("i") === df2("i")),
"Bucketed: false (disabled by configuration)" :: Nil: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"Bucketed: false (disabled by configuration)" :: Nil: _*)
}

checkKeywordsExistsInExplain(df1, "Bucketed: false (disabled by query planner)" :: Nil: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


checkKeywordsExistsInExplain(
df1.select("j"),
"Bucketed: false (bucket column(s) not read)" :: Nil: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Test build #142306 has finished for PR 33698 at commit b087cd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46856/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46856/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Test build #142348 has finished for PR 33698 at commit 2d42d37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 3.3.0 according to the issue type, Improvement.

@dongjoon-hyun
Copy link
Member

Thank you, @c21 and @cloud-fan .

@c21
Copy link
Contributor Author

c21 commented Aug 12, 2021

Thank you @dongjoon-hyun and @cloud-fan for review!

@c21 c21 deleted the scan-v1 branch August 12, 2021 19:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants