diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5703cc4ad4d1..8041a2f0e5ee 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -57,31 +58,30 @@ public class TableMetadata implements Serializable { private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); - /** - * @deprecated will be removed in 0.9.0; use newTableMetadata(Schema, PartitionSpec, String, Map) instead. - */ - @Deprecated - public static TableMetadata newTableMetadata(TableOperations ops, - Schema schema, - PartitionSpec spec, - String location, - Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); - } - public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, SortOrder sortOrder, String location, Map properties) { - return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION); + int formatVersion = PropertyUtil.propertyAsInt(properties, TableProperties.FORMAT_VERSION, + DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion); } public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + SortOrder sortOrder = SortOrder.unsorted(); + int formatVersion = PropertyUtil.propertyAsInt(properties, TableProperties.FORMAT_VERSION, + DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion); + } + + private static Map unreservedProperties(Map rawProperties) { + return rawProperties.entrySet().stream() + .filter(e -> !TableProperties.RESERVED_PROPERTIES.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } static TableMetadata newTableMetadata(Schema schema, @@ -90,6 +90,9 @@ static TableMetadata newTableMetadata(Schema schema, String location, Map properties, int formatVersion) { + Preconditions.checkArgument(properties.keySet().stream().noneMatch(TableProperties.RESERVED_PROPERTIES::contains), + "Table properties should not contain reserved properties, but got %s", properties); + // reassign all column ids to ensure consistency AtomicInteger lastColumnId = new AtomicInteger(0); Schema freshSchema = TypeUtil.assignFreshIds(INITIAL_SCHEMA_ID, schema, lastColumnId::incrementAndGet); @@ -677,12 +680,20 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } - public TableMetadata replaceProperties(Map newProperties) { - ValidationException.check(newProperties != null, "Cannot set properties to null"); - return new TableMetadata(null, formatVersion, uuid, location, + public TableMetadata replaceProperties(Map rawProperties) { + ValidationException.check(rawProperties != null, "Cannot set properties to null"); + Map newProperties = unreservedProperties(rawProperties); + TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties)); + + int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); + if (formatVersion != newFormatVersion) { + metadata = metadata.upgradeToFormatVersion(newFormatVersion); + } + + return metadata; } public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { @@ -754,7 +765,10 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update Map newProperties = Maps.newHashMap(); newProperties.putAll(this.properties); - newProperties.putAll(updatedProperties); + newProperties.putAll(unreservedProperties(updatedProperties)); + + // check if there is format version override + int newFormatVersion = PropertyUtil.propertyAsInt(updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); // determine the next schema id int freshSchemaId = reuseOrCreateNewSchemaId(freshSchema); @@ -764,11 +778,17 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds())); } - return new TableMetadata(null, formatVersion, uuid, newLocation, + TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation, lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(), specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()), orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); + + if (formatVersion != newFormatVersion) { + metadata = metadata.upgradeToFormatVersion(newFormatVersion); + } + + return metadata; } public TableMetadata updateLocation(String newLocation) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index ce965a384852..fe19a2a850f0 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -19,11 +19,37 @@ package org.apache.iceberg; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + public class TableProperties { private TableProperties() { } + /** + * Reserved table property for table format version. + *

+ * Iceberg will default a new table's format version to the latest stable and recommended version. + * This reserved property keyword allows users to override the Iceberg format version of the table metadata. + *

+ * If this table property exists when creating a table, the table will use the specified format version. + * If a table updates this property, it will try to upgrade to the specified format version. + *

+ * Note: incomplete or unstable versions cannot be selected using this property. + */ + public static final String FORMAT_VERSION = "format-version"; + + /** + * Reserved Iceberg table properties list. + *

+ * Reserved table properties are only used to control behaviors when creating or updating a table. + * The value of these properties are not persisted as a part of the table metadata. + */ + public static final Set RESERVED_PROPERTIES = ImmutableSet.of( + FORMAT_VERSION + ); + public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries"; public static final int COMMIT_NUM_RETRIES_DEFAULT = 4; diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index d3cebec1f328..91aad8a00b75 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -756,4 +756,68 @@ public void testUpdateSchema() { Assert.assertEquals("Should return expected last column id", 6, threeSchemaTable.lastColumnId()); } + + @Test + public void testCreateV2MetadataThroughTableProperty() { + Schema schema = new Schema( + Types.NestedField.required(10, "x", Types.StringType.get()) + ); + + TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null, + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key", "val")); + + Assert.assertEquals("format version should be configured based on the format-version key", + 2, meta.formatVersion()); + Assert.assertEquals("should not contain format-version in properties", + ImmutableMap.of("key", "val"), meta.properties()); + } + + @Test + public void testReplaceV1MetadataToV2ThroughTableProperty() { + Schema schema = new Schema( + Types.NestedField.required(10, "x", Types.StringType.get()) + ); + + TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null, + ImmutableMap.of(TableProperties.FORMAT_VERSION, "1", "key", "val")); + + meta = meta.buildReplacement(meta.schema(), meta.spec(), meta.sortOrder(), meta.location(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key2", "val2")); + + Assert.assertEquals("format version should be configured based on the format-version key", + 2, meta.formatVersion()); + Assert.assertEquals("should not contain format-version but should contain old and new properties", + ImmutableMap.of("key", "val", "key2", "val2"), meta.properties()); + } + + @Test + public void testUpgradeV1MetadataToV2ThroughTableProperty() { + Schema schema = new Schema( + Types.NestedField.required(10, "x", Types.StringType.get()) + ); + + TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null, + ImmutableMap.of(TableProperties.FORMAT_VERSION, "1", "key", "val")); + + meta = meta.replaceProperties(ImmutableMap.of(TableProperties.FORMAT_VERSION, + "2", "key2", "val2")); + + Assert.assertEquals("format version should be configured based on the format-version key", + 2, meta.formatVersion()); + Assert.assertEquals("should not contain format-version but should contain new properties", + ImmutableMap.of("key2", "val2"), meta.properties()); + } + + @Test + public void testNoReservedPropertyForTableMetadataCreation() { + Schema schema = new Schema( + Types.NestedField.required(10, "x", Types.StringType.get()) + ); + + AssertHelpers.assertThrows("should not allow reserved table property when creating table metadata", + IllegalArgumentException.class, + "Table properties should not contain reserved properties, but got {format-version=1}", + () -> TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null, "/tmp", + ImmutableMap.of(TableProperties.FORMAT_VERSION, "1"), 1)); + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 7c691bd4cdad..20e8721eb5de 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -44,6 +45,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -239,6 +241,44 @@ public void testCreatePartitionTable() throws TableNotExistException { Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); } + @Test + public void testCreateTableWithFormatV2ThroughTableProperty() throws Exception { + sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')"); + + Table table = table("tl"); + Assert.assertEquals("should create table using format v2", + 2, ((BaseTable) table).operations().current().formatVersion()); + } + + @Test + public void testUpgradeTableWithFormatV2ThroughTableProperty() throws Exception { + sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='1')"); + + Table table = table("tl"); + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("should create table using format v1", + 1, ops.refresh().formatVersion()); + + sql("ALTER TABLE tl SET('format-version'='2')"); + Assert.assertEquals("should update table to use format v2", + 2, ops.refresh().formatVersion()); + } + + @Test + public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws Exception { + sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')"); + + Table table = table("tl"); + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("should create table using format v2", + 2, ops.refresh().formatVersion()); + + AssertHelpers.assertThrowsCause("should fail to downgrade to v1", + IllegalArgumentException.class, + "Cannot downgrade v2 table to v1", + () -> sql("ALTER TABLE tl SET('format-version'='1')")); + } + @Test public void testLoadTransformPartitionTable() throws TableNotExistException { Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 8febaf293974..9ba73a3d4cd2 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -273,6 +273,26 @@ public void testCreateTableWithUnpartitionedSpec() { Assert.assertEquals(SPEC, icebergTable.spec()); } + @Test + public void testCreateTableWithFormatV2ThroughTableProperty() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + // We need the location for HadoopTable based tests only + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " + + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "', " + + "'" + TableProperties.FORMAT_VERSION + "'='" + 2 + "')"); + + // Check the Iceberg table partition data + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals("should create table using format v2", + 2, ((BaseTable) icebergTable).operations().current().formatVersion()); + } + @Test public void testDeleteBackingTable() throws TException, IOException, InterruptedException { TableIdentifier identifier = TableIdentifier.of("default", "customers"); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java index 9d8711c24403..303cbb5f932b 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -22,9 +22,11 @@ import java.io.File; import java.util.Map; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.spark.SparkCatalogTestBase; @@ -221,4 +223,60 @@ public void testCreateTableProperties() { Assert.assertEquals("Should have property p1", "2", table.properties().get("p1")); Assert.assertEquals("Should have property p2", "x", table.properties().get("p2")); } + + @Test + public void testCreateTableWithFormatV2ThroughTableProperty() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "TBLPROPERTIES ('format-version'='2')", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals("should create table using format v2", + 2, ((BaseTable) table).operations().current().formatVersion()); + } + + @Test + public void testUpgradeTableWithFormatV2ThroughTableProperty() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "TBLPROPERTIES ('format-version'='1')", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("should create table using format v1", + 1, ops.refresh().formatVersion()); + + sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='2')", tableName); + Assert.assertEquals("should update table to use format v2", + 2, ops.refresh().formatVersion()); + } + + @Test + public void testDowngradeTableToFormatV1ThroughTablePropertyFails() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "TBLPROPERTIES ('format-version'='2')", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("should create table using format v2", + 2, ops.refresh().formatVersion()); + + AssertHelpers.assertThrowsCause("should fail to downgrade to v1", + IllegalArgumentException.class, + "Cannot downgrade v2 table to v1", + () -> sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='1')", tableName)); + } }