Skip to content

Conversation

@huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Oct 18, 2021

What changes were proposed in this pull request?

Push down Sample to data source for better performance. If Sample is pushed down, it will be removed from logical plan so it will not be applied at Spark any more.

Current Plan without Sample push down:

== Parsed Logical Plan ==
'Project [*]
+- 'Sample 0.0, 0.8, false, 157
   +- 'UnresolvedRelation [postgresql, new_table], [], false

== Analyzed Logical Plan ==
col1: int, col2: int
Project [col1#163, col2#164]
+- Sample 0.0, 0.8, false, 157
   +- SubqueryAlias postgresql.new_table
      +- RelationV2[col1#163, col2#164] new_table

== Optimized Logical Plan ==
Sample 0.0, 0.8, false, 157
+- RelationV2[col1#163, col2#164] new_table

== Physical Plan ==
*(1) Sample 0.0, 0.8, false, 157
+- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@6dde4769 [col1#163,col2#164] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], PushedLimit: [], PushedSample: TABLESAMPLE  0.0 0.8 false 157, ReadSchema: struct<col1:int,col2:int>

after Sample push down:

== Parsed Logical Plan ==
'Project [*]
+- 'Sample 0.0, 0.8, false, 187
   +- 'UnresolvedRelation [postgresql, new_table], [], false

== Analyzed Logical Plan ==
col1: int, col2: int
Project [col1#163, col2#164]
+- Sample 0.0, 0.8, false, 187
   +- SubqueryAlias postgresql.new_table
      +- RelationV2[col1#163, col2#164] new_table

== Optimized Logical Plan ==
RelationV2[col1#163, col2#164] new_table

== Physical Plan ==
*(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@65b57543 [col1#163,col2#164] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], PushedLimit: [], PushedSample: TABLESAMPLE  0.0 0.8 false 187, ReadSchema: struct<col1:int,col2:int>

The new interface is implemented using JDBC for POC and end to end test. TABLESAMPLE is not supported by all the databases. TABLESAMPLE has been implemented using postgresql in this PR.

Why are the changes needed?

Reduce IO and improve performance.
For SAMPLE, e.g. SELECT * FROM t TABLESAMPLE (1 PERCENT), Spark retrieves all the data from table and then return 1% rows. It will dramatically reduce the transferred data size and improve performance if we can push Sample to data source side.

Does this PR introduce any user-facing change?

Yes. new interface SupportsPushDownTableSample

How was this patch tested?

New test

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144354 has finished for PR 34311 at commit aaca7fb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48832/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48833/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48832/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48833/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144356 has finished for PR 34311 at commit eb176f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not for ScanBuilder like SupportsPushDownFilters and others? Seems an inconsistent API design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made both SupportsPushDownLimit and SupportsPushDownTableSample extend scan. Because at the time of pushing down Limit or Sample, we have already created Scan. The child inside Limit or Sample is a DataSourceV2ScanRelation.

@huaxingao
Copy link
Contributor Author

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48906/

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48907/

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48907/

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48906/

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Test build #144433 has finished for PR 34311 at commit 0d9158a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 20, 2021

Test build #144434 has finished for PR 34311 at commit 09e4e9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this groupBy thing mixed in but not related to sample push down ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It's not related to sample push down.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not grasping all context, so this work requires the underlying DS support sample expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It requires the underlying support. Sample is not part of the ANSI SQL standard so not all data sources support it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the treatment for csv parquet format, will it make difference when sampling is pushed down to scan ? would that be supported ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work if you make parquet scan or csv scan implement the interface SupportsPushDownTableSample, but I am not sure how parquet or csv handles sample.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49146/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49148/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49146/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144679 has finished for PR 34311 at commit 1ee1105.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144677 has finished for PR 34311 at commit bd947bb.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49148/

Copy link
Contributor

@dohongdayi dohongdayi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finishing my review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would Option[TableSample] be better type for sample might not be provided ?

sql(s"INSERT INTO TABLE $catalogName.new_table values (15, 16)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (17, 18)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (19, 20)")
if (supportsTableSample) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If supportsTableSample was false, it would be no need to create testing table or insert testing data at all

Set.empty,
Set.empty,
None,
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should be None

filters: Set[Filter],
handledFilters: Set[Filter],
aggregation: Option[Aggregation],
sample: Option[TableSample],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many pushdown related parameters, would it be better if they were wrapped by some parent case class?


override def getTableSample(sample: Option[TableSample]): String = {
if (sample.nonEmpty) {
val method = if (sample.get.methodName.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If many of the dialects have default sample methods, would Option[String] be better type for TableSample.methodName?

withReplacement: Boolean,
seed: Long) extends TableSample {

override def describe(): String = s"$methodName $lowerBound $lowerBound $upperBound" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two lowerBounds ?

new AnalysisException(message, cause = Some(e))
}

def supportsTableSample: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would supportsTableSample() need parameter methodName: Option[String], for the dialect might not support the specified sample method or not support any sample method at all ?

case sample @ Sample(_, _, _, _, child) => child match {
case ScanOperation(_, _, sHolder: ScanBuilderHolder) =>
val tableSample = LogicalExpressions.tableSample(
"",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any other possible value of TableSample.methodName beside "" here, so I'm not sure thatTableSample.methodName was important ?


sample
: TABLESAMPLE '(' sampleMethod? ')'
: TABLESAMPLE '(' sampleMethod? ')' (REPEATABLE '('seed=INTEGER_VALUE')')?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a separate PR for this SQL syntax change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitted #34442 for syntax change

@huaxingao
Copy link
Contributor Author

It's too much work to rebase. I have submitted a new PR #34451 and will close this one.
@dohongdayi I have addressed your comments in the new PR. Thanks for reviewing!

@huaxingao huaxingao closed this Oct 31, 2021
@huaxingao huaxingao deleted the pushdownSample branch October 31, 2021 16:04
@dohongdayi
Copy link
Contributor

It's too much work to rebase. I have submitted a new PR #34451 and will close this one. @dohongdayi I have addressed your comments in the new PR. Thanks for reviewing!

NP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants