Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit hacky. Can we use the catalyst framework to optimize it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that will be overkill to apply the whole optimizer rules. The filterCreationSidePlan is simple enough, it can only contain project and filter. Apply ColumnPruning here seems more safer ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about the filter push rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filterCreationSidePlan has been optimized , so it should be done if there is a filter can be pushed.

I think one more useful rule is CollapseProject, but it should be fine not to apply here since PhysicalOperation support collect adjacent projects.

if (!canBroadcastBySize(aggregate, conf)) {
// Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
// i.e., the semi-join will be a shuffled join, which is not worthwhile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, MergeScalarSubqueries}
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
Expand Down Expand Up @@ -257,6 +257,11 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
ensureLeftSemiJoinExists(planEnabled)
assert(normalizedEnabled != normalizedDisabled)
val agg = planEnabled.collect {
case Join(_, agg: Aggregate, LeftSemi, _, _) => agg
}
assert(agg.size == 1)
assert(agg.head.fastEquals(ColumnPruning(agg.head)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test can pass without this pr because

Batch("Extract Python UDFs", Once,
ExtractPythonUDFFromJoinCondition,
// `ExtractPythonUDFFromJoinCondition` can convert a join to a cartesian product.
// Here, we rerun cartesian product check.
CheckCartesianProducts,
ExtractPythonUDFFromAggregate,
// This must be executed after `ExtractPythonUDFFromAggregate` and before `ExtractPythonUDFs`.
ExtractGroupingPythonUDFFromAggregate,
ExtractPythonUDFs,
// The eval-python node may be between Project/Filter and the scan node, which breaks
// column pruning and filter push-down. Here we rerun the related optimizer rules.
ColumnPruning,

I think it is just a coincidence since we converted the subquery to left semi join ..

} else {
comparePlans(planDisabled, planEnabled)
}
Expand Down