Spark 3.3: Discard filters that can be pushed down completely#6524
Spark 3.3: Discard filters that can be pushed down completely#6524aokolnychyi merged 6 commits intoapache:masterfrom
Conversation
| // to get Spark to handle record-level filtering | ||
| return filters; | ||
| private boolean requiresRecordLevelFiltering(Expression expr) { | ||
| return table.specs().values().stream() |
There was a problem hiding this comment.
For now, we simply check each spec in the table. In the future, we may optimize this to only look at selected specs but that won't be trivial. I think it is a reasonable start.
There was a problem hiding this comment.
I think another optimization maybe worth doing would be to group all expressions based on bound column an operation. In the bad case we are considering we would end up checking wether or not we can filter a "column = literal" for a ton of different literal values.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
| List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length); | ||
| List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length); | ||
| List<Filter> pushableFilters = Lists.newArrayListWithExpectedSize(filters.length); | ||
| List<Filter> postScanFilters = Lists.newArrayListWithExpectedSize(filters.length); |
There was a problem hiding this comment.
This may just me but I would mark this as "SparkFilters"
and "pushableFilters" as "IcebergFilters" to be clear which filters were being preformed by which system?
There was a problem hiding this comment.
To me, icebergFilters would imply it is an Iceberg filter while it is actually a Spark filter that can be pushed down to Iceberg. I picked these names from Spark based on what it uses in its code and in the Javadoc to this method.
[Test worker] INFO org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown -
Pushing operators to testhadoop.default.table
Pushed Filters: IsNotNull(dep), IsNotNull(id), EqualTo(dep,d1), EqualTo(id,1)
Post-Scan Filters: (id#22 = 1)
I agree it is hard to navigate without context. In order to make it a bit clear, I added a comment above. Could you check if it's any better?
There was a problem hiding this comment.
I think that's fine, I don't really like the Spark nomenclature here but I think your comment does a good job defining it. I think I would add that. (1) and (2) are placed in "PushableFilters" and (2) and (3) are returned to spark in "postScanFilters"
Really I just think
IcebergPushedFilters and
SparkPostScanFilters would also be nice but I think your comment provides enough context for a non-spark expert to get what's going on
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
| postScanFilters.add(filter); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage()); |
There was a problem hiding this comment.
This is now a Warn instead of an info
There was a problem hiding this comment.
I was not sure about this. If there is an exception anywhere in this path, it indicates something went wrong. Seems like something we should warn the user about? I can revert it too. What do you think?
There was a problem hiding this comment.
I'm not sure the user can really do anything about the exceptions in this path. It's really only something a dev can fix when working on the Iceberg library correct?
There was a problem hiding this comment.
I would prefer to be more specific about the error here. Logging the ValidationError at INFO is correct because it's okay for some filters to not be convertible -- we want to catch those and support them in the future, but there is no problem for the job. I'm not sure what other errors filter through here now and whether WARN is appropriate for those. But I'd prefer to have them broken out more specifically.
| for (Filter filter : filters) { | ||
| Expression expr = null; | ||
| try { | ||
| expr = SparkFilters.convert(filter); |
There was a problem hiding this comment.
I think this is a little hard for me since everything is in the try catch here, maybe instead we keep the older pattern of
if (expr != null) {
try {
bind
pushdown to iceberg
If (not fully pushable) {
add to spark
}
} catch {
leave for spark
}
} else {
leave for spark
}
But i'm pretty sure this is correct either way
There was a problem hiding this comment.
We also have to wrap the code that converts the filter. It is unlikely to throw an exception but we have to make sure it does not fail the query. There are 3 calls that can throw an exception now. I did not want to have nested try-catch because it looked to complicated with added logic.
Github renders it in a way that's really hard to read. It does not seem to be that bad in IDE.
try {
Expression expr = SparkFilters.convert(filter);
if (expr != null) {
// try binding the expression to ensure it can be pushed down
Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushableFilters.add(filter);
}
if (expr == null || requiresSparkFiltering(expr)) {
postScanFilters.add(filter);
} else {
LOG.info("Evaluating completely on Iceberg side: {}", filter);
}
} catch (Exception e) {
LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage());
postScanFilters.add(filter);
}
There was a problem hiding this comment.
For me it was just thinking about the different failure positions, I didn't know SparkFilters.convert could also throw, I thought it was just the binding.
There was a problem hiding this comment.
We had some bugs when SparkFilters.convert threw an exception. That's why there was a separate try-catch around it. Let me think a bit more here.
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
Outdated
Show resolved
Hide resolved
RussellSpitzer
left a comment
There was a problem hiding this comment.
Looks good to me! I added a few test suggestions and few notes on the code layout
|
I think you'll have to do a few tweeks to some of the existing tests that check filters now |
RussellSpitzer
left a comment
There was a problem hiding this comment.
one last comment, we should probably log or add to the scan description something identifying which predicates were completely evaluated by Iceberg so the user doesn't have to comb the UI to determine which pushed filters were eliminated from Spark's filtering.
| val prefix = String.format("SELECT 42 from %s where ", tableName) | ||
| val logicalPlan = session.sessionState.sqlParser.parsePlan(prefix + where) | ||
| val optimizedLogicalPlan = session.sessionState.executePlan(logicalPlan, CommandExecutionMode.ALL).optimizedPlan | ||
| val tableAttrs = session.table(tableName).queryExecution.analyzed.output |
|
Thanks for reviewing, @RussellSpitzer! I'll follow up with a change to consume this in MERGE operations. |
| if (expr == null || requiresSparkFiltering(expr)) { | ||
| postScanFilters.add(filter); | ||
| } else { | ||
| LOG.info("Evaluating completely on Iceberg side: {}", filter); |
There was a problem hiding this comment.
Is this worth an INFO message? That seems verbose to me. How about debug?
|
Hi @aokolnychyi do you plan on cherry-picking this to spark v3.2? |
This PR enhances our filter pushdown logic in Spark 3.3 to discard filters that can be completely evaluated using partition metadata. That way, we can skip evaluating some predicates which we know are always true or false for rows produced by a scan.
This change leverages
ExpressionUtil$selectsPartitions()that checks if an expression selects partitions by comparing its strict and inclusive projections.