Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -57,31 +58,30 @@ public class TableMetadata implements Serializable {

private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);

/**
* @deprecated will be removed in 0.9.0; use newTableMetadata(Schema, PartitionSpec, String, Map) instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing this!

*/
@Deprecated
public static TableMetadata newTableMetadata(TableOperations ops,
Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
int formatVersion = PropertyUtil.propertyAsInt(properties, TableProperties.FORMAT_VERSION,
DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
SortOrder sortOrder = SortOrder.unsorted();
int formatVersion = PropertyUtil.propertyAsInt(properties, TableProperties.FORMAT_VERSION,
DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
}

private static Map<String, String> unreservedProperties(Map<String, String> rawProperties) {
return rawProperties.entrySet().stream()
.filter(e -> !TableProperties.RESERVED_PROPERTIES.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

static TableMetadata newTableMetadata(Schema schema,
Expand All @@ -90,6 +90,9 @@ static TableMetadata newTableMetadata(Schema schema,
String location,
Map<String, String> properties,
int formatVersion) {
Preconditions.checkArgument(properties.keySet().stream().noneMatch(TableProperties.RESERVED_PROPERTIES::contains),
"Table properties should not contain reserved properties, but got %s", properties);

// reassign all column ids to ensure consistency
AtomicInteger lastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(INITIAL_SCHEMA_ID, schema, lastColumnId::incrementAndGet);
Expand Down Expand Up @@ -677,12 +680,20 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata replaceProperties(Map<String, String> newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(null, formatVersion, uuid, location,
public TableMetadata replaceProperties(Map<String, String> rawProperties) {
ValidationException.check(rawProperties != null, "Cannot set properties to null");
Map<String, String> newProperties = unreservedProperties(rawProperties);
TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));

int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);
if (formatVersion != newFormatVersion) {
metadata = metadata.upgradeToFormatVersion(newFormatVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit weird to call another instance method to produce a second TableMetadata here. I'm thinking that maybe the builder pattern would work better for complicated cases... Seems fine for now though.

}

return metadata;
}

public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
Expand Down Expand Up @@ -754,7 +765,10 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update

Map<String, String> newProperties = Maps.newHashMap();
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);
newProperties.putAll(unreservedProperties(updatedProperties));

// check if there is format version override
int newFormatVersion = PropertyUtil.propertyAsInt(updatedProperties, TableProperties.FORMAT_VERSION, formatVersion);

// determine the next schema id
int freshSchemaId = reuseOrCreateNewSchemaId(freshSchema);
Expand All @@ -764,11 +778,17 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds()));
}

return new TableMetadata(null, formatVersion, uuid, newLocation,
TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(),
specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()),
orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));

if (formatVersion != newFormatVersion) {
metadata = metadata.upgradeToFormatVersion(newFormatVersion);
}

return metadata;
}

public TableMetadata updateLocation(String newLocation) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,37 @@

package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;

public class TableProperties {

private TableProperties() {
}

/**
* Reserved table property for table format version.
* <p>
* Iceberg will default a new table's format version to the latest stable and recommended version.
* This reserved property keyword allows users to override the Iceberg format version of the table metadata.
* <p>
* If this table property exists when creating a table, the table will use the specified format version.
* If a table updates this property, it will try to upgrade to the specified format version.
* <p>
* Note: incomplete or unstable versions cannot be selected using this property.
*/
public static final String FORMAT_VERSION = "format-version";

/**
* Reserved Iceberg table properties list.
* <p>
* Reserved table properties are only used to control behaviors when creating or updating a table.
* The value of these properties are not persisted as a part of the table metadata.
*/
public static final Set<String> RESERVED_PROPERTIES = ImmutableSet.of(
FORMAT_VERSION
);

public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries";
public static final int COMMIT_NUM_RETRIES_DEFAULT = 4;

Expand Down
64 changes: 64 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -756,4 +756,68 @@ public void testUpdateSchema() {
Assert.assertEquals("Should return expected last column id",
6, threeSchemaTable.lastColumnId());
}

@Test
public void testCreateV2MetadataThroughTableProperty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add the unit tests in flink/spark/hive module to verify the end-to-end DDL work ? If so, I think we could make it to be a separate PR for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hard to test this in unit test because format version is not publicly accessible. I have tested with Spark on EMR and manually verified that the metadata file shows the correct version.

I think we could access the format version by using the following code:

Table table = ...
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
int formatVersion = meta.formatVersion();

Schema schema = new Schema(
Types.NestedField.required(10, "x", Types.StringType.get())
);

TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null,
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key", "val"));

Assert.assertEquals("format version should be configured based on the format-version key",
2, meta.formatVersion());
Assert.assertEquals("should not contain format-version in properties",
ImmutableMap.of("key", "val"), meta.properties());
}

@Test
public void testReplaceV1MetadataToV2ThroughTableProperty() {
Schema schema = new Schema(
Types.NestedField.required(10, "x", Types.StringType.get())
);

TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null,
ImmutableMap.of(TableProperties.FORMAT_VERSION, "1", "key", "val"));

meta = meta.buildReplacement(meta.schema(), meta.spec(), meta.sortOrder(), meta.location(),
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key2", "val2"));

Assert.assertEquals("format version should be configured based on the format-version key",
2, meta.formatVersion());
Assert.assertEquals("should not contain format-version but should contain old and new properties",
ImmutableMap.of("key", "val", "key2", "val2"), meta.properties());
}

@Test
public void testUpgradeV1MetadataToV2ThroughTableProperty() {
Schema schema = new Schema(
Types.NestedField.required(10, "x", Types.StringType.get())
);

TableMetadata meta = TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null,
ImmutableMap.of(TableProperties.FORMAT_VERSION, "1", "key", "val"));

meta = meta.replaceProperties(ImmutableMap.of(TableProperties.FORMAT_VERSION,
"2", "key2", "val2"));

Assert.assertEquals("format version should be configured based on the format-version key",
2, meta.formatVersion());
Assert.assertEquals("should not contain format-version but should contain new properties",
ImmutableMap.of("key2", "val2"), meta.properties());
}

@Test
public void testNoReservedPropertyForTableMetadataCreation() {
Schema schema = new Schema(
Types.NestedField.required(10, "x", Types.StringType.get())
);

AssertHelpers.assertThrows("should not allow reserved table property when creating table metadata",
IllegalArgumentException.class,
"Table properties should not contain reserved properties, but got {format-version=1}",
() -> TableMetadata.newTableMetadata(schema, PartitionSpec.unpartitioned(), null, "/tmp",
ImmutableMap.of(TableProperties.FORMAT_VERSION, "1"), 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
Expand All @@ -44,6 +45,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -239,6 +241,44 @@ public void testCreatePartitionTable() throws TableNotExistException {
Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys());
}

@Test
public void testCreateTableWithFormatV2ThroughTableProperty() throws Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");

Table table = table("tl");
Assert.assertEquals("should create table using format v2",
2, ((BaseTable) table).operations().current().formatVersion());
}

@Test
public void testUpgradeTableWithFormatV2ThroughTableProperty() throws Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='1')");

Table table = table("tl");
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v1",
1, ops.refresh().formatVersion());

sql("ALTER TABLE tl SET('format-version'='2')");
Assert.assertEquals("should update table to use format v2",
2, ops.refresh().formatVersion());
}

@Test
public void testDowngradeTableToFormatV1ThroughTablePropertyFails() throws Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");

Table table = table("tl");
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v2",
2, ops.refresh().formatVersion());

AssertHelpers.assertThrowsCause("should fail to downgrade to v1",
IllegalArgumentException.class,
"Cannot downgrade v2 table to v1",
() -> sql("ALTER TABLE tl SET('format-version'='1')"));
}

@Test
public void testLoadTransformPartitionTable() throws TableNotExistException {
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,26 @@ public void testCreateTableWithUnpartitionedSpec() {
Assert.assertEquals(SPEC, icebergTable.spec());
}

@Test
public void testCreateTableWithFormatV2ThroughTableProperty() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
// We need the location for HadoopTable based tests only
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
"'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "', " +
"'" + TableProperties.FORMAT_VERSION + "'='" + 2 + "')");

// Check the Iceberg table partition data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals("should create table using format v2",
2, ((BaseTable) icebergTable).operations().current().formatVersion());
}

@Test
public void testDeleteBackingTable() throws TException, IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.io.File;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.spark.SparkCatalogTestBase;
Expand Down Expand Up @@ -221,4 +223,60 @@ public void testCreateTableProperties() {
Assert.assertEquals("Should have property p1", "2", table.properties().get("p1"));
Assert.assertEquals("Should have property p2", "x", table.properties().get("p2"));
}

@Test
public void testCreateTableWithFormatV2ThroughTableProperty() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));

sql("CREATE TABLE %s " +
"(id BIGINT NOT NULL, data STRING) " +
"USING iceberg " +
"TBLPROPERTIES ('format-version'='2')",
tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("should create table using format v2",
2, ((BaseTable) table).operations().current().formatVersion());
}

@Test
public void testUpgradeTableWithFormatV2ThroughTableProperty() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));

sql("CREATE TABLE %s " +
"(id BIGINT NOT NULL, data STRING) " +
"USING iceberg " +
"TBLPROPERTIES ('format-version'='1')",
tableName);

Table table = validationCatalog.loadTable(tableIdent);
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v1",
1, ops.refresh().formatVersion());

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='2')", tableName);
Assert.assertEquals("should update table to use format v2",
2, ops.refresh().formatVersion());
}

@Test
public void testDowngradeTableToFormatV1ThroughTablePropertyFails() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));

sql("CREATE TABLE %s " +
"(id BIGINT NOT NULL, data STRING) " +
"USING iceberg " +
"TBLPROPERTIES ('format-version'='2')",
tableName);

Table table = validationCatalog.loadTable(tableIdent);
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v2",
2, ops.refresh().formatVersion());

AssertHelpers.assertThrowsCause("should fail to downgrade to v1",
IllegalArgumentException.class,
"Cannot downgrade v2 table to v1",
() -> sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='1')", tableName));
}
}