diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4e189aaf7e9d..d611a102ac28 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -182,7 +182,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; -import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInAllSpecs; +import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; @@ -1771,6 +1771,11 @@ public Optional> applyFilter(C else { Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Long snapshotId = table.getSnapshotId().orElseThrow(() -> new IllegalStateException("Snapshot id must be present")); + Set partitionSpecIds = icebergTable.snapshot(snapshotId).allManifests().stream() + .map(ManifestFile::partitionSpecId) + .collect(toImmutableSet()); + Map unsupported = new LinkedHashMap<>(); Map newEnforced = new LinkedHashMap<>(); Map newUnenforced = new LinkedHashMap<>(); @@ -1783,7 +1788,7 @@ public Optional> applyFilter(C (columnHandle.getType() == UUID && !(domain.isOnlyNull() || domain.getValues().isAll()))) { unsupported.put(columnHandle, domain); } - else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, columnHandle, domain)) { + else if (canEnforceColumnConstraintInSpecs(typeOperators, icebergTable, partitionSpecIds, columnHandle, domain)) { newEnforced.put(columnHandle, domain); } else { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 5c839eb9e9a8..f72c282f0e99 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -337,9 +337,15 @@ private static String quotedName(String name) return '"' + name.replace("\"", "\"\"") + '"'; } - public static boolean canEnforceColumnConstraintInAllSpecs(TypeOperators typeOperators, Table table, IcebergColumnHandle columnHandle, Domain domain) + public static boolean canEnforceColumnConstraintInSpecs( + TypeOperators typeOperators, + Table table, + Set partitionSpecIds, + IcebergColumnHandle columnHandle, + Domain domain) { return table.specs().values().stream() + .filter(partitionSpec -> partitionSpecIds.contains(partitionSpec.specId())) .allMatch(spec -> canEnforceConstraintWithinPartitioningSpec(typeOperators, spec, columnHandle, domain)); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index a4f0978647bd..052e84959fbb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1338,6 +1338,44 @@ else if (format == AVRO) { dropTable("test_hour_transform"); } + @Test + public void testPartitionPredicatePushdownWithHistoricalPartitionSpecs() + { + // Start with a bucket transform, which cannot be used for predicate pushdown + String tableName = "test_partition_predicate_pushdown_with_historical_partition_specs"; + assertUpdate("CREATE TABLE " + tableName + " (d TIMESTAMP(6), b INTEGER) WITH (partitioning = ARRAY['bucket(b, 3)'])"); + @Language("SQL") String selectQuery = "SELECT b FROM " + tableName + " WHERE CAST(d AS date) < DATE '2015-01-02'"; + + @Language("SQL") String initialValues = + "(TIMESTAMP '1969-12-31 22:22:22.222222', 8)," + + "(TIMESTAMP '1969-12-31 23:33:11.456789', 9)," + + "(TIMESTAMP '1969-12-31 23:44:55.567890', 10)"; + assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 8, 9, 10") + .isNotFullyPushedDown(FilterNode.class); + + @Language("SQL") String hourTransformValues = + "(TIMESTAMP '2015-01-01 10:01:23.123456', 1)," + + "(TIMESTAMP '2015-01-02 10:10:02.987654', 2)," + + "(TIMESTAMP '2015-01-03 10:55:00.456789', 3)"; + // While the bucket transform is still used the hour transform still cannot be used for pushdown + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['hour(d)']"); + assertUpdate("INSERT INTO " + tableName + " VALUES " + hourTransformValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 1, 8, 9, 10") + .isNotFullyPushedDown(FilterNode.class); + + // The old partition scheme is no longer used so pushdown using the hour transform is allowed + assertUpdate("DELETE FROM " + tableName + " WHERE year(d) = 1969", 3); + assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 1, 8, 9, 10") + .isFullyPushedDown(); + + dropTable(tableName); + } + @Test public void testDayTransformDate() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergProjectionPushdownPlans.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergProjectionPushdownPlans.java index 8f53a955c7df..9b5d17671027 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergProjectionPushdownPlans.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergProjectionPushdownPlans.java @@ -129,7 +129,7 @@ public void testDereferencePushdown() getQueryRunner().execute(format( "CREATE TABLE %s (col0, col1) WITH (partitioning = ARRAY['col1']) AS" + - " SELECT CAST(row(5, 6) AS row(x bigint, y bigint)) AS col0, 5 AS col1 WHERE false", + " SELECT CAST(row(5, 6) AS row(x bigint, y bigint)) AS col0, 5 AS col1", testTable)); Session session = getQueryRunner().getDefaultSession(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index b1c25554cfc2..85db3b10cc0c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -243,6 +243,7 @@ public void testPredicatePushdown() { String tableName = "predicate_test"; tester().getQueryRunner().execute(format("CREATE TABLE %s (a, b) AS SELECT 5, 6", tableName)); + Long snapshotId = (Long) tester().getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" LIMIT 1", tableName)).getOnlyValue(); PushPredicateIntoTableScan pushPredicateIntoTableScan = new PushPredicateIntoTableScan(tester().getPlannerContext(), tester().getTypeAnalyzer()); @@ -250,7 +251,7 @@ public void testPredicatePushdown() SCHEMA_NAME, tableName, DATA, - Optional.of(1L), + Optional.of(snapshotId), "", "", 1,