Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 91 additions & 15 deletions core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.BoundTerm;
Expand Down Expand Up @@ -50,7 +51,7 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
private final Map<String, PartitionField> nameToField;
private final Map<Pair<Integer, String>, PartitionField> transformToField;

private final List<PartitionField> adds = Lists.newArrayList();
private final List<Pair<PartitionField, String>> adds = Lists.newArrayList();
private final Map<Integer, PartitionField> addedTimeFields = Maps.newHashMap();
private final Map<Pair<Integer, String>, PartitionField> transformToAddedField = Maps.newHashMap();
private final Map<String, PartitionField> nameToAddedField = Maps.newHashMap();
Expand Down Expand Up @@ -145,11 +146,29 @@ public BaseUpdatePartitionSpec addField(String name, Term term) {
checkForRedundantAddedPartitions(newField);

transformToAddedField.put(validationKey, newField);
if (name != null) {
nameToAddedField.put(name, newField);

String partitionName;
if (newField.name() != null) {
partitionName = newField.name();
} else {
partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
}

adds.add(newField);
PartitionField existingField = nameToField.get(partitionName);
if (existingField != null) {
if (isVoidTransform(existingField)) {
// rename the old deleted field that is being replaced by the new field
renameField(existingField.name(), existingField.name() + "_" + UUID.randomUUID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well, I'd prefer not to use a UUID. This should be able to use the existing field's ID instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the existing field's ID instead.

} else {
throw new IllegalArgumentException(String.format("Cannot add duplicate partition field name: %s", name));
}
}

if (partitionName != null) {
nameToAddedField.put(partitionName, newField);
}

adds.add(Pair.of(newField, partitionName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create a new PartitionField with its name set to partitionName? Then you wouldn't need to change as much, including the type of adds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If name is null, we generate default name for it at the end of apply() method. We need all name of added field before detect the conflict. So i change the type of adds to save generated name , avoid generate it twice.
if i don't change the type of adds, i think there are four other options:

  1. just generate name twice
  2. add a set method setName(String name) to PartitionField
  3. generate newField again with generated name: just like
if (newField.name() == null) {
    String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
    newField = new PartitionField(
        newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
}

adds.add(newField);
  1. write a new static visit method in PartitionSpecVisitor, the signature is like:
static <R> R visit(Schema schema, int sourceId, Transform<?, ?> transform, PartitionSpecVisitor<R> visitor)

Which do you think is better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PartitionField is created in this method, as is partitionName. I think that by reordering a couple of statements here, you can use the correct name on the field from the start and avoid more changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regenerate PartitionField with partitionName if name is null.


return this;
}
Expand Down Expand Up @@ -192,6 +211,12 @@ public BaseUpdatePartitionSpec removeField(Term term) {

@Override
public BaseUpdatePartitionSpec renameField(String name, String newName) {
PartitionField existingField = nameToField.get(newName);
if (existingField != null && isVoidTransform(existingField)) {
// rename the old deleted field that is being replaced by the new field
renameField(existingField.name(), existingField.name() + "_" + UUID.randomUUID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use UUID.randomUUID() instead of the partition field ID? I think it makes more sense to use existingField.fieldId().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to existingField.fieldId()

}

PartitionField added = nameToAddedField.get(name);
Preconditions.checkArgument(added == null,
"Cannot rename newly added partition field: %s", name);
Expand Down Expand Up @@ -227,14 +252,9 @@ public PartitionSpec apply() {
}
}

for (PartitionField newField : adds) {
String partitionName;
if (newField.name() != null) {
partitionName = newField.name();
} else {
partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
}

for (Pair<PartitionField, String> newFieldAndNamePair : adds) {
PartitionField newField = newFieldAndNamePair.first();
String partitionName = newFieldAndNamePair.second();
builder.add(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
}

Expand Down Expand Up @@ -287,13 +307,13 @@ private static Map<String, PartitionField> indexSpecByName(PartitionSpec spec) {
}

private static Map<Pair<Integer, String>, PartitionField> indexSpecByTransform(PartitionSpec spec) {
ImmutableMap.Builder<Pair<Integer, String>, PartitionField> builder = ImmutableMap.builder();
Map<Pair<Integer, String>, PartitionField> indexSpecs = Maps.newHashMap();
List<PartitionField> fields = spec.fields();
for (PartitionField field : fields) {
builder.put(Pair.of(field.sourceId(), field.transform().toString()), field);
indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field);
}

return builder.build();
return indexSpecs;
}

private boolean isTimeTransform(PartitionField field) {
Expand Down Expand Up @@ -352,6 +372,62 @@ public Boolean unknown(int fieldId, String sourceName, int sourceId, String tran
}
}

private boolean isVoidTransform(PartitionField field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this approach to detect void transforms.

return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE);
}

private static class IsVoidTransform implements PartitionSpecVisitor<Boolean> {
private static final IsVoidTransform INSTANCE = new IsVoidTransform();

private IsVoidTransform() {
}

@Override
public Boolean identity(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) {
return false;
}

@Override
public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) {
return false;
}

@Override
public Boolean year(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean month(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean day(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean hour(int fieldId, String sourceName, int sourceId) {
return false;
}

@Override
public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) {
return true;
}

@Override
public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) {
return false;
}
}

private static class PartitionNameGenerator implements PartitionSpecVisitor<String> {
private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator();

Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,51 @@ public void testRenameAndDelete() {
.renameField("shard", "id_bucket"));
}

@Test
public void testRemoveAndAddMultiTimes() {
PartitionSpec addFirstTime = new BaseUpdatePartitionSpec(formatVersion, UNPARTITIONED)
.addField("ts_date", day("ts"))
.apply();
PartitionSpec removeFirstTime = new BaseUpdatePartitionSpec(formatVersion, addFirstTime)
.removeField(day("ts"))
.apply();
PartitionSpec addSecondTime = new BaseUpdatePartitionSpec(formatVersion, removeFirstTime)
.addField("ts_date", day("ts"))
.apply();
PartitionSpec removeSecondTime = new BaseUpdatePartitionSpec(formatVersion, addSecondTime)
.removeField(day("ts"))
.apply();
PartitionSpec addThirdTime = new BaseUpdatePartitionSpec(formatVersion, removeSecondTime)
.addField(month("ts"))
.apply();
PartitionSpec updated = new BaseUpdatePartitionSpec(formatVersion, addThirdTime)
.renameField("ts_month", "ts_date")
.apply();

if (formatVersion == 1) {
Assert.assertEquals("Should match expected spec field size", 3, updated.fields().size());

Assert.assertTrue("Should match expected field name",
updated.fields().get(0).name().matches("^ts_date(?:_\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12})+$"));
Assert.assertTrue("Should match expected field name",
updated.fields().get(1).name().matches("^ts_date_(?:\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12})+$"));
Assert.assertEquals("Should match expected field name", "ts_date", updated.fields().get(2).name());

Assert.assertEquals("Should match expected field transform", "void",
updated.fields().get(0).transform().toString());
Assert.assertEquals("Should match expected field transform", "void",
updated.fields().get(1).transform().toString());
Assert.assertEquals("Should match expected field transform", "month",
updated.fields().get(2).transform().toString());
}

PartitionSpec v2Expected = PartitionSpec.builderFor(SCHEMA)
.month("ts", "ts_date")
.build();

V2Assert.assertEquals("Should match expected spec", v2Expected, updated);
}

private static int id(String name) {
return SCHEMA.findField(name).fieldId();
}
Expand Down