Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 72 additions & 7 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

Expand All @@ -51,29 +54,47 @@ public class Schema implements Serializable {

private final StructType struct;
private final int schemaId;
private final int[] identifierFieldIds;

private transient BiMap<String, Integer> aliasToId = null;
private transient Map<Integer, NestedField> idToField = null;
private transient Map<String, Integer> nameToId = null;
private transient Map<String, Integer> lowerCaseNameToId = null;
private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
private transient Map<Integer, String> idToName = null;
private transient Set<Integer> identifierFieldIdSet = null;

public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
this.schemaId = DEFAULT_SCHEMA_ID;
this.struct = StructType.of(columns);
this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
this(columns, aliases, ImmutableSet.of());
}

// validate the schema through IndexByName visitor
lazyIdToName();
public Schema(List<NestedField> columns, Map<String, Integer> aliases, Set<Integer> identifierFieldIds) {
this(DEFAULT_SCHEMA_ID, columns, aliases, identifierFieldIds);
}

public Schema(List<NestedField> columns) {
this(DEFAULT_SCHEMA_ID, columns);
this(columns, ImmutableSet.of());
}

public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds) {
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
}

public Schema(int schemaId, List<NestedField> columns) {
this(schemaId, columns, ImmutableSet.of());
}

public Schema(int schemaId, List<NestedField> columns, Set<Integer> identifierFieldIds) {
this(schemaId, columns, null, identifierFieldIds);
}

private Schema(int schemaId, List<NestedField> columns, Map<String, Integer> aliases,
Set<Integer> identifierFieldIds) {
this.schemaId = schemaId;
this.struct = StructType.of(columns);
this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
this.identifierFieldIds = identifierFieldIds != null ? Ints.toArray(identifierFieldIds) : new int[0];

lazyIdToName();
}

Expand Down Expand Up @@ -120,6 +141,13 @@ private Map<Integer, Accessor<StructLike>> lazyIdToAccessor() {
return idToAccessor;
}

private Set<Integer> lazyIdentifierFieldIdSet() {
if (identifierFieldIdSet == null) {
identifierFieldIdSet = ImmutableSet.copyOf(Ints.asList(identifierFieldIds));
}
return identifierFieldIdSet;
}

/**
* Returns the schema ID for this schema.
* <p>
Expand Down Expand Up @@ -158,6 +186,29 @@ public List<NestedField> columns() {
return struct.fields();
}

/**
* The set of identifier field IDs.
* <p>
* Identifier is a concept similar to primary key in a relational database system.
* It consists of a unique set of primitive fields in the schema.
* An identifier field must be at root, or nested in a chain of structs (no maps or lists).
* A row should be unique in a table based on the values of the identifier fields.
* However, Iceberg identifier differs from primary key in the following ways:
* <ul>
* <li>Iceberg does not enforce the uniqueness of a row based on this identifier information.
* It is used for operations like upsert to define the default upsert key.</li>
* <li>NULL can be used as value of an identifier field. Iceberg ensures null-safe equality check.</li>
* <li>A nested field in a struct can be used as an identifier. For example, if there is a "last_name" field
* inside a "user" struct in a schema, field "user.last_name" can be set as a part of the identifier field.</li>
* </ul>
* <p>
*
* @return the set of identifier field IDs in this schema.
*/
public Set<Integer> identifierFieldIds() {
return lazyIdentifierFieldIdSet();
}

/**
* Returns the {@link Type} of a sub-field identified by the field name.
*
Expand Down Expand Up @@ -331,6 +382,16 @@ public Schema caseInsensitiveSelect(Collection<String> names) {
return internalSelect(names, false);
}

/**
* Checks whether this schema is equivalent to another schema while ignoring the schema ID.
* @param anotherSchema another schema
* @return true if this schema is equivalent to the given schema
*/
public boolean sameSchema(Schema anotherSchema) {
return asStruct().equals(anotherSchema.asStruct()) &&
identifierFieldIds().equals(anotherSchema.identifierFieldIds());
}

private Schema internalSelect(Collection<String> names, boolean caseSensitive) {
if (names.contains(ALL_COLUMNS)) {
return this;
Expand All @@ -353,11 +414,15 @@ private Schema internalSelect(Collection<String> names, boolean caseSensitive) {
return TypeUtil.select(this, selected);
}

private String identifierFieldToString(Types.NestedField field) {
return " " + field + (identifierFieldIds().contains(field.fieldId()) ? " (id)" : "");
}

@Override
public String toString() {
return String.format("table {\n%s\n}",
NEWLINE.join(struct.fields().stream()
.map(f -> " " + f)
.map(this::identifierFieldToString)
.collect(Collectors.toList())));
}
}
22 changes: 22 additions & 0 deletions api/src/main/java/org/apache/iceberg/UpdateSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;

/**
Expand Down Expand Up @@ -384,4 +386,24 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin
* with other additions, renames, or updates.
*/
UpdateSchema unionByNameWith(Schema newSchema);

/**
* Set the identifier fields given a set of field names.
* See {@link Schema#identifierFieldIds()} to learn more about Iceberg identifier.
*
* @param names names of the columns to set as identifier fields
* @return this for method chaining
*/
UpdateSchema setIdentifierFields(Set<String> names);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why require this to be a set instead of accepting any Collection or Iterable? Would a user never need to pass a List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set was used to explicitly tell users that identifier fields need to be unique, instead of giving people an illusion that the update operation can still succeed with repeated value.

Technically we can use List, Iterable or Collection. If we would like to make a broader use case for the API, I can document the behavior in javadoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to use Collection. It's fine to collapse that into a set internally since a set of columns or a multi-set of columns has the same identity behavior. Collection is easier to use for the caller.


/**
* Set the identifier fields given some field names.
* See {@link UpdateSchema#setIdentifierFields(Set)} for more details.
*
* @param names names of the columns to set as identifier fields
* @return this for method chaining
*/
default UpdateSchema setIdentifierFields(String... names) {
return setIdentifierFields(Sets.newHashSet(names));
}
}
24 changes: 18 additions & 6 deletions core/src/main/java/org/apache/iceberg/SchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -40,6 +41,7 @@ private SchemaParser() {
}

private static final String SCHEMA_ID = "schema-id";
private static final String IDENTIFIER_FIELD_IDS = "identifier-field-ids";
private static final String TYPE = "type";
private static final String STRUCT = "struct";
private static final String LIST = "list";
Expand All @@ -59,17 +61,26 @@ private SchemaParser() {
private static final String VALUE_REQUIRED = "value-required";

private static void toJson(Types.StructType struct, JsonGenerator generator) throws IOException {
toJson(struct, null, generator);
toJson(struct, null, null, generator);
}

private static void toJson(Types.StructType struct, Integer schemaId, JsonGenerator generator) throws IOException {
private static void toJson(Types.StructType struct, Integer schemaId, Set<Integer> identifierFieldIds,
JsonGenerator generator) throws IOException {
generator.writeStartObject();

generator.writeStringField(TYPE, STRUCT);
if (schemaId != null) {
generator.writeNumberField(SCHEMA_ID, schemaId);
}

if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) {
generator.writeArrayFieldStart(IDENTIFIER_FIELD_IDS);
for (int id : identifierFieldIds) {
generator.writeNumber(id);
}
generator.writeEndArray();
}

generator.writeArrayFieldStart(FIELDS);
for (Types.NestedField field : struct.fields()) {
generator.writeStartObject();
Expand Down Expand Up @@ -144,7 +155,7 @@ static void toJson(Type type, JsonGenerator generator) throws IOException {
}

public static void toJson(Schema schema, JsonGenerator generator) throws IOException {
toJson(schema.asStruct(), schema.schemaId(), generator);
toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(), generator);
}

public static String toJson(Schema schema) {
Expand All @@ -158,7 +169,7 @@ public static String toJson(Schema schema, boolean pretty) {
if (pretty) {
generator.useDefaultPrettyPrinter();
}
toJson(schema.asStruct(), generator);
toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(), generator);
generator.flush();
return writer.toString();

Expand Down Expand Up @@ -247,11 +258,12 @@ public static Schema fromJson(JsonNode json) {
Preconditions.checkArgument(type.isNestedType() && type.asNestedType().isStructType(),
"Cannot create schema, not a struct type: %s", type);
Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, json);
Set<Integer> identifierFieldIds = JsonUtil.getIntegerSetOrNull(IDENTIFIER_FIELD_IDS, json);

if (schemaId == null) {
return new Schema(type.asNestedType().asStructType().fields());
return new Schema(type.asNestedType().asStructType().fields(), identifierFieldIds);
} else {
return new Schema(schemaId, type.asNestedType().asStructType().fields());
return new Schema(schemaId, type.asNestedType().asStructType().fields(), identifierFieldIds);
}
}

Expand Down
Loading