diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index a3c2352d1bd5..49a13cd4fa1d 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -157,6 +157,10 @@ implementation is used: * - ``iceberg.register-table-procedure.enabled`` - Enable to allow user to call ``register_table`` procedure. - ``false`` + * - ``iceberg.query-partition-filter-required`` + - Set to ``true`` to force a query to use a partition filter. + You can use the ``query_partition_filter_required`` catalog session property for temporary, catalog specific use. + - ``false`` ``` ## Type mapping diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index f59284388194..5dce684f6b50 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -73,6 +73,7 @@ public class IcebergConfig private double minimumAssignedSplitWeight = 0.05; private Optional materializedViewsStorageSchema = Optional.empty(); private boolean sortedWritingEnabled = true; + private boolean queryPartitionFilterRequired; public CatalogType getCatalogType() { @@ -367,4 +368,17 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled) this.sortedWritingEnabled = sortedWritingEnabled; return this; } + + @Config("iceberg.query-partition-filter-required") + @ConfigDescription("Require a filter on at least one partition column") + public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) + { + this.queryPartitionFilterRequired = queryPartitionFilterRequired; + return this; + } + + public boolean isQueryPartitionFilterRequired() + { + return queryPartitionFilterRequired; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 1252b16f93b4..b931a3229238 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -160,6 +160,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -217,6 +218,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled; +import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; @@ -258,6 +260,7 @@ import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN; @@ -429,6 +432,8 @@ public ConnectorTableHandle getTableHandle( table.location(), table.properties(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); } @@ -657,6 +662,41 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable .build(); } + @Override + public void validateScan(ConnectorSession session, ConnectorTableHandle handle) + { + IcebergTableHandle table = (IcebergTableHandle) handle; + if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && table.getAnalyzeColumns().isEmpty()) { + Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); + Optional partitionSpec = table.getPartitionSpecJson() + .map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson)); + if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) { + return; + } + Set columnsWithPredicates = new HashSet<>(); + table.getConstraintColumns().stream() + .map(IcebergColumnHandle::getId) + .forEach(columnsWithPredicates::add); + table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream() + .map(IcebergColumnHandle::getId) + .forEach(columnsWithPredicates::add)); + Set partitionColumns = partitionSpec.get().fields().stream() + .filter(field -> !field.transform().isVoid()) + .map(PartitionField::sourceId) + .collect(toImmutableSet()); + if (Collections.disjoint(columnsWithPredicates, partitionColumns)) { + String partitionColumnNames = partitionSpec.get().fields().stream() + .filter(field -> !field.transform().isVoid()) + .map(PartitionField::sourceId) + .map(id -> schema.idToName().get(id)) + .collect(joining(", ")); + throw new TrinoException( + QUERY_REJECTED, + format("Filter required for %s on at least one of the partition columns: %s", table.getSchemaTableName(), partitionColumnNames)); + } + } + } + @Override public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { @@ -2001,7 +2041,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession }); return new ConnectorAnalyzeMetadata( - tableHandle, + handle.withAnalyzeColumns(analyzeColumnNames.or(() -> Optional.of(ImmutableSet.of()))), getStatisticsCollectionMetadata( tableMetadata, analyzeColumnNames, @@ -2384,7 +2424,9 @@ public Optional> applyLimit(Connect table.getTableLocation(), table.getStorageProperties(), table.isRecordScannedFiles(), - table.getMaxScannedFileSize()); + table.getMaxScannedFileSize(), + table.getConstraintColumns(), + table.getAnalyzeColumns()); return Optional.of(new LimitApplicationResult<>(table, false, false)); } @@ -2395,7 +2437,7 @@ public Optional> applyFilter(C IcebergTableHandle table = (IcebergTableHandle) handle; ConstraintExtractor.ExtractionResult extractionResult = extractTupleDomain(constraint); TupleDomain predicate = extractionResult.tupleDomain(); - if (predicate.isAll()) { + if (predicate.isAll() && constraint.getPredicateColumns().isEmpty()) { return Optional.empty(); } if (table.getLimit().isPresent()) { @@ -2455,8 +2497,16 @@ else if (isMetadataColumnId(columnHandle.getId())) { remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported)); } + Set newConstraintColumns = constraint.getPredicateColumns() + .map(columnHandles -> columnHandles.stream() + .map(columnHandle -> (IcebergColumnHandle) columnHandle) + .collect(toImmutableSet())) + .orElse(ImmutableSet.of()); + if (newEnforcedConstraint.equals(table.getEnforcedPredicate()) - && newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) { + && newUnenforcedConstraint.equals(table.getUnenforcedPredicate()) + && newConstraintColumns.equals(table.getConstraintColumns()) + && constraint.getPredicateColumns().isEmpty()) { return Optional.empty(); } @@ -2478,7 +2528,9 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getTableLocation(), table.getStorageProperties(), table.isRecordScannedFiles(), - table.getMaxScannedFileSize()), + table.getMaxScannedFileSize(), + Sets.union(table.getConstraintColumns(), newConstraintColumns), + table.getAnalyzeColumns()), remainingConstraint.transformKeys(ColumnHandle.class::cast), extractionResult.remainingExpression(), false)); @@ -2626,7 +2678,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getTableLocation(), originalHandle.getStorageProperties(), originalHandle.isRecordScannedFiles(), - originalHandle.getMaxScannedFileSize()), + originalHandle.getMaxScannedFileSize(), + originalHandle.getConstraintColumns(), + originalHandle.getAnalyzeColumns()), handle -> { Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); return TableStatisticsReader.getTableStatistics(typeManager, session, handle, icebergTable); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 9a639b285da7..3ee2f4a219c2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -88,6 +88,7 @@ public final class IcebergSessionProperties public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention"; private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write"; private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled"; + private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private final List> sessionProperties; @@ -299,6 +300,11 @@ public IcebergSessionProperties( "Enable sorted writing to tables with a specified sort order", icebergConfig.isSortedWritingEnabled(), false)) + .add(booleanProperty( + QUERY_PARTITION_FILTER_REQUIRED, + "Require filter on partition column", + icebergConfig.isQueryPartitionFilterRequired(), + false)) .build(); } @@ -489,4 +495,9 @@ public static boolean isSortedWritingEnabled(ConnectorSession session) { return session.getProperty(SORTED_WRITING_ENABLED, Boolean.class); } + + public static boolean isQueryPartitionFilterRequired(ConnectorSession session) + { + return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index b3edb6165935..b71cd5913da9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -55,6 +55,9 @@ public class IcebergTableHandle // Filter guaranteed to be enforced by Iceberg connector private final TupleDomain enforcedPredicate; + // Columns that are present in {@link Constraint#predicate()} applied on the table scan + private final Set constraintColumns; + // semantically limit is applied after enforcedPredicate private final OptionalLong limit; @@ -65,6 +68,9 @@ public class IcebergTableHandle private final boolean recordScannedFiles; private final Optional maxScannedFileSize; + // ANALYZE only + private final Optional> analyzeColumns; + @JsonCreator public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("catalog") CatalogHandle catalog, @@ -100,6 +106,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( tableLocation, storageProperties, false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); } @@ -120,7 +128,9 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, boolean recordScannedFiles, - Optional maxScannedFileSize) + Optional maxScannedFileSize, + Set constraintColumns, + Optional> analyzeColumns) { this.catalog = requireNonNull(catalog, "catalog is null"); this.schemaName = requireNonNull(schemaName, "schemaName is null"); @@ -139,6 +149,8 @@ public IcebergTableHandle( this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); + this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); + this.analyzeColumns = requireNonNull(analyzeColumns, "analyzeColumns is null"); } @JsonProperty @@ -244,6 +256,18 @@ public Optional getMaxScannedFileSize() return maxScannedFileSize; } + @JsonIgnore + public Set getConstraintColumns() + { + return constraintColumns; + } + + @JsonIgnore + public Optional> getAnalyzeColumns() + { + return analyzeColumns; + } + public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); @@ -273,7 +297,33 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, recordScannedFiles, - maxScannedFileSize); + maxScannedFileSize, + constraintColumns, + analyzeColumns); + } + + public IcebergTableHandle withAnalyzeColumns(Optional> analyzeColumns) + { + return new IcebergTableHandle( + catalog, + schemaName, + tableName, + tableType, + snapshotId, + tableSchemaJson, + partitionSpecJson, + formatVersion, + unenforcedPredicate, + enforcedPredicate, + limit, + projectedColumns, + nameMappingJson, + tableLocation, + storageProperties, + recordScannedFiles, + maxScannedFileSize, + constraintColumns, + analyzeColumns); } public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) @@ -295,7 +345,9 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, recordScannedFiles, - Optional.of(maxScannedFileSize)); + Optional.of(maxScannedFileSize), + constraintColumns, + analyzeColumns); } @Override @@ -325,7 +377,9 @@ public boolean equals(Object o) Objects.equals(nameMappingJson, that.nameMappingJson) && Objects.equals(tableLocation, that.tableLocation) && Objects.equals(storageProperties, that.storageProperties) && - Objects.equals(maxScannedFileSize, that.maxScannedFileSize); + Objects.equals(maxScannedFileSize, that.maxScannedFileSize) && + Objects.equals(constraintColumns, that.constraintColumns) && + Objects.equals(analyzeColumns, that.analyzeColumns); } @Override @@ -348,7 +402,9 @@ public int hashCode() tableLocation, storageProperties, recordScannedFiles, - maxScannedFileSize); + maxScannedFileSize, + constraintColumns, + analyzeColumns); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 257d3bba5ce1..d3fede8a8282 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -637,6 +637,24 @@ public void testMetadataTables() } } + @Test + public void testPartitionFilterRequired() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "query_partition_filter_required", "true") + .build(); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + String query = "SELECT id FROM " + tableName + " WHERE a = 'a'"; + String failureMessage = "Filter required for tpch.*\\." + tableName + " on at least one of the partition columns: ds"; + assertQueryFails(session, query, failureMessage); + assertQueryFails(session, "EXPLAIN " + query, failureMessage); + assertUpdate(session, "DROP TABLE " + tableName); + } + protected abstract boolean isFileSorted(Location path, String sortColumnName); @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 2f4e50d3a741..c5bd339b1532 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6984,6 +6984,222 @@ public void testTableChangesFunctionAfterSchemaChange() } } + @Test + public void testIdentityPartitionFilterMissing() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQueryFails(session, "SELECT id FROM " + tableName + " WHERE ds IS NOT null OR true", "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testBucketPartitionFilterMissing() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['bucket(ds, 16)'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQueryFails(session, "SELECT id FROM " + tableName + " WHERE ds IS NOT null OR true", "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testIdentityPartitionFilterIncluded() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + String query = "SELECT id FROM " + tableName + " WHERE ds = 'a'"; + assertQuery(session, query, "VALUES 1"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testBucketPartitionFilterIncluded() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['bucket(ds, 16)'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a'), (2, 'b', 'b')", 2); + String query = "SELECT id FROM " + tableName + " WHERE ds = 'a'"; + assertQuery(session, query, "VALUES 1"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testMultiPartitionedTableFilterIncluded() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['id', 'bucket(ds, 16)'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a'), (2, 'b', 'b')", 2); + // include predicate only on 'id', not on 'ds' + String query = "SELECT id, ds FROM " + tableName + " WHERE id = 2"; + assertQuery(session, query, "VALUES (2, 'b')"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testIdentityPartitionIsNotNullFilter() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQuery(session, "SELECT id FROM " + tableName + " WHERE ds IS NOT null", "VALUES 1"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testJoinPartitionFilterIncluded() + { + String tableName1 = "test_partition_" + randomNameSuffix(); + String tableName2 = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName1 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName1 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertUpdate(session, "CREATE TABLE " + tableName2 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName2 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQuery(session, "SELECT a.id, b.id FROM " + tableName1 + " a JOIN " + tableName2 + " b ON (a.ds = b.ds) WHERE a.ds = 'a'", "VALUES (1, 1)"); + assertUpdate(session, "DROP TABLE " + tableName1); + assertUpdate(session, "DROP TABLE " + tableName2); + } + + @Test + public void testJoinWithMissingPartitionFilter() + { + String tableName1 = "test_partition_" + randomNameSuffix(); + String tableName2 = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName1 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName1 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertUpdate(session, "CREATE TABLE " + tableName2 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName2 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQueryFails(session, "SELECT a.id, b.id FROM " + tableName1 + " a JOIN " + tableName2 + " b ON (a.id = b.id) WHERE a.ds = 'a'", "Filter required for tpch\\." + tableName2 + " on at least one of the partition columns: ds"); + assertUpdate(session, "DROP TABLE " + tableName1); + assertUpdate(session, "DROP TABLE " + tableName2); + } + + @Test + public void testJoinWithPartitionFilterOnPartitionedTable() + { + String tableName1 = "test_partition_" + randomNameSuffix(); + String tableName2 = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName1 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName1 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertUpdate(session, "CREATE TABLE " + tableName2 + " (id integer, a varchar, b varchar, ds varchar)"); + assertUpdate(session, "INSERT INTO " + tableName2 + " (id, a, ds) VALUES (1, 'a', 'a')", 1); + assertQuery(session, "SELECT a.id, b.id FROM " + tableName1 + " a JOIN " + tableName2 + " b ON (a.id = b.id) WHERE a.ds = 'a'", "VALUES (1, 1)"); + assertUpdate(session, "DROP TABLE " + tableName1); + assertUpdate(session, "DROP TABLE " + tableName2); + } + + @Test + public void testPartitionPredicateWithCasting() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, '1', '1')", 1); + String query = "SELECT id FROM " + tableName + " WHERE cast(ds as integer) = 1"; + assertQuery(session, query, "VALUES 1"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testNestedQueryWithInnerPartitionPredicate() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, '1', '1')", 1); + String query = "SELECT id FROM (SELECT * FROM " + tableName + " WHERE cast(ds as integer) = 1) WHERE cast(a as integer) = 1"; + assertQuery(session, query, "VALUES 1"); + assertUpdate(session, "DROP TABLE " + tableName + ""); + } + + @Test + public void testPredicateOnNonPartitionColumn() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, '1', '1')", 1); + String query = "SELECT id FROM " + tableName + " WHERE cast(b as integer) = 1"; + assertQueryFails(session, query, "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds"); + assertUpdate(session, "DROP TABLE " + tableName); + } + + @Test + public void testNonSelectStatementsWithPartitionFilterRequired() + { + String tableName1 = "test_partition_" + randomNameSuffix(); + String tableName2 = "test_partition_" + randomNameSuffix(); + + Session session = withPartitionFilterRequired(getSession()); + + assertUpdate(session, "CREATE TABLE " + tableName1 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "CREATE TABLE " + tableName2 + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])"); + assertUpdate(session, "INSERT INTO " + tableName1 + " (id, a, ds) VALUES (1, '1', '1'), (2, '2', '2')", 2); + assertUpdate(session, "INSERT INTO " + tableName2 + " (id, a, ds) VALUES (1, '1', '1'), (3, '3', '3')", 2); + + // These non-SELECT statements fail without a partition filter + String errorMessage = "Filter required for tpch\\." + tableName1 + " on at least one of the partition columns: ds"; + assertQueryFails(session, "ALTER TABLE " + tableName1 + " EXECUTE optimize", errorMessage); + assertQueryFails(session, "UPDATE " + tableName1 + " SET a = 'New'", errorMessage); + assertQueryFails(session, "MERGE INTO " + tableName1 + " AS a USING " + tableName2 + " AS b ON (a.ds = b.ds) WHEN MATCHED THEN UPDATE SET a = 'New'", errorMessage); + assertQueryFails(session, "DELETE FROM " + tableName1 + " WHERE a = '1'", errorMessage); + + // Adding partition filters to each solves the problem + assertQuerySucceeds(session, "ALTER TABLE " + tableName1 + " EXECUTE optimize WHERE ds in ('2', '4')"); + assertQuerySucceeds(session, "UPDATE " + tableName1 + " SET a = 'New' WHERE ds = '2'"); + assertQuerySucceeds(session, "MERGE INTO " + tableName1 + " AS a USING (SELECT * FROM " + tableName2 + " WHERE ds = '1') AS b ON (a.ds = b.ds) WHEN MATCHED THEN UPDATE SET a = 'New'"); + assertQuerySucceeds(session, "DELETE FROM " + tableName1 + " WHERE ds = '1'"); + + // Analyze should always succeed, since currently it cannot take a partition argument like Hive + assertQuerySucceeds(session, "ANALYZE " + tableName1); + assertQuerySucceeds(session, "ANALYZE " + tableName2 + " WITH (columns = ARRAY['id', 'a'])"); + + assertUpdate(session, "DROP TABLE " + tableName1); + assertUpdate(session, "DROP TABLE " + tableName2); + } + + private static Session withPartitionFilterRequired(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "query_partition_filter_required", "true") + .build(); + } + @Override protected void verifyTableNameLengthFailurePermissible(Throwable e) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 5711641500e5..eda6d6b8764d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -61,7 +61,8 @@ public void testDefaults() .setMinimumAssignedSplitWeight(0.05) .setMaterializedViewsStorageSchema(null) .setRegisterTableProcedureEnabled(false) - .setSortedWritingEnabled(true)); + .setSortedWritingEnabled(true) + .setQueryPartitionFilterRequired(false)); } @Test @@ -89,6 +90,7 @@ public void testExplicitPropertyMappings() .put("iceberg.materialized-views.storage-schema", "mv_storage_schema") .put("iceberg.register-table-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") + .put("iceberg.query-partition-filter-required", "true") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -112,7 +114,8 @@ public void testExplicitPropertyMappings() .setMinimumAssignedSplitWeight(0.01) .setMaterializedViewsStorageSchema("mv_storage_schema") .setRegisterTableProcedureEnabled(true) - .setSortedWritingEnabled(false); + .setSortedWritingEnabled(false) + .setQueryPartitionFilterRequired(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 019732d1af00..ae5f772ed110 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -186,6 +186,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle tablePath, ImmutableMap.of(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()), transaction); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index af02fe6ec1ef..620e93e19dce 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -139,6 +139,8 @@ public void testIncompleteDynamicFilterTimeout() nationTable.location(), nationTable.properties(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); try (IcebergSplitSource splitSource = new IcebergSplitSource( diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index d5f0c9994c5d..519b80444b39 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -170,6 +170,8 @@ public void testProjectionPushdown() "", ImmutableMap.of(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -252,6 +254,8 @@ public void testPredicatePushdown() "", ImmutableMap.of(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -301,6 +305,8 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -361,6 +367,8 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), false, + Optional.empty(), + ImmutableSet.of(), Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false));