diff --git a/api/src/main/java/org/apache/iceberg/Schema.java b/api/src/main/java/org/apache/iceberg/Schema.java index f753a2b44e07..1b80a2deed5d 100644 --- a/api/src/main/java/org/apache/iceberg/Schema.java +++ b/api/src/main/java/org/apache/iceberg/Schema.java @@ -32,9 +32,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.BiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -51,29 +54,47 @@ public class Schema implements Serializable { private final StructType struct; private final int schemaId; + private final int[] identifierFieldIds; + private transient BiMap aliasToId = null; private transient Map idToField = null; private transient Map nameToId = null; private transient Map lowerCaseNameToId = null; private transient Map> idToAccessor = null; private transient Map idToName = null; + private transient Set identifierFieldIdSet = null; public Schema(List columns, Map aliases) { - this.schemaId = DEFAULT_SCHEMA_ID; - this.struct = StructType.of(columns); - this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null; + this(columns, aliases, ImmutableSet.of()); + } - // validate the schema through IndexByName visitor - lazyIdToName(); + public Schema(List columns, Map aliases, Set identifierFieldIds) { + this(DEFAULT_SCHEMA_ID, columns, aliases, identifierFieldIds); } public Schema(List columns) { - this(DEFAULT_SCHEMA_ID, columns); + this(columns, ImmutableSet.of()); + } + + public Schema(List columns, Set identifierFieldIds) { + this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds); } public Schema(int schemaId, List columns) { + this(schemaId, columns, ImmutableSet.of()); + } + + public Schema(int schemaId, List columns, Set identifierFieldIds) { + this(schemaId, columns, null, identifierFieldIds); + } + + private Schema(int schemaId, List columns, Map aliases, + Set identifierFieldIds) { this.schemaId = schemaId; this.struct = StructType.of(columns); + this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null; + this.identifierFieldIds = identifierFieldIds != null ? Ints.toArray(identifierFieldIds) : new int[0]; + lazyIdToName(); } @@ -120,6 +141,13 @@ private Map> lazyIdToAccessor() { return idToAccessor; } + private Set lazyIdentifierFieldIdSet() { + if (identifierFieldIdSet == null) { + identifierFieldIdSet = ImmutableSet.copyOf(Ints.asList(identifierFieldIds)); + } + return identifierFieldIdSet; + } + /** * Returns the schema ID for this schema. *

@@ -158,6 +186,29 @@ public List columns() { return struct.fields(); } + /** + * The set of identifier field IDs. + *

+ * Identifier is a concept similar to primary key in a relational database system. + * It consists of a unique set of primitive fields in the schema. + * An identifier field must be at root, or nested in a chain of structs (no maps or lists). + * A row should be unique in a table based on the values of the identifier fields. + * However, Iceberg identifier differs from primary key in the following ways: + *

    + *
  • Iceberg does not enforce the uniqueness of a row based on this identifier information. + * It is used for operations like upsert to define the default upsert key.
  • + *
  • NULL can be used as value of an identifier field. Iceberg ensures null-safe equality check.
  • + *
  • A nested field in a struct can be used as an identifier. For example, if there is a "last_name" field + * inside a "user" struct in a schema, field "user.last_name" can be set as a part of the identifier field.
  • + *
+ *

+ * + * @return the set of identifier field IDs in this schema. + */ + public Set identifierFieldIds() { + return lazyIdentifierFieldIdSet(); + } + /** * Returns the {@link Type} of a sub-field identified by the field name. * @@ -331,6 +382,16 @@ public Schema caseInsensitiveSelect(Collection names) { return internalSelect(names, false); } + /** + * Checks whether this schema is equivalent to another schema while ignoring the schema ID. + * @param anotherSchema another schema + * @return true if this schema is equivalent to the given schema + */ + public boolean sameSchema(Schema anotherSchema) { + return asStruct().equals(anotherSchema.asStruct()) && + identifierFieldIds().equals(anotherSchema.identifierFieldIds()); + } + private Schema internalSelect(Collection names, boolean caseSensitive) { if (names.contains(ALL_COLUMNS)) { return this; @@ -353,11 +414,15 @@ private Schema internalSelect(Collection names, boolean caseSensitive) { return TypeUtil.select(this, selected); } + private String identifierFieldToString(Types.NestedField field) { + return " " + field + (identifierFieldIds().contains(field.fieldId()) ? " (id)" : ""); + } + @Override public String toString() { return String.format("table {\n%s\n}", NEWLINE.join(struct.fields().stream() - .map(f -> " " + f) + .map(this::identifierFieldToString) .collect(Collectors.toList()))); } } diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java index b3727581bd53..32c3092fc0ec 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java +++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java @@ -19,7 +19,9 @@ package org.apache.iceberg; +import java.util.Set; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; /** @@ -384,4 +386,24 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin * with other additions, renames, or updates. */ UpdateSchema unionByNameWith(Schema newSchema); + + /** + * Set the identifier fields given a set of field names. + * See {@link Schema#identifierFieldIds()} to learn more about Iceberg identifier. + * + * @param names names of the columns to set as identifier fields + * @return this for method chaining + */ + UpdateSchema setIdentifierFields(Set names); + + /** + * Set the identifier fields given some field names. + * See {@link UpdateSchema#setIdentifierFields(Set)} for more details. + * + * @param names names of the columns to set as identifier fields + * @return this for method chaining + */ + default UpdateSchema setIdentifierFields(String... names) { + return setIdentifierFields(Sets.newHashSet(names)); + } } diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index 534e085e1287..0a4f6f389868 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -27,6 +27,7 @@ import java.io.StringWriter; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -40,6 +41,7 @@ private SchemaParser() { } private static final String SCHEMA_ID = "schema-id"; + private static final String IDENTIFIER_FIELD_IDS = "identifier-field-ids"; private static final String TYPE = "type"; private static final String STRUCT = "struct"; private static final String LIST = "list"; @@ -59,10 +61,11 @@ private SchemaParser() { private static final String VALUE_REQUIRED = "value-required"; private static void toJson(Types.StructType struct, JsonGenerator generator) throws IOException { - toJson(struct, null, generator); + toJson(struct, null, null, generator); } - private static void toJson(Types.StructType struct, Integer schemaId, JsonGenerator generator) throws IOException { + private static void toJson(Types.StructType struct, Integer schemaId, Set identifierFieldIds, + JsonGenerator generator) throws IOException { generator.writeStartObject(); generator.writeStringField(TYPE, STRUCT); @@ -70,6 +73,14 @@ private static void toJson(Types.StructType struct, Integer schemaId, JsonGenera generator.writeNumberField(SCHEMA_ID, schemaId); } + if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) { + generator.writeArrayFieldStart(IDENTIFIER_FIELD_IDS); + for (int id : identifierFieldIds) { + generator.writeNumber(id); + } + generator.writeEndArray(); + } + generator.writeArrayFieldStart(FIELDS); for (Types.NestedField field : struct.fields()) { generator.writeStartObject(); @@ -144,7 +155,7 @@ static void toJson(Type type, JsonGenerator generator) throws IOException { } public static void toJson(Schema schema, JsonGenerator generator) throws IOException { - toJson(schema.asStruct(), schema.schemaId(), generator); + toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(), generator); } public static String toJson(Schema schema) { @@ -158,7 +169,7 @@ public static String toJson(Schema schema, boolean pretty) { if (pretty) { generator.useDefaultPrettyPrinter(); } - toJson(schema.asStruct(), generator); + toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(), generator); generator.flush(); return writer.toString(); @@ -247,11 +258,12 @@ public static Schema fromJson(JsonNode json) { Preconditions.checkArgument(type.isNestedType() && type.asNestedType().isStructType(), "Cannot create schema, not a struct type: %s", type); Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, json); + Set identifierFieldIds = JsonUtil.getIntegerSetOrNull(IDENTIFIER_FIELD_IDS, json); if (schemaId == null) { - return new Schema(type.asNestedType().asStructType().fields()); + return new Schema(type.asNestedType().asStructType().fields(), identifierFieldIds); } else { - return new Schema(schemaId, type.asNestedType().asStructType().fields()); + return new Schema(schemaId, type.asNestedType().asStructType().fields(), identifierFieldIds); } } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index e4f2593aecce..1fb21a01beac 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -33,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimap; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.schema.UnionByNameVisitor; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; @@ -59,25 +62,32 @@ class SchemaUpdate implements UpdateSchema { private final Multimap moves = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); private int lastColumnId; private boolean allowIncompatibleChanges = false; - + private Set identifierFieldNames; SchemaUpdate(TableOperations ops) { - this.ops = ops; - this.base = ops.current(); - this.schema = base.schema(); - this.lastColumnId = base.lastColumnId(); - this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct())); + this(ops, ops.current()); } /** * For testing only. */ SchemaUpdate(Schema schema, int lastColumnId) { - this.ops = null; - this.base = null; + this(null, null, schema, lastColumnId); + } + + private SchemaUpdate(TableOperations ops, TableMetadata base) { + this(ops, base, base.schema(), base.lastColumnId()); + } + + private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, int lastColumnId) { + this.ops = ops; + this.base = base; this.schema = schema; this.lastColumnId = lastColumnId; this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct())); + this.identifierFieldNames = schema.identifierFieldIds().stream() + .map(id -> schema.findField(id).name()) + .collect(Collectors.toSet()); } @Override @@ -170,7 +180,6 @@ public UpdateSchema deleteColumn(String name) { "Cannot delete a column that has additions: %s", name); Preconditions.checkArgument(!updates.containsKey(field.fieldId()), "Cannot delete a column that has updates: %s", name); - deletes.add(field.fieldId()); return this; @@ -193,6 +202,11 @@ public UpdateSchema renameColumn(String name, String newName) { updates.put(fieldId, Types.NestedField.of(fieldId, field.isOptional(), newName, field.type(), field.doc())); } + if (identifierFieldNames.contains(name)) { + identifierFieldNames.remove(name); + identifierFieldNames.add(newName); + } + return this; } @@ -317,6 +331,12 @@ public UpdateSchema unionByNameWith(Schema newSchema) { return this; } + @Override + public UpdateSchema setIdentifierFields(Set names) { + this.identifierFieldNames = Sets.newHashSet(names); + return this; + } + private Integer findForMove(String name) { Types.NestedField field = schema.findField(name); if (field != null) { @@ -359,7 +379,7 @@ private void internalMove(String name, Move move) { */ @Override public Schema apply() { - Schema newSchema = applyChanges(schema, deletes, updates, adds, moves); + Schema newSchema = applyChanges(schema, deletes, updates, adds, moves, identifierFieldNames); // Validate the metrics if we have existing properties. if (base != null && base.properties() != null) { @@ -408,11 +428,53 @@ private TableMetadata applyChangesToMapping(TableMetadata metadata) { private static Schema applyChanges(Schema schema, List deletes, Map updates, Multimap adds, - Multimap moves) { + Multimap moves, + Set identifierFieldNames) { + // validate existing identifier fields are not deleted + for (String name : identifierFieldNames) { + Types.NestedField field = schema.findField(name); + if (field != null) { + Preconditions.checkArgument(!deletes.contains(field.fieldId()), + "Cannot delete identifier field %s. To force deletion, " + + "also call setIdentifierFields to update identifier fields.", field); + } + } + + // apply schema changes Types.StructType struct = TypeUtil .visit(schema, new ApplyChanges(deletes, updates, adds, moves)) .asNestedType().asStructType(); - return new Schema(struct.fields()); + + // validate identifier requirements based on the latest schema + Map nameToId = TypeUtil.indexByName(struct); + Set freshIdentifierFieldIds = Sets.newHashSet(); + for (String name : identifierFieldNames) { + Preconditions.checkArgument(nameToId.containsKey(name), + "Cannot add field %s as an identifier field: not found in current schema or added columns"); + freshIdentifierFieldIds.add(nameToId.get(name)); + } + + Map idToParent = TypeUtil.indexParents(schema.asStruct()); + Map idToField = TypeUtil.indexById(struct); + freshIdentifierFieldIds.forEach(id -> validateIdentifierField(id, idToField, idToParent)); + + return new Schema(struct.fields(), freshIdentifierFieldIds); + } + + private static void validateIdentifierField(int fieldId, Map idToField, + Map idToParent) { + Types.NestedField field = idToField.get(fieldId); + Preconditions.checkArgument(field.type().isPrimitiveType(), + "Cannot add field %s as an identifier field: not a primitive type field", field.name()); + + // check whether the nested field is in a chain of struct fields + Integer parentId = idToParent.get(field.fieldId()); + while (parentId != null) { + Types.NestedField parent = idToField.get(parentId); + Preconditions.checkArgument(parent.type().isStructType(), + "Cannot add field %s as an identifier field: must not be nested in %s", field.name(), parent); + parentId = idToParent.get(parent.fieldId()); + } } private static class ApplyChanges extends TypeUtil.SchemaVisitor { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 1e33c54b8ec6..532519e5921c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -491,7 +491,7 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { ImmutableList.Builder builder = ImmutableList.builder().addAll(schemas); if (!schemasById.containsKey(newSchemaId)) { - builder.add(new Schema(newSchemaId, newSchema.columns())); + builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds())); } return new TableMetadata(null, formatVersion, uuid, location, @@ -760,7 +760,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update ImmutableList.Builder schemasBuilder = ImmutableList.builder().addAll(schemas); if (!schemasById.containsKey(freshSchemaId)) { - schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns())); + schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds())); } return new TableMetadata(null, formatVersion, uuid, newLocation, @@ -916,7 +916,7 @@ private int reuseOrCreateNewSchemaId(Schema newSchema) { // if the schema already exists, use its id; otherwise use the highest id + 1 int newSchemaId = currentSchemaId; for (Schema schema : schemas) { - if (schema.asStruct().equals(newSchema.asStruct())) { + if (schema.sameSchema(newSchema)) { newSchemaId = schema.schemaId(); break; } else if (schema.schemaId() >= newSchemaId) { diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 99b82a4d83cf..bcde95968484 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -25,9 +25,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; public class JsonUtil { @@ -117,18 +119,80 @@ public static Map getStringMap(String property, JsonNode node) { public static List getStringList(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing list %s", property); - JsonNode pNode = node.get(property); - Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isArray(), - "Cannot parse %s from non-array value: %s", property, pNode); + return ImmutableList.builder() + .addAll(new JsonStringArrayIterator(property, node)) + .build(); + } + + public static Set getIntegerSetOrNull(String property, JsonNode node) { + if (!node.has(property)) { + return null; + } + + return ImmutableSet.builder() + .addAll(new JsonIntegerArrayIterator(property, node)) + .build(); + } + + abstract static class JsonArrayIterator implements Iterator { - ImmutableList.Builder builder = ImmutableList.builder(); - Iterator elements = pNode.elements(); - while (elements.hasNext()) { + private final Iterator elements; + + JsonArrayIterator(String property, JsonNode node) { + JsonNode pNode = node.get(property); + Preconditions.checkArgument(pNode != null && !pNode.isNull() && pNode.isArray(), + "Cannot parse %s from non-array value: %s", property, pNode); + this.elements = pNode.elements(); + } + + @Override + public boolean hasNext() { + return elements.hasNext(); + } + + @Override + public T next() { JsonNode element = elements.next(); - Preconditions.checkArgument(element.isTextual(), - "Cannot parse string from non-text value: %s", element); - builder.add(element.asText()); + validate(element); + return convert(element); + } + + abstract T convert(JsonNode element); + + abstract void validate(JsonNode element); + } + + static class JsonStringArrayIterator extends JsonArrayIterator { + + JsonStringArrayIterator(String property, JsonNode node) { + super(property, node); + } + + @Override + String convert(JsonNode element) { + return element.asText(); + } + + @Override + void validate(JsonNode element) { + Preconditions.checkArgument(element.isTextual(), "Cannot parse string from non-text value: %s", element); + } + } + + static class JsonIntegerArrayIterator extends JsonArrayIterator { + + JsonIntegerArrayIterator(String property, JsonNode node) { + super(property, node); + } + + @Override + Integer convert(JsonNode element) { + return element.asInt(); + } + + @Override + void validate(JsonNode element) { + Preconditions.checkArgument(element.isInt(), "Cannot parse integer from non-int value: %s", element); } - return builder.build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java index ecd29b837788..6b8380e31b94 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java @@ -1223,4 +1223,239 @@ public void testMoveBetweenStructsFails() { .moveBefore("s2.x", "s1.a") .apply()); } + + @Test + public void testAddExistingIdentifierFields() { + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id") + .apply(); + + Assert.assertEquals("add an existing field as identifier field should succeed", + Sets.newHashSet(newSchema.findField("id").fieldId()), + newSchema.identifierFieldIds()); + } + + @Test + public void testAddNewIdentifierFieldColumns() { + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn("new_field", Types.StringType.get()) + .setIdentifierFields("id", "new_field") + .apply(); + + Assert.assertEquals("add column then set as identifier should succeed", + Sets.newHashSet(newSchema.findField("id").fieldId(), newSchema.findField("new_field").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id", "new_field") + .addColumn("new_field", Types.StringType.get()) + .apply(); + + Assert.assertEquals("set identifier then add column should succeed", + Sets.newHashSet(newSchema.findField("id").fieldId(), newSchema.findField("new_field").fieldId()), + newSchema.identifierFieldIds()); + } + + @Test + public void testAddNestedIdentifierFieldColumns() { + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("preferences.feature1") + .apply(); + + Assert.assertEquals("set existing nested field as identifier should succeed", + Sets.newHashSet(newSchema.findField("preferences.feature1").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn("new", Types.StructType.of( + Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field", Types.StringType.get()) + )) + .setIdentifierFields("new.field") + .apply(); + + Assert.assertEquals("set newly added nested field as identifier should succeed", + Sets.newHashSet(newSchema.findField("new.field").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn("new", Types.StructType.of( + Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field", Types.StructType.of( + Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 2, "nested", Types.StringType.get()))))) + .setIdentifierFields("new.field.nested") + .apply(); + + Assert.assertEquals("set newly added multi-layer nested field as identifier should succeed", + Sets.newHashSet(newSchema.findField("new.field.nested").fieldId()), + newSchema.identifierFieldIds()); + } + + @Test + public void testAddDottedIdentifierFieldColumns() { + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn(null, "dot.field", Types.StringType.get()) + .setIdentifierFields("id", "dot.field") + .apply(); + + Assert.assertEquals("add a field with dot as identifier should succeed", + Sets.newHashSet(newSchema.findField("id").fieldId(), newSchema.findField("dot.field").fieldId()), + newSchema.identifierFieldIds()); + } + + @Test + public void testRemoveIdentifierFields() { + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn("new_field", Types.StringType.get()) + .addColumn("new_field2", Types.StringType.get()) + .setIdentifierFields("id", "new_field", "new_field2") + .apply(); + + newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("new_field", "new_field2") + .apply(); + + Assert.assertEquals("remove an identifier field should succeed", + Sets.newHashSet(newSchema.findField("new_field").fieldId(), newSchema.findField("new_field2").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields(Sets.newHashSet()) + .apply(); + + Assert.assertEquals("remove all identifier fields should succeed", + Sets.newHashSet(), + newSchema.identifierFieldIds()); + } + + @Test + public void testSetIdentifierFieldsFails() { + AssertHelpers.assertThrows("add a field with name not exist should fail", + IllegalArgumentException.class, + "not found in current schema or added columns", + () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("unknown") + .apply()); + + AssertHelpers.assertThrows("add a field of non-primitive type should fail", + IllegalArgumentException.class, + "not a primitive type field", + () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("locations") + .apply()); + + AssertHelpers.assertThrows("add a map key nested field should fail", + IllegalArgumentException.class, + "must not be nested in " + SCHEMA.findField("locations"), + () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("locations.key.zip") + .apply()); + + AssertHelpers.assertThrows("add a map value nested field should fail", + IllegalArgumentException.class, + "must not be nested in " + SCHEMA.findField("locations"), + () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("locations.value.lat") + .apply()); + + AssertHelpers.assertThrows("add a nested field in list should fail", + IllegalArgumentException.class, + "must not be nested in " + SCHEMA.findField("points"), + () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("points.element.x") + .apply()); + + Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .addColumn("new", Types.StructType.of( + Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "fields", Types.ListType.ofOptional( + SCHEMA_LAST_COLUMN_ID + 2, Types.StructType.of( + Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 3, "nested", Types.StringType.get()) + )) + ) + )) + .apply(); + + AssertHelpers.assertThrows("add a nested field in struct of a map should fail", + IllegalArgumentException.class, + "must not be nested in " + newSchema.findField("new.fields"), + () -> new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID + 3) + .setIdentifierFields("new.fields.element.nested") + .apply()); + } + + @Test + public void testDeleteIdentifierFieldColumns() { + Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id") + .apply(); + + Assert.assertEquals("delete column and then reset identifier field should succeed", + Sets.newHashSet(), + new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .deleteColumn("id").setIdentifierFields(Sets.newHashSet()).apply() + .identifierFieldIds()); + + Assert.assertEquals("delete reset identifier field and then delete column should succeed", + Sets.newHashSet(), + new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields(Sets.newHashSet()).deleteColumn("id").apply() + .identifierFieldIds()); + } + + @Test + public void testDeleteIdentifierFieldColumnsFails() { + Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id") + .apply(); + + AssertHelpers.assertThrows("delete an identifier column without setting identifier fields should fail", + IllegalArgumentException.class, + "Cannot delete identifier field 1: id: required int. To force deletion, " + + "also call setIdentifierFields to update identifier fields.", + () -> new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID).deleteColumn("id").apply()); + } + + @Test + public void testRenameIdentifierFields() { + Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id") + .apply(); + + Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .renameColumn("id", "id2") + .apply(); + + Assert.assertEquals("rename should not affect identifier fields", + Sets.newHashSet(SCHEMA.findField("id").fieldId()), + newSchema.identifierFieldIds()); + } + + @Test + public void testMoveIdentifierFields() { + Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .setIdentifierFields("id") + .apply(); + + Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .moveAfter("id", "locations") + .apply(); + + Assert.assertEquals("move after should not affect identifier fields", + Sets.newHashSet(SCHEMA.findField("id").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .moveBefore("id", "locations") + .apply(); + + Assert.assertEquals("move before should not affect identifier fields", + Sets.newHashSet(SCHEMA.findField("id").fieldId()), + newSchema.identifierFieldIds()); + + newSchema = new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID) + .moveFirst("id") + .apply(); + + Assert.assertEquals("move first should not affect identifier fields", + Sets.newHashSet(SCHEMA.findField("id").fieldId()), + newSchema.identifierFieldIds()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index a7b30a6cdd7a..c44ca63a126a 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -646,6 +646,33 @@ public void testUpdateSortOrder() { NullOrder.NULLS_FIRST, sortedByX.sortOrder().fields().get(0).nullOrder()); } + @Test + public void testParseSchemaIdentifierFields() throws Exception { + String data = readTableMetadataInputFile("TableMetadataV2Valid.json"); + TableMetadata parsed = TableMetadataParser.fromJson( + ops.io(), null, JsonUtil.mapper().readValue(data, JsonNode.class)); + Assert.assertEquals(Sets.newHashSet(), parsed.schemasById().get(0).identifierFieldIds()); + Assert.assertEquals(Sets.newHashSet(1, 2), parsed.schemasById().get(1).identifierFieldIds()); + } + + @Test + public void testUpdateSchemaIdentifierFields() { + Schema schema = new Schema( + Types.NestedField.required(10, "x", Types.StringType.get()) + ); + + TableMetadata meta = TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + + Schema newSchema = new Schema( + Lists.newArrayList(Types.NestedField.required(1, "x", Types.StringType.get())), + Sets.newHashSet(1) + ); + TableMetadata newMeta = meta.updateSchema(newSchema, 1); + Assert.assertEquals(2, newMeta.schemas().size()); + Assert.assertEquals(Sets.newHashSet(1), newMeta.schema().identifierFieldIds()); + } + @Test public void testUpdateSchema() { Schema schema = new Schema(0, diff --git a/core/src/test/resources/TableMetadataV2Valid.json b/core/src/test/resources/TableMetadataV2Valid.json index cf492f568ff3..d43e0a20619f 100644 --- a/core/src/test/resources/TableMetadataV2Valid.json +++ b/core/src/test/resources/TableMetadataV2Valid.json @@ -22,6 +22,10 @@ { "type": "struct", "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], "fields": [ { "id": 1, @@ -85,4 +89,4 @@ "snapshots": [], "snapshot-log": [], "metadata-log": [] -} \ No newline at end of file +}