diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 6d402d3d6474..3ec2735f0cc8 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -49,7 +49,7 @@ */ public class PartitionSpec implements Serializable { // IDs for partition fields start at 1000 - private static final int PARTITION_DATA_ID_START = 1000; + static final int PARTITION_DATA_ID_START = 1000; private final Schema schema; @@ -174,7 +174,7 @@ public String partitionToPath(StructLike data) { } /** - * Returns true if this spec is equivalent to the other, with field names and partition field ids ignored. + * Returns true if this spec is compatible to the other, with field names and partition field ids ignored. * That is, if both specs have the same number of fields, field order, source columns, and transforms. * * @param other another PartitionSpec @@ -201,6 +201,21 @@ public boolean compatibleWith(PartitionSpec other) { return true; } + /** + * Returns true if this spec is equivalent to the other. + * That is, if both specs have the same number of fields, field order, and partition fields. + * + * @param other another PartitionSpec + * @return true if the specs have the same partition fields. + */ + boolean equivalentTo(PartitionSpec other) { + if (equals(other)) { + return true; + } + return Arrays.equals(fields, other.fields); + } + + @Override public boolean equals(Object other) { if (this == other) { @@ -311,7 +326,7 @@ public static class Builder { private final Schema schema; private final List fields = Lists.newArrayList(); private final Set partitionNames = Sets.newHashSet(); - private Map timeFields = Maps.newHashMap(); + private Map partitionFields = Maps.newHashMap(); private int specId = 0; private final AtomicInteger lastAssignedFieldId = new AtomicInteger(PARTITION_DATA_ID_START - 1); @@ -346,11 +361,29 @@ private void checkAndAddPartitionName(String name, Integer identitySourceColumnI partitionNames.add(name); } + private String getDedupKey(PartitionField field) { + String transformName = field.transform().getName(); + if (transformName != null) { + return transformName + "(" + field.sourceId() + ")"; + } else { + return null; + } + } + private void checkForRedundantPartitions(PartitionField field) { - PartitionField timeField = timeFields.get(field.sourceId()); - Preconditions.checkArgument(timeField == null, - "Cannot add redundant partition: %s conflicts with %s", timeField, field); - timeFields.put(field.sourceId(), field); + String dedupKey = getDedupKey(field); + if (dedupKey == null) { + return; + } + PartitionField partitionField = partitionFields.get(dedupKey); + Preconditions.checkArgument(partitionField == null, + "Cannot add redundant partition: %s conflicts with %s", partitionField, field); + partitionFields.put(dedupKey, field); + } + + private void checkDuplicateFieldId(int fieldId) { + Preconditions.checkArgument(fields.stream().allMatch(f -> f.fieldId() != fieldId), + "Cannot add a partition that duplicates another within %s.", fields); } public Builder withSpecId(int newSpecId) { @@ -377,11 +410,11 @@ public Builder identity(String sourceName) { } public Builder year(String sourceName, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year(sourceColumn.type())); checkForRedundantPartitions(field); + checkAndAddPartitionName(targetName); fields.add(field); return this; } @@ -391,11 +424,11 @@ public Builder year(String sourceName) { } public Builder month(String sourceName, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month(sourceColumn.type())); checkForRedundantPartitions(field); + checkAndAddPartitionName(targetName); fields.add(field); return this; } @@ -405,11 +438,11 @@ public Builder month(String sourceName) { } public Builder day(String sourceName, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day(sourceColumn.type())); checkForRedundantPartitions(field); + checkAndAddPartitionName(targetName); fields.add(field); return this; } @@ -419,11 +452,11 @@ public Builder day(String sourceName) { } public Builder hour(String sourceName, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); PartitionField field = new PartitionField( sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour(sourceColumn.type())); checkForRedundantPartitions(field); + checkAndAddPartitionName(targetName); fields.add(field); return this; } @@ -433,10 +466,12 @@ public Builder hour(String sourceName) { } public Builder bucket(String sourceName, int numBuckets, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); - fields.add(new PartitionField( - sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets))); + PartitionField field = new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets)); + checkForRedundantPartitions(field); + checkAndAddPartitionName(targetName); + fields.add(field); return this; } @@ -476,11 +511,20 @@ Builder add(int sourceId, int fieldId, String name, String transform) { Types.NestedField column = schema.findField(sourceId); checkAndAddPartitionName(name, column.fieldId()); Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); + checkDuplicateFieldId(fieldId); fields.add(new PartitionField(sourceId, fieldId, name, Transforms.fromString(column.type(), transform))); lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); return this; } + Builder addAll(Iterable fieldsToAdd) { + fieldsToAdd.forEach(field -> { + checkForRedundantPartitions(field); + add(field.sourceId(), field.fieldId(), field.name(), field.transform().toString()); + }); + return this; + } + public PartitionSpec build() { PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); checkCompatibility(spec, schema); diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 4536bb2b50e5..e66b4d1a0412 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -115,6 +115,13 @@ public interface Table { */ UpdateSchema updateSchema(); + /** + * Create a new {@link UpdatePartitionSpec} to alter the partition spec of this table and commit the change. + * + * @return a new {@link UpdatePartitionSpec} + */ + UpdatePartitionSpec updateSpec(); + /** * Create a new {@link UpdateProperties} to update table properties and commit the changes. * diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index 3f12db0dd493..d5c8a3ca736f 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -40,6 +40,13 @@ public interface Transaction { */ UpdateSchema updateSchema(); + /** + * Create a new {@link UpdatePartitionSpec} to alter the partition spec of this table. + * + * @return a new {@link UpdatePartitionSpec} + */ + UpdatePartitionSpec updateSpec(); + /** * Create a new {@link UpdateProperties} to update table properties. * diff --git a/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java new file mode 100644 index 000000000000..249e0d81e4ca --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.exceptions.CommitFailedException; + +/** + * API for partition spec evolution. + *

+ * When committing, these changes will be applied to the current table metadata. Commit conflicts + * will not be resolved and will result in a {@link CommitFailedException}. + */ +public interface UpdatePartitionSpec extends PendingUpdate { + + /** + * Add a new partition field with identity transform to the partition spec. + *

+ * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addIdentityField(String sourceName, String targetName); + + /** + * Add a new partition field with identity transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @return this for method chaining + */ + UpdatePartitionSpec addIdentityField(String sourceName); + + /** + * Add a new partition field with year transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addYearField(String sourceName, String targetName); + + /** + * Add a new partition field with year transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @return this for method chaining + */ + UpdatePartitionSpec addYearField(String sourceName); + + /** + * Add a new partition field with month transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addMonthField(String sourceName, String targetName); + + /** + * Add a new partition field with month transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @return this for method chaining + */ + UpdatePartitionSpec addMonthField(String sourceName); + + /** + * Add a new partition field with day transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addDayField(String sourceName, String targetName); + + /** + * Add a new partition field with day transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @return this for method chaining + */ + UpdatePartitionSpec addDayField(String sourceName); + + /** + * Add a new partition field with hour transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addHourField(String sourceName, String targetName); + + /** + * Add a new partition field with hour transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @return this for method chaining + */ + UpdatePartitionSpec addHourField(String sourceName); + + /** + * Add a new partition field with bucket transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param numBuckets the number of buckets + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addBucketField(String sourceName, int numBuckets, String targetName); + + /** + * Add a new partition field with bucket transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param numBuckets the number of buckets + * @return this for method chaining + */ + UpdatePartitionSpec addBucketField(String sourceName, int numBuckets); + + /** + * Add a new partition field with truncate transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param width the width of truncation + * @param targetName the name of this partition field + * @return this for method chaining + */ + UpdatePartitionSpec addTruncateField(String sourceName, int width, String targetName); + + /** + * Add a new partition field with truncate transform to the partition spec. + * + * @param sourceName the field name of the source field in the {@link PartitionSpec spec's} table schema + * @param width the width of truncation + * @return this for method chaining + */ + UpdatePartitionSpec addTruncateField(String sourceName, int width); + + /** + * Rename a partition field in the partition spec. + * + * @param name the name of a partition field to be renamed + * @param newName the new name of the partition field + * @return this for method chaining + */ + UpdatePartitionSpec renameField(String name, String newName); + + /** + * Remove a partition field in the partition spec. + *

+ * The partition field will be soft deleted for a table with V1 metadata and hard deleted in a higher version. + * + * @param name the name of a partition field to be removed + * @return this for method chaining + */ + UpdatePartitionSpec removeField(String name); + +} diff --git a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java index 9ab1fd44a5c9..0458b355c098 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java @@ -154,6 +154,11 @@ public Type getResultType(Type sourceType) { return Types.IntegerType.get(); } + @Override + public String getName() { + return "bucket"; + } + private static class BucketInteger extends Bucket { private BucketInteger(int numBuckets) { super(numBuckets); diff --git a/api/src/main/java/org/apache/iceberg/transforms/Dates.java b/api/src/main/java/org/apache/iceberg/transforms/Dates.java index d1a1583f6f36..6b389014e307 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Dates.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Dates.java @@ -125,4 +125,9 @@ public String toHumanString(Integer value) { public String toString() { return name; } + + @Override + public String getName() { + return "date"; + } } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Identity.java b/api/src/main/java/org/apache/iceberg/transforms/Identity.java index e53a8f115f9c..53fa1069d87e 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Identity.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Identity.java @@ -128,4 +128,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(type); } + + @Override + public String getName() { + return "identity"; + } } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java index 97086211da83..ed3d771e106b 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java @@ -129,4 +129,9 @@ public String toHumanString(Integer value) { public String toString() { return name; } + + @Override + public String getName() { + return "timestamp"; + } } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Transform.java b/api/src/main/java/org/apache/iceberg/transforms/Transform.java index ba0fb1b219df..d4f5169c78ba 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Transform.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Transform.java @@ -102,4 +102,13 @@ default boolean isIdentity() { default String toHumanString(T value) { return String.valueOf(value); } + + /** + * Returns the transform name + * + * @return the transform name in String format + */ + default String getName() { + return null; + } } diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java index 39120b372ccc..28b4668e7465 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java +++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java @@ -63,6 +63,11 @@ public Type getResultType(Type sourceType) { return sourceType; } + @Override + public String getName() { + return "truncate"; + } + private static class TruncateInteger extends Truncate { private final int width; diff --git a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java index b22bc7b6c401..30ff5732ed2f 100644 --- a/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java +++ b/api/src/test/java/org/apache/iceberg/TestPartitionSpecValidation.java @@ -34,10 +34,54 @@ public class TestPartitionSpecValidation { NestedField.required(6, "s", Types.StringType.get()) ); + @Test + public void testPartitionNameCollisions() { + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).year("ts", "year").year("another_ts", "year").build()); + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).month("ts", "month").month("another_ts", "month").build()); + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).day("ts", "day").day("another_ts", "day").build()); + AssertHelpers.assertThrows("Should not allow partition fields with the same name)", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).hour("ts", "hour").hour("another_ts", "hour").build()); + + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).year("d", "year").year("another_d", "year").build()); + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).month("d", "month").month("another_d", "month").build()); + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA).day("d", "day").day("another_d", "day").build()); + + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA) + .bucket("id", 8, "bucket") + .bucket("s", 16, "bucket").build()); + + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA) + .truncate("id", 8, "bucket") + .truncate("s", 16, "bucket").build()); + + AssertHelpers.assertThrows("Should not allow partition fields with the same name", + IllegalArgumentException.class, "Cannot use partition name more than once", + () -> PartitionSpec.builderFor(SCHEMA) + .identity("id", "identity") + .identity("s", "identity").build()); + } + @Test public void testMultipleTimestampPartitions() { AssertHelpers.assertThrows("Should not allow year(ts) and year(ts)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).year("ts").year("ts").build()); AssertHelpers.assertThrows("Should not allow year(ts) and month(ts)", IllegalArgumentException.class, "Cannot add redundant partition", @@ -50,7 +94,7 @@ public void testMultipleTimestampPartitions() { () -> PartitionSpec.builderFor(SCHEMA).year("ts").hour("ts").build()); AssertHelpers.assertThrows("Should not allow month(ts) and month(ts)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).month("ts").month("ts").build()); AssertHelpers.assertThrows("Should not allow month(ts) and day(ts)", IllegalArgumentException.class, "Cannot add redundant partition", @@ -60,21 +104,21 @@ public void testMultipleTimestampPartitions() { () -> PartitionSpec.builderFor(SCHEMA).month("ts").hour("ts").build()); AssertHelpers.assertThrows("Should not allow day(ts) and day(ts)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).day("ts").day("ts").build()); AssertHelpers.assertThrows("Should not allow day(ts) and hour(ts)", IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).day("ts").hour("ts").build()); AssertHelpers.assertThrows("Should not allow hour(ts) and hour(ts)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).hour("ts").hour("ts").build()); } @Test public void testMultipleDatePartitions() { AssertHelpers.assertThrows("Should not allow year(d) and year(d)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).year("d").year("d").build()); AssertHelpers.assertThrows("Should not allow year(d) and month(d)", IllegalArgumentException.class, "Cannot add redundant partition", @@ -84,14 +128,14 @@ public void testMultipleDatePartitions() { () -> PartitionSpec.builderFor(SCHEMA).year("d").day("d").build()); AssertHelpers.assertThrows("Should not allow month(d) and month(d)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).month("d").month("d").build()); AssertHelpers.assertThrows("Should not allow month(d) and day(d)", IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).month("d").day("d").build()); AssertHelpers.assertThrows("Should not allow day(d) and day(d)", - IllegalArgumentException.class, "Cannot use partition name more than once", + IllegalArgumentException.class, "Cannot add redundant partition", () -> PartitionSpec.builderFor(SCHEMA).day("d").day("d").build()); } @@ -241,11 +285,38 @@ public void testAddPartitionFieldsWithAndWithoutFieldIds() { .add(1, "id_partition2", "bucket[5]") .add(1, 1005, "id_partition1", "bucket[4]") .truncate("s", 1, "custom_truncate") + .add(1, 1002, "id_partition3", "bucket[3]") .build(); Assert.assertEquals(1000, spec.fields().get(0).fieldId()); Assert.assertEquals(1005, spec.fields().get(1).fieldId()); Assert.assertEquals(1006, spec.fields().get(2).fieldId()); + Assert.assertEquals(1002, spec.fields().get(3).fieldId()); Assert.assertEquals(1006, spec.lastAssignedFieldId()); } + + @Test + public void testAddPartitionFieldsWithInvalidFieldId() { + AssertHelpers.assertThrows("Should detect invalid duplicate field id", + IllegalArgumentException.class, + "Cannot add a partition that duplicates another within", + () -> PartitionSpec.builderFor(SCHEMA) + .add(1, "id_partition2", "bucket[5]") + .add(1, 1005, "id_partition1", "bucket[4]") + .add(1, 1005, "id_partition3", "bucket[3]") + .build()); + } + + @Test + public void testMultipleBucketPartitions() { + AssertHelpers.assertThrows("Should not allow bucket[8](id) and bucket[16](id)", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> PartitionSpec.builderFor(SCHEMA).bucket("id", 8).bucket("id", 16).build()); + + AssertHelpers.assertThrows("Should not allow bucket[8](id) and bucket[16](id)", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> PartitionSpec.builderFor(SCHEMA) + .bucket("id", 8, "id_bucket1") + .bucket("id", 16, "id_bucket2").build()); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 009cfbda8edc..15d31e2f5dd7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -97,6 +97,11 @@ public UpdateSchema updateSchema() { throw new UnsupportedOperationException("Cannot update the schema of a metadata table"); } + @Override + public UpdatePartitionSpec updateSpec() { + throw new UnsupportedOperationException("Cannot update the partition spec of a metadata table"); + } + @Override public UpdateProperties updateProperties() { throw new UnsupportedOperationException("Cannot update the properties of a metadata table"); diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 15b8742691d9..d7fc5052c1cc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -104,6 +104,10 @@ public UpdateSchema updateSchema() { return new SchemaUpdate(ops); } + public UpdatePartitionSpec updateSpec() { + return new PartitionSpecUpdate(ops); + } + @Override public UpdateProperties updateProperties() { return new PropertiesUpdate(ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index f1ea1913d1df..e9bd49254ebe 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -102,6 +102,14 @@ public UpdateSchema updateSchema() { return schemaChange; } + @Override + public UpdatePartitionSpec updateSpec() { + checkLastOperationCommitted("UpdateSpec"); + UpdatePartitionSpec partitionSpecChange = new PartitionSpecUpdate(transactionOps); + updates.add(partitionSpecChange); + return partitionSpecChange; + } + @Override public UpdateProperties updateProperties() { checkLastOperationCommitted("UpdateProperties"); @@ -541,6 +549,11 @@ public UpdateSchema updateSchema() { return BaseTransaction.this.updateSchema(); } + @Override + public UpdatePartitionSpec updateSpec() { + return BaseTransaction.this.updateSpec(); + } + @Override public UpdateProperties updateProperties() { return BaseTransaction.this.updateProperties(); diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java index 6fa713509b0a..ef030b401951 100644 --- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java @@ -42,6 +42,11 @@ public UpdateSchema updateSchema() { return wrapped.updateSchema(); } + @Override + public UpdatePartitionSpec updateSpec() { + return wrapped.updateSpec(); + } + @Override public UpdateProperties updateProperties() { return wrapped.updateProperties(); diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecUpdate.java b/core/src/main/java/org/apache/iceberg/PartitionSpecUpdate.java new file mode 100644 index 000000000000..69a525924eb3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionSpecUpdate.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * PartitionSpec evolution API implementation. + */ +class PartitionSpecUpdate implements UpdatePartitionSpec { + + private static final String SOFT_DELETE_POSTFIX = "__[removed]"; + + private final TableMetadata base; + private final TableOperations ops; + private final List specs; + private final Schema schema; + private final Map curSpecFields; + private final List fields = Lists.newArrayList(); + private final AtomicInteger lastAssignedFieldId = new AtomicInteger(0); + private final Map partitionFieldIdByKey = Maps.newHashMap(); + + PartitionSpecUpdate(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + this.specs = ImmutableList.builder().addAll(base.specs()).add(base.spec()).build(); + this.schema = base.schema(); + this.curSpecFields = buildSpecByNameMap(base.spec()); + init(); + } + + /** + * For testing only. + */ + @VisibleForTesting + PartitionSpecUpdate(List partitionSpecs) { + this.ops = null; + this.base = null; + this.specs = partitionSpecs; + this.schema = partitionSpecs.get(partitionSpecs.size() - 1).schema(); + this.curSpecFields = buildSpecByNameMap(partitionSpecs.get(partitionSpecs.size() - 1)); + init(); + } + + private Map buildSpecByNameMap(PartitionSpec spec) { + return spec.fields().stream() + .filter(PartitionSpecUpdate::notSoftDeleted).collect( + Collectors.toMap( + PartitionField::name, + Function.identity())); + } + + private void init() { + for (PartitionSpec spec : specs) { + for (PartitionField field : spec.fields()) { + if (notSoftDeleted(field)) { + partitionFieldIdByKey.put(getKey(field.transform(), field.sourceId()), field.fieldId()); + } + } + lastAssignedFieldId.getAndAccumulate(spec.lastAssignedFieldId(), Math::max); + } + } + + @Override + public void commit() { + PartitionSpec newSpec = apply(); // V2 ready partition spec + if (base.formatVersion() == 1) { + newSpec = fillGapsByNullFields(newSpec); + } + TableMetadata updated = base.updatePartitionSpec(newSpec); + + if (updated == base) { + // do not commit if the metadata has not changed. For example, this may happen + // when the committing partition spec is already current. Note that this check uses identity. + return; + } + + ops.commit(base, updated); + } + + @Override + public PartitionSpec apply() { + fields.addAll(curSpecFields.values()); + fields.sort(Comparator.comparingInt(PartitionField::fieldId)); + return PartitionSpec.builderFor(schema).addAll(fields).build(); + } + + private UpdatePartitionSpec addFieldWithType(String sourceName, + String targetName, + Function> func) { + Types.NestedField sourceColumn = schema.findField(sourceName); + Preconditions.checkArgument(sourceColumn != null, "Cannot find source column: %s", sourceName); + + Transform transform = func.apply(sourceColumn.type()); + Integer assignedFieldId = partitionFieldIdByKey.get(getKey(transform, sourceColumn.fieldId())); + if (assignedFieldId == null) { + assignedFieldId = lastAssignedFieldId.incrementAndGet(); + } + + fields.add(new PartitionField(sourceColumn.fieldId(), assignedFieldId, targetName, transform)); + return this; + } + + @Override + public UpdatePartitionSpec addIdentityField(String sourceName, String targetName) { + return addFieldWithType(sourceName, targetName, Transforms::identity); + } + + @Override + public UpdatePartitionSpec addIdentityField(String sourceName) { + return addIdentityField(sourceName, sourceName); + } + + @Override + public UpdatePartitionSpec addYearField(String sourceName, String targetName) { + return addFieldWithType(sourceName, targetName, Transforms::year); + } + + @Override + public UpdatePartitionSpec addYearField(String sourceName) { + return addYearField(sourceName, sourceName + "_year"); + } + + @Override + public UpdatePartitionSpec addMonthField(String sourceName, String targetName) { + return addFieldWithType(sourceName, targetName, Transforms::month); + } + + @Override + public UpdatePartitionSpec addMonthField(String sourceName) { + return addMonthField(sourceName, sourceName + "_month"); + } + + @Override + public UpdatePartitionSpec addDayField(String sourceName, String targetName) { + return addFieldWithType(sourceName, targetName, Transforms::day); + } + + @Override + public UpdatePartitionSpec addDayField(String sourceName) { + return addDayField(sourceName, sourceName + "_day"); + } + + @Override + public UpdatePartitionSpec addHourField(String sourceName, String targetName) { + return addFieldWithType(sourceName, targetName, Transforms::hour); + } + + @Override + public UpdatePartitionSpec addHourField(String sourceName) { + return addHourField(sourceName, sourceName + "_hour"); + } + + @Override + public UpdatePartitionSpec addBucketField(String sourceName, int numBuckets, String targetName) { + return addFieldWithType(sourceName, targetName, type -> Transforms.bucket(type, numBuckets)); + } + + @Override + public UpdatePartitionSpec addBucketField(String sourceName, int numBuckets) { + return addBucketField(sourceName, numBuckets, sourceName + "_bucket"); + } + + @Override + public UpdatePartitionSpec addTruncateField(String sourceName, int width, String targetName) { + return addFieldWithType(sourceName, targetName, type -> Transforms.truncate(type, width)); + } + + @Override + public UpdatePartitionSpec addTruncateField(String sourceName, int width) { + return addTruncateField(sourceName, width, sourceName + "_trunc"); + } + + @Override + public UpdatePartitionSpec renameField(String name, String newName) { + Preconditions.checkArgument(curSpecFields.containsKey(name), + "Cannot find an existing partition field with the name: %s", name); + Preconditions.checkArgument(newName != null && !newName.isEmpty(), + "Cannot use an empty or null partition name: %s", newName); + + PartitionField field = curSpecFields.get(name); + curSpecFields.put(name, + new PartitionField(field.sourceId(), field.fieldId(), newName, field.transform())); + return this; + } + + @Override + public UpdatePartitionSpec removeField(String name) { + Preconditions.checkArgument(curSpecFields.containsKey(name), + "Cannot find an existing partition field with the name: %s", name); + curSpecFields.remove(name); + return this; + } + + private static boolean notSoftDeleted(PartitionField field) { + return !(field.name().endsWith(SOFT_DELETE_POSTFIX) && Transforms.alwaysNull().equals(field.transform())); + } + + private static String getKey(Transform transform, int sourceId) { + return transform + "(" + sourceId + ")"; + } + + private PartitionSpec fillGapsByNullFields(PartitionSpec partitionSpec) { + Map sourceIdByFieldId = specs.stream().flatMap(spec -> spec.fields().stream()).collect( + Collectors.toMap( + PartitionField::fieldId, + PartitionField::sourceId, + (n1, n2) -> n2, + HashMap::new + )); + + int startId = PartitionSpec.PARTITION_DATA_ID_START; + PartitionField[] partitionFields = new PartitionField[partitionSpec.lastAssignedFieldId() - startId + 1]; + partitionSpec.fields().forEach(field -> partitionFields[field.fieldId() - startId] = field); + for (int i = 0; i < partitionFields.length; ++i) { + if (partitionFields[i] == null) { + int fieldId = startId + i; + ValidationException.check(sourceIdByFieldId.containsKey(fieldId), + "Invalid partition specs, which miss partition field info for id %s.", fieldId); + partitionFields[i] = new PartitionField( + sourceIdByFieldId.get(fieldId), + fieldId, + fieldId + SOFT_DELETE_POSTFIX, + Transforms.alwaysNull()); + } + } + + return PartitionSpec.builderFor(schema).withSpecId(partitionSpec.specId()) + .addAll(Arrays.asList(partitionFields)) + .build(); + } + +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index b7fb7cb5a3d3..1fe4f4e4a663 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -429,11 +429,21 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { } } - Preconditions.checkArgument(defaultSpecId != newDefaultSpecId, - "Cannot set default partition spec to the current default"); + if (defaultSpecId == newDefaultSpecId && newPartitionSpec.equivalentTo(spec())) { + // the new spec is already current and return the current table metadata + return this; + } + + // Always setting default partition spec to the new partition spec + ImmutableList.Builder builder = ImmutableList.builder(); + for (PartitionSpec spec : specs) { + if (spec.specId() == newDefaultSpecId) { + builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); + } else { + builder.add(spec); + } + } - ImmutableList.Builder builder = ImmutableList.builder() - .addAll(specs); if (!specsById.containsKey(newDefaultSpecId)) { // get a fresh spec to ensure the spec ID is set to the new default builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 4e91131b6db3..0d17e684d08d 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -79,11 +79,12 @@ public void testManifestReaderWithPartitionMetadata() throws IOException { } @Test - public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IOException { + public void testManifestReaderWithUpdatedPartitionMetadata() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(table.schema()) .bucket("id", 8) .bucket("data", 16) .build(); + // commit the new partition spec to the table manually without spec evolution table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec)); ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); @@ -103,4 +104,27 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO } } + @Test + public void testManifestReaderWithPartitionMetadataEvolution() throws IOException { + table.updateSpec() + .addBucketField("id", 8) + .commit(); + + ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + Assert.assertEquals(123L, (long) entry.snapshotId()); + + List fields = ((PartitionData) entry.file().partition()).getPartitionType().fields(); + Assert.assertEquals(2, fields.size()); + Assert.assertEquals(1000, fields.get(0).fieldId()); + Assert.assertEquals("data_bucket", fields.get(0).name()); + Assert.assertEquals(Types.IntegerType.get(), fields.get(0).type()); + + Assert.assertEquals(1001, fields.get(1).fieldId()); + Assert.assertEquals("id_bucket", fields.get(1).name()); + Assert.assertEquals(Types.IntegerType.get(), fields.get(1).type()); + } + } + } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index bc6c6a3d5bc2..ddfdceef415b 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -389,7 +389,7 @@ public void testChangedPartitionSpec() { .bucket("id", 4) .build(); - // commit the new partition spec to the table manually + // commit the new partition spec to the table manually without spec evolution table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) @@ -435,7 +435,7 @@ public void testChangedPartitionSpecMergeExisting() { .bucket("id", 4) .build(); - // commit the new partition spec to the table manually + // commit the new partition spec to the table manually without spec evolution table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) @@ -665,21 +665,14 @@ public void testInvalidAppendManifest() throws IOException { .commit()); } - @Test - public void testUpdatePartitionSpecFieldIdsForV1Table() { - TableMetadata base = readMetadata(); - - // build the new spec using the table's schema, which uses fresh IDs - PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) - .bucket("id", 16) - .identity("data") - .bucket("data", 4) - .bucket("data", 16, "data_partition") // reuse field id although different target name - .build(); - - // commit the new partition spec to the table manually - table.ops().commit(base, base.updatePartitionSpec(newSpec)); + public void testUpdatePartitionSpecFieldIdsWithSpecEvolution() { + table.updateSpec() + .removeField("data_bucket") + .addBucketField("id", 16) + .addIdentityField("data") + .addBucketField("data", 4) + .commit(); List partitionSpecs = table.ops().current().specs(); PartitionSpec partitionSpec = partitionSpecs.get(0); @@ -696,19 +689,19 @@ public void testUpdatePartitionSpecFieldIdsForV1Table() { structType = partitionSpec.partitionType(); fields = structType.fields(); - Assert.assertEquals(4, fields.size()); - Assert.assertEquals("id_bucket", fields.get(0).name()); - Assert.assertEquals(1000, fields.get(0).fieldId()); - Assert.assertEquals("data", fields.get(1).name()); - Assert.assertEquals(1001, fields.get(1).fieldId()); - Assert.assertEquals("data_bucket", fields.get(2).name()); - Assert.assertEquals(1002, fields.get(2).fieldId()); - Assert.assertEquals("data_partition", fields.get(3).name()); - Assert.assertEquals(1003, fields.get(3).fieldId()); + + int offset = 2 - formatVersion; // 1 for V1 and 0 for V2 + Assert.assertEquals("field size should be 4 in V1 and 3 in V2", 3 + offset, fields.size()); + V1Assert.assertEquals("removed field should be soft deleted in V1", + "1000: 1000__[removed]: optional string", fields.get(0).toString()); + + Assert.assertEquals("id bucket field", "1001: id_bucket: optional int", fields.get(0 + offset).toString()); + Assert.assertEquals("data identity field", "1002: data: optional string", fields.get(1 + offset).toString()); + Assert.assertEquals("data bucket field", "1003: data_bucket: optional int", fields.get(2 + offset).toString()); } @Test - public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { + public void testManifestEntryFieldIdsForChangedPartitionSpecWithSpecEvolution() { table.newAppend() .appendFile(FILE_A) .commit(); @@ -718,14 +711,9 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { 1, base.currentSnapshot().allManifests().size()); ManifestFile initialManifest = base.currentSnapshot().allManifests().get(0); - // build the new spec using the table's schema, which uses fresh IDs - PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) - .bucket("id", 8) - .bucket("data", 8) - .build(); - - // commit the new partition spec to the table manually - table.ops().commit(base, base.updatePartitionSpec(newSpec)); + table.updateSpec() + .addBucketField("id", 8) + .commit(); DataFile newFile = DataFiles.builder(table.spec()) .copy(FILE_B) @@ -744,15 +732,14 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { Assert.assertEquals("Second manifest should be the initial manifest with the old spec", initialManifest, pending.allManifests().get(1)); - // field ids of manifest entries in two manifests with different specs of the same source field should be different ManifestEntry entry = ManifestFiles.read(pending.allManifests().get(0), FILE_IO) .entries().iterator().next(); Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); Assert.assertEquals(1000, field.fieldId()); - Assert.assertEquals("id_bucket", field.name()); + Assert.assertEquals("data_bucket", field.name()); field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(1); Assert.assertEquals(1001, field.fieldId()); - Assert.assertEquals("data_bucket", field.name()); + Assert.assertEquals("id_bucket", field.name()); entry = ManifestFiles.read(pending.allManifests().get(1), FILE_IO).entries().iterator().next(); field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java index b0395f004323..7515fa870483 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java @@ -91,7 +91,7 @@ public void testSpecInfoPartitionedTable() { } @Test - public void testSpecInfoPartitionSpecEvolutionForV1Table() { + public void testSpecInfoPartitionSpecEvolution() { PartitionSpec spec = PartitionSpec.builderFor(schema) .bucket("data", 4) .build(); @@ -99,17 +99,29 @@ public void testSpecInfoPartitionSpecEvolutionForV1Table() { Assert.assertEquals(spec, table.spec()); - TableMetadata base = TestTables.readMetadata("test"); - PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()) - .bucket("data", 10) - .withSpecId(1) - .build(); - table.ops().commit(base, base.updatePartitionSpec(newSpec)); - - Assert.assertEquals(newSpec, table.spec()); - Assert.assertEquals(newSpec, table.specs().get(newSpec.specId())); + table.updateSpec() + .removeField("data_bucket") + .addBucketField("data", 10) + .commit(); + + PartitionSpec expectedNewSpec; + if (formatVersion == 1) { + expectedNewSpec = PartitionSpec.builderFor(table.schema()) + .add(2, 1000, "1000__[removed]", "void") + .add(2, 1001, "data_bucket", "bucket[10]") + .withSpecId(1) + .build(); + } else { + expectedNewSpec = PartitionSpec.builderFor(table.schema()) + .add(2, 1001, "data_bucket", "bucket[10]") + .withSpecId(1) + .build(); + } + + Assert.assertEquals(expectedNewSpec, table.spec()); + Assert.assertEquals(expectedNewSpec, table.specs().get(expectedNewSpec.specId())); Assert.assertEquals(spec, table.specs().get(spec.specId())); - Assert.assertEquals(ImmutableMap.of(spec.specId(), spec, newSpec.specId(), newSpec), table.specs()); + Assert.assertEquals(ImmutableMap.of(spec.specId(), spec, expectedNewSpec.specId(), expectedNewSpec), table.specs()); Assert.assertNull(table.specs().get(Integer.MAX_VALUE)); } } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java index 847ff4283ab1..6f774202e39b 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecParser.java @@ -19,16 +19,20 @@ package org.apache.iceberg; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; -public class TestPartitionSpecParser extends TableTestBase { - public TestPartitionSpecParser() { - super(1); - } +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestPartitionSpecParser { + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()) + ); @Test - public void testToJsonForV1Table() { + public void testToJson() { String expected = "{\n" + " \"spec-id\" : 0,\n" + " \"fields\" : [ {\n" + @@ -38,30 +42,36 @@ public void testToJsonForV1Table() { " \"field-id\" : 1000\n" + " } ]\n" + "}"; - Assert.assertEquals(expected, PartitionSpecParser.toJson(table.spec(), true)); + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + Assert.assertEquals(expected, PartitionSpecParser.toJson(initialSpec, true)); - PartitionSpec spec = PartitionSpec.builderFor(table.schema()) + PartitionSpec evolvedSpec = PartitionSpec.builderFor(SCHEMA) + .withSpecId(1) + .alwaysNull("data", "1000__[removed]") .bucket("id", 8) - .bucket("data", 16) + .bucket("data", 4) .build(); - table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec)); - expected = "{\n" + " \"spec-id\" : 1,\n" + " \"fields\" : [ {\n" + + " \"name\" : \"1000__[removed]\",\n" + + " \"transform\" : \"void\",\n" + + " \"source-id\" : 2,\n" + + " \"field-id\" : 1000\n" + + " }, {\n" + " \"name\" : \"id_bucket\",\n" + " \"transform\" : \"bucket[8]\",\n" + " \"source-id\" : 1,\n" + - " \"field-id\" : 1000\n" + + " \"field-id\" : 1001\n" + " }, {\n" + " \"name\" : \"data_bucket\",\n" + - " \"transform\" : \"bucket[16]\",\n" + + " \"transform\" : \"bucket[4]\",\n" + " \"source-id\" : 2,\n" + - " \"field-id\" : 1001\n" + + " \"field-id\" : 1002\n" + " } ]\n" + "}"; - Assert.assertEquals(expected, PartitionSpecParser.toJson(table.spec(), true)); + Assert.assertEquals(expected, PartitionSpecParser.toJson(evolvedSpec, true)); } @Test @@ -81,7 +91,7 @@ public void testFromJsonWithFieldId() { " } ]\n" + "}"; - PartitionSpec spec = PartitionSpecParser.fromJson(table.schema(), specString); + PartitionSpec spec = PartitionSpecParser.fromJson(SCHEMA, specString); Assert.assertEquals(2, spec.fields().size()); // should be the field ids in the JSON @@ -104,7 +114,7 @@ public void testFromJsonWithoutFieldId() { " } ]\n" + "}"; - PartitionSpec spec = PartitionSpecParser.fromJson(table.schema(), specString); + PartitionSpec spec = PartitionSpecParser.fromJson(SCHEMA, specString); Assert.assertEquals(2, spec.fields().size()); // should be the default assignment diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecUpdate.java b/core/src/test/java/org/apache/iceberg/TestPartitionSpecUpdate.java new file mode 100644 index 000000000000..bbb6b783cb3f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecUpdate.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestPartitionSpecUpdate { + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "ts", Types.TimestampType.withZone()) + ); + + List initialSpecs = Collections.singletonList( + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build()); + PartitionSpecUpdate specUpdate; + + @Test + public void testAddAndRemoveField() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + Assert.assertEquals( + PartitionSpec.builderFor(SCHEMA) + .add(2, 1001, "data_bucket", "bucket[8]") + .build(), + specUpdate + .removeField("data_bucket") + .addBucketField("data", 8) + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + Assert.assertEquals( + PartitionSpec.builderFor(SCHEMA) + .add(2, 1001, "data_bucket", "bucket[8]") + .build(), + specUpdate + .addBucketField("data", 8) + .removeField("data_bucket") + .apply()); + } + + @Test + public void testAddRemovedSamePartitionField() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + Assert.assertEquals( + "remove and add operations should cancel each other, equivalent to noop", + PartitionSpec.builderFor(SCHEMA) + .add(2, 1000, "data_bucket", "bucket[16]") + .add(1, 1001, "id_bucket", "bucket[8]") + .build(), + specUpdate + .removeField("data_bucket") + .addBucketField("id", 8) + .addBucketField("data", 16) + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + Assert.assertEquals( + "remove and add operations should cancel each other, equivalent to noop", + PartitionSpec.builderFor(SCHEMA) + .add(2, 1000, "data_bucket", "bucket[16]") + .add(1, 1001, "id_bucket", "bucket[8]") + .build(), + specUpdate + .addBucketField("id", 8) + .addBucketField("data", 16) + .removeField("data_bucket") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + Assert.assertEquals( + "remove and add operations are equivalent to rename data_bucket to data_partition", + PartitionSpec.builderFor(SCHEMA) + .add(2, 1000, "data_partition", "bucket[16]") + .build(), + specUpdate + .addBucketField("data", 16, "data_partition") + .removeField("data_bucket") + .apply()); + } + + @Test + public void testUpdateException() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if there is an invalid partition field", + IllegalArgumentException.class, "Cannot find an existing partition field with the name: id_bucket", + () -> specUpdate + .removeField("id_bucket") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if there is a redundant partition field", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> specUpdate + .removeField("data_bucket") + .addBucketField("id", 8) + .addBucketField("id", 16) + .apply()); + } + + @Test + public void testAddDuplicateFieldException() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if adding a duplicate partition field", + IllegalArgumentException.class, "Cannot use partition name more than once: data_bucket", + () -> specUpdate + .addTruncateField("data", 16, "data_bucket") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if adding a duplicate partition field", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> specUpdate + .addBucketField("data", 16) + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if adding a redundant partition field", + IllegalArgumentException.class, + "Cannot add redundant partition:", + () -> specUpdate + .addBucketField("data", 16, "data_partition") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if adding a redundant partition field", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> specUpdate + .addBucketField("data", 8, "data_partition") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if adding a redundant partition field", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> specUpdate + .addBucketField("id", 8, "id_partition1") + .addBucketField("id", 16, "id_partition2") + .apply()); + } + + @Test + public void testAddField() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + PartitionSpec newSpec = specUpdate + .addBucketField("id", 8) + .addTruncateField("data", 8) + .apply(); + + PartitionSpec evolvedSpec = PartitionSpec.builderFor(SCHEMA) + .bucket("data", 16) + .bucket("id", 8) + .truncate("data", 8) + .build(); + + Assert.assertEquals("should match evolved spec", evolvedSpec, newSpec); + Assert.assertEquals(1002, newSpec.lastAssignedFieldId()); + + specUpdate = new PartitionSpecUpdate( + Arrays.asList( + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).bucket("id", 8).build(), + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build())); + + Assert.assertEquals("Should not reuse field id for a new field", + PartitionSpec.builderFor(SCHEMA) + .add(2, 1000, "data_bucket", "bucket[16]") + .add(2, 1002, "data", "identity") + .build(), + specUpdate + .addIdentityField("data") + .apply()); + } + + @Test + public void testRenameField() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + PartitionSpec newSpec = specUpdate + .addBucketField("id", 8) + .renameField("data_bucket", "data_partition") + .apply(); + + PartitionSpec evolvedSpec = PartitionSpec.builderFor(SCHEMA) + .bucket("data", 16, "data_partition") + .bucket("id", 8, "id_bucket") + .build(); + + Assert.assertEquals("should match evolved spec", evolvedSpec, newSpec); + Assert.assertEquals(1001, newSpec.lastAssignedFieldId()); + } + + @Test + public void testRenameFieldExceptions() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if renaming a non-existing partition field", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: not_existing", + () -> specUpdate + .renameField("not_existing", "data_partition") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if renaming a partition field to null", + IllegalArgumentException.class, + "Cannot use an empty or null partition name: null", + () -> specUpdate + .renameField("data_bucket", null) + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if renaming a not committed new partition field", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: not_committed", + () -> specUpdate + .addBucketField("data", 6, "not_committed") + .renameField("not_committed", "data_partition") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if renaming a removed field", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: data_bucket", + () -> specUpdate + .removeField("data_bucket") + .renameField("data_bucket", "data_partition") + .apply()); + + specUpdate = new PartitionSpecUpdate( + Arrays.asList( + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(), + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).bucket("id", 8).build())); + + AssertHelpers.assertThrows( + "Should throw IllegalArgumentException if renaming a field to the same name as another field", + IllegalArgumentException.class, + "Cannot use partition name more than once", + () -> specUpdate + .renameField("data_bucket", "id_bucket") + .apply()); + } + + @Test + public void testRemoveFieldException() { + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalStateException if removing a non-existing partition field", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: not_existing", + () -> specUpdate + .removeField("not_existing") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalStateException if removing a not committed new partition field", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: not_committed", + () -> specUpdate + .addBucketField("id", 6, "not_committed") + .removeField("not_committed") + .apply()); + + specUpdate = new PartitionSpecUpdate(initialSpecs); + AssertHelpers.assertThrows( + "Should throw IllegalStateException if removing a partition field more than once", + IllegalArgumentException.class, + "Cannot find an existing partition field with the name: data_bucket", + () -> specUpdate + .removeField("data_bucket") + .removeField("data_bucket") + .apply()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index ac139e94855d..6730f826a533 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -509,7 +509,7 @@ public void testNewTableMetadataReassignmentAllIds() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5) .add(3, 1005, "x_partition", "bucket[4]") - .add(5, 1005, "z_partition", "bucket[8]") + .add(5, 1003, "z_partition", "bucket[8]") .build(); String location = "file://tmp/db/table"; TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of()); @@ -533,7 +533,7 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception { .add(1, 1005, "x_partition", "bucket[4]") .build(); String location = "file://tmp/db/table"; - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of()); + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, location, ImmutableMap.of(), 1); AssertHelpers.assertThrows("Should fail to update an invalid partition spec", ValidationException.class, "Spec does not use sequential IDs that are required in v1", diff --git a/core/src/test/java/org/apache/iceberg/TestTablePartitionSpecUpdate.java b/core/src/test/java/org/apache/iceberg/TestTablePartitionSpecUpdate.java new file mode 100644 index 000000000000..a3a785e46518 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestTablePartitionSpecUpdate.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestTablePartitionSpecUpdate extends TableTestBase { + + @Parameterized.Parameters + public static Object[][] parameters() { + return new Object[][] { + new Object[] { 1 }, + new Object[] { 2 }, + }; + } + + public TestTablePartitionSpecUpdate(int formatVersion) { + super(formatVersion); + } + + @Before + public void verifyInitialSpec() { + PartitionSpec initialSpec = PartitionSpec.builderFor(table.schema()).bucket("data", 16).build(); + Assert.assertEquals("Should use the expected initial spec", initialSpec, table.spec()); + Assert.assertEquals(1000, table.spec().lastAssignedFieldId()); + Assert.assertEquals(0, table.spec().specId()); + } + + @Test + public void testCommitUpdatedSpec() { + table.updateSpec() + .addBucketField("id", 8) + .commit(); + + PartitionSpec evolvedSpec = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .bucket("data", 16) + .bucket("id", 8) + .build(); + Assert.assertEquals("Should append a partition field to the spec", evolvedSpec, table.spec()); + Assert.assertEquals(1001, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .removeField("id_bucket") + .removeField("data_bucket") + .addTruncateField("data", 8) + .commit(); + + V1Assert.assertEquals("Should soft delete id and data buckets", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("data", "1000__[removed]") + .alwaysNull("id", "1001__[removed]") + .truncate("data", 8) + .build(), table.spec()); + + V2Assert.assertEquals("Should hard delete id and data buckets", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .add(2, 1002, "data_trunc", "truncate[8]") + .build(), table.spec()); + + Assert.assertEquals(1002, table.spec().lastAssignedFieldId()); + } + + @Test + public void testNoopCommit() { + TableMetadata current = table.ops().current(); + Integer currentVersion = TestTables.metadataVersion("test"); + + // no-op commit due to no-op + table.updateSpec().commit(); + TableMetadata updated = table.ops().current(); + Integer updatedVersion = TestTables.metadataVersion("test"); + Assert.assertEquals(current, updated); + Assert.assertEquals(currentVersion, updatedVersion); + + // no-op commit due to no-op rename + table.updateSpec() + .renameField("data_bucket", "data_bucket") + .commit(); + updated = table.ops().current(); + updatedVersion = TestTables.metadataVersion("test"); + Assert.assertEquals(current, updated); + Assert.assertEquals(currentVersion, updatedVersion); + + // no-op commit due to no-ops (remove + add for the same field) + table.updateSpec() + .removeField("data_bucket") + .addBucketField("data", 16) + .commit(); + updated = table.ops().current(); + updatedVersion = TestTables.metadataVersion("test"); + Assert.assertEquals(current, updated); + Assert.assertEquals(currentVersion, updatedVersion); + + // no-op commit due to no-ops (add + remove for the same field) + table.updateSpec() + .addBucketField("data", 16) + .removeField("data_bucket") + .commit(); + updated = table.ops().current(); + updatedVersion = TestTables.metadataVersion("test"); + Assert.assertEquals(current, updated); + Assert.assertEquals(currentVersion, updatedVersion); + } + + @Test + public void testRenameField() { + table.updateSpec() + .renameField("data_bucket", "data_partition") + .addBucketField("id", 8) + .commit(); + + PartitionSpec evolvedSpec = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .bucket("data", 16, "data_partition") + .bucket("id", 8, "id_bucket") + .build(); + + Assert.assertEquals("should match evolved spec", evolvedSpec, table.spec()); + Assert.assertEquals(1001, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .addTruncateField("id", 4) + .renameField("data_partition", "data_bucket") + .commit(); + + evolvedSpec = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .bucket("data", 16, "data_bucket") + .bucket("id", 8, "id_bucket") + .truncate("id", 4) + .build(); + + Assert.assertEquals("should match evolved spec", evolvedSpec, table.spec()); + Assert.assertEquals(1002, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .renameField("id_bucket", "id_partition") + .commit(); + + evolvedSpec = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .bucket("data", 16, "data_bucket") + .bucket("id", 8, "id_partition") + .truncate("id", 4) + .build(); + Assert.assertEquals("should match evolved spec", evolvedSpec, table.spec()); + Assert.assertEquals(1002, table.spec().lastAssignedFieldId()); + } + + @Test + public void testRemoveField() { + table.updateSpec() + .removeField("data_bucket") + .addBucketField("id", 8) + .commit(); + + V1Assert.assertEquals("Should soft delete data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .alwaysNull("data", "1000__[removed]") + .bucket("id", 8) + .build(), table.spec()); + + V2Assert.assertEquals("Should hard delete data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .add(1, 1001, "id_bucket", "bucket[8]") + .build(), table.spec()); + + Assert.assertEquals(1001, table.spec().lastAssignedFieldId()); + } + + @Test + public void testRemoveAndAddField() { + table.updateSpec() + .removeField("data_bucket") + .commit(); + + Assert.assertEquals("Should hard delete data bucket for both V1 and V2", PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .build(), table.spec()); + + table.updateSpec() + .addBucketField("data", 8) + .commit(); + + V1Assert.assertEquals("Should add a new data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("data", "1000__[removed]") + .bucket("data", 8) + .build(), table.spec()); + V2Assert.assertEquals("Should add a new data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .add(2, 1001, "data_bucket", "bucket[8]") + .build(), table.spec()); + Assert.assertEquals(1001, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .addBucketField("data", 6) + .removeField("data_bucket") + .commit(); + + V1Assert.assertEquals("Should evolve to a new spec", PartitionSpec.builderFor(table.schema()) + .withSpecId(3) + .alwaysNull("data", "1000__[removed]") + .alwaysNull("data", "1001__[removed]") + .bucket("data", 6) + .build(), table.spec()); + V2Assert.assertEquals("Should evolve back to the initial spec", PartitionSpec.builderFor(table.schema()) + .withSpecId(3) + .add(2, 1002, "data_bucket", "bucket[6]") + .build(), table.spec()); + Assert.assertEquals("Should allocate a new field id", 1002, table.spec().lastAssignedFieldId()); + } + + @Test + public void testRenameAndRemoveField() { + table.updateSpec() + .renameField("data_bucket", "data_partition") + .removeField("data_bucket") + .addBucketField("data", 8) + .commit(); + + V1Assert.assertEquals("Should remove the renamed field", PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .alwaysNull("data", "1000__[removed]") + .bucket("data", 8) + .build(), table.spec()); + V2Assert.assertEquals("Should remove the renamed field", PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .add(2, 1001, "data_bucket", "bucket[8]") + .build(), table.spec()); + + table.updateSpec() + .addBucketField("data", 6) + .removeField("data_bucket") + .commit(); + + V1Assert.assertEquals("Should remove and then add a bucket field", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("data", "1000__[removed]") + .alwaysNull("data", "1001__[removed]") + .bucket("data", 6) + .build(), table.spec()); + V2Assert.assertEquals("Should remove and then add a bucket field", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .add(2, 1002, "data_bucket", "bucket[6]") + .build(), table.spec()); + Assert.assertEquals(1002, table.spec().lastAssignedFieldId()); + } + + @Test + public void testFieldIdEvolution() { + table.updateSpec() + .addBucketField("id", 8) + .commit(); + + PartitionSpec evolvedSpec = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .bucket("data", 16) + .bucket("id", 8) + .build(); + Assert.assertEquals("Should append a partition field to the spec", evolvedSpec, table.spec()); + Assert.assertEquals(1001, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .removeField("data_bucket") + .addBucketField("data", 8) + .commit(); + + V1Assert.assertEquals("Should add a new data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("data", "1000__[removed]") + .bucket("id", 8) + .bucket("data", 8) + .build(), table.spec()); + V2Assert.assertEquals("Should add a new data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .add(1, 1001, "id_bucket", "bucket[8]") + .add(2, 1002, "data_bucket", "bucket[8]") + .build(), table.spec()); + Assert.assertEquals(1002, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .removeField("data_bucket") + .addBucketField("data", 16, "data_partition") + .commit(); + + Assert.assertEquals( + "Should add back a removed data bucket, truncate the trailing removed fields, and reuse the spec id", + PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .bucket("data", 16, "data_partition") + .bucket("id", 8) + .build(), + table.spec()); + Assert.assertEquals("Should reuse the field id", 1001, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .removeField("id_bucket") + .renameField("data_partition", "data_bucket") + .commit(); + + Assert.assertEquals( + "Should rename an existing data bucket, truncate the trailing removed fields, and reuse the spec id", + PartitionSpec.builderFor(table.schema()) + .withSpecId(0) + .add(2, 1000, "data_bucket", "bucket[16]") + .build(), + table.spec()); + Assert.assertEquals("Should not add a new field id", 1000, table.spec().lastAssignedFieldId()); + + table.updateSpec() + .addTruncateField("id", 6) + .commit(); + + V1Assert.assertEquals("Should add a new data bucket and fill the gaps", PartitionSpec.builderFor(table.schema()) + .withSpecId(3) + .add(2, 1000, "data_bucket", "bucket[16]") + .add(1, 1001, "1001__[removed]", "void") + .add(2, 1002, "1002__[removed]", "void") + .add(1, 1003, "id_trunc", "truncate[6]") + .build(), table.spec()); + V2Assert.assertEquals("Should add a new data bucket", PartitionSpec.builderFor(table.schema()) + .withSpecId(3) + .add(2, 1000, "data_bucket", "bucket[16]") + .add(1, 1003, "id_trunc", "truncate[6]") + .build(), table.spec()); + Assert.assertEquals(1003, table.spec().lastAssignedFieldId()); + } +}