diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 4d071a205089b..e44da20d00404 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -111,6 +111,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; @@ -188,6 +189,7 @@ import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; +import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS; import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite; @@ -357,6 +359,43 @@ public Optional getProcedureContext() return this.procedureContext; } + /** + * Validates that an Iceberg table does not use unsupported v3 features. + * TODO: Remove when Iceberg v3 is fully supported + */ + protected static void validateTableForPresto(BaseTable table, Optional tableSnapshotId) + { + Snapshot snapshot = tableSnapshotId + .map(table::snapshot) + .orElse(table.currentSnapshot()); + if (snapshot == null) { + // empty table, nothing to validate + return; + } + + TableMetadata metadata = table.operations().current(); + if (metadata.formatVersion() < 3) { + return; + } + + Schema schema = metadata.schemasById().get(snapshot.schemaId()); + if (schema == null) { + schema = metadata.schema(); + } + + // Reject schema default values (initial-default / write-default) + for (Types.NestedField field : schema.columns()) { + if (field.initialDefault() != null || field.writeDefault() != null) { + throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported"); + } + } + + // Reject Iceberg table encryption + if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) { + throw new PrestoException(NOT_SUPPORTED, "Iceberg table encryption is not supported"); + } + } + /** * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker */ @@ -836,10 +875,17 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE || - !Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) - .map(mode -> mode.equals(MERGE_ON_READ.modeName())) - .orElse(false)) { + if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { + throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION, + "Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); + } + if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { + throw new PrestoException(NOT_SUPPORTED, + format("Iceberg table updates for format version %s are not supported yet", formatVersion)); + } + if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) + .map(mode -> mode.equals(MERGE_ON_READ.modeName())) + .orElse(false)) { throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION, "Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); } @@ -1134,6 +1180,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto verify(table.getIcebergTableName().getTableType() == DATA, "only the data table can have data inserted"); Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); validateTableMode(session, icebergTable); + BaseTable baseTable = (BaseTable) icebergTable; + validateTableForPresto(baseTable, Optional.ofNullable(baseTable.currentSnapshot()).map(Snapshot::snapshotId)); return beginIcebergTableInsert(session, table, icebergTable); } @@ -1324,6 +1372,10 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE)); } + if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { + throw new PrestoException(NOT_SUPPORTED, + format("Iceberg table updates for format version %s are not supported yet", formatVersion)); + } if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) { throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions."); } @@ -1580,11 +1632,17 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE || - !Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) - .map(mode -> mode.equals(MERGE_ON_READ.modeName())) - .orElse(false)) { - throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); + if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { + throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2"); + } + if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { + throw new PrestoException(NOT_SUPPORTED, + format("Iceberg table updates for format version %s are not supported yet", formatVersion)); + } + if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) + .map(mode -> mode.equals(MERGE_ON_READ.modeName())) + .orElse(false)) { + throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require update mode to be merge-on-read"); } validateTableMode(session, icebergTable); transaction = icebergTable.newTransaction(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index 98ee9f2693450..9d4cd1a615636 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -18,6 +18,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; @@ -46,6 +47,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.limit; import static java.util.Objects.requireNonNull; @@ -124,6 +126,13 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) PartitionSpec spec = task.spec(); Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); + // Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported) + for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) { + if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) { + throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported"); + } + } + // TODO: We should leverage residual expression and convert that to TupleDomain. // The predicate here is used by readers for predicate push down at reader level, // so when we do not use residual expression, we are just wasting CPU cycles diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 6ec40b607a022..3dc9cbb699807 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -215,6 +215,8 @@ public final class IcebergUtil { private static final Logger log = Logger.get(IcebergUtil.class); public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2; + public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 2; + public static final int MAX_SUPPORTED_FORMAT_VERSION = 3; public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L; public static final long DOUBLE_POSITIVE_INFINITE = 0x7ff0000000000000L; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 17a72c8bf03ec..1be59757168cd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.slice.Slice; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; @@ -69,6 +70,7 @@ import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; +import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; @@ -124,6 +126,16 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec Table icebergTable = procedureContext.getTable(); IcebergTableHandle tableHandle = layoutHandle.getTable(); + // Validate format version for OPTIMIZE operation + int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); + if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { + throw new PrestoException(NOT_SUPPORTED, + format("OPTIMIZE is not supported for Iceberg table format version > %d. Table %s format version is %s.", + MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS, + icebergTable.name(), + formatVersion)); + } + SortOrder sortOrder = icebergTable.sortOrder(); List sortFieldStrings = ImmutableList.of(); if (sortOrderIndex.isPresent()) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java new file mode 100644 index 0000000000000..f08d407203413 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java @@ -0,0 +1,239 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.Map; +import java.util.OptionalInt; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestIcebergV3 + extends AbstractTestQueryFramework +{ + private static final String TEST_SCHEMA = "tpch"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setFormat(PARQUET) + .setNodeCount(OptionalInt.of(1)) + .setCreateTpchTables(false) + .setAddJmxPlugin(false) + .build().getQueryRunner(); + } + + private void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testCreateV3Table() + { + String tableName = "test_create_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3')"); + Table table = loadTable(tableName); + assertEquals(((BaseTable) table).operations().current().formatVersion(), 3); + assertQuery("SELECT * FROM " + tableName, "SELECT * WHERE false"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testUpgradeV2ToV3() + { + String tableName = "test_upgrade_v2_to_v3"; + try { + // Create v2 table + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '2')"); + Table table = loadTable(tableName); + assertEquals(((BaseTable) table).operations().current().formatVersion(), 2); + + // Upgrade to v3 + BaseTable baseTable = (BaseTable) table; + TableOperations operations = baseTable.operations(); + TableMetadata currentMetadata = operations.current(); + operations.commit(currentMetadata, currentMetadata.upgradeToFormatVersion(3)); + + // Verify the upgrade + table = loadTable(tableName); + assertEquals(((BaseTable) table).operations().current().formatVersion(), 3); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testInsertIntoV3Table() + { + String tableName = "test_insert_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one'), (2, 'two')", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'one'), (2, 'two')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 'three')", 1); + assertQuery("SELECT count(*) FROM " + tableName, "SELECT 3"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testDeleteOnV3TableNotSupported() + { + String tableName = "test_delete_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3', \"write.delete.mode\" = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one'), (2, 'two')", 2); + assertThatThrownBy(() -> getQueryRunner().execute("DELETE FROM " + tableName + " WHERE id = 1")) + .hasMessageContaining("Iceberg table updates for format version 3 are not supported yet"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testUpdateOnV3TableNotSupported() + { + String tableName = "test_update_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3', \"write.update.mode\" = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one'), (2, 'two')", 2); + assertThatThrownBy(() -> getQueryRunner().execute("UPDATE " + tableName + " SET value = 'updated' WHERE id = 1")) + .hasMessageContaining("Iceberg table updates for format version 3 are not supported yet"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testMergeOnV3TableNotSupported() + { + String tableName = "test_merge_v3_table"; + String sourceTable = "test_merge_v3_source"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3', \"write.update.mode\" = 'merge-on-read')"); + assertUpdate("CREATE TABLE " + sourceTable + " (id integer, value varchar)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one')", 1); + assertUpdate("INSERT INTO " + sourceTable + " VALUES (1, 'updated')", 1); + assertThatThrownBy(() -> getQueryRunner().execute( + "MERGE INTO " + tableName + " t USING " + sourceTable + " s ON t.id = s.id WHEN MATCHED THEN UPDATE SET value = s.value")) + .hasMessageContaining("Iceberg table updates for format version 3 are not supported yet"); + } + finally { + dropTable(tableName); + dropTable(sourceTable); + } + } + + @Test + public void testOptimizeOnV3TableNotSupported() + { + String tableName = "test_optimize_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'two')", 1); + assertThatThrownBy(() -> getQueryRunner().execute(format("CALL system.rewrite_data_files('%s', '%s')", TEST_SCHEMA, tableName))) + .hasMessageContaining("OPTIMIZE is not supported for Iceberg table format version > 2"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testSelectFromV3TableAfterInsert() + { + String tableName = "test_select_v3_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, name varchar, price decimal(10,2)) WITH (\"format-version\" = '3')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'apple', 1.50), (2, 'banana', 0.75), (3, 'cherry', 2.00)", 3); + assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES (1, 'apple', 1.50), (2, 'banana', 0.75), (3, 'cherry', 2.00)"); + assertQuery("SELECT count(*) FROM " + tableName, "SELECT 3"); + assertQuery("SELECT sum(price) FROM " + tableName, "SELECT 4.25"); + assertQuery("SELECT name FROM " + tableName + " WHERE price > 1.00 ORDER BY name", "VALUES ('apple'), ('cherry')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testV3TableWithPartitioning() + { + String tableName = "test_v3_partitioned_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, category varchar, value integer) " + + "WITH (\"format-version\" = '3', partitioning = ARRAY['category'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'A', 100), (2, 'B', 200), (3, 'A', 150)", 3); + assertQuery("SELECT * FROM " + tableName + " WHERE category = 'A' ORDER BY id", "VALUES (1, 'A', 100), (3, 'A', 150)"); + assertQuery("SELECT category, sum(value) FROM " + tableName + " GROUP BY category ORDER BY category", "VALUES ('A', 250), ('B', 200)"); + } + finally { + dropTable(tableName); + } + } + + private Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } + + private File getCatalogDirectory() + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false); + return catalogDirectory.toFile(); + } +}