Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[sql] case class PhysicalRDD(
private[sql] object PhysicalRDD {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
val HANDLED_FILTERS = "HandledFilters"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandledFilters at here means filters that will be applied to every row inside the data source, right? Is there a better name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about FilteredAtSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just delete PUSHED_FILTERS since it is not used? I think HandledFilters is a better name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm


def createFromDataSource(
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.PhysicalRDD.{HANDLED_FILTERS, INPUT_PATHS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down Expand Up @@ -307,8 +307,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// A set of column attributes that are only referenced by pushed down filters. We can eliminate
// them from requested columns.
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
Expand All @@ -321,8 +321,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val metadata: Map[String, String] = {
val pairs = ArrayBuffer.empty[(String, String)]

if (pushedFilters.nonEmpty) {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
if (handledPredicates.nonEmpty) {
pairs += (HANDLED_FILTERS -> handledPredicates.mkString("[", ", ", "]"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also keep pushed filters? For some data source like orc, a pushed filter will be evaluated at a coarse grain level instead of on every rows. I think it is better to keep that information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought 11663 meant all filters are pushed down, regardless so I wondered if that was redundant? It's also still a bit confusing since although it says the filters are "pushed" there is no guarantee that the underlying source will do anything with them at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry. I think I understand the change now. handledPredicates contains all filters that are pushed to the data source except those filters returned by the unhandledFilters method. I think the change is good and HandledFilters is a proper name.

}

relation.relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class PlannerSuite extends SharedSQLContext {
}
}

test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
test("SPARK-11390 explain should print HandledFilters of PhysicalRDD") {
withTempPath { file =>
val path = file.getCanonicalPath
testData.write.parquet(path)
Expand All @@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {

withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
assert(exp.toString.contains("HandledFilters: [EqualTo(key,15)]"))
}
}
}
Expand Down