diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java b/api/src/main/java/org/apache/iceberg/UpdateSchema.java index c84c237f8d8f..cbcaa0ee2365 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java +++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java @@ -20,6 +20,8 @@ import java.util.Collection; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; @@ -49,7 +51,7 @@ public interface UpdateSchema extends PendingUpdate { UpdateSchema allowIncompatibleChanges(); /** - * Add a new top-level column. + * Add a new optional top-level column. * *

Because "." may be interpreted as a column path separator or may be used in field names, it * is not allowed in names passed to this method. To add to nested structures or to add fields @@ -57,34 +59,83 @@ public interface UpdateSchema extends PendingUpdate { * *

If type is a nested type, its field IDs are reassigned when added to the existing schema. * + *

The added column will be optional with a null default value. + * * @param name name for the new column * @param type type for the new column * @return this for method chaining * @throws IllegalArgumentException If name contains "." */ default UpdateSchema addColumn(String name, Type type) { - return addColumn(name, type, null); + return addColumn(name, type, null, null); } /** - * Add a new top-level column. + * Add a new optional top-level column. * *

Because "." may be interpreted as a column path separator or may be used in field names, it * is not allowed in names passed to this method. To add to nested structures or to add fields - * with names that contain ".", use {@link #addColumn(String, String, Type)}. + * with names that contain ".", use {@link #addColumn(String, String, Type, String)}. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + *

The added column will be optional with a null default value. + * + * @param name name for the new column + * @param type type for the new column + * @param doc documentation string for the new column + * @return this for method chaining + * @throws IllegalArgumentException If name contains "." + */ + default UpdateSchema addColumn(String name, Type type, String doc) { + return addColumn(name, type, doc, null); + } + + /** + * Add a new optional top-level column. + * + *

Because "." may be interpreted as a column path separator or may be used in field names, it + * is not allowed in names passed to this method. To add to nested structures or to add fields + * with names that contain ".", use {@link #addColumn(String, String, Type, Literal)}. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param name name for the new column + * @param type type for the new column + * @param defaultValue a default value for the column in existing rows + * @return this for method chaining + * @throws IllegalArgumentException If name contains "." + */ + default UpdateSchema addColumn(String name, Type type, Literal defaultValue) { + return addColumn(name, type, null, defaultValue); + } + + /** + * Add a new optional top-level column. + * + *

Because "." may be interpreted as a column path separator or may be used in field names, it + * is not allowed in names passed to this method. To add to nested structures or to add fields + * with names that contain ".", use {@link #addColumn(String, String, Type, String, Literal)}. * *

If type is a nested type, its field IDs are reassigned when added to the existing schema. * * @param name name for the new column * @param type type for the new column * @param doc documentation string for the new column + * @param defaultValue a default value for the column in existing rows * @return this for method chaining * @throws IllegalArgumentException If name contains "." */ - UpdateSchema addColumn(String name, Type type, String doc); + default UpdateSchema addColumn(String name, Type type, String doc, Literal defaultValue) { + Preconditions.checkArgument( + !name.contains("."), + "Cannot add column with ambiguous name: %s, use addColumn(parent, name, type)", + name); + return addColumn(null, name, type, doc, defaultValue); + } /** - * Add a new column to a nested struct. + * Add a new optional column to a nested struct. * *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the * parent name is null, the new column will be added to the root as a top-level column. If parent @@ -97,6 +148,8 @@ default UpdateSchema addColumn(String name, Type type) { * *

If type is a nested type, its field IDs are reassigned when added to the existing schema. * + *

The added column will be optional with a null default value. + * * @param parent name of the parent struct to the column will be added to * @param name name for the new column * @param type type for the new column @@ -104,11 +157,63 @@ default UpdateSchema addColumn(String name, Type type) { * @throws IllegalArgumentException If parent doesn't identify a struct */ default UpdateSchema addColumn(String parent, String name, Type type) { - return addColumn(parent, name, type, null); + return addColumn(parent, name, type, null, null); + } + + /** + * Add a new optional column to a nested struct. + * + *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the + * parent name is null, the new column will be added to the root as a top-level column. If parent + * identifies a struct, a new column is added to that struct. If it identifies a list, the column + * is added to the list element struct, and if it identifies a map, the new column is added to the + * map's value struct. + * + *

The given name is used to name the new column and names containing "." are not handled + * differently. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + *

The added column will be optional with a null default value. + * + * @param parent name of the parent struct to the column will be added to + * @param name name for the new column + * @param type type for the new column + * @param doc documentation string for the new column + * @return this for method chaining + * @throws IllegalArgumentException If parent doesn't identify a struct + */ + default UpdateSchema addColumn(String parent, String name, Type type, String doc) { + return addColumn(parent, name, type, doc, null); + } + + /** + * Add a new optional column to a nested struct. + * + *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the + * parent name is null, the new column will be added to the root as a top-level column. If parent + * identifies a struct, a new column is added to that struct. If it identifies a list, the column + * is added to the list element struct, and if it identifies a map, the new column is added to the + * map's value struct. + * + *

The given name is used to name the new column and names containing "." are not handled + * differently. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param parent name of the parent struct to the column will be added to + * @param name name for the new column + * @param type type for the new column + * @param defaultValue a default value for the column in existing rows + * @return this for method chaining + * @throws IllegalArgumentException If parent doesn't identify a struct + */ + default UpdateSchema addColumn(String parent, String name, Type type, Literal defaultValue) { + return addColumn(parent, name, type, null, defaultValue); } /** - * Add a new column to a nested struct. + * Add a new optional column to a nested struct. * *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the * parent name is null, the new column will be added to the root as a top-level column. If parent @@ -125,16 +230,23 @@ default UpdateSchema addColumn(String parent, String name, Type type) { * @param name name for the new column * @param type type for the new column * @param doc documentation string for the new column + * @param defaultValue a default value for the column in existing rows * @return this for method chaining * @throws IllegalArgumentException If parent doesn't identify a struct */ - UpdateSchema addColumn(String parent, String name, Type type, String doc); + default UpdateSchema addColumn( + String parent, String name, Type type, String doc, Literal defaultValue) { + throw new UnsupportedOperationException("Default values are not supported"); + } /** * Add a new required top-level column. * - *

This is an incompatible change that can break reading older data. This method will result in - * an exception unless {@link #allowIncompatibleChanges()} has been called. + *

Adding a required column without a default is an incompatible change that can break reading + * older data. To make this a compatible change, add a default value by calling {@link + * #updateColumnDefault(String, Literal)} or use {@link #addRequiredColumn(String, Type, String, + * Literal)} instead. To suppress exceptions thrown when an incompatible change is detected, call + * {@link #allowIncompatibleChanges()}. * *

Because "." may be interpreted as a column path separator or may be used in field names, it * is not allowed in names passed to this method. To add to nested structures or to add fields @@ -148,18 +260,21 @@ default UpdateSchema addColumn(String parent, String name, Type type) { * @throws IllegalArgumentException If name contains "." */ default UpdateSchema addRequiredColumn(String name, Type type) { - return addRequiredColumn(name, type, null); + return addRequiredColumn(name, type, null, null); } /** * Add a new required top-level column. * - *

This is an incompatible change that can break reading older data. This method will result in - * an exception unless {@link #allowIncompatibleChanges()} has been called. + *

Adding a required column without a default is an incompatible change that can break reading + * older data. To make this a compatible change, add a default value by calling {@link + * #updateColumnDefault(String, Literal)} or use {@link #addRequiredColumn(String, Type, String, + * Literal)} instead. To suppress exceptions thrown when an incompatible change is detected, call + * {@link #allowIncompatibleChanges()}. * *

Because "." may be interpreted as a column path separator or may be used in field names, it * is not allowed in names passed to this method. To add to nested structures or to add fields - * with names that contain ".", use {@link #addRequiredColumn(String, String, Type)}. + * with names that contain ".", use {@link #addRequiredColumn(String, String, Type, String)}. * *

If type is a nested type, its field IDs are reassigned when added to the existing schema. * @@ -169,13 +284,63 @@ default UpdateSchema addRequiredColumn(String name, Type type) { * @return this for method chaining * @throws IllegalArgumentException If name contains "." */ - UpdateSchema addRequiredColumn(String name, Type type, String doc); + default UpdateSchema addRequiredColumn(String name, Type type, String doc) { + return addRequiredColumn(name, type, doc, null); + } /** * Add a new required top-level column. * - *

This is an incompatible change that can break reading older data. This method will result in - * an exception unless {@link #allowIncompatibleChanges()} has been called. + *

Because "." may be interpreted as a column path separator or may be used in field names, it + * is not allowed in names passed to this method. To add to nested structures or to add fields + * with names that contain ".", use {@link #addRequiredColumn(String, String, Type, Literal)}. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param name name for the new column + * @param type type for the new column + * @param defaultValue a default value for the column in existing rows + * @return this for method chaining + * @throws IllegalArgumentException If name contains "." + */ + default UpdateSchema addRequiredColumn(String name, Type type, Literal defaultValue) { + return addRequiredColumn(name, type, null, defaultValue); + } + + /** + * Add a new required top-level column. + * + *

Because "." may be interpreted as a column path separator or may be used in field names, it + * is not allowed in names passed to this method. To add to nested structures or to add fields + * with names that contain ".", use {@link #addRequiredColumn(String, String, Type, String, + * Literal)}. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param name name for the new column + * @param type type for the new column + * @param doc documentation string for the new column + * @param defaultValue a default value for the column in existing rows + * @return this for method chaining + * @throws IllegalArgumentException If name contains "." + */ + default UpdateSchema addRequiredColumn( + String name, Type type, String doc, Literal defaultValue) { + Preconditions.checkArgument( + !name.contains("."), + "Cannot add column with ambiguous name: %s, use addColumn(parent, name, type)", + name); + return addRequiredColumn(null, name, type, doc, defaultValue); + } + + /** + * Add a new required column to a nested struct. + * + *

Adding a required column without a default is an incompatible change that can break reading + * older data. To make this a compatible change, add a default value by calling {@link + * #updateColumnDefault(String, Literal)} or use {@link #addRequiredColumn(String, String, Type, + * String, Literal)} instead. To suppress exceptions thrown when an incompatible change is + * detected, call {@link #allowIncompatibleChanges()}. * *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the * parent name is null, the new column will be added to the root as a top-level column. If parent @@ -195,14 +360,68 @@ default UpdateSchema addRequiredColumn(String name, Type type) { * @throws IllegalArgumentException If parent doesn't identify a struct */ default UpdateSchema addRequiredColumn(String parent, String name, Type type) { - return addRequiredColumn(parent, name, type, null); + return addRequiredColumn(parent, name, type, null, null); + } + + /** + * Add a new required column to a nested struct. + * + *

Adding a required column without a default is an incompatible change that can break reading + * older data. To make this a compatible change, add a default value by calling {@link + * #updateColumnDefault(String, Literal)} or use {@link #addRequiredColumn(String, String, Type, + * String, Literal)} instead. To suppress exceptions thrown when an incompatible change is + * detected, call {@link #allowIncompatibleChanges()}. + * + *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the + * parent name is null, the new column will be added to the root as a top-level column. If parent + * identifies a struct, a new column is added to that struct. If it identifies a list, the column + * is added to the list element struct, and if it identifies a map, the new column is added to the + * map's value struct. + * + *

The given name is used to name the new column and names containing "." are not handled + * differently. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param parent name of the parent struct to the column will be added to + * @param name name for the new column + * @param type type for the new column + * @param doc documentation string for the new column + * @return this for method chaining + * @throws IllegalArgumentException If parent doesn't identify a struct + */ + default UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc) { + return addRequiredColumn(parent, name, type, doc, null); } /** * Add a new required top-level column. * - *

This is an incompatible change that can break reading older data. This method will result in - * an exception unless {@link #allowIncompatibleChanges()} has been called. + *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the + * parent name is null, the new column will be added to the root as a top-level column. If parent + * identifies a struct, a new column is added to that struct. If it identifies a list, the column + * is added to the list element struct, and if it identifies a map, the new column is added to the + * map's value struct. + * + *

The given name is used to name the new column and names containing "." are not handled + * differently. + * + *

If type is a nested type, its field IDs are reassigned when added to the existing schema. + * + * @param parent name of the parent struct to the column will be added to + * @param name name for the new column + * @param type type for the new column + * @param defaultValue a default value for the column in existing rows + * @return this for method chaining + * @throws IllegalArgumentException If parent doesn't identify a struct + */ + default UpdateSchema addRequiredColumn( + String parent, String name, Type type, Literal defaultValue) { + return addRequiredColumn(parent, name, type, null, defaultValue); + } + + /** + * Add a new required top-level column. * *

The parent name is used to find the parent using {@link Schema#findField(String)}. If the * parent name is null, the new column will be added to the root as a top-level column. If parent @@ -219,10 +438,14 @@ default UpdateSchema addRequiredColumn(String parent, String name, Type type) { * @param name name for the new column * @param type type for the new column * @param doc documentation string for the new column + * @param defaultValue a default value for the column in existing rows * @return this for method chaining * @throws IllegalArgumentException If parent doesn't identify a struct */ - UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc); + default UpdateSchema addRequiredColumn( + String parent, String name, Type type, String doc, Literal defaultValue) { + throw new UnsupportedOperationException("Default values are not supported"); + } /** * Rename a column in the schema. @@ -294,7 +517,24 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin UpdateSchema updateColumnDoc(String name, String newDoc); /** - * Update a column to optional. + * Update the default value for a column. + * + *

The name is used to find the column to update using {@link Schema#findField(String)}. + * + *

Note: Changing the default value for a column does not alter existing rows. + * + * @param name name of the column to update the default value for + * @param newDefault replacement default value for the column + * @return this for method chaining + * @throws IllegalArgumentException If name doesn't identify a column in the schema or if the + * column will be deleted + */ + default UpdateSchema updateColumnDefault(String name, Literal newDefault) { + throw new UnsupportedOperationException("Default values are not supported"); + } + + /** + * Update a column to be optional. * * @param name name of the column to mark optional * @return this for method chaining @@ -302,7 +542,7 @@ default UpdateSchema updateColumn(String name, Type.PrimitiveType newType, Strin UpdateSchema makeColumnOptional(String name); /** - * Update a column to required. + * Update a column to be required. * *

This is an incompatible change that can break reading older data. This method will result in * an exception unless {@link #allowIncompatibleChanges()} has been called. diff --git a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java index deeba664ec07..7020b259b1b5 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java @@ -24,6 +24,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.util.NaNUtil; /** Factory methods for creating {@link Expression expressions}. */ public class Expressions { @@ -245,6 +246,9 @@ public static UnboundPredicate predicate(Operation op, String name, Liter && op != Operation.NOT_NAN, "Cannot create %s predicate inclusive a value", op); + Preconditions.checkArgument( + !NaNUtil.isNaN(lit.value()), + "Invalid expression literal: NaN, use isNaN or notNaN instead"); return new UnboundPredicate(op, ref(name), lit); } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 2c0f56b79ace..d1b4241bcba6 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -559,14 +560,20 @@ public static Builder optional(String name) { return new Builder(true, name); } + public static Builder builder() { + return new Builder(); + } + public static class Builder { - private final boolean isOptional; - private final String name; + private boolean isOptional = true; + private String name = null; private Integer id = null; private Type type = null; private String doc = null; - private Object initialDefault = null; - private Object writeDefault = null; + private Literal initialDefault = null; + private Literal writeDefault = null; + + private Builder() {} private Builder(boolean isFieldOptional, String fieldName) { isOptional = isFieldOptional; @@ -583,6 +590,26 @@ private Builder(NestedField toCopy) { this.writeDefault = toCopy.writeDefault; } + public Builder asRequired() { + this.isOptional = false; + return this; + } + + public Builder asOptional() { + this.isOptional = true; + return this; + } + + public Builder isOptional(boolean fieldIsOptional) { + this.isOptional = fieldIsOptional; + return this; + } + + public Builder withName(String fieldName) { + this.name = fieldName; + return this; + } + public Builder withId(int fieldId) { id = fieldId; return this; @@ -598,12 +625,32 @@ public Builder withDoc(String fieldDoc) { return this; } + /** + * Set the initial default using an Object. + * + * @deprecated will be removed in 2.0.0; use {@link #withInitialDefault(Literal)} instead. + */ + @Deprecated public Builder withInitialDefault(Object fieldInitialDefault) { + return withInitialDefault(Expressions.lit(fieldInitialDefault)); + } + + public Builder withInitialDefault(Literal fieldInitialDefault) { initialDefault = fieldInitialDefault; return this; } + /** + * Set the write default using an Object. + * + * @deprecated will be removed in 2.0.0; use {@link #withWriteDefault(Literal)} instead. + */ + @Deprecated public Builder withWriteDefault(Object fieldWriteDefault) { + return withWriteDefault(Expressions.lit(fieldWriteDefault)); + } + + public Builder withWriteDefault(Literal fieldWriteDefault) { writeDefault = fieldWriteDefault; return this; } @@ -620,8 +667,8 @@ public NestedField build() { private final String name; private final Type type; private final String doc; - private final Object initialDefault; - private final Object writeDefault; + private final Literal initialDefault; + private final Literal writeDefault; private NestedField( boolean isOptional, @@ -629,8 +676,8 @@ private NestedField( String name, Type type, String doc, - Object initialDefault, - Object writeDefault) { + Literal initialDefault, + Literal writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -642,12 +689,12 @@ private NestedField( this.writeDefault = castDefault(writeDefault, type); } - private static Object castDefault(Object defaultValue, Type type) { + private static Literal castDefault(Literal defaultValue, Type type) { if (type.isNestedType() && defaultValue != null) { throw new IllegalArgumentException( String.format("Invalid default value for %s: %s (must be null)", type, defaultValue)); } else if (defaultValue != null) { - return Expressions.lit(defaultValue).to(type).value(); + return defaultValue.to(type); } return null; @@ -699,14 +746,22 @@ public String doc() { return doc; } - public Object initialDefault() { + public Literal initialDefaultLiteral() { return initialDefault; } - public Object writeDefault() { + public Object initialDefault() { + return initialDefault != null ? initialDefault.value() : null; + } + + public Literal writeDefaultLiteral() { return writeDefault; } + public Object writeDefault() { + return writeDefault != null ? writeDefault.value() : null; + } + @Override public String toString() { return String.format( diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index 27e6ed048712..64d3f8795db0 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -197,9 +199,9 @@ private static Type typeFromJson(JsonNode json) { throw new IllegalArgumentException("Cannot parse type from json: " + json); } - private static Object defaultFromJson(String defaultField, Type type, JsonNode json) { + private static Literal defaultFromJson(String defaultField, Type type, JsonNode json) { if (json.has(defaultField)) { - return SingleValueParser.fromJson(type, json.get(defaultField)); + return Expressions.lit(SingleValueParser.fromJson(type, json.get(defaultField))); } return null; @@ -229,8 +231,8 @@ private static Types.StructType structFromJson(JsonNode json) { String name = JsonUtil.getString(NAME, field); Type type = typeFromJson(JsonUtil.get(TYPE, field)); - Object initialDefault = defaultFromJson(INITIAL_DEFAULT, type, field); - Object writeDefault = defaultFromJson(WRITE_DEFAULT, type, field); + Literal initialDefault = defaultFromJson(INITIAL_DEFAULT, type, field); + Literal writeDefault = defaultFromJson(WRITE_DEFAULT, type, field); String doc = JsonUtil.getStringOrNull(DOC, field); boolean isRequired = JsonUtil.getBool(REQUIRED, field); diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 2b541080ac72..4b6d7f6cdd6e 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; @@ -55,7 +56,7 @@ class SchemaUpdate implements UpdateSchema { private final Map idToParent; private final List deletes = Lists.newArrayList(); private final Map updates = Maps.newHashMap(); - private final Multimap adds = + private final Multimap parentToAddedIds = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); private final Map addedNameToId = Maps.newHashMap(); private final Multimap moves = @@ -94,40 +95,26 @@ public SchemaUpdate allowIncompatibleChanges() { } @Override - public UpdateSchema addColumn(String name, Type type, String doc) { - Preconditions.checkArgument( - !name.contains("."), - "Cannot add column with ambiguous name: %s, use addColumn(parent, name, type)", - name); - return addColumn(null, name, type, doc); - } - - @Override - public UpdateSchema addColumn(String parent, String name, Type type, String doc) { - internalAddColumn(parent, name, true, type, doc); + public UpdateSchema addColumn( + String parent, String name, Type type, String doc, Literal defaultValue) { + internalAddColumn(parent, name, true, type, doc, defaultValue); return this; } @Override - public UpdateSchema addRequiredColumn(String name, Type type, String doc) { - Preconditions.checkArgument( - !name.contains("."), - "Cannot add column with ambiguous name: %s, use addColumn(parent, name, type)", - name); - addRequiredColumn(null, name, type, doc); - return this; - } - - @Override - public UpdateSchema addRequiredColumn(String parent, String name, Type type, String doc) { - Preconditions.checkArgument( - allowIncompatibleChanges, "Incompatible change: cannot add required column: %s", name); - internalAddColumn(parent, name, false, type, doc); + public UpdateSchema addRequiredColumn( + String parent, String name, Type type, String doc, Literal defaultValue) { + internalAddColumn(parent, name, false, type, doc, defaultValue); return this; } private void internalAddColumn( - String parent, String name, boolean isOptional, Type type, String doc) { + String parent, + String name, + boolean isOptional, + Type type, + String doc, + Literal defaultValue) { int parentId = TABLE_ROOT_ID; String fullName; if (parent != null) { @@ -168,6 +155,11 @@ private void internalAddColumn( fullName = name; } + Preconditions.checkArgument( + defaultValue != null || isOptional || allowIncompatibleChanges, + "Incompatible change: cannot add required column without a default value: %s", + fullName); + // assign new IDs in order int newId = assignNewColumnId(); @@ -177,10 +169,19 @@ private void internalAddColumn( idToParent.put(newId, parentId); } - adds.put( - parentId, - Types.NestedField.of( - newId, isOptional, name, TypeUtil.assignFreshIds(type, this::assignNewColumnId), doc)); + Types.NestedField newField = + Types.NestedField.builder() + .withName(name) + .isOptional(isOptional) + .withId(newId) + .ofType(TypeUtil.assignFreshIds(type, this::assignNewColumnId)) + .withDoc(doc) + .withInitialDefault(defaultValue) + .withWriteDefault(defaultValue) + .build(); + + updates.put(newId, newField); + parentToAddedIds.put(parentId, newId); } @Override @@ -188,7 +189,9 @@ public UpdateSchema deleteColumn(String name) { Types.NestedField field = findField(name); Preconditions.checkArgument(field != null, "Cannot delete missing column: %s", name); Preconditions.checkArgument( - !adds.containsKey(field.fieldId()), "Cannot delete a column that has additions: %s", name); + !parentToAddedIds.containsKey(field.fieldId()), + "Cannot delete a column that has additions: %s", + name); Preconditions.checkArgument( !updates.containsKey(field.fieldId()), "Cannot delete a column that has updates: %s", name); deletes.add(field.fieldId()); @@ -209,15 +212,9 @@ public UpdateSchema renameColumn(String name, String newName) { // merge with an update, if present int fieldId = field.fieldId(); Types.NestedField update = updates.get(fieldId); - if (update != null) { - updates.put( - fieldId, - Types.NestedField.of(fieldId, update.isOptional(), newName, update.type(), update.doc())); - } else { - updates.put( - fieldId, - Types.NestedField.of(fieldId, field.isOptional(), newName, field.type(), field.doc())); - } + Types.NestedField newField = + Types.NestedField.from(update != null ? update : field).withName(newName).build(); + updates.put(fieldId, newField); if (identifierFieldNames.contains(name)) { identifierFieldNames.remove(name); @@ -240,7 +237,7 @@ public UpdateSchema makeColumnOptional(String name) { } private void internalUpdateColumnRequirement(String name, boolean isOptional) { - Types.NestedField field = findField(name); + Types.NestedField field = findForUpdate(name); Preconditions.checkArgument(field != null, "Cannot update missing column: %s", name); if ((!isOptional && field.isRequired()) || (isOptional && field.isOptional())) { @@ -248,8 +245,10 @@ private void internalUpdateColumnRequirement(String name, boolean isOptional) { return; } + boolean isDefaultedAdd = isAdded(name) && field.initialDefault() != null; + Preconditions.checkArgument( - isOptional || allowIncompatibleChanges, + isOptional || isDefaultedAdd || allowIncompatibleChanges, "Cannot change column nullability: %s: optional -> required", name); Preconditions.checkArgument( @@ -258,22 +257,19 @@ private void internalUpdateColumnRequirement(String name, boolean isOptional) { field.name()); int fieldId = field.fieldId(); - Types.NestedField update = updates.get(fieldId); - - if (update != null) { - updates.put( - fieldId, - Types.NestedField.of(fieldId, isOptional, update.name(), update.type(), update.doc())); + Types.NestedField.Builder builder = Types.NestedField.from(field); + if (isOptional) { + builder.asOptional(); } else { - updates.put( - fieldId, - Types.NestedField.of(fieldId, isOptional, field.name(), field.type(), field.doc())); + builder.asRequired(); } + + updates.put(fieldId, builder.build()); } @Override public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) { - Types.NestedField field = findField(name); + Types.NestedField field = findForUpdate(name); Preconditions.checkArgument(field != null, "Cannot update missing column: %s", name); Preconditions.checkArgument( !deletes.contains(field.fieldId()), @@ -293,23 +289,15 @@ public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) { // merge with a rename, if present int fieldId = field.fieldId(); - Types.NestedField update = updates.get(fieldId); - if (update != null) { - updates.put( - fieldId, - Types.NestedField.of(fieldId, update.isOptional(), update.name(), newType, update.doc())); - } else { - updates.put( - fieldId, - Types.NestedField.of(fieldId, field.isOptional(), field.name(), newType, field.doc())); - } + Types.NestedField newField = Types.NestedField.from(field).ofType(newType).build(); + updates.put(fieldId, newField); return this; } @Override public UpdateSchema updateColumnDoc(String name, String doc) { - Types.NestedField field = findField(name); + Types.NestedField field = findForUpdate(name); Preconditions.checkArgument(field != null, "Cannot update missing column: %s", name); Preconditions.checkArgument( !deletes.contains(field.fieldId()), @@ -322,17 +310,33 @@ public UpdateSchema updateColumnDoc(String name, String doc) { // merge with a rename or update, if present int fieldId = field.fieldId(); - Types.NestedField update = updates.get(fieldId); - if (update != null) { - updates.put( - fieldId, - Types.NestedField.of(fieldId, update.isOptional(), update.name(), update.type(), doc)); - } else { - updates.put( - fieldId, - Types.NestedField.of(fieldId, field.isOptional(), field.name(), field.type(), doc)); + Types.NestedField newField = Types.NestedField.from(field).withDoc(doc).build(); + updates.put(fieldId, newField); + + return this; + } + + @Override + public UpdateSchema updateColumnDefault(String name, Literal newDefault) { + Types.NestedField field = findForUpdate(name); + Preconditions.checkArgument(field != null, "Cannot update missing column: %s", name); + Preconditions.checkArgument( + !deletes.contains(field.fieldId()), + "Cannot update a column that will be deleted: %s", + field.name()); + + // if the value can be converted to the expected type, check if it is already set + // if it can't be converted, the builder will throw an exception + Literal converted = newDefault != null ? newDefault.to(field.type()) : null; + if (converted != null && Objects.equals(field.writeDefault(), converted.value())) { + return this; } + // write default is always set and initial default is only set if the field requires one + int fieldId = field.fieldId(); + Types.NestedField newField = Types.NestedField.from(field).withWriteDefault(newDefault).build(); + updates.put(fieldId, newField); + return this; } @@ -386,6 +390,29 @@ public UpdateSchema caseSensitive(boolean caseSensitivity) { return this; } + private boolean isAdded(String name) { + return addedNameToId.containsKey(name); + } + + private Types.NestedField findForUpdate(String name) { + Types.NestedField existing = findField(name); + if (existing != null) { + Types.NestedField pendingUpdate = updates.get(existing.fieldId()); + if (pendingUpdate != null) { + return pendingUpdate; + } + + return existing; + } + + Integer addedId = addedNameToId.get(name); + if (addedId != null) { + return updates.get(addedId); + } + + return null; + } + private Integer findForMove(String name) { Integer addedId = addedNameToId.get(name); if (addedId != null) { @@ -436,10 +463,8 @@ private void internalMove(String name, Move move) { */ @Override public Schema apply() { - Schema newSchema = - applyChanges(schema, deletes, updates, adds, moves, identifierFieldNames, caseSensitive); - - return newSchema; + return applyChanges( + schema, deletes, updates, parentToAddedIds, moves, identifierFieldNames, caseSensitive); } @Override @@ -461,7 +486,7 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) { try { // parse and update the mapping NameMapping mapping = NameMappingParser.fromJson(mappingJson); - NameMapping updated = MappingUtil.update(mapping, updates, adds); + NameMapping updated = MappingUtil.update(mapping, updates, parentToAddedIds); // replace the table property Map updatedProperties = Maps.newHashMap(); @@ -484,6 +509,7 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) { deletes.stream().map(schema::findColumnName).collect(Collectors.toList()); Map renamedColumns = updates.keySet().stream() + .filter(id -> !addedNameToId.containsValue(id)) // remove added columns .filter(id -> !schema.findColumnName(id).equals(newSchema.findColumnName(id))) .collect(Collectors.toMap(schema::findColumnName, newSchema::findColumnName)); if (!deletedColumns.isEmpty() || !renamedColumns.isEmpty()) { @@ -505,7 +531,7 @@ private static Schema applyChanges( Schema schema, List deletes, Map updates, - Multimap adds, + Multimap parentToAddedIds, Multimap moves, Set identifierFieldNames, boolean caseSensitive) { @@ -535,7 +561,7 @@ private static Schema applyChanges( // apply schema changes Types.StructType struct = - TypeUtil.visit(schema, new ApplyChanges(deletes, updates, adds, moves)) + TypeUtil.visit(schema, new ApplyChanges(deletes, updates, parentToAddedIds, moves)) .asNestedType() .asStructType(); @@ -560,27 +586,29 @@ private static Schema applyChanges( private static class ApplyChanges extends TypeUtil.SchemaVisitor { private final List deletes; private final Map updates; - private final Multimap adds; + private final Multimap parentToAddedIds; private final Multimap moves; private ApplyChanges( List deletes, Map updates, - Multimap adds, + Multimap parentToAddedIds, Multimap moves) { this.deletes = deletes; this.updates = updates; - this.adds = adds; + this.parentToAddedIds = parentToAddedIds; this.moves = moves; } @Override public Type schema(Schema schema, Type structResult) { + List addedFields = + parentToAddedIds.get(TABLE_ROOT_ID).stream() + .map(updates::get) + .collect(Collectors.toList()); List fields = addAndMoveFields( - structResult.asStructType().fields(), - adds.get(TABLE_ROOT_ID), - moves.get(TABLE_ROOT_ID)); + structResult.asStructType().fields(), addedFields, moves.get(TABLE_ROOT_ID)); if (fields != null) { return Types.StructType.of(fields); @@ -601,24 +629,15 @@ public Type struct(Types.StructType struct, List fieldResults) { } Types.NestedField field = struct.fields().get(i); - String name = field.name(); - String doc = field.doc(); - boolean isOptional = field.isOptional(); Types.NestedField update = updates.get(field.fieldId()); - if (update != null) { - name = update.name(); - doc = update.doc(); - isOptional = update.isOptional(); - } + Types.NestedField updated = + Types.NestedField.from(update != null ? update : field).ofType(resultType).build(); - if (name.equals(field.name()) - && isOptional == field.isOptional() - && field.type() == resultType - && Objects.equals(doc, field.doc())) { + if (field.equals(updated)) { newFields.add(field); } else { hasChange = true; - newFields.add(Types.NestedField.of(field.fieldId(), isOptional, name, resultType, doc)); + newFields.add(updated); } } @@ -647,7 +666,8 @@ public Type field(Types.NestedField field, Type fieldResult) { } // handle adds - Collection newFields = adds.get(fieldId); + Collection newFields = + parentToAddedIds.get(fieldId).stream().map(updates::get).collect(Collectors.toList()); Collection columnsToMove = moves.get(fieldId); if (!newFields.isEmpty() || !columnsToMove.isEmpty()) { // if either collection is non-null, then this must be a struct type. try to apply the @@ -694,7 +714,7 @@ public Type map(Types.MapType map, Type kResult, Type valueResult) { throw new IllegalArgumentException("Cannot delete map keys: " + map); } else if (updates.containsKey(keyId)) { throw new IllegalArgumentException("Cannot update map keys: " + map); - } else if (adds.containsKey(keyId)) { + } else if (parentToAddedIds.containsKey(keyId)) { throw new IllegalArgumentException("Cannot add fields to map keys: " + map); } else if (!map.keyType().equals(kResult)) { throw new IllegalArgumentException("Cannot alter map keys: " + map); diff --git a/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java b/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java index de6ce2ad0425..a3817b8ad911 100644 --- a/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java +++ b/core/src/main/java/org/apache/iceberg/mapping/MappingUtil.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -64,7 +65,7 @@ public static NameMapping create(Schema schema) { public static NameMapping update( NameMapping mapping, Map updates, - Multimap adds) { + Multimap adds) { return new NameMapping(visit(mapping, new UpdateMapping(updates, adds))); } @@ -78,10 +79,10 @@ static Map indexByName(MappedFields mapping) { private static class UpdateMapping implements Visitor { private final Map updates; - private final Multimap adds; + private final Multimap adds; private UpdateMapping( - Map updates, Multimap adds) { + Map updates, Multimap adds) { this.updates = updates; this.adds = adds; } @@ -121,8 +122,9 @@ public MappedField field(MappedField field, MappedFields fieldResult) { } private MappedFields addNewFields(MappedFields mapping, int parentId) { - Collection fieldsToAdd = adds.get(parentId); - if (fieldsToAdd == null || fieldsToAdd.isEmpty()) { + Collection fieldsToAdd = + adds.get(parentId).stream().map(updates::get).collect(Collectors.toList()); + if (fieldsToAdd.isEmpty()) { return mapping; } diff --git a/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java b/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java index 68172b7062a6..b7ac23816a02 100644 --- a/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java +++ b/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java @@ -157,7 +157,10 @@ private Type findFieldType(int fieldId) { private void addColumn(int parentId, Types.NestedField field) { String parentName = partnerSchema.findColumnName(parentId); - api.addColumn(parentName, field.name(), field.type(), field.doc()); + String fullName = (parentName != null ? parentName + "." : "") + field.name(); + api.addColumn( + parentName, field.name(), field.type(), field.doc(), field.initialDefaultLiteral()) + .updateColumnDefault(fullName, field.writeDefaultLiteral()); } private void updateColumn(Types.NestedField field, Types.NestedField existingField) { @@ -166,6 +169,8 @@ private void updateColumn(Types.NestedField field, Types.NestedField existingFie boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired(); boolean needsTypeUpdate = !isIgnorableTypeUpdate(existingField.type(), field.type()); boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc()); + boolean needsDefaultUpdate = + field.writeDefault() != null && !field.writeDefault().equals(existingField.writeDefault()); if (needsOptionalUpdate) { api.makeColumnOptional(fullName); @@ -178,6 +183,10 @@ private void updateColumn(Types.NestedField field, Types.NestedField existingFie if (needsDocUpdate) { api.updateColumnDoc(fullName, field.doc()); } + + if (needsDefaultUpdate) { + api.updateColumnDefault(fullName, field.writeDefaultLiteral()); + } } private boolean isIgnorableTypeUpdate(Type existingType, Type newType) { diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUnionByFieldName.java b/core/src/test/java/org/apache/iceberg/TestSchemaUnionByFieldName.java index 656e72a0c19c..3a61ce8a1513 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaUnionByFieldName.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUnionByFieldName.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; @@ -86,6 +87,21 @@ public void testAddTopLevelPrimitives() { assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct()); } + @Test + public void testAddFieldWithDefault() { + Schema newSchema = + new Schema( + optional("test") + .withId(1) + .ofType(LongType.get()) + .withDoc("description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + Schema applied = new SchemaUpdate(new Schema(), 0).unionByNameWith(newSchema).apply(); + assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct()); + } + @Test public void testAddTopLevelListOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { @@ -279,6 +295,40 @@ public void testDetectInvalidTopLevelMapKey() { .hasMessage("Cannot change column type: aMap.key: string -> uuid"); } + @Test + public void testUpdateColumnDoc() { + Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get())); + Schema newSchema = new Schema(required(1, "aCol", IntegerType.get(), "description")); + + Schema applied = new SchemaUpdate(currentSchema, 1).unionByNameWith(newSchema).apply(); + assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct()); + } + + @Test + public void testUpdateColumnDefaults() { + Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get())); + Schema newSchema = + new Schema( + required("aCol") + .withId(1) + .ofType(IntegerType.get()) + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + + // the initial default is not modified for existing columns + Schema expected = + new Schema( + required("aCol") + .withId(1) + .ofType(IntegerType.get()) + .withWriteDefault(Literal.of(35)) + .build()); + + Schema applied = new SchemaUpdate(currentSchema, 1).unionByNameWith(newSchema).apply(); + assertThat(applied.asStruct()).isEqualTo(expected.asStruct()); + } + @Test // int 32-bit signed integers -> Can promote to long public void testTypePromoteIntegerToLong() { diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java index 2b91a408850e..38cf6da18a3e 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -203,6 +204,85 @@ public void testUpdateTypes() { assertThat(updated.asStruct()).isEqualTo(expected); } + @Test + public void testUpdateTypePreservesOtherMetadata() { + Schema schema = + new Schema( + required("i") + .withId(1) + .ofType(Types.IntegerType.get()) + .withDoc("description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + Schema expected = + new Schema( + required("i") + .withId(1) + .withDoc("description") + .ofType(Types.LongType.get()) + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + + Schema updated = new SchemaUpdate(schema, 1).updateColumn("i", Types.LongType.get()).apply(); + + assertThat(updated.asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void testUpdateDocPreservesOtherMetadata() { + Schema schema = + new Schema( + required("i") + .withId(1) + .ofType(Types.IntegerType.get()) + .withDoc("description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + Schema expected = + new Schema( + required("i") + .withId(1) + .ofType(Types.IntegerType.get()) + .withDoc("longer description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + + Schema updated = new SchemaUpdate(schema, 1).updateColumnDoc("i", "longer description").apply(); + + assertThat(updated.asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void testUpdateDefaultPreservesOtherMetadata() { + Schema schema = + new Schema( + required("i") + .withId(1) + .ofType(Types.IntegerType.get()) + .withDoc("description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(35)) + .build()); + Schema expected = + new Schema( + required("i") + .withId(1) + .ofType(Types.IntegerType.get()) + .withDoc("description") + .withInitialDefault(Literal.of(34)) + .withWriteDefault(Literal.of(123456)) + .build()); + + Schema updated = + new SchemaUpdate(schema, 1).updateColumnDefault("i", Literal.of(123456)).apply(); + + assertThat(updated.asStruct()).isEqualTo(expected.asStruct()); + } + @Test public void testUpdateTypesCaseInsensitive() { Types.StructType expected = @@ -479,6 +559,53 @@ public void testAddFields() { assertThat(added.asStruct()).isEqualTo(expected.asStruct()); } + @Test + public void testAddColumnWithDefault() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + Schema expected = + new Schema( + optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withDoc("description") + .withInitialDefault(Literal.of("unknown")) + .withWriteDefault(Literal.of("unknown")) + .build()); + + // when default value is passed to add column, both initial and write defaults are set + Schema result = + new SchemaUpdate(schema, 1) + .addColumn("data", Types.StringType.get(), "description", Literal.of("unknown")) + .apply(); + + assertThat(result.asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void testAddColumnWithUpdateColumnDefault() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + Schema expected = + new Schema( + optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault(null) + .withWriteDefault(Literal.of("unknown")) + .build()); + + // changes only the write default because the initial default is null when adding an optional + // column, unless the default value is given in the addColumn call + Schema result = + new SchemaUpdate(schema, 1) + .addColumn("data", Types.StringType.get()) + .updateColumnDefault("data", Literal.of("unknown")) + .apply(); + + assertThat(result.asStruct()).isEqualTo(expected.asStruct()); + } + @Test public void testAddNestedStruct() { Schema schema = new Schema(required(1, "id", Types.IntegerType.get())); @@ -569,7 +696,7 @@ public void testAddNestedListOfStructs() { } @Test - public void testAddRequiredColumn() { + public void testAddRequiredColumnWithoutDefault() { Schema schema = new Schema(required(1, "id", Types.IntegerType.get())); Schema expected = new Schema( @@ -577,9 +704,13 @@ public void testAddRequiredColumn() { required(2, "data", Types.StringType.get())); assertThatThrownBy( - () -> new SchemaUpdate(schema, 1).addRequiredColumn("data", Types.StringType.get())) + () -> + new SchemaUpdate(schema, 1) + .addRequiredColumn("data", Types.StringType.get()) + .apply()) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Incompatible change: cannot add required column: data"); + .hasMessage( + "Incompatible change: cannot add required column without a default value: data"); Schema result = new SchemaUpdate(schema, 1) @@ -590,6 +721,44 @@ public void testAddRequiredColumn() { assertThat(result.asStruct()).isEqualTo(expected.asStruct()); } + @Test + public void testAddRequiredColumnWithDefault() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + Schema expected = + new Schema( + optional(1, "id", Types.IntegerType.get()), + Types.NestedField.required("data") + .withId(2) + .ofType(Types.StringType.get()) + .withDoc("description") + .withInitialDefault(Literal.of("unknown")) + .withWriteDefault(Literal.of("unknown")) + .build()); + + // when default value is passed to add column, both initial and write defaults are set + Schema result = + new SchemaUpdate(schema, 1) + .addRequiredColumn("data", Types.StringType.get(), "description", Literal.of("unknown")) + .apply(); + + assertThat(result.asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void testAddRequiredColumnWithUpdateColumnDefault() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + + // fails because the initial default is only set when adding + assertThatThrownBy( + () -> + new SchemaUpdate(schema, 1) + .addRequiredColumn("data", Types.StringType.get()) + .updateColumnDefault("data", Literal.of("unknown"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Incompatible change: cannot add required column without a default value: data"); + } + @Test public void testAddRequiredColumnCaseInsensitive() { Schema schema = new Schema(required(1, "id", Types.IntegerType.get())); @@ -633,6 +802,43 @@ public void testRequireColumn() { assertThat(result.asStruct()).isEqualTo(expected.asStruct()); } + @Test + public void testAddColumnWithDefaultToRequiredColumn() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + Schema expected = + new Schema( + optional(1, "id", Types.IntegerType.get()), + Types.NestedField.required("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault(Literal.of("unknown")) + .withWriteDefault(Literal.of("unknown")) + .build()); + + Schema result = + new SchemaUpdate(schema, 1) + .addColumn("data", Types.StringType.get(), Literal.of("unknown")) + .requireColumn("data") + .apply(); + + assertThat(result.asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void testAddColumnWithUpdateColumnDefaultToRequiredColumn() { + Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); + + // updateColumnDefault does not set the initial default so the column cannot be set to required + assertThatThrownBy( + () -> + new SchemaUpdate(schema, 1) + .addColumn("data", Types.StringType.get()) + .updateColumnDefault("data", Literal.of("unknown")) + .requireColumn("data")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot change column nullability: data: optional -> required"); + } + @Test public void testRequireColumnCaseInsensitive() { Schema schema = new Schema(optional(1, "id", Types.IntegerType.get())); @@ -901,6 +1107,26 @@ public void testUpdateMissingColumn() { .hasMessage("Cannot update missing column: col"); } + @Test + public void testUpdateMissingColumnDoc() { + assertThatThrownBy( + () -> + new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .updateColumnDoc("col", "description")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot update missing column: col"); + } + + @Test + public void testUpdateMissingColumnDefaultValue() { + assertThatThrownBy( + () -> + new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID) + .updateColumnDefault("col", Literal.of(34))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot update missing column: col"); + } + @Test public void testUpdateDeleteConflict() { assertThatThrownBy( @@ -970,17 +1196,37 @@ public void testUpdateMapKey() { .hasMessage("Cannot update map keys: map"); } + @Test + public void testUpdateAddedColumnType() { + Schema schema = new Schema(required(1, "i", Types.IntegerType.get())); + Schema expected = + new Schema( + required(1, "i", Types.IntegerType.get()), optional(2, "value", Types.LongType.get())); + + Schema updated = + new SchemaUpdate(schema, 1) + .addColumn("value", Types.IntegerType.get()) + .updateColumn("value", Types.LongType.get()) + .apply(); + + assertThat(updated.asStruct()).isEqualTo(expected.asStruct()); + } + @Test public void testUpdateAddedColumnDoc() { Schema schema = new Schema(required(1, "i", Types.IntegerType.get())); - assertThatThrownBy( - () -> - new SchemaUpdate(schema, 3) - .addColumn("value", Types.LongType.get()) - .updateColumnDoc("value", "a value") - .apply()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot update missing column: value"); + Schema expected = + new Schema( + required(1, "i", Types.IntegerType.get()), + optional(2, "value", Types.LongType.get(), "a value")); + + Schema updated = + new SchemaUpdate(schema, 1) + .addColumn("value", Types.LongType.get()) + .updateColumnDoc("value", "a value") + .apply(); + + assertThat(updated.asStruct()).isEqualTo(expected.asStruct()); } @Test diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 7a364b856398..87e2c5065e11 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -339,11 +339,6 @@ public void testAlterTableAddColumn() { 3, "col1", Types.StringType.get(), "comment for col1"), Types.NestedField.optional(4, "col2", Types.LongType.get())) .asStruct()); - // Adding a required field should fail because Iceberg's SchemaUpdate does not allow - // incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); // Adding an existing field should fail due to Flink's internal validation. assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) @@ -446,12 +441,6 @@ public void testAlterTableModifyColumnNullability() { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct()); - // Changing nullability from optional to required should fail - // because Iceberg's SchemaUpdate does not allow incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) - .isInstanceOf(TableException.class) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Cannot change column nullability: dt: optional -> required"); // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)"); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 04d7b8da6b9c..de086bc9e451 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -339,11 +339,6 @@ public void testAlterTableAddColumn() { 3, "col1", Types.StringType.get(), "comment for col1"), Types.NestedField.optional(4, "col2", Types.LongType.get())) .asStruct()); - // Adding a required field should fail because Iceberg's SchemaUpdate does not allow - // incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); // Adding an existing field should fail due to Flink's internal validation. assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) @@ -446,12 +441,6 @@ public void testAlterTableModifyColumnNullability() { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct()); - // Changing nullability from optional to required should fail - // because Iceberg's SchemaUpdate does not allow incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) - .isInstanceOf(TableException.class) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Cannot change column nullability: dt: optional -> required"); // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)"); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 04d7b8da6b9c..de086bc9e451 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -339,11 +339,6 @@ public void testAlterTableAddColumn() { 3, "col1", Types.StringType.get(), "comment for col1"), Types.NestedField.optional(4, "col2", Types.LongType.get())) .asStruct()); - // Adding a required field should fail because Iceberg's SchemaUpdate does not allow - // incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); // Adding an existing field should fail due to Flink's internal validation. assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) @@ -446,12 +441,6 @@ public void testAlterTableModifyColumnNullability() { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct()); - // Changing nullability from optional to required should fail - // because Iceberg's SchemaUpdate does not allow incompatible changes. - assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) - .isInstanceOf(TableException.class) - .hasRootCauseInstanceOf(IllegalArgumentException.class) - .hasRootCauseMessage("Cannot change column nullability: dt: optional -> required"); // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)");