Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,26 @@ public BaseUpdatePartitionSpec addField(String name, Term term) {

PartitionField newField = new PartitionField(
sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
checkForRedundantAddedPartitions(newField);
if (newField.name() == null) {
String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
}

checkForRedundantAddedPartitions(newField);
transformToAddedField.put(validationKey, newField);
if (name != null) {
nameToAddedField.put(name, newField);

PartitionField existingField = nameToField.get(newField.name());
if (existingField != null) {
if (isVoidTransform(existingField)) {
// rename the old deleted field that is being replaced by the new field
renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId());
} else {
throw new IllegalArgumentException(String.format("Cannot add duplicate partition field name: %s", name));
}
}

nameToAddedField.put(newField.name(), newField);

adds.add(newField);

return this;
Expand Down Expand Up @@ -192,6 +205,12 @@ public BaseUpdatePartitionSpec removeField(Term term) {

@Override
public BaseUpdatePartitionSpec renameField(String name, String newName) {
PartitionField existingField = nameToField.get(newName);
if (existingField != null && isVoidTransform(existingField)) {
// rename the old deleted field that is being replaced by the new field
renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId());
}

PartitionField added = nameToAddedField.get(name);
Preconditions.checkArgument(added == null,
"Cannot rename newly added partition field: %s", name);
Expand Down Expand Up @@ -228,14 +247,7 @@ public PartitionSpec apply() {
}

for (PartitionField newField : adds) {
String partitionName;
if (newField.name() != null) {
partitionName = newField.name();
} else {
partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
}

builder.add(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
builder.add(newField.sourceId(), newField.fieldId(), newField.name(), newField.transform());
}

return builder.build();
Expand Down Expand Up @@ -287,13 +299,13 @@ private static Map<String, PartitionField> indexSpecByName(PartitionSpec spec) {
}

private static Map<Pair<Integer, String>, PartitionField> indexSpecByTransform(PartitionSpec spec) {
ImmutableMap.Builder<Pair<Integer, String>, PartitionField> builder = ImmutableMap.builder();
Map<Pair<Integer, String>, PartitionField> indexSpecs = Maps.newHashMap();
List<PartitionField> fields = spec.fields();
for (PartitionField field : fields) {
builder.put(Pair.of(field.sourceId(), field.transform().toString()), field);
indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field);
}

return builder.build();
return indexSpecs;
}

private boolean isTimeTransform(PartitionField field) {
Expand Down Expand Up @@ -352,6 +364,62 @@ public Boolean unknown(int fieldId, String sourceName, int sourceId, String tran
}
}

private boolean isVoidTransform(PartitionField field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this approach to detect void transforms.

return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE);
}

private static class IsVoidTransform implements PartitionSpecVisitor<Boolean> {
private static final IsVoidTransform INSTANCE = new IsVoidTransform();

private IsVoidTransform() {
}

@Override
public Boolean identity(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) {
return false;
}

@Override
public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) {
return false;
}

@Override
public Boolean year(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean month(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean day(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean hour(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) {
return true;
}

@Override
public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) {
return false;
}
}

private static class PartitionNameGenerator implements PartitionSpecVisitor<String> {
private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator();

Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,51 @@ public void testRenameAndDelete() {
.renameField("shard", "id_bucket"));
}

@Test
public void testRemoveAndAddMultiTimes() {
PartitionSpec addFirstTime = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED)
.addField("ts_date", day("ts"))
.apply();
PartitionSpec removeFirstTime = new BaseUpdatePartitionSpec(formatVersion, addFirstTime)
.removeField(day("ts"))
.apply();
PartitionSpec addSecondTime = new BaseUpdatePartitionSpec(formatVersion, removeFirstTime)
.addField("ts_date", day("ts"))
.apply();
PartitionSpec removeSecondTime = new BaseUpdatePartitionSpec(formatVersion, addSecondTime)
.removeField(day("ts"))
.apply();
PartitionSpec addThirdTime = new BaseUpdatePartitionSpec(formatVersion, removeSecondTime)
.addField(month("ts"))
.apply();
PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, addThirdTime)
.renameField("ts_month", "ts_date")
.apply();

if (formatVersion == 1) {
Assert.assertEquals("Should match expected spec field size", 3, updated.fields().size());

Assert.assertTrue("Should match expected field name",
updated.fields().get(0).name().matches("^ts_date(?:_\\d+)+$"));
Assert.assertTrue("Should match expected field name",
updated.fields().get(1).name().matches("^ts_date_(?:\\d+)+$"));
Assert.assertEquals("Should match expected field name", "ts_date", updated.fields().get(2).name());

Assert.assertEquals("Should match expected field transform", "void",
updated.fields().get(0).transform().toString());
Assert.assertEquals("Should match expected field transform", "void",
updated.fields().get(1).transform().toString());
Assert.assertEquals("Should match expected field transform", "month",
updated.fields().get(2).transform().toString());
}

PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA)
.month("ts", "ts_date")
.build();

V2Assert.assertEquals("Should match expected spec", v2Expected, updated);
}

private static int id(String name) {
return SCHEMA.findField(name).fieldId();
}
Expand Down