Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ public Set<Integer> identifierFieldIds() {
return lazyIdentifierFieldIdSet();
}

/**
* Returns the set of identifier field names.
*/
public Set<String> identifierFieldNames() {
return identifierFieldIds()
.stream()
.map(id -> findField(id).name())
.collect(Collectors.toSet());
}

/**
* Returns the {@link Type} of a sub-field identified by the field name.
*
Expand Down
35 changes: 23 additions & 12 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -151,10 +152,8 @@ public static Type assignFreshIds(Type type, NextID nextId) {
* @return a structurally identical schema with new ids assigned by the nextId function
*/
public static Schema assignFreshIds(Schema schema, NextID nextId) {
return new Schema(TypeUtil
.visit(schema.asStruct(), new AssignFreshIds(nextId))
.asNestedType()
.fields());
Types.StructType struct = TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType();
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
}

/**
Expand All @@ -166,10 +165,8 @@ public static Schema assignFreshIds(Schema schema, NextID nextId) {
* @return a structurally identical schema with new ids assigned by the nextId function
*/
public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId) {
return new Schema(schemaId, TypeUtil
.visit(schema.asStruct(), new AssignFreshIds(nextId))
.asNestedType()
.fields());
Types.StructType struct = TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType();
return new Schema(schemaId, struct.fields(), refreshIdentifierFields(struct, schema));
}

/**
Expand All @@ -181,10 +178,24 @@ public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId)
* @return a structurally identical schema with new ids assigned by the nextId function
*/
public static Schema assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) {
return new Schema(TypeUtil
Types.StructType struct = TypeUtil
.visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId))
.asNestedType()
.fields());
.asStructType();
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
}

/**
* Get the identifier fields in the fresh schema based on the identifier fields in the base schema.
* @param freshSchema fresh schema
* @param baseSchema base schema
* @return identifier fields in the fresh schema
*/
public static Set<Integer> refreshIdentifierFields(Types.StructType freshSchema, Schema baseSchema) {
Map<String, Integer> nameToId = TypeUtil.indexByName(freshSchema);
Set<String> identifierFieldNames = baseSchema.identifierFieldNames();
identifierFieldNames.forEach(name -> Preconditions.checkArgument(nameToId.containsKey(name),
"Cannot find ID for identifier field %s in schema %s", name, freshSchema));
return identifierFieldNames.stream().map(nameToId::get).collect(Collectors.toSet());
}

/**
Expand Down Expand Up @@ -213,7 +224,7 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
*/
public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
return new Schema(struct.fields());
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since schema passed in to this method is almost always a schema just got constructed (and thus have to call this method to assign the right ids), I think refreshIdentifierFields(struct, schema) here will almost always be a no-op?

I wonder if we want to use idSourceSchema to get the identifier fields, although that might have a different problem that the input schema could be a subset of idSourceSchema and thus doesn't include all identifier fields. Though in this case identifier fields won't be useful and we may skip it, and we can visit two schema and verify they have the same number of columns to identify this case. I'm not sure about the use case of this method and if we really need to assign identifier fields here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to call refreshIdentifierFields here to get an consistent view of identifier field id list for the schema, without caring about the idsourceSchema’s own identifier field id list. This makes the reassignIds method looks more general.

}

public static Type find(Schema schema, Predicate<Type> predicate) {
Expand Down
61 changes: 61 additions & 0 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package org.apache.iceberg.types;

import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -42,6 +44,65 @@ public void testReassignIdsDuplicateColumns() {
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
}

@Test
public void testReassignIdsWithIdentifier() {
Schema schema = new Schema(
Lists.newArrayList(
required(0, "a", Types.IntegerType.get()),
required(1, "A", Types.IntegerType.get())),
Sets.newHashSet(0)
);
Schema sourceSchema = new Schema(
Lists.newArrayList(
required(1, "a", Types.IntegerType.get()),
required(2, "A", Types.IntegerType.get())),
Sets.newHashSet(1)
);
final Schema actualSchema = TypeUtil.reassignIds(schema, sourceSchema);
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
Assert.assertEquals("identifier field ID should change based on source schema",
sourceSchema.identifierFieldIds(), actualSchema.identifierFieldIds());
}

@Test
public void testAssignIncreasingFreshIdWithIdentifier() {
Schema schema = new Schema(
Lists.newArrayList(
required(10, "a", Types.IntegerType.get()),
required(11, "A", Types.IntegerType.get())),
Sets.newHashSet(10)
);
Schema expectedSchema = new Schema(
Lists.newArrayList(
required(1, "a", Types.IntegerType.get()),
required(2, "A", Types.IntegerType.get())),
Sets.newHashSet(1)
);
final Schema actualSchema = TypeUtil.assignIncreasingFreshIds(schema);
Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
Assert.assertEquals("identifier field ID should change based on source schema",
expectedSchema.identifierFieldIds(), actualSchema.identifierFieldIds());
}

@Test
public void testAssignIncreasingFreshIdNewIdentifier() {
Schema schema = new Schema(
Lists.newArrayList(
required(10, "a", Types.IntegerType.get()),
required(11, "A", Types.IntegerType.get())),
Sets.newHashSet(10)
);
Schema sourceSchema = new Schema(
Lists.newArrayList(
required(1, "a", Types.IntegerType.get()),
required(2, "A", Types.IntegerType.get()))
);
final Schema actualSchema = TypeUtil.reassignIds(schema, sourceSchema);
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
Assert.assertEquals("source schema missing identifier should not impact refreshing new identifier",
Sets.newHashSet(sourceSchema.findField("a").fieldId()), actualSchema.identifierFieldIds());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add a case to call assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) with schema doesn't have identifier fields but baseSchema has, and the output schema will have identifier fields? I think it's an existing use case we have in TableMetadata that may worth explicit testing.

@Test(expected = IllegalArgumentException.class)
public void testReassignIdsIllegalArgumentException() {
Schema schema = new Schema(
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
Expand Down Expand Up @@ -85,9 +84,7 @@ private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, int
this.schema = schema;
this.lastColumnId = lastColumnId;
this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
this.identifierFieldNames = schema.identifierFieldIds().stream()
.map(id -> schema.findField(id).name())
.collect(Collectors.toSet());
this.identifierFieldNames = schema.identifierFieldNames();
}

@Override
Expand Down
46 changes: 46 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import java.io.File;
import java.io.IOException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.PartitionSpec.unpartitioned;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

@RunWith(Parameterized.class)
public class TestCreateTransaction extends TableTestBase {
Expand Down Expand Up @@ -69,6 +73,48 @@ public void testCreateTransaction() throws IOException {
Assert.assertEquals("Table should not have any snapshots", 0, meta.snapshots().size());
}

@Test
public void testCreateTransactionAndUpdateSchema() throws IOException {
File tableDir = temp.newFolder();
Assert.assertTrue(tableDir.delete());

Transaction txn = TestTables.beginCreate(tableDir, "test_create", SCHEMA, unpartitioned());

Assert.assertNull("Starting a create transaction should not commit metadata",
TestTables.readMetadata("test_create"));
Assert.assertNull("Should have no metadata version",
TestTables.metadataVersion("test_create"));

txn.updateSchema()
.addColumn("col", Types.StringType.get())
.setIdentifierFields("id", "col")
.commit();

txn.commitTransaction();

TableMetadata meta = TestTables.readMetadata("test_create");
Assert.assertNotNull("Table metadata should be created after transaction commits", meta);
Assert.assertEquals("Should have metadata version 0",
0, (int) TestTables.metadataVersion("test_create"));
Assert.assertEquals("Should have 0 manifest files",
0, listManifestFiles(tableDir).size());

Schema resultSchema = new Schema(
Lists.newArrayList(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()),
optional(3, "col", Types.StringType.get())),
Sets.newHashSet(1, 3)
);

Assert.assertEquals("Table schema should match with reassigned IDs",
resultSchema.asStruct(), meta.schema().asStruct());
Assert.assertEquals("Table schema identifier should match",
resultSchema.identifierFieldIds(), meta.schema().identifierFieldIds());
Assert.assertEquals("Table spec should match", unpartitioned(), meta.spec());
Assert.assertEquals("Table should not have any snapshots", 0, meta.snapshots().size());
}

@Test
public void testCreateAndAppendWithTransaction() throws IOException {
File tableDir = temp.newFolder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,32 @@ public void testSchemaUpdateComplexType() throws Exception {
Assert.assertEquals("Should contain 0 Avro manifest files", 0, manifests.size());
}

@Test
public void testSchemaUpdateIdentifierFields() throws Exception {
Assert.assertTrue("Should create v1 metadata",
version(1).exists() && version(1).isFile());
Assert.assertFalse("Should not create v2 or newer versions",
version(2).exists());

Schema updatedSchema = new Schema(Lists.newArrayList(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
), Sets.newHashSet(1));

table.updateSchema()
.setIdentifierFields("id")
.commit();

Assert.assertTrue("Should create v2 for the update",
version(2).exists() && version(2).isFile());
Assert.assertEquals("Should write the current version to the hint file",
2, readVersionHint());
Assert.assertEquals("Table schema should match schema with reassigned ids",
updatedSchema.asStruct(), table.schema().asStruct());
Assert.assertEquals("Identifier fields should match schema with reassigned ids",
updatedSchema.identifierFieldIds(), table.schema().identifierFieldIds());
}

@Test
public void testFailedCommit() throws Exception {
// apply the change to metadata without committing
Expand Down