diff --git a/api/src/main/java/org/apache/iceberg/Schema.java b/api/src/main/java/org/apache/iceberg/Schema.java index 1b80a2deed5d..c02e17416235 100644 --- a/api/src/main/java/org/apache/iceberg/Schema.java +++ b/api/src/main/java/org/apache/iceberg/Schema.java @@ -209,6 +209,16 @@ public Set identifierFieldIds() { return lazyIdentifierFieldIdSet(); } + /** + * Returns the set of identifier field names. + */ + public Set identifierFieldNames() { + return identifierFieldIds() + .stream() + .map(id -> findField(id).name()) + .collect(Collectors.toSet()); + } + /** * Returns the {@link Type} of a sub-field identified by the field name. * diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index a6b8b62735f8..11c66f946e20 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -151,10 +152,8 @@ public static Type assignFreshIds(Type type, NextID nextId) { * @return a structurally identical schema with new ids assigned by the nextId function */ public static Schema assignFreshIds(Schema schema, NextID nextId) { - return new Schema(TypeUtil - .visit(schema.asStruct(), new AssignFreshIds(nextId)) - .asNestedType() - .fields()); + Types.StructType struct = TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); } /** @@ -166,10 +165,8 @@ public static Schema assignFreshIds(Schema schema, NextID nextId) { * @return a structurally identical schema with new ids assigned by the nextId function */ public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId) { - return new Schema(schemaId, TypeUtil - .visit(schema.asStruct(), new AssignFreshIds(nextId)) - .asNestedType() - .fields()); + Types.StructType struct = TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType(); + return new Schema(schemaId, struct.fields(), refreshIdentifierFields(struct, schema)); } /** @@ -181,10 +178,24 @@ public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId) * @return a structurally identical schema with new ids assigned by the nextId function */ public static Schema assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) { - return new Schema(TypeUtil + Types.StructType struct = TypeUtil .visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId)) - .asNestedType() - .fields()); + .asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); + } + + /** + * Get the identifier fields in the fresh schema based on the identifier fields in the base schema. + * @param freshSchema fresh schema + * @param baseSchema base schema + * @return identifier fields in the fresh schema + */ + public static Set refreshIdentifierFields(Types.StructType freshSchema, Schema baseSchema) { + Map nameToId = TypeUtil.indexByName(freshSchema); + Set identifierFieldNames = baseSchema.identifierFieldNames(); + identifierFieldNames.forEach(name -> Preconditions.checkArgument(nameToId.containsKey(name), + "Cannot find ID for identifier field %s in schema %s", name, freshSchema)); + return identifierFieldNames.stream().map(nameToId::get).collect(Collectors.toSet()); } /** @@ -213,7 +224,7 @@ public static Schema assignIncreasingFreshIds(Schema schema) { */ public static Schema reassignIds(Schema schema, Schema idSourceSchema) { Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType(); - return new Schema(struct.fields()); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); } public static Type find(Schema schema, Predicate predicate) { diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index 400354ed95f1..c11c859edacf 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -21,6 +21,8 @@ package org.apache.iceberg.types; import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -42,6 +44,65 @@ public void testReassignIdsDuplicateColumns() { Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct()); } + @Test + public void testReassignIdsWithIdentifier() { + Schema schema = new Schema( + Lists.newArrayList( + required(0, "a", Types.IntegerType.get()), + required(1, "A", Types.IntegerType.get())), + Sets.newHashSet(0) + ); + Schema sourceSchema = new Schema( + Lists.newArrayList( + required(1, "a", Types.IntegerType.get()), + required(2, "A", Types.IntegerType.get())), + Sets.newHashSet(1) + ); + final Schema actualSchema = TypeUtil.reassignIds(schema, sourceSchema); + Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct()); + Assert.assertEquals("identifier field ID should change based on source schema", + sourceSchema.identifierFieldIds(), actualSchema.identifierFieldIds()); + } + + @Test + public void testAssignIncreasingFreshIdWithIdentifier() { + Schema schema = new Schema( + Lists.newArrayList( + required(10, "a", Types.IntegerType.get()), + required(11, "A", Types.IntegerType.get())), + Sets.newHashSet(10) + ); + Schema expectedSchema = new Schema( + Lists.newArrayList( + required(1, "a", Types.IntegerType.get()), + required(2, "A", Types.IntegerType.get())), + Sets.newHashSet(1) + ); + final Schema actualSchema = TypeUtil.assignIncreasingFreshIds(schema); + Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct()); + Assert.assertEquals("identifier field ID should change based on source schema", + expectedSchema.identifierFieldIds(), actualSchema.identifierFieldIds()); + } + + @Test + public void testAssignIncreasingFreshIdNewIdentifier() { + Schema schema = new Schema( + Lists.newArrayList( + required(10, "a", Types.IntegerType.get()), + required(11, "A", Types.IntegerType.get())), + Sets.newHashSet(10) + ); + Schema sourceSchema = new Schema( + Lists.newArrayList( + required(1, "a", Types.IntegerType.get()), + required(2, "A", Types.IntegerType.get())) + ); + final Schema actualSchema = TypeUtil.reassignIds(schema, sourceSchema); + Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct()); + Assert.assertEquals("source schema missing identifier should not impact refreshing new identifier", + Sets.newHashSet(sourceSchema.findField("a").fieldId()), actualSchema.identifierFieldIds()); + } + @Test(expected = IllegalArgumentException.class) public void testReassignIdsIllegalArgumentException() { Schema schema = new Schema( diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index b387a6b83863..1b29e4c0756b 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -85,9 +84,7 @@ private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, int this.schema = schema; this.lastColumnId = lastColumnId; this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct())); - this.identifierFieldNames = schema.identifierFieldIds().stream() - .map(id -> schema.findField(id).name()) - .collect(Collectors.toSet()); + this.identifierFieldNames = schema.identifierFieldNames(); } @Override diff --git a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java index 872e7cc9c47d..a693acfce79c 100644 --- a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java @@ -22,14 +22,18 @@ import java.io.File; import java.io.IOException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static org.apache.iceberg.PartitionSpec.unpartitioned; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; @RunWith(Parameterized.class) public class TestCreateTransaction extends TableTestBase { @@ -69,6 +73,48 @@ public void testCreateTransaction() throws IOException { Assert.assertEquals("Table should not have any snapshots", 0, meta.snapshots().size()); } + @Test + public void testCreateTransactionAndUpdateSchema() throws IOException { + File tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); + + Transaction txn = TestTables.beginCreate(tableDir, "test_create", SCHEMA, unpartitioned()); + + Assert.assertNull("Starting a create transaction should not commit metadata", + TestTables.readMetadata("test_create")); + Assert.assertNull("Should have no metadata version", + TestTables.metadataVersion("test_create")); + + txn.updateSchema() + .addColumn("col", Types.StringType.get()) + .setIdentifierFields("id", "col") + .commit(); + + txn.commitTransaction(); + + TableMetadata meta = TestTables.readMetadata("test_create"); + Assert.assertNotNull("Table metadata should be created after transaction commits", meta); + Assert.assertEquals("Should have metadata version 0", + 0, (int) TestTables.metadataVersion("test_create")); + Assert.assertEquals("Should have 0 manifest files", + 0, listManifestFiles(tableDir).size()); + + Schema resultSchema = new Schema( + Lists.newArrayList( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "col", Types.StringType.get())), + Sets.newHashSet(1, 3) + ); + + Assert.assertEquals("Table schema should match with reassigned IDs", + resultSchema.asStruct(), meta.schema().asStruct()); + Assert.assertEquals("Table schema identifier should match", + resultSchema.identifierFieldIds(), meta.schema().identifierFieldIds()); + Assert.assertEquals("Table spec should match", unpartitioned(), meta.spec()); + Assert.assertEquals("Table should not have any snapshots", 0, meta.snapshots().size()); + } + @Test public void testCreateAndAppendWithTransaction() throws IOException { File tableDir = temp.newFolder(); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index 25200bef88c9..782eb7b1aef4 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -158,6 +158,32 @@ public void testSchemaUpdateComplexType() throws Exception { Assert.assertEquals("Should contain 0 Avro manifest files", 0, manifests.size()); } + @Test + public void testSchemaUpdateIdentifierFields() throws Exception { + Assert.assertTrue("Should create v1 metadata", + version(1).exists() && version(1).isFile()); + Assert.assertFalse("Should not create v2 or newer versions", + version(2).exists()); + + Schema updatedSchema = new Schema(Lists.newArrayList( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ), Sets.newHashSet(1)); + + table.updateSchema() + .setIdentifierFields("id") + .commit(); + + Assert.assertTrue("Should create v2 for the update", + version(2).exists() && version(2).isFile()); + Assert.assertEquals("Should write the current version to the hint file", + 2, readVersionHint()); + Assert.assertEquals("Table schema should match schema with reassigned ids", + updatedSchema.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Identifier fields should match schema with reassigned ids", + updatedSchema.identifierFieldIds(), table.schema().identifierFieldIds()); + } + @Test public void testFailedCommit() throws Exception { // apply the change to metadata without committing