Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,21 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
return writeCompatibilityErrors(readSchema, writeSchema, true);
return writeCompatibilityErrors(readSchema, writeSchema, true, 2);
}

/**
* Returns a list of compatibility errors for writing with the given write schema. This includes
* nullability: writing optional (nullable) values to a required field is an error.
*
* @param readSchema a read schema
* @param writeSchema a write schema
* @param formatVersion the table format version
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(
Schema readSchema, Schema writeSchema, int formatVersion) {
return writeCompatibilityErrors(readSchema, writeSchema, true, formatVersion);
}

/**
Expand All @@ -53,7 +67,24 @@ public static List<String> writeCompatibilityErrors(Schema readSchema, Schema wr
*/
public static List<String> writeCompatibilityErrors(
Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, true));
return writeCompatibilityErrors(readSchema, writeSchema, checkOrdering, 2);
}

/**
* Returns a list of compatibility errors for writing with the given write schema. This includes
* nullability: writing optional (nullable) values to a required field is an error Optionally this
* method allows case where input schema has different ordering than table schema.
*
* @param readSchema a read schema
* @param writeSchema a write schema
* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @param formatVersion the table format version
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(
Schema readSchema, Schema writeSchema, boolean checkOrdering, int formatVersion) {
return TypeUtil.visit(
readSchema, new CheckCompatibility(writeSchema, checkOrdering, true, formatVersion));
}

/**
Expand All @@ -70,7 +101,13 @@ public static List<String> writeCompatibilityErrors(
*/
public static List<String> typeCompatibilityErrors(
Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, false));
return typeCompatibilityErrors(readSchema, writeSchema, checkOrdering, 1);
}

public static List<String> typeCompatibilityErrors(
Schema readSchema, Schema writeSchema, boolean checkOrdering, int formatVersion) {
return TypeUtil.visit(
readSchema, new CheckCompatibility(writeSchema, checkOrdering, false, formatVersion));
}

/**
Expand All @@ -84,7 +121,13 @@ public static List<String> typeCompatibilityErrors(
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> typeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, false));
return typeCompatibilityErrors(readSchema, writeSchema, 2);
}

public static List<String> typeCompatibilityErrors(
Schema readSchema, Schema writeSchema, int formatVersion) {
return TypeUtil.visit(
readSchema, new CheckCompatibility(writeSchema, true, false, formatVersion));
}

/**
Expand All @@ -95,22 +138,31 @@ public static List<String> typeCompatibilityErrors(Schema readSchema, Schema wri
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> readCompatibilityErrors(Schema readSchema, Schema writeSchema) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, false, true));
return readCompatibilityErrors(readSchema, writeSchema, 2);
}

public static List<String> readCompatibilityErrors(
Schema readSchema, Schema writeSchema, int formatVersion) {
return TypeUtil.visit(
readSchema, new CheckCompatibility(writeSchema, false, true, formatVersion));
}

private static final ImmutableList<String> NO_ERRORS = ImmutableList.of();

private final Schema schema;
private final boolean checkOrdering;
private final boolean checkNullability;
private final int formatVersion;

// the current file schema, maintained while traversing a write schema
private Type currentType;

private CheckCompatibility(Schema schema, boolean checkOrdering, boolean checkNullability) {
private CheckCompatibility(
Schema schema, boolean checkOrdering, boolean checkNullability, int formatVersion) {
this.schema = schema;
this.checkOrdering = checkOrdering;
this.checkNullability = checkNullability;
this.formatVersion = formatVersion;
}

@Override
Expand Down Expand Up @@ -273,7 +325,8 @@ public List<String> primitive(Type.PrimitiveType readPrimitive) {
currentType.typeId().toString().toLowerCase(Locale.ENGLISH), readPrimitive));
}

if (!TypeUtil.isPromotionAllowed(currentType.asPrimitiveType(), readPrimitive)) {
if (!TypeUtil.isPromotionAllowed(
currentType.asPrimitiveType(), readPrimitive, formatVersion, false)) {
return ImmutableList.of(
String.format(": %s cannot be promoted to %s", currentType, readPrimitive));
}
Expand Down
26 changes: 26 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,40 @@ public static Type find(Type type, Predicate<Type> predicate) {
return visit(type, new FindTypeVisitor(predicate));
}

/**
* @deprecated will be removed in 2.0.0, use {@link #isPromotionAllowed(Type, Type.PrimitiveType,
* Integer, boolean)} instead. This method does not take advantage of table format or source
* id references
*/
@Deprecated
public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
return TypeUtil.isPromotionAllowed(from, to, 2, false);
}

public static boolean isPromotionAllowed(
Type from, Type.PrimitiveType to, Integer formatVersion, boolean sourceIdReference) {
// Warning! Before changing this function, make sure that the type change doesn't introduce
// compatibility problems in partitioning.
if (from.equals(to)) {
return true;
}

switch (from.typeId()) {
case DATE:
if (formatVersion < 3) {
return false;
} else if (sourceIdReference) {
return false;
} else if (to.typeId() == Type.TypeID.TIMESTAMP) {
// Timezone types cannot be promoted.
Types.TimestampType toTs = (Types.TimestampType) to;
return Types.TimestampType.withoutZone().equals(toTs);
} else if (to.typeId() == Type.TypeID.TIMESTAMP_NANO) {
// Timezone types cannot be promoted.
Types.TimestampNanoType toTs = (Types.TimestampNanoType) to;
return Types.TimestampNanoType.withoutZone().equals(toTs);
}
return false;
case INTEGER:
return to.typeId() == Type.TypeID.LONG;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testPrimitiveTypes() {
CheckCompatibility.writeCompatibilityErrors(
new Schema(required(1, "to_field", to)), fromSchema);

if (TypeUtil.isPromotionAllowed(from, to)) {
if (TypeUtil.isPromotionAllowed(from, to, 2, false)) {
assertThat(errors).as("Should produce 0 error messages").isEmpty();
} else {
assertThat(errors).hasSize(1);
Expand Down
29 changes: 29 additions & 0 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -945,4 +945,33 @@ public void ancestorFieldsInNestedSchema() {
assertThat(TypeUtil.ancestorFields(schema, 16)).containsExactly(pointsElement, points);
assertThat(TypeUtil.ancestorFields(schema, 17)).containsExactly(pointsElement, points);
}

@Test
public void testDateToTimestampPromotion() {
// Format version < 3 should not be accepted.
assertThat(
TypeUtil.isPromotionAllowed(
Types.DateType.get(), Types.TimestampType.withoutZone(), 2, false))
.isFalse();
// Timezone should not be accepted.
assertThat(
TypeUtil.isPromotionAllowed(
Types.DateType.get(), Types.TimestampType.withZone(), 3, false))
.isFalse();
// Timezone nano should not be accepted.
assertThat(
TypeUtil.isPromotionAllowed(
Types.DateType.get(), Types.TimestampNanoType.withZone(), 3, false))
.isFalse();
// Timestamp without timezone should be accepted.
assertThat(
TypeUtil.isPromotionAllowed(
Types.DateType.get(), Types.TimestampType.withoutZone(), 3, false))
.isTrue();
// Timestamp nano without timezone should be accepted.
assertThat(
TypeUtil.isPromotionAllowed(
Types.DateType.get(), Types.TimestampNanoType.withoutZone(), 3, false))
.isTrue();
}
}
42 changes: 36 additions & 6 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,48 @@ class SchemaUpdate implements UpdateSchema {
private boolean allowIncompatibleChanges = false;
private Set<String> identifierFieldNames;
private boolean caseSensitive = true;
private final int formatVersion;

SchemaUpdate(TableOperations ops) {
this(ops, ops.current());
}

/** For testing only. */
SchemaUpdate(TableMetadata base) {
this(null, base, base.schema(), base.lastColumnId(), base.formatVersion());
}

/** For testing only. */
SchemaUpdate(Schema schema, int lastColumnId) {
this(null, null, schema, lastColumnId);
this(null, null, schema, lastColumnId, TableProperties.DEFAULT_FORMAT_VERSION);
}

private SchemaUpdate(TableOperations ops, TableMetadata base) {
this(ops, base, base.schema(), base.lastColumnId());
/** For testing only. */
SchemaUpdate(Schema schema, int lastColumnId, int formatVersion) {
this(null, null, schema, lastColumnId, formatVersion);
}

private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, int lastColumnId) {
private SchemaUpdate(TableOperations ops, TableMetadata base) {
this(
ops,
base,
base.schema(),
base.lastColumnId(),
PropertyUtil.propertyAsInt(
base.properties(),
TableProperties.FORMAT_VERSION,
TableProperties.DEFAULT_FORMAT_VERSION));
}

private SchemaUpdate(
TableOperations ops, TableMetadata base, Schema schema, int lastColumnId, int formatVersion) {
this.ops = ops;
this.base = base;
this.schema = schema;
this.lastColumnId = lastColumnId;
this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
this.identifierFieldNames = schema.identifierFieldNames();
this.formatVersion = formatVersion;
}

@Override
Expand Down Expand Up @@ -280,8 +301,17 @@ public UpdateSchema updateColumn(String name, Type.PrimitiveType newType) {
return this;
}

// If field is listed in source-ids, we need to flag it for promoting date -> timestamp.
List<PartitionField> partitionFields =
this.base != null
? this.base.spec().getFieldsBySourceId(field.fieldId())
: Lists.newArrayList();

boolean isBucketPartitioned =
partitionFields.stream().anyMatch(pf -> pf.transform().toString().startsWith("bucket["));

Preconditions.checkArgument(
TypeUtil.isPromotionAllowed(field.type(), newType),
TypeUtil.isPromotionAllowed(field.type(), newType, formatVersion, isBucketPartitioned),
"Cannot change column type: %s: %s -> %s",
name,
field.type(),
Expand Down Expand Up @@ -374,7 +404,7 @@ public UpdateSchema moveAfter(String name, String afterName) {

@Override
public UpdateSchema unionByNameWith(Schema newSchema) {
UnionByNameVisitor.visit(this, schema, newSchema, caseSensitive);
UnionByNameVisitor.visit(this, schema, newSchema, caseSensitive, formatVersion);
return this;
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private TableProperties() {}
*/
public static final String FORMAT_VERSION = "format-version";

public static final int DEFAULT_FORMAT_VERSION = 2;

/** Reserved table property for table UUID. */
public static final String UUID = "uuid";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ public class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolea
private final UpdateSchema api;
private final Schema partnerSchema;
private final boolean caseSensitive;
private final int formatVersion;

private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema, boolean caseSensitive) {
private UnionByNameVisitor(
UpdateSchema api, Schema partnerSchema, boolean caseSensitive, int formatVersion) {
this.api = api;
this.partnerSchema = partnerSchema;
this.caseSensitive = caseSensitive;
this.formatVersion = formatVersion;
}

/**
Expand All @@ -53,7 +56,17 @@ private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema, boolean caseS
* @param newSchema a new schema to compare with the existing
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
visit(api, existingSchema, newSchema, true);
visit(api, existingSchema, newSchema, true, 2);
}

public static void visit(
UpdateSchema api, Schema existingSchema, Schema newSchema, boolean caseSensitive) {
visit(api, existingSchema, newSchema, caseSensitive, 2);
}

public static void visit(
UpdateSchema api, Schema existingSchema, Schema newSchema, int formatVersion) {
visit(api, existingSchema, newSchema, true, formatVersion);
}

/**
Expand All @@ -65,13 +78,18 @@ public static void visit(UpdateSchema api, Schema existingSchema, Schema newSche
* @param existingSchema an existing schema
* @param caseSensitive when false, the case of schema's fields are ignored
* @param newSchema a new schema to compare with the existing
* @param formatVersion the table format version
*/
public static void visit(
UpdateSchema api, Schema existingSchema, Schema newSchema, boolean caseSensitive) {
UpdateSchema api,
Schema existingSchema,
Schema newSchema,
boolean caseSensitive,
int formatVersion) {
visit(
newSchema,
-1,
new UnionByNameVisitor(api, existingSchema, caseSensitive),
new UnionByNameVisitor(api, existingSchema, caseSensitive, formatVersion),
new PartnerIdByNameAccessors(existingSchema, caseSensitive));
}

Expand Down Expand Up @@ -204,7 +222,8 @@ private boolean isIgnorableTypeUpdate(Type existingType, Type newType) {
// existingType:long -> newType:int returns true, meaning it is ignorable
// existingType:int -> newType:long returns false, meaning it is not ignorable
return newType.isPrimitiveType()
&& TypeUtil.isPromotionAllowed(newType, existingType.asPrimitiveType());
&& TypeUtil.isPromotionAllowed(
newType, existingType.asPrimitiveType(), formatVersion, false);
} else {
// Complex -> Complex
return !newType.isPrimitiveType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,28 @@ public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() {
assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct());
}

@Test
// date -> Can promote to timestamp
public void testTypePromoteDateToTimestamp() {
Schema currentSchema = new Schema(required(1, "aCol", DateType.get()));
Schema newSchema = new Schema(required(1, "aCol", TimestampType.withoutZone()));

Schema applied = new SchemaUpdate(currentSchema, 1).unionByNameWith(newSchema).apply();
assertThat(applied.asStruct()).isEqualTo(newSchema.asStruct());
assertThat(applied.asStruct().fields()).hasSize(1);
assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(TimestampType.withoutZone());
}

@Test
public void testTypePromoteDateToTimestampWithZone() {
Schema currentSchema = new Schema(required(1, "aCol", DateType.get()));
Schema newSchema = new Schema(required(1, "aCol", TimestampType.withZone()));

assertThatThrownBy(() -> new SchemaUpdate(currentSchema, 1).unionByNameWith(newSchema).apply())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot change column type: aCol: date -> timestamptz");
}

@Test
public void testAddPrimitiveToNestedStruct() {
Schema schema =
Expand Down
Loading