Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.connector.expressions.filter;

import java.io.Serializable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.NamedReference;
Expand All @@ -27,7 +29,7 @@
* @since 3.3.0
*/
@Evolving
public abstract class Filter implements Expression {
public abstract class Filter implements Expression, Serializable {

protected static final NamedReference[] EMPTY_REFERENCE = new NamedReference[0];

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.filter.Filter;

/**
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
* push down filters to the data source and reduce the size of the data to be read.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should highlight the difference between this new interface and the old one. We can do this in a followup.

*
* @since 3.3.0
*/
@Evolving
public interface SupportsPushDownV2Filters extends ScanBuilder {

/**
* Pushes down filters, and returns filters that need to be evaluated after scanning.
* <p>
* Rows should be returned from the data source if and only if all of the filters match. That is,
* filters must be interpreted as ANDed together.
*/
Filter[] pushFilters(Filter[] filters);

/**
* Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}.
* <p>
* There are 3 kinds of filters:
* <ol>
* <li>pushable filters which don't need to be evaluated again after scanning.</li>
* <li>pushable filters which still need to be evaluated after scanning, e.g. parquet row
* group filter.</li>
* <li>non-pushable filters.</li>
* </ol>
* <p>
* Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
* <p>
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
* is never called, empty array should be returned for this case.
*/
Filter[] pushedFilters();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,30 @@
package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEndsWith, StringStartsWith => V2StringStartsWith}
import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumn, PushableColumnBase}
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{BooleanType, StringType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String

class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {

Expand Down Expand Up @@ -427,3 +433,158 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case _ => Nil
}
}

private[sql] object DataSourceV2Strategy {

private def translateLeafNodeFilterV2(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[V2Filter] = predicate match {
case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))
case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))

case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) =>
Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))
case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) =>
Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))

case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) =>
Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))
case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) =>
Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))

case expressions.LessThan(pushableColumn(name), Literal(v, t)) =>
Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))
case expressions.LessThan(Literal(v, t), pushableColumn(name)) =>
Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))

case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) =>
Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) =>
Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))

case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) =>
Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) =>
Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))

case in @ expressions.InSet(pushableColumn(name), set) =>
val values: Array[V2Literal[_]] =
set.toSeq.map(elem => LiteralValue(elem, in.dataType)).toArray
Some(new V2In(FieldReference(name), values))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case in @ expressions.In(pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
Some(new V2In(FieldReference(name),
hSet.toArray.map(LiteralValue(_, in.value.dataType))))

case expressions.IsNull(pushableColumn(name)) =>
Some(new V2IsNull(FieldReference(name)))
case expressions.IsNotNull(pushableColumn(name)) =>
Some(new V2IsNotNull(FieldReference(name)))

case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(new V2StringStartsWith(FieldReference(name), v))

case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(new V2StringEndsWith(FieldReference(name), v))

case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(new V2StringContains(FieldReference(name), v))

case expressions.Literal(true, BooleanType) =>
Some(new V2AlwaysTrue)

case expressions.Literal(false, BooleanType) =>
Some(new V2AlwaysFalse)

case _ => None
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilterV2(
predicate: Expression,
supportNestedPredicatePushdown: Boolean): Option[V2Filter] = {
translateFilterV2WithMapping(predicate, None, supportNestedPredicatePushdown)
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @param predicate The input [[Expression]] to be translated as [[Filter]]
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
* translated [[Filter]]. The map is used for rebuilding
* [[Expression]] from [[Filter]].
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilterV2WithMapping(
predicate: Expression,
translatedFilterToExpr: Option[mutable.HashMap[V2Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean)
: Option[V2Filter] = {
predicate match {
case expressions.And(left, right) =>
// See SPARK-12218 for detailed discussion
// It is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// Let's say we have (a = 2 AND trim(b) = 'blah') OR (c > 0)
// and we do not understand how to convert trim(b) = 'blah'.
// If we only convert a = 2, we will end up with
// (a = 2) OR (c > 0), which will generate wrong results.
// Pushing one leg of AND down is only safe to do at the top level.
// You can see ParquetFilters' createFilter for more details.
for {
leftFilter <- translateFilterV2WithMapping(
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
rightFilter <- translateFilterV2WithMapping(
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield new V2And(leftFilter, rightFilter)

case expressions.Or(left, right) =>
for {
leftFilter <- translateFilterV2WithMapping(
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
rightFilter <- translateFilterV2WithMapping(
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield new V2Or(leftFilter, rightFilter)

case expressions.Not(child) =>
translateFilterV2WithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
.map(new V2Not(_))

case other =>
val filter = translateLeafNodeFilterV2(
other, PushableColumn(nestedPredicatePushdownEnabled))
if (filter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(filter.get) = predicate
}
filter
}
}

protected[sql] def rebuildExpressionFromFilter(
filter: V2Filter,
translatedFilterToExpr: mutable.HashMap[V2Filter, Expression]): Expression = {
filter match {
case and: V2And =>
expressions.And(rebuildExpressionFromFilter(and.left, translatedFilterToExpr),
rebuildExpressionFromFilter(and.right, translatedFilterToExpr))
case or: V2Or =>
expressions.Or(rebuildExpressionFromFilter(or.left, translatedFilterToExpr),
rebuildExpressionFromFilter(or.right, translatedFilterToExpr))
case not: V2Not =>
expressions.Not(rebuildExpressionFromFilter(not.child, translatedFilterToExpr))
case other =>
translatedFilterToExpr.getOrElse(other,
throw new IllegalStateException("Failed to rebuild Expression for filter: " + filter))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.expressions.filter.{Filter => V2Filter}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumnWithoutNestedColumn}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
Expand All @@ -38,7 +39,7 @@ object PushDownUtils extends PredicateHelper {
*/
def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
filters: Seq[Expression]): (Either[Seq[sources.Filter], Seq[V2Filter]], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
// A map from translated data source leaf node filters to original catalyst filter
Expand Down Expand Up @@ -67,12 +68,41 @@ object PushDownUtils extends PredicateHelper {
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq)
(Left(r.pushedFilters()), (untranslatableExprs ++ postScanFilters).toSeq)

case r: SupportsPushDownV2Filters =>
// A map from translated data source leaf node filters to original catalyst filter
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
// pushed down. This map can be used to construct a catalyst filter expression from the
// input filter, or a superset(partial push down filter) of the input filter.
val translatedFilterToExpr = mutable.HashMap.empty[V2Filter, Expression]
val translatedFilters = mutable.ArrayBuffer.empty[V2Filter]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated =
DataSourceV2Strategy.translateFilterV2WithMapping(
filterExpr, Some(translatedFilterToExpr), nestedPredicatePushdownEnabled = true)
if (translated.isEmpty) {
untranslatableExprs += filterExpr
} else {
translatedFilters += translated.get
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceV2Strategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(Right(r.pushedFilters), (untranslatableExprs ++ postScanFilters).toSeq)

case f: FileScanBuilder =>
val postScanFilters = f.pushFilters(filters)
(f.pushedFilters, postScanFilters)
case _ => (Nil, filters)
(Left(f.pushedFilters), postScanFilters)
case _ => (Left(Nil), filters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
sHolder.builder, normalizedFiltersWithoutSubquery)
val pushedFiltersStr = if (pushedFilters.isLeft) {
pushedFilters.left.get.mkString(", ")
} else {
pushedFilters.right.get.mkString(", ")
}

val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery

logInfo(
s"""
|Pushing operators to ${sHolder.relation.name}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Pushed Filters: $pushedFiltersStr
|Post-Scan Filters: ${postScanFilters.mkString(",")}
""".stripMargin)

Expand Down
Loading