Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand All @@ -41,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;

/**
Expand Down Expand Up @@ -703,6 +705,35 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
if (formatVersion > 1) {
Map<Pair<Integer, String>, Integer> transformToFieldId = specs.stream()
.flatMap(spec -> spec.fields().stream())
.collect(Collectors.toMap(
field -> Pair.of(field.sourceId(), field.transform().toString()),
PartitionField::fieldId,
(n1, n2) -> n2));

PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema())
.withSpecId(partitionSpec.specId());

for (PartitionField field : partitionSpec.fields()) {
// reassign the partition field ids
int fieldId = transformToFieldId.computeIfAbsent(
Pair.of(field.sourceId(), field.transform().toString()), k -> nextID.get());
specBuilder.add(
field.sourceId(),
fieldId,
field.name(),
field.transform());
}
return specBuilder.build();
} else {
// noop for v1 table
return partitionSpec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the algorithm for v1 tables should be to find out what matches previous specs and re-use those. Then if there is a new field it is moved to the end, and if there is an old field that is unmatched it gets replaced with a void transform. Otherwise, this will create partition field ID conflicts in v1 tables.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #2284 (comment), the issue to replace old one using void transform is that the table schema might be changed and cannot create void transform if the old source field is removed.
@rdblue do you think we should use a dummy source field in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jun-he, you can't drop a column if it is referenced by the partition spec. I don't think we could get into the case you mentioned because of that limitation. When this method runs, the column was referenced by the previous version of the field. So we know that the column exists. After this method runs, the void partition should prevent the column from being removed. Am I missing a case where the column can be dropped but still referenced by a void transform?

}
}

// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
SortOrder updatedSortOrder, String newLocation,
Expand All @@ -718,7 +749,11 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
int nextSpecId = maxSpecId.orElse(TableMetadata.INITIAL_SPEC_ID) + 1;

// rebuild the partition spec using the new column ids
PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec);
PartitionSpec newSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec);

// reassign partition field ids with existing partition specs in the table
AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId);
PartitionSpec freshSpec = reassignPartitionIds(newSpec, lastPartitionId::incrementAndGet);

// if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
int specId = specs.stream()
Expand Down
68 changes: 68 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,74 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception {
() -> metadata.updatePartitionSpec(spec));
}

@Test
public void testBuildReplacementForV1Table() {
Schema schema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "y", Types.LongType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0)
.identity("x")
.identity("y")
.build();
String location = "file://tmp/db/table";
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 1);
Assert.assertEquals(spec, metadata.spec());

Schema updatedSchema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "z", Types.StringType.get())
);
PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0)
.bucket("z", 8)
.identity("x")
.build();
TableMetadata updated = metadata.buildReplacement(
updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of());
PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
.add(3, 1000, "z_bucket", "bucket[8]")
.add(1, 1001, "x", "identity")
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this needs to be this:

    PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
        .add(1, 1000, "x", "identity")
        .add(2, 1001, "y", "void")
        .add(3, 1002, "z_bucket", "bucket[8]")
        .build();

We should be able to create that by updating the existing spec.

Copy link
Collaborator Author

@jun-he jun-he May 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is for v2 table, which does not need void transform partition field for removed column.
This .add(2, 1001, "y", "void") seems not working with the error fo Cannot find source column: 2 as no field y in the updated schema.
This also means v1 table cannot use void to fill the field id gap in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name is testBuildReplacementForV1Table, which is why I thought that the changes should be only v1 changes.

Assert.assertEquals(
"Should reassign the partition field IDs and reuse any existing IDs for equivalent fields",
expected, updated.spec());
}

@Test
public void testBuildReplacementForV2Table() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Schema schema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "y", Types.LongType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0)
.identity("x")
.identity("y")
.build();
String location = "file://tmp/db/table";
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 2);
Assert.assertEquals(spec, metadata.spec());

Schema updatedSchema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "z", Types.StringType.get())
);
PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0)
.bucket("z", 8)
.identity("x")
.build();
TableMetadata updated = metadata.buildReplacement(
updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of());
PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
.add(3, 1002, "z_bucket", "bucket[8]")
.add(1, 1000, "x", "identity")
.build();
Assert.assertEquals(
"Should reassign the partition field IDs and reuse any existing IDs for equivalent fields",
expected, updated.spec());
}

@Test
public void testSortOrder() {
Schema schema = new Schema(
Expand Down