diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index ef5692f41408d..e9b58124bf99f 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -358,6 +358,23 @@ Property Name Description ``iceberg.delete-as-join-rewrite-enabled`` When enabled, equality delete row filtering is applied ``true`` Yes No, Equality delete read is not supported as a join with the data of the equality delete files. + Deprecated: This property is deprecated and will be removed + in a future release. Use the + ``iceberg.delete-as-join-rewrite-max-delete-columns`` + configuration property instead. + +``iceberg.delete-as-join-rewrite-max-delete-columns`` When set to a number greater than 0, this property enables ``400`` Yes No, Equality delete read is not supported + equality delete row filtering as a join with the data of the + equality delete files. The value of this property is the + maximum number of columns that can be used in the equality + delete files. If the number of columns in the equality delete + files exceeds this value, then the optimization is not + applied and the equality delete files are applied directly to + each row in the data files. + + This property is only applicable when + ``iceberg.delete-as-join-rewrite-enabled`` is set to + ``true``. ``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true`` Yes No @@ -499,6 +516,12 @@ Property Name Description ===================================================== ======================================================================= =================== ============================================= ``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property Yes No, Equality delete read is not supported ``iceberg.delete-as-join-rewrite-enabled`` in the current session. + + Deprecated: This property is deprecated and will be removed. Use + ``iceberg.delete_as_join_rewrite_max_delete_columns`` instead. +``iceberg.delete_as_join_rewrite_max_delete_columns`` Overrides the behavior of the connector property Yes No, Equality delete read is not supported + ``iceberg.delete-as-join-rewrite-max-delete-columns`` in the + current session. ``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property Yes Yes ``iceberg.hive-statistics-merge-strategy`` in the current session. ``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property Yes Yes diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index a90c7c888e6bf..d65c71df50b58 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -15,6 +15,7 @@ import com.facebook.airlift.configuration.Config; import com.facebook.airlift.configuration.ConfigDescription; +import com.facebook.airlift.configuration.LegacyConfig; import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Splitter; @@ -60,6 +61,7 @@ public class IcebergConfig private double statisticSnapshotRecordDifferenceWeight; private boolean pushdownFilterEnabled; private boolean deleteAsJoinRewriteEnabled = true; + private int deleteAsJoinRewriteMaxDeleteColumns = 400; private int rowsForMetadataOptimizationThreshold = 1000; private int metadataPreviousVersionsMax = METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT; private boolean metadataDeleteAfterCommit = METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT; @@ -267,19 +269,39 @@ public boolean isPushdownFilterEnabled() return pushdownFilterEnabled; } - @Config("iceberg.delete-as-join-rewrite-enabled") - @ConfigDescription("When enabled, equality delete row filtering will be implemented by rewriting the query plan to join with the delete keys.") + @LegacyConfig(value = "iceberg.delete-as-join-rewrite-enabled") + @Config("deprecated.iceberg.delete-as-join-rewrite-enabled") + @ConfigDescription("When enabled, equality delete row filtering will be implemented by rewriting the query plan to join with the delete keys. " + + "Deprecated: Set 'iceberg.delete-as-join-rewrite-max-delete-columns' to 0 to control the enabling of this feature. This will be removed in a future release.") + @Deprecated public IcebergConfig setDeleteAsJoinRewriteEnabled(boolean deleteAsJoinPushdownEnabled) { this.deleteAsJoinRewriteEnabled = deleteAsJoinPushdownEnabled; return this; } + @Deprecated public boolean isDeleteAsJoinRewriteEnabled() { return deleteAsJoinRewriteEnabled; } + @Config("iceberg.delete-as-join-rewrite-max-delete-columns") + @ConfigDescription("The maximum number of columns that can be used in a delete as join rewrite. " + + "If the number of columns exceeds this value, the delete as join rewrite will not be applied.") + @Min(0) + @Max(400) + public IcebergConfig setDeleteAsJoinRewriteMaxDeleteColumns(int deleteAsJoinRewriteMaxDeleteColumns) + { + this.deleteAsJoinRewriteMaxDeleteColumns = deleteAsJoinRewriteMaxDeleteColumns; + return this; + } + + public int getDeleteAsJoinRewriteMaxDeleteColumns() + { + return deleteAsJoinRewriteMaxDeleteColumns; + } + @Config("iceberg.rows-for-metadata-optimization-threshold") @ConfigDescription("The max partitions number to utilize metadata optimization. 0 means skip the metadata optimization directly.") public IcebergConfig setRowsForMetadataOptimizationThreshold(int rowsForMetadataOptimizationThreshold) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index a11338b2c5f64..3052ba7f88351 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -20,6 +20,7 @@ import com.facebook.presto.iceberg.nessie.IcebergNessieConfig; import com.facebook.presto.iceberg.util.StatisticsUtil; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.session.PropertyMetadata; import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Joiner; @@ -33,15 +34,18 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; import static com.facebook.presto.iceberg.util.StatisticsUtil.SUPPORTED_MERGE_FLAGS; import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; import static com.facebook.presto.spi.session.PropertyMetadata.longProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; +import static java.lang.String.format; public final class IcebergSessionProperties { @@ -62,6 +66,7 @@ public final class IcebergSessionProperties public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled"; public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled"; public static final String DELETE_AS_JOIN_REWRITE_ENABLED = "delete_as_join_rewrite_enabled"; + public static final String DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS = "delete_as_join_rewrite_max_delete_columns"; public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy"; public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight"; public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold"; @@ -181,6 +186,23 @@ public IcebergSessionProperties( "When enabled equality delete row filtering will be pushed down into a join.", icebergConfig.isDeleteAsJoinRewriteEnabled(), false)) + .add(new PropertyMetadata<>( + DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, + "The maximum number of columns that can be used in a delete as join rewrite. " + + "If the number of columns exceeds this value, the delete as join rewrite will not be applied.", + INTEGER, + Integer.class, + icebergConfig.getDeleteAsJoinRewriteMaxDeleteColumns(), + false, + value -> { + int intValue = ((Number) value).intValue(); + if (intValue < 0 || intValue > 400) { + throw new PrestoException(INVALID_SESSION_PROPERTY, + format("Invalid value for %s: %s. It must be between 0 and 400.", DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, intValue)); + } + return intValue; + }, + integer -> integer)) .add(integerProperty( ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, "The max partitions number to utilize metadata optimization. When partitions number " + @@ -311,6 +333,11 @@ public static boolean isDeleteToJoinPushdownEnabled(ConnectorSession session) return session.getProperty(DELETE_AS_JOIN_REWRITE_ENABLED, Boolean.class); } + public static int getDeleteAsJoinRewriteMaxDeleteColumns(ConnectorSession session) + { + return session.getProperty(DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, Integer.class); + } + public static int getRowsForMetadataOptimizationThreshold(ConnectorSession session) { return session.getProperty(ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, Integer.class); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java index 18635bb4ce806..b81be47195b37 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java @@ -85,11 +85,13 @@ import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getDeleteAsJoinRewriteMaxDeleteColumns; import static com.facebook.presto.iceberg.IcebergSessionProperties.isDeleteToJoinPushdownEnabled; import static com.facebook.presto.iceberg.IcebergUtil.getDeleteFiles; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; /** @@ -130,7 +132,9 @@ public class IcebergEqualityDeleteAsJoin @Override public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator) { - if (!isDeleteToJoinPushdownEnabled(session)) { + int maxDeleteColumns = getDeleteAsJoinRewriteMaxDeleteColumns(session); + checkArgument(maxDeleteColumns >= 0, "maxDeleteColumns must be non-negative, got %s", maxDeleteColumns); + if (!isDeleteToJoinPushdownEnabled(session) || maxDeleteColumns == 0) { return maxSubplan; } return rewriteWith(new DeleteAsJoinRewriter(functionResolution, @@ -190,6 +194,10 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) // no equality deletes return node; } + if (deleteSchemas.keySet().stream().anyMatch(equalityIds -> equalityIds.size() > getDeleteAsJoinRewriteMaxDeleteColumns(session))) { + // Too many fields in the delete schema, don't rewrite + return node; + } // Add all the fields required by the join that were not added by the user's query ImmutableMap unselectedAssignments = createAssignmentsForUnselectedFields(node, deleteSchemas, icebergTable); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index ed91212089c3a..570900319e5f2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -45,6 +45,7 @@ import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.analyzer.MetadataResolver; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata; +import com.facebook.presto.spi.plan.JoinNode; import com.facebook.presto.spi.security.AllowAllAccessControl; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ConnectorHistogram; @@ -113,6 +114,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -143,12 +145,15 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_ENABLED; +import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; import static com.facebook.presto.testing.MaterializedResult.resultBuilder; @@ -2822,4 +2827,149 @@ protected void dropTable(Session session, String table) assertUpdate(session, "DROP TABLE " + table); assertFalse(getQueryRunner().tableExists(session, table)); } + + @Test + public void testEqualityDeleteAsJoinWithMaximumFieldsLimitUnderLimit() + throws Exception + { + int maxColumns = 10; + String tableName = "test_eq_delete_under_max_cols_" + randomTableSuffix(); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, "true") + // Make sure the max columns is set to one more than the number of columns in the table + .setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, "" + (maxColumns + 1)) + .build(); + + try { + // Test with exactly max columns - should work fine + // Create table with specified number of columns + List columnDefinitions = IntStream.range(0, maxColumns) + .mapToObj(i -> "col_" + i + " varchar") + .collect(Collectors.toList()); + columnDefinitions.add(0, "id bigint"); + + String createTableSql = "CREATE TABLE " + tableName + " (" + + String.join(", ", columnDefinitions) + ")"; + assertUpdate(session, createTableSql); + + // Insert test rows + for (int row = 1; row <= 3; row++) { + final int currentRow = row; + List values = IntStream.range(0, maxColumns) + .mapToObj(i -> "'val_" + currentRow + "_" + i + "'") + .collect(Collectors.toList()); + values.add(0, String.valueOf(currentRow)); + + String insertSql = "INSERT INTO " + tableName + " VALUES (" + + String.join(", ", values) + ")"; + assertUpdate(session, insertSql, 1); + } + + // Verify all rows exist + assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (3)"); + + // Update table to format version 2 and create equality delete files + Table icebergTable = updateTable(tableName); + + // Create equality delete using ALL columns + Map deleteRow = new HashMap<>(); + deleteRow.put("id", 2L); + for (int i = 0; i < maxColumns; i++) { + deleteRow.put("col_" + i, "val_2_" + i); + } + + // Write equality delete with ALL columns + writeEqualityDeleteToNationTable(icebergTable, deleteRow); + + // Query should work correctly regardless of optimization + assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (2)"); + assertQuery(session, "SELECT id FROM " + tableName + " ORDER BY id", "VALUES (1), (3)"); + + // With <= max columns, query plan should use JOIN (optimization enabled) + assertPlan(session, "SELECT * FROM " + tableName, + anyTree( + node(JoinNode.class, + anyTree(tableScan(tableName)), + anyTree(tableScan(tableName))))); + } + finally { + dropTable(session, tableName); + } + } + + @Test + public void testEqualityDeleteAsJoinWithMaximumFieldsLimitOverLimit() + throws Exception + { + int maxColumns = 10; + String tableName = "test_eq_delete_max_cols_" + randomTableSuffix(); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, "true") + .setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, "" + maxColumns) + .build(); + + try { + // Test with max columns - optimization should be disabled to prevent stack overflow + // Create table with specified number of columns + List columnDefinitions = IntStream.range(0, maxColumns) + .mapToObj(i -> "col_" + i + " varchar") + .collect(Collectors.toList()); + columnDefinitions.add(0, "id bigint"); + + String createTableSql = "CREATE TABLE " + tableName + " (" + + String.join(", ", columnDefinitions) + ")"; + assertUpdate(session, createTableSql); + + // Insert test rows + for (int row = 1; row <= 3; row++) { + final int currentRow = row; + List values = IntStream.range(0, maxColumns) + .mapToObj(i -> "'val_" + currentRow + "_" + i + "'") + .collect(Collectors.toList()); + values.add(0, String.valueOf(currentRow)); + + String insertSql = "INSERT INTO " + tableName + " VALUES (" + + String.join(", ", values) + ")"; + assertUpdate(session, insertSql, 1); + } + + // Verify all rows exist + assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (3)"); + + // Update table to format version 2 and create equality delete files + Table icebergTable = updateTable(tableName); + + // Create equality delete using ALL columns + Map deleteRow = new HashMap<>(); + deleteRow.put("id", 2L); + for (int i = 0; i < maxColumns; i++) { + deleteRow.put("col_" + i, "val_2_" + i); + } + + // Write equality delete with ALL columns + writeEqualityDeleteToNationTable(icebergTable, deleteRow); + + // Query should work correctly regardless of optimization + assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (2)"); + assertQuery(session, "SELECT id FROM " + tableName + " ORDER BY id", "VALUES (1), (3)"); + + // With > max columns, optimization is disabled - no JOIN in plan + // Verify the query works but doesn't contain a join node + assertQuery(session, "SELECT * FROM " + tableName + " WHERE id = 1", + "VALUES (" + Stream.concat(Stream.of("1"), + IntStream.range(0, maxColumns).mapToObj(i -> "'val_1_" + i + "'")) + .collect(Collectors.joining(", ")) + ")"); + + // To verify no join is present, we can check that the plan only contains table scan + assertPlan(session, "SELECT * FROM " + tableName, + anyTree( + anyNot(JoinNode.class, + tableScan(tableName)))); + } + finally { + dropTable(session, tableName); + } + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 43f06897ad3c6..7ba659745c45e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -60,6 +60,7 @@ public void testDefaults() .setMergeOnReadModeEnabled(true) .setPushdownFilterEnabled(false) .setDeleteAsJoinRewriteEnabled(true) + .setDeleteAsJoinRewriteMaxDeleteColumns(400) .setRowsForMetadataOptimizationThreshold(1000) .setManifestCachingEnabled(true) .setFileIOImpl(HadoopFileIO.class.getName()) @@ -93,7 +94,8 @@ public void testExplicitPropertyMappings() .put("iceberg.statistic-snapshot-record-difference-weight", "1.0") .put("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name() + "," + TOTAL_SIZE_IN_BYTES.name()) .put("iceberg.pushdown-filter-enabled", "true") - .put("iceberg.delete-as-join-rewrite-enabled", "false") + .put("deprecated.iceberg.delete-as-join-rewrite-enabled", "false") + .put("iceberg.delete-as-join-rewrite-max-delete-columns", "1") .put("iceberg.rows-for-metadata-optimization-threshold", "500") .put("iceberg.io.manifest.cache-enabled", "false") .put("iceberg.io-impl", "com.facebook.presto.iceberg.HdfsFileIO") @@ -125,6 +127,7 @@ public void testExplicitPropertyMappings() .setHiveStatisticsMergeFlags("NUMBER_OF_DISTINCT_VALUES,TOTAL_SIZE_IN_BYTES") .setPushdownFilterEnabled(true) .setDeleteAsJoinRewriteEnabled(false) + .setDeleteAsJoinRewriteMaxDeleteColumns(1) .setRowsForMetadataOptimizationThreshold(500) .setManifestCachingEnabled(false) .setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO")