diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 4448189601f9..fc152fc142eb 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.transforms.UnknownTransform; import org.apache.iceberg.types.Type; @@ -333,12 +334,12 @@ private void checkAndAddPartitionName(String name) { checkAndAddPartitionName(name, null); } - private void checkAndAddPartitionName(String name, Integer identitySourceColumnId) { + private void checkAndAddPartitionName(String name, Integer sourceColumnId) { Types.NestedField schemaField = schema.findField(name); - if (identitySourceColumnId != null) { + if (sourceColumnId != null) { // for identity transform case we allow conflicts between partition and schema field name as // long as they are sourced from the same schema field - Preconditions.checkArgument(schemaField == null || schemaField.fieldId() == identitySourceColumnId, + Preconditions.checkArgument(schemaField == null || schemaField.fieldId() == sourceColumnId, "Cannot create identity partition sourced from different field in schema: %s", name); } else { // for all other transforms we don't allow conflicts between partition name and schema field name @@ -463,8 +464,8 @@ public Builder truncate(String sourceName, int width) { } public Builder alwaysNull(String sourceName, String targetName) { - checkAndAddPartitionName(targetName); Types.NestedField sourceColumn = findSourceColumn(sourceName); + checkAndAddPartitionName(targetName, sourceColumn.fieldId()); // can duplicate a source column name fields.add(new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.alwaysNull())); return this; } @@ -480,9 +481,13 @@ Builder add(int sourceId, String name, String transform) { Builder add(int sourceId, int fieldId, String name, String transform) { Types.NestedField column = schema.findField(sourceId); - checkAndAddPartitionName(name, column.fieldId()); Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); - fields.add(new PartitionField(sourceId, fieldId, name, Transforms.fromString(column.type(), transform))); + return add(sourceId, fieldId, name, Transforms.fromString(column.type(), transform)); + } + + Builder add(int sourceId, int fieldId, String name, Transform transform) { + checkAndAddPartitionName(name, sourceId); + fields.add(new PartitionField(sourceId, fieldId, name, transform)); lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); return this; } diff --git a/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java new file mode 100644 index 000000000000..d15c272e164c --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.expressions.Term; + +/** + * API for partition spec evolution. + *

+ * When committing, these changes will be applied to the current table metadata. Commit conflicts + * will not be resolved and will result in a {@link CommitFailedException}. + */ +public interface UpdatePartitionSpec extends PendingUpdate { + UpdatePartitionSpec addField(String sourceName); + + UpdatePartitionSpec addField(Term term); + + UpdatePartitionSpec addField(String name, Term term); + + UpdatePartitionSpec removeField(String name); + + UpdatePartitionSpec removeField(Term term); + + UpdatePartitionSpec renameField(String name, String newName); +} diff --git a/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java b/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java index cba2738fda15..1770a22a35c4 100644 --- a/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java +++ b/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java @@ -113,37 +113,44 @@ static List visit(PartitionSpec spec, PartitionSpecVisitor visitor) { * @deprecated this will be removed in 0.11.0; use {@link #visit(PartitionSpec, PartitionSpecVisitor)} instead. */ @Deprecated - @SuppressWarnings("checkstyle:CyclomaticComplexity") static List visit(Schema schema, PartitionSpec spec, PartitionSpecVisitor visitor) { List results = Lists.newArrayListWithExpectedSize(spec.fields().size()); for (PartitionField field : spec.fields()) { - String sourceName = schema.findColumnName(field.sourceId()); - Transform transform = field.transform(); - - if (transform instanceof Identity) { - results.add(visitor.identity(field.fieldId(), sourceName, field.sourceId())); - } else if (transform instanceof Bucket) { - int numBuckets = ((Bucket) transform).numBuckets(); - results.add(visitor.bucket(field.fieldId(), sourceName, field.sourceId(), numBuckets)); - } else if (transform instanceof Truncate) { - int width = ((Truncate) transform).width(); - results.add(visitor.truncate(field.fieldId(), sourceName, field.sourceId(), width)); - } else if (transform == Dates.YEAR || transform == Timestamps.YEAR) { - results.add(visitor.year(field.fieldId(), sourceName, field.sourceId())); - } else if (transform == Dates.MONTH || transform == Timestamps.MONTH) { - results.add(visitor.month(field.fieldId(), sourceName, field.sourceId())); - } else if (transform == Dates.DAY || transform == Timestamps.DAY) { - results.add(visitor.day(field.fieldId(), sourceName, field.sourceId())); - } else if (transform == Timestamps.HOUR) { - results.add(visitor.hour(field.fieldId(), sourceName, field.sourceId())); - } else if (transform instanceof VoidTransform) { - results.add(visitor.alwaysNull(field.fieldId(), sourceName, field.sourceId())); - } else if (transform instanceof UnknownTransform) { - results.add(visitor.unknown(field.fieldId(), sourceName, field.sourceId(), transform.toString())); - } + results.add(visit(schema, field, visitor)); } return results; } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + static R visit(Schema schema, PartitionField field, PartitionSpecVisitor visitor) { + String sourceName = schema.findColumnName(field.sourceId()); + Transform transform = field.transform(); + + if (transform instanceof Identity) { + return visitor.identity(field.fieldId(), sourceName, field.sourceId()); + } else if (transform instanceof Bucket) { + int numBuckets = ((Bucket) transform).numBuckets(); + return visitor.bucket(field.fieldId(), sourceName, field.sourceId(), numBuckets); + } else if (transform instanceof Truncate) { + int width = ((Truncate) transform).width(); + return visitor.truncate(field.fieldId(), sourceName, field.sourceId(), width); + } else if (transform == Dates.YEAR || transform == Timestamps.YEAR) { + return visitor.year(field.fieldId(), sourceName, field.sourceId()); + } else if (transform == Dates.MONTH || transform == Timestamps.MONTH) { + return visitor.month(field.fieldId(), sourceName, field.sourceId()); + } else if (transform == Dates.DAY || transform == Timestamps.DAY) { + return visitor.day(field.fieldId(), sourceName, field.sourceId()); + } else if (transform == Timestamps.HOUR) { + return visitor.hour(field.fieldId(), sourceName, field.sourceId()); + } else if (transform instanceof VoidTransform) { + return visitor.alwaysNull(field.fieldId(), sourceName, field.sourceId()); + } else if (transform instanceof UnknownTransform) { + return visitor.unknown(field.fieldId(), sourceName, field.sourceId(), transform.toString()); + } + + throw new UnsupportedOperationException( + String.format("Unknown transform class %s", field.transform().getClass().getName())); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java new file mode 100644 index 000000000000..efcf0894abb3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.BoundTerm; +import org.apache.iceberg.expressions.BoundTransform; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.util.Pair; + +class BaseUpdatePartitionSpec implements UpdatePartitionSpec { + private final TableOperations ops; + private final boolean caseSensitive; + private final TableMetadata base; + private final int formatVersion; + private final PartitionSpec spec; + private final Schema schema; + private final Map nameToField; + private final Map, PartitionField> transformToField; + + private final List adds = Lists.newArrayList(); + private final Map addedTimeFields = Maps.newHashMap(); + private final Map, PartitionField> transformToAddedField = Maps.newHashMap(); + private final Map nameToAddedField = Maps.newHashMap(); + private final Set deletes = Sets.newHashSet(); + private final Map renames = Maps.newHashMap(); + + private int lastAssignedPartitionId; + + BaseUpdatePartitionSpec(TableOperations ops, boolean caseSensitive) { + this.ops = ops; + this.caseSensitive = caseSensitive; + this.base = ops.current(); + this.formatVersion = base.formatVersion(); + this.spec = base.spec(); + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = spec.fields().stream().mapToInt(PartitionField::fieldId).max().orElse(999); + + spec.fields().stream() + .filter(field -> field.transform() instanceof UnknownTransform) + .findAny() + .ifPresent(field -> { + throw new IllegalArgumentException("Cannot update partition spec with unknown transform: " + field); + }); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { + this(formatVersion, spec, spec.fields().stream().mapToInt(PartitionField::fieldId).max().orElse(999)); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { + this.ops = null; + this.base = null; + this.formatVersion = formatVersion; + this.caseSensitive = true; + this.spec = spec; + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = lastAssignedPartitionId; + } + + private int assignFieldId() { + this.lastAssignedPartitionId += 1; + return lastAssignedPartitionId; + } + + @Override + public BaseUpdatePartitionSpec addField(String sourceName) { + return addField(Expressions.ref(sourceName)); + } + + @Override + public BaseUpdatePartitionSpec addField(Term term) { + return addField(null, term); + } + + @Override + public BaseUpdatePartitionSpec addField(String name, Term term) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument(alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded); + + Pair> sourceTransform = resolve(term); + Pair validationKey = Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField existing = transformToField.get(validationKey); + Preconditions.checkArgument(existing == null, + "Cannot add duplicate partition field %s=%s, conflicts with %s", name, term, existing); + + PartitionField added = transformToAddedField.get(validationKey); + Preconditions.checkArgument(added == null, + "Cannot add duplicate partition field %s=%s, already added: %s", name, term, added); + + PartitionField newField = new PartitionField( + sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + checkForRedundantAddedPartitions(newField); + + transformToAddedField.put(validationKey, newField); + if (name != null) { + nameToAddedField.put(name, newField); + } + + adds.add(newField); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(String name) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument(alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded); + + Preconditions.checkArgument(renames.get(name) == null, + "Cannot rename and delete partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, + "Cannot find partition field to remove: %s", name); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(Term term) { + Pair> sourceTransform = resolve(term); + Pair key = Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField added = transformToAddedField.get(key); + Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added); + + PartitionField field = transformToField.get(key); + Preconditions.checkArgument(field != null, + "Cannot find partition field to remove: %s", term); + Preconditions.checkArgument(renames.get(field.name()) == null, + "Cannot rename and delete partition field: %s", field.name()); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec renameField(String name, String newName) { + PartitionField added = nameToAddedField.get(name); + Preconditions.checkArgument(added == null, + "Cannot rename newly added partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, + "Cannot find partition field to rename: %s", name); + Preconditions.checkArgument(!deletes.contains(field.fieldId()), + "Cannot delete and rename partition field: %s", name); + + renames.put(name, newName); + + return this; + } + + @Override + public PartitionSpec apply() { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + for (PartitionField field : spec.fields()) { + if (!deletes.contains(field.fieldId())) { + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, field.transform()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); + } + } else if (formatVersion < 2) { + // field IDs were not required for v1 and were assigned sequentially in each partition spec starting at 1,000. + // to maintain consistent field ids across partition specs in v1 tables, any partition field that is removed + // must be replaced with a null transform. null values are always allowed in partition data. + builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); + } + } + + for (PartitionField newField : adds) { + String partitionName; + if (newField.name() != null) { + partitionName = newField.name(); + } else { + partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); + } + + builder.add(newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); + } + + return builder.build(); + } + + @Override + public void commit() { + TableMetadata update = base.updatePartitionSpec(apply()); + ops.commit(base, update); + } + + private Pair> resolve(Term term) { + Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); + + BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), caseSensitive); + int sourceId = boundTerm.ref().fieldId(); + Transform transform = toTransform(boundTerm); + + return Pair.of(sourceId, transform); + } + + private Transform toTransform(BoundTerm term) { + if (term instanceof BoundReference) { + return Transforms.identity(term.type()); + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform(); + } else { + throw new ValidationException("Invalid term: %s, expected either a bound reference or transform", term); + } + } + + private void checkForRedundantAddedPartitions(PartitionField field) { + if (isTimeTransform(field)) { + PartitionField timeField = addedTimeFields.get(field.sourceId()); + Preconditions.checkArgument(timeField == null, + "Cannot add redundant partition field: %s conflicts with %s", timeField, field); + addedTimeFields.put(field.sourceId(), field); + } + } + + private static Map indexSpecByName(PartitionSpec spec) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + List fields = spec.fields(); + for (PartitionField field : fields) { + builder.put(field.name(), field); + } + + return builder.build(); + } + + private static Map, PartitionField> indexSpecByTransform(PartitionSpec spec) { + ImmutableMap.Builder, PartitionField> builder = ImmutableMap.builder(); + List fields = spec.fields(); + for (PartitionField field : fields) { + builder.put(Pair.of(field.sourceId(), field.transform().toString()), field); + } + + return builder.build(); + } + + private boolean isTimeTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); + } + + private static class IsTimeTransform implements PartitionSpecVisitor { + private static final IsTimeTransform INSTANCE = new IsTimeTransform(); + + private IsTimeTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private static class PartitionNameGenerator implements PartitionSpecVisitor { + private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator(); + + private PartitionNameGenerator() { + } + + @Override + public String identity(int fieldId, String sourceName, int sourceId) { + return sourceName; + } + + @Override + public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return sourceName + "_bucket_" + numBuckets; + } + + @Override + public String truncate(int fieldId, String sourceName, int sourceId, int width) { + return sourceName + "_trunc_" + width; + } + + @Override + public String year(int fieldId, String sourceName, int sourceId) { + return sourceName + "_year"; + } + + @Override + public String month(int fieldId, String sourceName, int sourceId) { + return sourceName + "_month"; + } + + @Override + public String day(int fieldId, String sourceName, int sourceId) { + return sourceName + "_day"; + } + + @Override + public String hour(int fieldId, String sourceName, int sourceId) { + return sourceName + "_hour"; + } + + @Override + public String alwaysNull(int fieldId, String sourceName, int sourceId) { + return sourceName + "_null"; + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestUpdatePartitionSpec.java b/core/src/test/java/org/apache/iceberg/TestUpdatePartitionSpec.java new file mode 100644 index 000000000000..1c0fc76534a2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestUpdatePartitionSpec.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.expressions.Expressions.bucket; +import static org.apache.iceberg.expressions.Expressions.day; +import static org.apache.iceberg.expressions.Expressions.hour; +import static org.apache.iceberg.expressions.Expressions.month; +import static org.apache.iceberg.expressions.Expressions.ref; +import static org.apache.iceberg.expressions.Expressions.truncate; +import static org.apache.iceberg.expressions.Expressions.year; + +@RunWith(Parameterized.class) +public class TestUpdatePartitionSpec extends TableTestBase { + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.TimestampType.withZone()), + Types.NestedField.required(3, "category", Types.StringType.get()), + Types.NestedField.optional(4, "data", Types.StringType.get()) + ); + + private static final PartitionSpec UNPARTITIONED = PartitionSpec.builderFor(SCHEMA).build(); + private static final PartitionSpec PARTITIONED = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .bucket("id", 16, "shard") + .build(); + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + public TestUpdatePartitionSpec(int formatVersion) { + super(formatVersion); + } + + @Test + public void testAddIdentityByName() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField("category") + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddIdentityByTerm() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(ref("category")) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddYear() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(year("ts")) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .year("ts") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddMonth() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(month("ts")) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .month("ts") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddDay() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(day("ts")) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .day("ts") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddHour() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(hour("ts")) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .hour("ts") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddBucket() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(bucket("id", 16)) + .apply(); + + // added fields have different default names to avoid conflicts + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .bucket("id", 16, "id_bucket_16") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddTruncate() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(truncate("data", 4)) + .apply(); + + // added fields have different default names to avoid conflicts + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .truncate("data", 4, "data_trunc_4") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddNamedPartition() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField("shard", bucket("id", 16)) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .bucket("id", 16, "shard") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddToExisting() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField(truncate("data", 4)) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .bucket("id", 16, "shard") + .truncate("data", 4, "data_trunc_4") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testMultipleAdds() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField("category") + .addField(day("ts")) + .addField("shard", bucket("id", 16)) + .addField("prefix", truncate("data", 4)) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .bucket("id", 16, "shard") + .truncate("data", 4, "prefix") + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testAddHourToDay() { + // multiple partitions for the same source with different time granularity is not allowed by the builder, but is + // allowed when updating a spec so that existing columns in metadata continue to work. + PartitionSpec byDay = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(day("ts")) + .apply(); + + PartitionSpec byHour = new BaseUpdatePartitionSpec(formatVersion, byDay) + .addField(hour("ts")) + .apply(); + + Assert.assertEquals("Should have a day and an hour time field", + ImmutableList.of( + new PartitionField(2, 1000, "ts_day", Transforms.day(Types.TimestampType.withZone())), + new PartitionField(2, 1001, "ts_hour", Transforms.hour(Types.TimestampType.withZone()))), + byHour.fields()); + } + + @Test + public void testAddMultipleBuckets() { + PartitionSpec bucket16 = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(bucket("id", 16)) + .apply(); + + PartitionSpec bucket8 = new BaseUpdatePartitionSpec(formatVersion, bucket16) + .addField(bucket("id", 8)) + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .bucket("id", 16, "id_bucket_16") + .bucket("id", 8, "id_bucket_8") + .build(); + + Assert.assertEquals("Should have a day and an hour time field", expected, bucket8); + } + + @Test + public void testRemoveIdentityByName() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField("category") + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .alwaysNull("category", "category") + .day("ts") + .bucket("id", 16, "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .add(id("ts"), 1001, "ts_day", Transforms.day(Types.TimestampType.withZone())) + .add(id("id"), 1002, "shard", Transforms.bucket(Types.LongType.get(), 16)) + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRemoveBucketByName() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField("shard") + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .alwaysNull("id", "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .add(id("category"), 1000, "category", Transforms.identity(Types.StringType.get())) + .add(id("ts"), 1001, "ts_day", Transforms.day(Types.TimestampType.withZone())) + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRemoveIdentityByEquivalent() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField(ref("category")) + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .alwaysNull("category", "category") + .day("ts") + .bucket("id", 16, "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .add(id("ts"), 1001, "ts_day", Transforms.day(Types.TimestampType.withZone())) + .add(id("id"), 1002, "shard", Transforms.bucket(Types.LongType.get(), 16)) + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRemoveDayByEquivalent() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField(day("ts")) + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .alwaysNull("ts", "ts_day") + .bucket("id", 16, "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .add(id("category"), 1000, "category", Transforms.identity(Types.StringType.get())) + .add(id("id"), 1002, "shard", Transforms.bucket(Types.LongType.get(), 16)) + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRemoveBucketByEquivalent() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField(bucket("id", 16)) + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .alwaysNull("id", "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRename() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .renameField("shard", "id_bucket") // rename back to default + .apply(); + + PartitionSpec expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .bucket("id", 16) + .build(); + + Assert.assertEquals("Should match expected spec", expected, updated); + } + + @Test + public void testMultipleChanges() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .renameField("shard", "id_bucket") // rename back to default + .removeField(day("ts")) + .addField("prefix", truncate("data", 4)) + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .alwaysNull("ts", "ts_day") + .bucket("id", 16) + .truncate("data", 4, "prefix") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .add(id("category"), 1000, "category", Transforms.identity(Types.StringType.get())) + .add(id("id"), 1002, "id_bucket", Transforms.bucket(Types.LongType.get(), 16)) + .add(id("data"), 1003, "prefix", Transforms.truncate(Types.StringType.get(), 4)) + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testAddDeletedName() { + PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField(bucket("id", 16)) + .apply(); + + PartitionSpec v1Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .alwaysNull("id", "shard") + .build(); + + V1Assert.assertEquals("Should match expected spec", v1Expected, updated); + + PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .day("ts") + .build(); + + V2Assert.assertEquals("Should match expected spec", v2Expected, updated); + } + + @Test + public void testRemoveNewlyAddedFieldByName() { + AssertHelpers.assertThrows("Should fail trying to remove unknown field", + IllegalArgumentException.class, "Cannot delete newly added field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("prefix", truncate("data", 4)) + .removeField("prefix") + ); + } + + @Test + public void testRemoveNewlyAddedFieldByTransform() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot delete newly added field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("prefix", truncate("data", 4)) + .removeField(truncate("data", 4))); + } + + @Test + public void testAddAlreadyAddedFieldByTransform() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("prefix", truncate("data", 4)) + .addField(truncate("data", 4))); + } + + @Test + public void testAddAlreadyAddedFieldByName() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("prefix", truncate("data", 4)) + .addField("prefix", truncate("data", 6))); + } + + @Test + public void testAddRedundantTimePartition() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add redundant partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED) + .addField(day("ts")) + .addField(hour("ts"))); // conflicts with hour + + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add redundant partition", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField(hour("ts")) // does not conflict with day because day already exists + .addField(month("ts"))); // conflicts with hour + } + + @Test + public void testAddDeletedField() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField("shard") + .addField(bucket("id", 16))); // duplicates shard + } + + @Test + public void testAddDuplicateByName() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("category")); + } + + @Test + public void testAddDuplicateByRef() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField(ref("category"))); + } + + @Test + public void testAddDuplicateTransform() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField(bucket("id", 16))); + } + + @Test + public void testAddNamedDuplicate() { + AssertHelpers.assertThrows("Should fail adding a duplicate field", + IllegalArgumentException.class, "Cannot add duplicate partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("b16", bucket("id", 16))); + } + + @Test + public void testRemoveUnknownFieldByName() { + AssertHelpers.assertThrows("Should fail trying to remove unknown field", + IllegalArgumentException.class, "Cannot find partition field to remove", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED).removeField("moon") + ); + } + + @Test + public void testRemoveUnknownFieldByEquivalent() { + AssertHelpers.assertThrows("Should fail trying to remove unknown field", + IllegalArgumentException.class, "Cannot find partition field to remove", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED).removeField(hour("ts")) // day(ts) exists + ); + } + + @Test + public void testRenameUnknownField() { + AssertHelpers.assertThrows("Should fail trying to rename an unknown field", + IllegalArgumentException.class, "Cannot find partition field to rename", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED).renameField("shake", "seal") + ); + } + + @Test + public void testRenameAfterAdd() { + AssertHelpers.assertThrows("Should fail trying to rename an added field", + IllegalArgumentException.class, "Cannot rename newly added partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .addField("data_trunc", truncate("data", 4)) + .renameField("data_trunc", "prefix") + ); + } + + @Test + public void testDeleteAndRename() { + AssertHelpers.assertThrows("Should fail trying to rename a deleted field", + IllegalArgumentException.class, "Cannot rename and delete partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .renameField("shard", "id_bucket") + .removeField(bucket("id", 16))); + } + + @Test + public void testRenameAndDelete() { + AssertHelpers.assertThrows("Should fail trying to delete a renamed field", + IllegalArgumentException.class, "Cannot delete and rename partition field", + () -> new BaseUpdatePartitionSpec(formatVersion, PARTITIONED) + .removeField(bucket("id", 16)) + .renameField("shard", "id_bucket")); + } + + private static int id(String name) { + return SCHEMA.findField(name).fieldId(); + } +}