diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index ec3bacd01cde..b630f9c7a9b7 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -639,9 +639,9 @@ public String predicate(UnboundPredicate pred) { case NOT_EQ: return pred.ref().name() + " != " + sqlString(pred.literal()); case STARTS_WITH: - return pred.ref().name() + " LIKE '" + pred.literal() + "%'"; + return pred.ref().name() + " LIKE '" + pred.literal().value() + "%'"; case NOT_STARTS_WITH: - return pred.ref().name() + " NOT LIKE '" + pred.literal() + "%'"; + return pred.ref().name() + " NOT LIKE '" + pred.literal().value() + "%'"; case IN: return pred.ref().name() + " IN (" + sqlString(pred.literals()) + ")"; case NOT_IN: diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 1dee9b6128cb..90c8ae3f92e2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -29,9 +29,9 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -105,42 +105,52 @@ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) { @Override public Filter[] pushFilters(Filter[] filters) { + // there are 3 kinds of filters: + // (1) filters that can be pushed down completely and don't have to evaluated by Spark + // (e.g. filters that select entire partitions) + // (2) filters that can be pushed down partially and require record-level filtering in Spark + // (e.g. filters that may select some but not necessarily all rows in a file) + // (3) filters that can't be pushed down at all and have to be evaluated by Spark + // (e.g. unsupported filters) + // filters (1) and (2) are used prune files during job planning in Iceberg + // filters (2) and (3) form a set of post scan filters and must be evaluated by Spark + List expressions = Lists.newArrayListWithExpectedSize(filters.length); - List pushed = Lists.newArrayListWithExpectedSize(filters.length); + List pushableFilters = Lists.newArrayListWithExpectedSize(filters.length); + List postScanFilters = Lists.newArrayListWithExpectedSize(filters.length); for (Filter filter : filters) { - Expression expr = null; try { - expr = SparkFilters.convert(filter); - } catch (IllegalArgumentException e) { - // converting to Iceberg Expression failed, so this expression cannot be pushed down - LOG.info( - "Failed to convert filter to Iceberg expression, skipping push down for this expression: {}. {}", - filter, - e.getMessage()); - } + Expression expr = SparkFilters.convert(filter); - if (expr != null) { - try { + if (expr != null) { + // try binding the expression to ensure it can be pushed down Binder.bind(schema.asStruct(), expr, caseSensitive); expressions.add(expr); - pushed.add(filter); - } catch (ValidationException e) { - // binding to the table schema failed, so this expression cannot be pushed down - LOG.info( - "Failed to bind expression to table schema, skipping push down for this expression: {}. {}", - filter, - e.getMessage()); + pushableFilters.add(filter); + } + + if (expr == null || requiresSparkFiltering(expr)) { + postScanFilters.add(filter); + } else { + LOG.info("Evaluating completely on Iceberg side: {}", filter); } + + } catch (Exception e) { + LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage()); + postScanFilters.add(filter); } } this.filterExpressions = expressions; - this.pushedFilters = pushed.toArray(new Filter[0]); + this.pushedFilters = pushableFilters.toArray(new Filter[0]); + + return postScanFilters.toArray(new Filter[0]); + } - // Spark doesn't support residuals per task, so return all filters - // to get Spark to handle record-level filtering - return filters; + private boolean requiresSparkFiltering(Expression expr) { + return table.specs().values().stream() + .anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec, caseSensitive)); } @Override diff --git a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala index c4b5a7c0ce14..554fa7f66dc6 100644 --- a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala +++ b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala @@ -22,9 +22,10 @@ package org.apache.spark.sql.execution.datasources import org.apache.iceberg.spark.SparkFilters import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.catalyst.plans.logical.LeafNode object SparkExpressionConverter { @@ -37,15 +38,14 @@ object SparkExpressionConverter { @throws[AnalysisException] def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = { - var expression: Expression = null - // Add a dummy prefix linking to the table to collect the resolved spark expression from optimized plan. - val prefix = String.format("SELECT 42 from %s where ", tableName) - val logicalPlan = session.sessionState.sqlParser.parsePlan(prefix + where) - val optimizedLogicalPlan = session.sessionState.executePlan(logicalPlan, CommandExecutionMode.ALL).optimizedPlan + val tableAttrs = session.table(tableName).queryExecution.analyzed.output + val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where) + val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs)) + val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan optimizedLogicalPlan.collectFirst { - case filter: Filter => - expression = filter.expressions.head - } - expression + case filter: Filter => filter.condition + }.getOrElse(throw new AnalysisException("Failed to find filter expression")) } + + case class DummyRelation(output: Seq[Attribute]) extends LeafNode } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 20b3ab3c3fc7..893051fc226c 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -28,6 +28,7 @@ import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -229,6 +230,16 @@ protected void withUnavailableLocations(Iterable locations, Action actio } } + protected void withDefaultTimeZone(String zoneId, Action action) { + TimeZone currentZone = TimeZone.getDefault(); + try { + TimeZone.setDefault(TimeZone.getTimeZone(zoneId)); + action.invoke(); + } finally { + TimeZone.setDefault(currentZone); + } + } + protected void withSQLConf(Map conf, Action action) { SQLConf sqlConf = SQLConf.get(); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java new file mode 100644 index 000000000000..0ea34e187f1d --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.spark.sql.execution.SparkPlan; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Test; + +public class TestFilterPushDown extends SparkTestBaseWithCatalog { + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS tmp_view"); + } + + @Test + public void testFilterPushdownWithIdentityTransform() { + sql( + "CREATE TABLE %s (id INT, salary INT, dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep)", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName); + sql("INSERT INTO %s VALUES (3, 300, 'd3')", tableName); + sql("INSERT INTO %s VALUES (4, 400, 'd4')", tableName); + sql("INSERT INTO %s VALUES (5, 500, 'd5')", tableName); + sql("INSERT INTO %s VALUES (6, 600, null)", tableName); + + checkOnlyIcebergFilters( + "dep IS NULL" /* query predicate */, + "dep IS NULL" /* Iceberg scan filters */, + ImmutableList.of(row(6, 600, null))); + + checkOnlyIcebergFilters( + "dep IS NOT NULL" /* query predicate */, + "dep IS NOT NULL" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, "d1"), + row(2, 200, "d2"), + row(3, 300, "d3"), + row(4, 400, "d4"), + row(5, 500, "d5"))); + + checkOnlyIcebergFilters( + "dep = 'd3'" /* query predicate */, + "dep IS NOT NULL, dep = 'd3'" /* Iceberg scan filters */, + ImmutableList.of(row(3, 300, "d3"))); + + checkOnlyIcebergFilters( + "dep > 'd3'" /* query predicate */, + "dep IS NOT NULL, dep > 'd3'" /* Iceberg scan filters */, + ImmutableList.of(row(4, 400, "d4"), row(5, 500, "d5"))); + + checkOnlyIcebergFilters( + "dep >= 'd5'" /* query predicate */, + "dep IS NOT NULL, dep >= 'd5'" /* Iceberg scan filters */, + ImmutableList.of(row(5, 500, "d5"))); + + checkOnlyIcebergFilters( + "dep < 'd2'" /* query predicate */, + "dep IS NOT NULL, dep < 'd2'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkOnlyIcebergFilters( + "dep <= 'd2'" /* query predicate */, + "dep IS NOT NULL, dep <= 'd2'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); + + checkOnlyIcebergFilters( + "dep <=> 'd3'" /* query predicate */, + "dep = 'd3'" /* Iceberg scan filters */, + ImmutableList.of(row(3, 300, "d3"))); + + checkOnlyIcebergFilters( + "dep IN (null, 'd1')" /* query predicate */, + "dep IN ('d1')" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkOnlyIcebergFilters( + "dep NOT IN ('d2', 'd4')" /* query predicate */, + "(dep IS NOT NULL AND dep NOT IN ('d2', 'd4'))" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(3, 300, "d3"), row(5, 500, "d5"))); + + checkOnlyIcebergFilters( + "dep = 'd1' AND dep IS NOT NULL" /* query predicate */, + "dep = 'd1', dep IS NOT NULL" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkOnlyIcebergFilters( + "dep = 'd1' OR dep = 'd2' OR dep = 'd3'" /* query predicate */, + "((dep = 'd1' OR dep = 'd2') OR dep = 'd3')" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"), row(3, 300, "d3"))); + + checkFilters( + "dep = 'd1' AND id = 1" /* query predicate */, + "isnotnull(id) AND (id = 1)" /* Spark post scan filter */, + "dep IS NOT NULL, id IS NOT NULL, dep = 'd1', id = 1" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkFilters( + "dep = 'd2' OR id = 1" /* query predicate */, + "(dep = d2) OR (id = 1)" /* Spark post scan filter */, + "(dep = 'd2' OR id = 1)" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); + + checkFilters( + "dep LIKE 'd1%' AND id = 1" /* query predicate */, + "isnotnull(id) AND (id = 1)" /* Spark post scan filter */, + "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkFilters( + "dep NOT LIKE 'd5%' AND (id = 1 OR id = 5)" /* query predicate */, + "(id = 1) OR (id = 5)" /* Spark post scan filter */, + "dep IS NOT NULL, NOT (dep LIKE 'd5%'), (id = 1 OR id = 5)" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + checkFilters( + "dep LIKE '%d5' AND id IN (1, 5)" /* query predicate */, + "EndsWith(dep, d5) AND id IN (1,5)" /* Spark post scan filter */, + "dep IS NOT NULL, id IN (1, 5)" /* Iceberg scan filters */, + ImmutableList.of(row(5, 500, "d5"))); + } + + @Test + public void testFilterPushdownWithHoursTransform() { + sql( + "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)" + + "USING iceberg " + + "PARTITIONED BY (hours(t))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (3, 300, null)", tableName); + + withDefaultTimeZone( + "UTC", + () -> { + checkOnlyIcebergFilters( + "t IS NULL" /* query predicate */, + "t IS NULL" /* Iceberg scan filters */, + ImmutableList.of(row(3, 300, null))); + + // strict/inclusive projections for t < TIMESTAMP '2021-06-30T02:00:00.000Z' are equal, + // so this filter selects entire partitions and can be pushed down completely + checkOnlyIcebergFilters( + "t < TIMESTAMP '2021-06-30T02:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1625018400000000" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); + + // strict/inclusive projections for t < TIMESTAMP '2021-06-30T01:00:00.001Z' differ, + // so this filter does NOT select entire partitions and can't be pushed down completely + checkFilters( + "t < TIMESTAMP '2021-06-30T01:00:00.001Z'" /* query predicate */, + "t < 2021-06-30 01:00:00.001" /* Spark post scan filter */, + "t IS NOT NULL, t < 1625014800001000" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); + + // strict/inclusive projections for t <= TIMESTAMP '2021-06-30T01:00:00.000Z' differ, + // so this filter does NOT select entire partitions and can't be pushed down completely + checkFilters( + "t <= TIMESTAMP '2021-06-30T01:00:00.000Z'" /* query predicate */, + "t <= 2021-06-30 01:00:00" /* Spark post scan filter */, + "t IS NOT NULL, t <= 1625014800000000" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); + }); + } + + @Test + public void testFilterPushdownWithDaysTransform() { + sql( + "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)" + + "USING iceberg " + + "PARTITIONED BY (days(t))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-15T01:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP '2021-07-15T10:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (4, 400, null)", tableName); + + withDefaultTimeZone( + "UTC", + () -> { + checkOnlyIcebergFilters( + "t IS NULL" /* query predicate */, + "t IS NULL" /* Iceberg scan filters */, + ImmutableList.of(row(4, 400, null))); + + // strict/inclusive projections for t < TIMESTAMP '2021-07-05T00:00:00.000Z' are equal, + // so this filter selects entire partitions and can be pushed down completely + checkOnlyIcebergFilters( + "t < TIMESTAMP '2021-07-05T00:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1625443200000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-15T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + + // strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ, + // so this filter does NOT select entire partitions and can't be pushed down completely + checkFilters( + "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, + "t < 2021-06-30 03:00:00" /* Spark post scan filter */, + "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-15T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + }); + } + + @Test + public void testFilterPushdownWithMonthsTransform() { + sql( + "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)" + + "USING iceberg " + + "PARTITIONED BY (months(t))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP '2021-07-15T10:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (4, 400, null)", tableName); + + withDefaultTimeZone( + "UTC", + () -> { + checkOnlyIcebergFilters( + "t IS NULL" /* query predicate */, + "t IS NULL" /* Iceberg scan filters */, + ImmutableList.of(row(4, 400, null))); + + // strict/inclusive projections for t < TIMESTAMP '2021-07-01T00:00:00.000Z' are equal, + // so this filter selects entire partitions and can be pushed down completely + checkOnlyIcebergFilters( + "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + + // strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ, + // so this filter does NOT select entire partitions and can't be pushed down completely + checkFilters( + "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, + "t < 2021-06-30 03:00:00" /* Spark post scan filter */, + "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + }); + } + + @Test + public void testFilterPushdownWithYearsTransform() { + sql( + "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)" + + "USING iceberg " + + "PARTITIONED BY (years(t))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2022-09-25T02:00:00.000Z')", tableName); + sql("INSERT INTO %s VALUES (3, 300, null)", tableName); + + withDefaultTimeZone( + "UTC", + () -> { + checkOnlyIcebergFilters( + "t IS NULL" /* query predicate */, + "t IS NULL" /* Iceberg scan filters */, + ImmutableList.of(row(3, 300, null))); + + // strict/inclusive projections for t < TIMESTAMP '2022-01-01T00:00:00.000Z' are equal, + // so this filter selects entire partitions and can be pushed down completely + checkOnlyIcebergFilters( + "t < TIMESTAMP '2022-01-01T00:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1640995200000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + + // strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ, + // so this filter does NOT select entire partitions and can't be pushed down completely + checkFilters( + "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, + "t < 2021-06-30 03:00:00" /* Spark post scan filter */, + "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), + row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); + }); + } + + @Test + public void testFilterPushdownWithBucketTransform() { + sql( + "CREATE TABLE %s (id INT, salary INT, dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, id))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName); + + checkFilters( + "dep = 'd1' AND id = 1" /* query predicate */, + "id = 1" /* Spark post scan filter */, + "dep IS NOT NULL, id IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + } + + @Test + public void testFilterPushdownWithTruncateTransform() { + sql( + "CREATE TABLE %s (id INT, salary INT, dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (truncate(1, dep))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName); + sql("INSERT INTO %s VALUES (3, 300, 'a3')", tableName); + + checkOnlyIcebergFilters( + "dep LIKE 'd%'" /* query predicate */, + "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); + + checkFilters( + "dep = 'd1'" /* query predicate */, + "dep = d1" /* Spark post scan filter */, + "dep IS NOT NULL" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + } + + @Test + public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() { + sql( + "CREATE TABLE %s (id INT, salary INT, dep STRING, sub_dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep)", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, 'd1', 'sd1')", tableName); + + // the filter can be pushed completely because all specs include identity(dep) + checkOnlyIcebergFilters( + "dep = 'd1'" /* query predicate */, + "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1", "sd1"))); + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec().addField("sub_dep").commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO %s VALUES (2, 200, 'd2', 'sd2')", tableName); + + // the filter can be pushed completely because all specs include identity(dep) + checkOnlyIcebergFilters( + "dep = 'd1'" /* query predicate */, + "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1", "sd1"))); + + table.updateSpec().removeField("sub_dep").removeField("dep").commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO %s VALUES (3, 300, 'd3', 'sd3')", tableName); + + // the filter can't be pushed completely because not all specs include identity(dep) + checkFilters( + "dep = 'd1'" /* query predicate */, + "isnotnull(dep) AND (dep = d1)" /* Spark post scan filter */, + "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1", "sd1"))); + } + + @Test + public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() { + sql( + "CREATE TABLE %s (id INT, salary INT, dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (truncate(2, dep))", + tableName); + + sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName); + + // the filter can be pushed completely because the current spec supports it + checkOnlyIcebergFilters( + "dep LIKE 'd1%'" /* query predicate */, + "dep IS NOT NULL, dep LIKE 'd1%'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + + Table table = validationCatalog.loadTable(tableIdent); + table + .updateSpec() + .removeField(Expressions.truncate("dep", 2)) + .addField(Expressions.truncate("dep", 1)) + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName); + + // the filter can be pushed completely because both specs support it + checkOnlyIcebergFilters( + "dep LIKE 'd%'" /* query predicate */, + "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); + + // the filter can't be pushed completely because the second spec is truncate(dep, 1) and + // the predicate literal is d1, which is two chars + checkFilters( + "dep LIKE 'd1%' AND id = 1" /* query predicate */, + "(isnotnull(id) AND StartsWith(dep, d1)) AND (id = 1)" /* Spark post scan filter */, + "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, "d1"))); + } + + @Test + public void testFilterPushdownWithSpecEvolutionAndTimeTransforms() { + sql( + "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)" + + "USING iceberg " + + "PARTITIONED BY (hours(t))", + tableName); + + withDefaultTimeZone( + "UTC", + () -> { + sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName); + + // the filter can be pushed completely because the current spec supports it + checkOnlyIcebergFilters( + "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.000Z")))); + + Table table = validationCatalog.loadTable(tableIdent); + table + .updateSpec() + .removeField(Expressions.hour("t")) + .addField(Expressions.month("t")) + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-05-30T01:00:00.000Z')", tableName); + + // the filter can be pushed completely because both specs support it + checkOnlyIcebergFilters( + "t < TIMESTAMP '2021-06-01T00:00:00.000Z'" /* query predicate */, + "t IS NOT NULL, t < 1622505600000000" /* Iceberg scan filters */, + ImmutableList.of(row(2, 200, timestamp("2021-05-30T01:00:00.000Z")))); + }); + } + + @Test + public void testFilterPushdownWithSpecialFloatingPointPartitionValues() { + sql( + "CREATE TABLE %s (id INT, salary DOUBLE)" + "USING iceberg " + "PARTITIONED BY (salary)", + tableName); + + sql("INSERT INTO %s VALUES (1, 100.5)", tableName); + sql("INSERT INTO %s VALUES (2, double('NaN'))", tableName); + sql("INSERT INTO %s VALUES (3, double('infinity'))", tableName); + sql("INSERT INTO %s VALUES (4, double('-infinity'))", tableName); + + checkOnlyIcebergFilters( + "salary = 100.5" /* query predicate */, + "salary IS NOT NULL, salary = 100.5" /* Iceberg scan filters */, + ImmutableList.of(row(1, 100.5))); + + checkOnlyIcebergFilters( + "salary = double('NaN')" /* query predicate */, + "salary IS NOT NULL, is_nan(salary)" /* Iceberg scan filters */, + ImmutableList.of(row(2, Double.NaN))); + + checkOnlyIcebergFilters( + "salary != double('NaN')" /* query predicate */, + "salary IS NOT NULL, NOT (is_nan(salary))" /* Iceberg scan filters */, + ImmutableList.of( + row(1, 100.5), row(3, Double.POSITIVE_INFINITY), row(4, Double.NEGATIVE_INFINITY))); + + checkOnlyIcebergFilters( + "salary = double('infinity')" /* query predicate */, + "salary IS NOT NULL, salary = Infinity" /* Iceberg scan filters */, + ImmutableList.of(row(3, Double.POSITIVE_INFINITY))); + + checkOnlyIcebergFilters( + "salary = double('-infinity')" /* query predicate */, + "salary IS NOT NULL, salary = -Infinity" /* Iceberg scan filters */, + ImmutableList.of(row(4, Double.NEGATIVE_INFINITY))); + } + + private void checkOnlyIcebergFilters( + String predicate, String icebergFilters, List expectedRows) { + + checkFilters(predicate, null, icebergFilters, expectedRows); + } + + private void checkFilters( + String predicate, String sparkFilter, String icebergFilters, List expectedRows) { + + Action check = + () -> { + assertEquals( + "Rows must match", + expectedRows, + sql("SELECT * FROM %s WHERE %s ORDER BY id", tableName, predicate)); + }; + SparkPlan sparkPlan = executeAndKeepPlan(check); + String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", ""); + + if (sparkFilter != null) { + Assertions.assertThat(planAsString) + .as("Post scan filter should match") + .contains("Filter (" + sparkFilter + ")"); + } else { + Assertions.assertThat(planAsString) + .as("Should be no post scan filter") + .doesNotContain("Filter ("); + } + + Assertions.assertThat(planAsString) + .as("Pushed filters must match") + .contains("[filters=" + icebergFilters + ","); + } + + private Timestamp timestamp(String timestampAsString) { + return Timestamp.from(Instant.parse(timestampAsString)); + } +}