diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index e7b22f8f3e80..c04f9932253f 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -238,6 +238,25 @@ otherwise the procedure will fail with similar message: ``Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)``. The default value for this property is ``7d``. +.. _iceberg-alter-table-set-properties: + +ALTER TABLE SET PROPERTIES +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The connector supports modifying the properties on existing tables using +:ref:`ALTER TABLE SET PROPERTIES `. + +The following table properties can be updated after a table is created: + +* ``format`` +* ``format_version`` + +For example, to update a table from v1 of the Iceberg specification to v2: + +.. code-block:: sql + + ALTER TABLE table_name SET PROPERTIES format_version = 2; + .. _iceberg-type-mapping: Type mapping diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3be20b50f75c..4bc321262b65 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -104,6 +104,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; @@ -146,6 +147,7 @@ import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; @@ -190,6 +192,7 @@ import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; public class IcebergMetadata @@ -1146,6 +1149,52 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand catalog.renameTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), newTable); } + @Override + public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map> properties) + { + IcebergTableHandle table = (IcebergTableHandle) tableHandle; + BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName()); + + transaction = icebergTable.newTransaction(); + UpdateProperties updateProperties = transaction.updateProperties(); + + for (Map.Entry> propertyEntry : properties.entrySet()) { + String trinoPropertyName = propertyEntry.getKey(); + Optional propertyValue = propertyEntry.getValue(); + + switch (trinoPropertyName) { + case FILE_FORMAT_PROPERTY: + updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg()); + break; + case FORMAT_VERSION_PROPERTY: + // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version + updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion)); + break; + default: + // TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174 + throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported"); + } + } + + try { + updateProperties.commit(); + transaction.commitTransaction(); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e); + } + } + + private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional value, Function toIcebergString) + { + if (value.isPresent()) { + updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get())); + } + else { + updateProperties.remove(icebergPropertyName); + } + } + @Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 2621040007ce..698b1c88ecc4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -94,7 +94,6 @@ import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; -import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE; import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; @@ -3439,6 +3438,35 @@ public void testIfDeletesReturnsNumberOfRemovedRows() assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2); } + @Test + public void testUpdatingFileFormat() + { + String tableName = "test_updating_file_format_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " WITH (format = 'orc') AS SELECT * FROM nation WHERE nationkey < 10", "SELECT count(*) FROM nation WHERE nationkey < 10"); + assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'ORC'"); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format = 'parquet'"); + assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'PARQUET'"); + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", "SELECT count(*) FROM nation WHERE nationkey >= 10"); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.orc'", "VALUES 1"); + assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.parquet'", "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testUpdatingInvalidTableProperty() + { + String tableName = "test_updating_invalid_table_property_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (a INT, b INT)"); + assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES not_a_valid_table_property = 'a value'")) + .hasMessage("Catalog 'iceberg' table property 'not_a_valid_table_property' does not exist"); + assertUpdate("DROP TABLE " + tableName); + } + private Session prepareCleanUpSession() { return Session.builder(getSession()) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index d23f53814b58..920f0d2c988e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -64,6 +64,8 @@ import static io.trino.testing.sql.TestTable.randomTableSuffix; import static io.trino.tpch.TpchTable.NATION; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; public class TestIcebergV2 extends AbstractTestQueryFramework @@ -172,6 +174,39 @@ public void testV2TableWithEqualityDelete() assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1"); } + @Test + public void testUpgradeTableToV2FromTrino() + { + String tableName = "test_upgrade_table_to_v2_from_trino_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25); + assertEquals(loadTable(tableName).operations().current().formatVersion(), 1); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"); + assertEquals(loadTable(tableName).operations().current().formatVersion(), 2); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + } + + @Test + public void testDowngradingV2TableToV1Fails() + { + String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); + assertEquals(loadTable(tableName).operations().current().formatVersion(), 2); + assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1")) + .hasMessage("Failed to commit new table properties") + .getRootCause() + .hasMessage("Cannot downgrade v2 table to v1"); + } + + @Test + public void testUpgradingToInvalidVersionFails() + { + String tableName = "test_upgrading_to_invalid_version_fails_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); + assertEquals(loadTable(tableName).operations().current().formatVersion(), 2); + assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42")) + .hasMessage("Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2"); + } + private void writeEqualityDeleteToNationTable(Table icebergTable) throws Exception {