Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand All @@ -40,7 +41,9 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;

/**
Expand Down Expand Up @@ -704,6 +707,57 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema())
.withSpecId(partitionSpec.specId());

if (formatVersion > 1) {
// for v2 and later, reuse any existing field IDs, but reproduce the same spec
Map<Pair<Integer, String>, Integer> transformToFieldId = specs.stream()
.flatMap(spec -> spec.fields().stream())
.collect(Collectors.toMap(
field -> Pair.of(field.sourceId(), field.transform().toString()),
PartitionField::fieldId,
Math::max));

for (PartitionField field : partitionSpec.fields()) {
// reassign the partition field ids
int partitionFieldId = transformToFieldId.computeIfAbsent(
Pair.of(field.sourceId(), field.transform().toString()), k -> nextID.get());
specBuilder.add(
field.sourceId(),
partitionFieldId,
field.name(),
field.transform());
}

} else {
// for v1, preserve the existing spec and carry forward all fields, replacing missing fields with void
Map<Pair<Integer, String>, PartitionField> newFields = Maps.newLinkedHashMap();
for (PartitionField newField : partitionSpec.fields()) {
newFields.put(Pair.of(newField.sourceId(), newField.transform().toString()), newField);
}

for (PartitionField field : spec().fields()) {
// ensure each field is either carried forward or replaced with void
PartitionField newField = newFields.remove(Pair.of(field.sourceId(), field.transform().toString()));
if (newField != null) {
// copy the new field with the existing field ID
specBuilder.add(newField.sourceId(), field.fieldId(), newField.name(), newField.transform());
} else {
specBuilder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull());
}
}

// add any remaining new fields at the end and assign new partition field IDs
for (PartitionField newField : newFields.values()) {
specBuilder.add(newField.sourceId(), nextID.get(), newField.name(), newField.transform());
}
}

return specBuilder.build();
}

// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
SortOrder updatedSortOrder, String newLocation,
Expand All @@ -721,16 +775,20 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
// rebuild the partition spec using the new column ids
PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, updatedPartitionSpec);

// reassign partition field ids with existing partition specs in the table
AtomicInteger lastPartitionId = new AtomicInteger(lastAssignedPartitionId);
PartitionSpec newSpec = reassignPartitionIds(freshSpec, lastPartitionId::incrementAndGet);

// if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID.
int specId = specs.stream()
.filter(freshSpec::compatibleWith)
.filter(newSpec::compatibleWith)
.findFirst()
.map(PartitionSpec::specId)
.orElse(nextSpecId);

ImmutableList.Builder<PartitionSpec> specListBuilder = ImmutableList.<PartitionSpec>builder().addAll(specs);
if (!specsById.containsKey(specId)) {
specListBuilder.add(freshSpec);
specListBuilder.add(newSpec);
}

// determine the next order id
Expand Down Expand Up @@ -766,7 +824,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update

return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(),
specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()),
specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()),
orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
}
Expand Down Expand Up @@ -826,7 +884,7 @@ private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec parti

// add all of the fields to the builder. IDs should not change.
for (PartitionField field : partitionSpec.fields()) {
specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform().toString());
specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform());
}

return specBuilder.build();
Expand Down
44 changes: 38 additions & 6 deletions core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -79,8 +80,18 @@ public void testReplaceTransactionWithCustomSortOrder() {
Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot());
Assert.assertEquals("Schema should match previous schema",
schema.asStruct(), table.schema().asStruct());
Assert.assertEquals("Partition spec should have no fields",
0, table.spec().fields().size());

PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build();
V2Assert.assertEquals("Table should have an unpartitioned spec",
v2Expected, table.spec());

PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
V1Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());

Assert.assertEquals("Table should have 2 orders", 2, table.sortOrders().size());
SortOrder sortOrder = table.sortOrder();
Assert.assertEquals("Order ID must match", 1, sortOrder.orderId());
Expand Down Expand Up @@ -117,15 +128,27 @@ public void testReplaceTransaction() {
Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot());
Assert.assertEquals("Schema should match previous schema",
schema.asStruct(), table.schema().asStruct());
Assert.assertEquals("Partition spec should have no fields",
0, table.spec().fields().size());

PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build();
V2Assert.assertEquals("Table should have an unpartitioned spec",
v2Expected, table.spec());

PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
V1Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());

Assert.assertEquals("Table should have 1 order", 1, table.sortOrders().size());
Assert.assertEquals("Table order ID should match", 0, table.sortOrder().orderId());
Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted());
}

@Test
public void testReplaceWithIncompatibleSchemaUpdate() {
Assume.assumeTrue("Fails early for v1 tables because partition spec cannot drop a field", formatVersion == 2);

Schema newSchema = new Schema(
required(4, "obj_id", Types.IntegerType.get()));

Expand Down Expand Up @@ -175,8 +198,17 @@ public void testReplaceWithNewPartitionSpec() {
Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot());
Assert.assertEquals("Schema should use new schema, not compatible with previous",
schema.asStruct(), table.schema().asStruct());
Assert.assertEquals("Table should have new unpartitioned spec",
0, table.spec().fields().size());

PartitionSpec v2Expected = PartitionSpec.builderFor(table.schema()).withSpecId(1).build();
V2Assert.assertEquals("Table should have an unpartitioned spec",
v2Expected, table.spec());

PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
V1Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());
}

@Test
Expand Down
70 changes: 70 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,76 @@ public void testInvalidUpdatePartitionSpecForV1Table() throws Exception {
() -> metadata.updatePartitionSpec(spec));
}

@Test
public void testBuildReplacementForV1Table() {
Schema schema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "y", Types.LongType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0)
.identity("x")
.identity("y")
.build();
String location = "file://tmp/db/table";
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 1);
Assert.assertEquals(spec, metadata.spec());

Schema updatedSchema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "z", Types.StringType.get()),
Types.NestedField.required(3, "y", Types.LongType.get())
);
PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0)
.bucket("z", 8)
.identity("x")
.build();
TableMetadata updated = metadata.buildReplacement(
updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of());
PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
.add(1, 1000, "x", "identity")
.add(2, 1001, "y", "void")
.add(3, 1002, "z_bucket", "bucket[8]")
.build();
Assert.assertEquals(
"Should reassign the partition field IDs and reuse any existing IDs for equivalent fields",
expected, updated.spec());
}

@Test
public void testBuildReplacementForV2Table() {
Schema schema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "y", Types.LongType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(0)
.identity("x")
.identity("y")
.build();
String location = "file://tmp/db/table";
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, SortOrder.unsorted(), location, ImmutableMap.of(), 2);
Assert.assertEquals(spec, metadata.spec());

Schema updatedSchema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "z", Types.StringType.get())
);
PartitionSpec updatedSpec = PartitionSpec.builderFor(updatedSchema).withSpecId(0)
.bucket("z", 8)
.identity("x")
.build();
TableMetadata updated = metadata.buildReplacement(
updatedSchema, updatedSpec, SortOrder.unsorted(), location, ImmutableMap.of());
PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
.add(3, 1002, "z_bucket", "bucket[8]")
.add(1, 1000, "x", "identity")
.build();
Assert.assertEquals(
"Should reassign the partition field IDs and reuse any existing IDs for equivalent fields",
expected, updated.spec());
}

@Test
public void testSortOrder() {
Schema schema = new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ public void testReplaceTxnBuilder() throws Exception {

table = catalog.loadTable(tableIdent);
Assert.assertNull(table.currentSnapshot());
Assert.assertTrue(table.spec().isUnpartitioned());
PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ public void testReplaceTxnBuilder() {

table = catalog.loadTable(tableIdent);
Assert.assertNull(table.currentSnapshot());
Assert.assertTrue(table.spec().isUnpartitioned());
PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ public void testReplaceTableTxn() {
txn.commitTransaction();

Table table = catalog.loadTable(TABLE_IDENTIFIER);
Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size());
PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("id", "id")
.withSpecId(1)
.build();
Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());
}

@Test
Expand Down Expand Up @@ -233,7 +238,12 @@ public void testCreateOrReplaceTableTxnTableExists() {
txn.commitTransaction();

Table table = catalog.loadTable(TABLE_IDENTIFIER);
Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size());
PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("id", "id")
.withSpecId(1)
.build();
Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,13 @@ public void testReplaceTxnBuilder() throws Exception {
table = catalog.loadTable(tableIdent);
Assert.assertEquals(newLocation, table.location());
Assert.assertNull(table.currentSnapshot());
Assert.assertTrue(table.spec().isUnpartitioned());
PartitionSpec v1Expected = PartitionSpec.builderFor(table.schema())
.alwaysNull("data", "data_bucket")
.withSpecId(1)
.build();
Assert.assertEquals("Table should have a spec with one void field",
v1Expected, table.spec());

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
} finally {
Expand Down