Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/sql-data-sources-jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ logging into the data sources.
<td>read</td>
</tr>

<tr>
<td><code>pushDownTableSample</code></td>
<td><code>false</code></td>
<td>
The option to enable or disable TABLESAMPLE push-down into the JDBC data source. The default value is false, in which case Spark does not push down TABLESAMPLE to the JDBC data source. Otherwise, if value sets to true, TABLESAMPLE is pushed down to the JDBC data source.
</td>
<td>read</td>
</tr>
<tr>
<td><code>keytab</code></td>
<td>(none)</td>
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ Below is a list of all the keywords in Spark SQL.
|REGEXP|non-reserved|non-reserved|not a keyword|
|RENAME|non-reserved|non-reserved|non-reserved|
|REPAIR|non-reserved|non-reserved|non-reserved|
|REPEATABLE|non-reserved|non-reserved|non-reserved|
|REPLACE|non-reserved|non-reserved|non-reserved|
|RESET|non-reserved|non-reserved|non-reserved|
|RESPECT|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.postgresql", classOf[JDBCTableCatalog].getName)
.set("spark.sql.catalog.postgresql.url", db.getJdbcUrl(dockerIp, externalPort))
.set("spark.sql.catalog.postgresql.pushDownTableSample", "true")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not grasping all context, so this work requires the underlying DS support sample expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It requires the underlying support. Sample is not part of the ANSI SQL standard so not all data sources support it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the treatment for csv parquet format, will it make difference when sampling is pushed down to scan ? would that be supported ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work if you make parquet scan or csv scan implement the interface SupportsPushDownTableSample, but I am not sure how parquet or csv handles sample.


override def dataPreparation(conn: Connection): Unit = {}

override def testUpdateColumnType(tbl: String): Unit = {
Expand All @@ -75,4 +77,6 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}

override def supportsTableSample: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.log4j.Level

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.Sample
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
Expand Down Expand Up @@ -284,4 +285,32 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
testIndexUsingSQL(s"$catalogName.new_table")
}
}

def supportsTableSample: Boolean = false

test("Test TABLESAMPLE") {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table (col1 INT, col2 INT)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (1, 2)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (3, 4)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (5, 6)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (7, 8)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (9, 10)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (11, 12)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (13, 14)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (15, 16)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (17, 18)")
sql(s"INSERT INTO TABLE $catalogName.new_table values (19, 20)")
if (supportsTableSample) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If supportsTableSample was false, it would be no need to create testing table or insert testing data at all

val df = sql(s"SELECT * FROM $catalogName.new_table TABLESAMPLE (BUCKET 6 OUT OF 10)" +
s" REPEATABLE (12345)")
df.explain(true)
val sample = df.queryExecution.optimizedPlan.collect {
case s: Sample => s
}
assert(sample.isEmpty)
assert(df.collect().length <= 7)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ joinCriteria
;

sample
: TABLESAMPLE '(' sampleMethod? ')'
: TABLESAMPLE '(' sampleMethod? ')' (REPEATABLE '('seed=INTEGER_VALUE')')?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a separate PR for this SQL syntax change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitted #34442 for syntax change

;

sampleMethod
Expand Down Expand Up @@ -1194,6 +1194,7 @@ ansiNonReserved
| REFRESH
| RENAME
| REPAIR
| REPEATABLE
| REPLACE
| RESET
| RESPECT
Expand Down Expand Up @@ -1460,6 +1461,7 @@ nonReserved
| REFRESH
| RENAME
| REPAIR
| REPEATABLE
| REPLACE
| RESET
| RESPECT
Expand Down Expand Up @@ -1726,6 +1728,7 @@ REFERENCES: 'REFERENCES';
REFRESH: 'REFRESH';
RENAME: 'RENAME';
REPAIR: 'REPAIR';
REPEATABLE: 'REPEATABLE';
REPLACE: 'REPLACE';
RESET: 'RESET';
RESPECT: 'RESPECT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,26 @@ public static SortOrder sort(Expression expr, SortDirection direction, NullOrder
public static SortOrder sort(Expression expr, SortDirection direction) {
return LogicalExpressions.sort(expr, direction, direction.defaultNullOrdering());
}

/**
* Create a tableSample expression.
*
* @param methodName the sample method name
* @param lowerBound the lower-bound of the sampling probability (usually 0.0)
* @param upperBound the upper-bound of the sampling probability
* @param withReplacement whether to sample with replacement
* @param seed the random seed
* @return a TableSample
*
* @since 3.3.0
*/
public static TableSample tableSample(
String methodName,
double lowerBound,
double upperBound,
boolean withReplacement,
long seed) {
return LogicalExpressions.tableSample(
methodName, lowerBound, upperBound, withReplacement, seed);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Experimental;

/**
* Represents a TableSample in the public expression API.
*
* @since 3.3.0
*/
@Experimental
public interface TableSample extends Expression {

/**
* Returns the sample method name.
*/
String methodName();

/**
* Returns the lower-bound of the sampling probability (usually 0.0).
*/
double lowerBound();

/**
* Returns the upper-bound of the sampling probability. The expected fraction sampled
* will be ub - lb.
*/
double upperBound();

/**
* Returns whether to sample with replacement.
*/
boolean withReplacement();

/**
* Returns the random seed.
*/
long seed();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.TableSample;

/**
* A mix-in interface for {@link Scan}. Data sources can implement this interface to
* push down SAMPLE.
*
* @since 3.3.0
*/
@Evolving
public interface SupportsPushDownTableSample extends ScanBuilder {

/**
* Pushes down SAMPLE to the data source.
*/
boolean pushTableSample(TableSample limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1180,29 +1180,35 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
// Create a sampled plan if we need one.
def sample(fraction: Double): Sample = {
def sample(fraction: Double, seed: Long): Sample = {
// The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
// function takes X PERCENT as the input and the range of X is [0, 100], we need to
// adjust the fraction.
val eps = RandomSampler.roundingEpsilon
validate(fraction >= 0.0 - eps && fraction <= 1.0 + eps,
s"Sampling fraction ($fraction) must be on interval [0, 1]",
ctx)
Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, query)
Sample(0.0, fraction, withReplacement = false, seed, query)
}

if (ctx.sampleMethod() == null) {
throw QueryParsingErrors.emptyInputForTableSampleError(ctx)
}

val seed = if (ctx.seed != null) {
ctx.seed.getText.toLong
} else {
(math.random * 1000).toLong
}

ctx.sampleMethod() match {
case ctx: SampleByRowsContext =>
Limit(expression(ctx.expression), query)

case ctx: SampleByPercentileContext =>
val fraction = ctx.percentage.getText.toDouble
val sign = if (ctx.negativeSign == null) 1 else -1
sample(sign * fraction / 100.0d)
sample(sign * fraction / 100.0d, seed)

case ctx: SampleByBytesContext =>
val bytesStr = ctx.bytes.getText
Expand All @@ -1222,7 +1228,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

case ctx: SampleByBucketContext =>
sample(ctx.numerator.getText.toDouble / ctx.denominator.getText.toDouble)
sample(ctx.numerator.getText.toDouble / ctx.denominator.getText.toDouble, seed)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ private[sql] object LogicalExpressions {
nullOrdering: NullOrdering): SortOrder = {
SortValue(reference, direction, nullOrdering)
}

def tableSample(
methodName: String,
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long): TableSample = {
TableSampleValue(methodName: String, lowerBound, upperBound, withReplacement, seed)
}
}

/**
Expand Down Expand Up @@ -357,3 +366,14 @@ private[sql] object SortValue {
None
}
}

private[sql] final case class TableSampleValue(
methodName: String,
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long) extends TableSample {

override def describe(): String = s"$methodName $lowerBound $lowerBound $upperBound" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two lowerBounds ?

s" $withReplacement $seed"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.expressions.TableSample
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
Expand Down Expand Up @@ -104,6 +105,7 @@ case class RowDataSourceScanExec(
filters: Set[Filter],
handledFilters: Set[Filter],
aggregation: Option[Aggregation],
sample: Option[TableSample],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many pushdown related parameters, would it be better if they were wrapped by some parent case class?

rdd: RDD[InternalRow],
@transient relation: BaseRelation,
tableIdentifier: Option[TableIdentifier])
Expand Down Expand Up @@ -153,7 +155,10 @@ case class RowDataSourceScanExec(
"ReadSchema" -> requiredSchema.catalogString,
"PushedFilters" -> seqToString(markedFilters.toSeq),
"PushedAggregates" -> aggString,
"PushedGroupby" -> groupByString)
"PushedGroupby" -> groupByString) ++
sample.map(v => "PushedSample" ->
s"SAMPLE ${v.methodName} ${v.lowerBound} ${v.upperBound} ${v.withReplacement} ${v.seed}"
)
}

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ object DataSourceStrategy
Set.empty,
Set.empty,
None,
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should be None

toCatalystRDD(l, baseRelation.buildScan()),
baseRelation,
None) :: Nil
Expand Down Expand Up @@ -410,6 +411,7 @@ object DataSourceStrategy
pushedFilters.toSet,
handledFilters,
None,
null,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
Expand All @@ -433,6 +435,7 @@ object DataSourceStrategy
pushedFilters.toSet,
handledFilters,
None,
null,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ class JDBCOptions(
// An option to allow/disallow pushing down aggregate into JDBC data source
val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, "false").toBoolean

// An option to allow/disallow pushing down TABLESAMPLE into JDBC data source
val pushDownTableSample = parameters.getOrElse(JDBC_PUSHDOWN_TABLESAMPLE, "false").toBoolean

// The local path of user's keytab file, which is assumed to be pre-uploaded to all nodes either
// by --files option of spark-submit or manually
val keytab = {
Expand Down Expand Up @@ -266,6 +269,7 @@ object JDBCOptions {
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
val JDBC_PUSHDOWN_AGGREGATE = newOption("pushDownAggregate")
val JDBC_PUSHDOWN_TABLESAMPLE = newOption("pushDownTableSample")
val JDBC_KEYTAB = newOption("keytab")
val JDBC_PRINCIPAL = newOption("principal")
val JDBC_TABLE_COMMENT = newOption("tableComment")
Expand Down
Loading