Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,35 @@ private int assignFieldId() {
return lastAssignedPartitionId;
}

/**
* In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field
* ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField.
* @param sourceTransform pair of source ID and transform for this PartitionField addition
* @param name target partition field name, if specified
* @return the recycled or newly created partition field
*/
private PartitionField recycleOrCreatePartitionField(Pair<Integer, Transform<?, ?>> sourceTransform, String name) {
if (formatVersion == 2 && base != null) {
int sourceId = sourceTransform.first();
Transform<?, ?> transform = sourceTransform.second();

Set<PartitionField> allHistoricalFields = Sets.newHashSet();
for (PartitionSpec partitionSpec : base.specs()) {
allHistoricalFields.addAll(partitionSpec.fields());
}

for (PartitionField field : allHistoricalFields) {
if (field.sourceId() == sourceId && field.transform().equals(transform)) {
// if target name is specified then consider it too, otherwise not
if (name == null || field.name().equals(name)) {
return field;
}
}
}
}
return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
}

@Override
public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
Expand Down Expand Up @@ -157,8 +186,7 @@ public BaseUpdatePartitionSpec addField(String name, Term term) {
Preconditions.checkArgument(added == null,
"Cannot add duplicate partition field %s=%s, already added: %s", name, term, added);

PartitionField newField = new PartitionField(
sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name);
if (newField.name() == null) {
String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/iceberg/ScanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -133,4 +135,63 @@ public void testTableScanWithPlanExecutor() {
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}

@Test
public void testReAddingPartitionField() throws Exception {
Assume.assumeTrue(formatVersion == 2);
Schema schema = new Schema(
required(1, "a", Types.IntegerType.get()),
required(2, "b", Types.StringType.get()),
required(3, "data", Types.IntegerType.get())
);
PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build();
File dir = temp.newFolder();
dir.delete();
this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion);
table.newFastAppend().appendFile(DataFiles.builder(initialSpec)
.withPath("/path/to/data/a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("a=1")
.withRecordCount(1)
.build()).commit();

table.updateSpec().addField("b").removeField("a").commit();
table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1")
.withRecordCount(1)
.build()).commit();

table.updateSpec().addField("a").commit();
table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/ab.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1/a=1")
.withRecordCount(1)
.build()).commit();

table.newFastAppend().appendFile(DataFiles.builder(table.spec())
.withPath("/path/to/data/a2b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("b=1/a=2")
.withRecordCount(1)
.build()).commit();

TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1"));
try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
for (CombinedScanTask combinedScanTask : tasks) {
Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size());
}
}

TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2));
try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
for (CombinedScanTask combinedScanTask : tasks) {
Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,11 @@ public void testFilesTableScanWithDroppedPartition() throws IOException {
table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
table.refresh();

table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
// Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per
// https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed
// data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require
// data_bucket not to be recycled.
table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit();
table.refresh();

table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,6 @@ public void testPartitionsTableRenameFields() throws ParseException {

@Test
public void testPartitionsTableSwitchFields() throws Exception {
// Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
// In V1, dropped partition fields show separately when field is re-added
// In V2, re-added field currently conflicts with its deleted form
Assume.assumeTrue(formatVersion == 1);

sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -531,17 +526,34 @@ public void testPartitionsTableSwitchFields() throws Exception {

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName);

assertPartitions(
ImmutableList.of(
row(null, "c1", null),
row(null, "c1", "d1"),
row(null, "c2", null),
row(null, "c2", "d2"),
row("d1", "c1", null),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS);
if (formatVersion == 1) {
assertPartitions(
ImmutableList.of(
row(null, "c1", null),
row(null, "c1", "d1"),
row(null, "c2", null),
row(null, "c2", "d2"),
row(null, "c3", "d3"),
row("d1", "c1", null),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS);
} else {
// In V2 re-adding a former partition field that was part of an older spec will not change its name or its
// field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new
// partition field addition will result in a new column in this metadata table)
assertPartitions(
ImmutableList.of(
row(null, "c1"),
row(null, "c2"),
row("d1", "c1"),
row("d2", "c2"),
row("d3", "c3")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);
}
}

@Test
Expand Down