From 1f09e66e92bb9c0c4e65ac7b8d7939209aa493a0 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Thu, 28 Apr 2022 18:20:30 +0200 Subject: [PATCH 1/6] Core: Metadata table queries fail if a partition column was reused in V2 --- core/src/main/java/org/apache/iceberg/Partitioning.java | 7 ++++++- .../source/TestMetadataTablesWithPartitionEvolution.java | 6 ------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 2bf0b800db32..ae83f1d4891c 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -228,9 +228,14 @@ public static StructType partitionType(Table table) { PartitionField existingField = fieldMap.get(fieldId); if (existingField == null) { + String fieldName = structField.name(); + if (nameMap.containsValue(fieldName)) { + fieldName += "_" + fieldId; + } + fieldMap.put(fieldId, field); typeMap.put(fieldId, structField.type()); - nameMap.put(fieldId, structField.name()); + nameMap.put(fieldId, fieldName); } else { // verify the fields are compatible as they may conflict in v1 tables diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index 7519486bc5c7..4494c2de1edf 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -45,7 +45,6 @@ import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -482,11 +481,6 @@ public void testPartitionsTableRenameFields() throws ParseException { @Test public void testPartitionsTableSwitchFields() throws Exception { - // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292 - // In V1, dropped partition fields show separately when field is re-added - // In V2, re-added field currently conflicts with its deleted form - Assume.assumeTrue(formatVersion == 1); - sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); initTable(); Table table = validationCatalog.loadTable(tableIdent); From 31d4b0ebe65b70cbbc39a05c0124a1319bb86e17 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 11 May 2022 16:17:27 +0200 Subject: [PATCH 2/6] Core: Metadata table queries fail if a partition column was reused in V2 - the collapse method Change-Id: I91808def7cea2005e47b57d3033298afef26ca32 --- .../iceberg/BaseUpdatePartitionSpec.java | 26 ++++++++++++- .../java/org/apache/iceberg/Partitioning.java | 7 +--- ...tMetadataTablesWithPartitionEvolution.java | 38 ++++++++++++++----- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index a6a55a5fae7a..fcfa0ea3e35b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -103,7 +103,29 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { this.lastAssignedPartitionId = lastAssignedPartitionId; } - private int assignFieldId() { + /** + * Calculates a field ID to the newly added PartitionField. + * New field ID is assigned in every case for V1 tables, but for V2 we try to recycle a former ID if possible. + * @param sourceTransform - pair of source ID and transform for this PartitionField addition + * @return - the calculated field ID + */ + private int assignFieldId(Pair> sourceTransform) { + if (formatVersion == 2) { + int sourceId = sourceTransform.first(); + Transform transform = sourceTransform.second(); + + Set allHistoricalFields = Sets.newHashSet(); + for (PartitionSpec partitionSpecpec : base.specs()) { + allHistoricalFields.addAll(partitionSpecpec.fields()); + } + + for (PartitionField field : allHistoricalFields) { + if (field.sourceId() == sourceId && field.transform().equals(transform)) { + return field.fieldId(); + } + } + } + this.lastAssignedPartitionId += 1; return lastAssignedPartitionId; } @@ -158,7 +180,7 @@ public BaseUpdatePartitionSpec addField(String name, Term term) { "Cannot add duplicate partition field %s=%s, already added: %s", name, term, added); PartitionField newField = new PartitionField( - sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + sourceTransform.first(), assignFieldId(sourceTransform), name, sourceTransform.second()); if (newField.name() == null) { String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index ae83f1d4891c..2bf0b800db32 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -228,14 +228,9 @@ public static StructType partitionType(Table table) { PartitionField existingField = fieldMap.get(fieldId); if (existingField == null) { - String fieldName = structField.name(); - if (nameMap.containsValue(fieldName)) { - fieldName += "_" + fieldId; - } - fieldMap.put(fieldId, field); typeMap.put(fieldId, structField.type()); - nameMap.put(fieldId, fieldName); + nameMap.put(fieldId, structField.name()); } else { // verify the fields are compatible as they may conflict in v1 tables diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java index 4494c2de1edf..691e9f6f5481 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -45,6 +45,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -525,17 +526,34 @@ public void testPartitionsTableSwitchFields() throws Exception { sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName); sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName); + sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName); - assertPartitions( - ImmutableList.of( - row(null, "c1", null), - row(null, "c1", "d1"), - row(null, "c2", null), - row(null, "c2", "d2"), - row("d1", "c1", null), - row("d2", "c2", null)), - "STRUCT", - PARTITIONS); + if (formatVersion == 1) { + assertPartitions( + ImmutableList.of( + row(null, "c1", null), + row(null, "c1", "d1"), + row(null, "c2", null), + row(null, "c2", "d2"), + row(null, "c3", "d3"), + row("d1", "c1", null), + row("d2", "c2", null)), + "STRUCT", + PARTITIONS); + } else { + // In V2 re-adding a former partition field that was part of an older spec will not change its name or its + // field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new + // partition field addition will result in a new column in this metadata table) + assertPartitions( + ImmutableList.of( + row(null, "c1"), + row(null, "c2"), + row("d1", "c1"), + row("d2", "c2"), + row("d3", "c3")), + "STRUCT", + PARTITIONS); + } } @Test From acb245493971ca2feaf505be3febc97c40ac591a Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Thu, 12 May 2022 16:44:28 +0200 Subject: [PATCH 3/6] NPE fix for tests Change-Id: Ibba90e196e43fbde998d95ad02785f341c77f7ee --- .../main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index fcfa0ea3e35b..e5c634bd29f9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -110,7 +110,7 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { * @return - the calculated field ID */ private int assignFieldId(Pair> sourceTransform) { - if (formatVersion == 2) { + if (formatVersion == 2 && base != null) { int sourceId = sourceTransform.first(); Transform transform = sourceTransform.second(); From 8d744309332aaf8316ceba6661f73127447a369e Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Tue, 17 May 2022 17:56:22 +0200 Subject: [PATCH 4/6] review comments Change-Id: If25577df7655a13e21f4d6dd983b1b6c41601a2a --- .../iceberg/BaseUpdatePartitionSpec.java | 8 +-- .../java/org/apache/iceberg/ScanTestBase.java | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index e5c634bd29f9..06d530270db1 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -106,8 +106,8 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { /** * Calculates a field ID to the newly added PartitionField. * New field ID is assigned in every case for V1 tables, but for V2 we try to recycle a former ID if possible. - * @param sourceTransform - pair of source ID and transform for this PartitionField addition - * @return - the calculated field ID + * @param sourceTransform pair of source ID and transform for this PartitionField addition + * @return the calculated field ID */ private int assignFieldId(Pair> sourceTransform) { if (formatVersion == 2 && base != null) { @@ -115,8 +115,8 @@ private int assignFieldId(Pair> sourceTransform) { Transform transform = sourceTransform.second(); Set allHistoricalFields = Sets.newHashSet(); - for (PartitionSpec partitionSpecpec : base.specs()) { - allHistoricalFields.addAll(partitionSpecpec.fields()); + for (PartitionSpec partitionSpec : base.specs()) { + allHistoricalFields.addAll(partitionSpec.fields()); } for (PartitionField field : allHistoricalFields) { diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java index a800e4032997..5fe0c0419cea 100644 --- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Executors; @@ -28,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -133,4 +135,63 @@ public void testTableScanWithPlanExecutor() { Assert.assertEquals(2, Iterables.size(scan.planFiles())); Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); } + + @Test + public void testReaddingPartitionField() throws Exception { + Assume.assumeTrue(formatVersion == 2); + Schema schema = new Schema( + required(1, "a", Types.IntegerType.get()), + required(2, "b", Types.StringType.get()), + required(3, "data", Types.IntegerType.get()) + ); + PartitionSpec spec_a = PartitionSpec.builderFor(schema).identity("a").build(); + File dir = temp.newFolder(); + dir.delete(); + this.table = TestTables.create(dir, "test_part_evolution", schema, spec_a, formatVersion); + table.newFastAppend().appendFile(DataFiles.builder(spec_a) + .withPath("/path/to/data/a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("a=1") + .withRecordCount(1) + .build()).commit(); + + table.updateSpec().addField("b").removeField("a").commit(); + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1") + .withRecordCount(1) + .build()).commit(); + + table.updateSpec().addField("a").commit(); + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/ab.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1/a=1") + .withRecordCount(1) + .build()).commit(); + + table.newFastAppend().appendFile(DataFiles.builder(table.spec()) + .withPath("/path/to/data/a2b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("b=1/a=2") + .withRecordCount(1) + .build()).commit(); + + TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1")); + try (CloseableIterable tasks = scan1.planTasks()) { + Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1); + for (CombinedScanTask combinedScanTask : tasks) { + Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size()); + } + } + + TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2)); + try (CloseableIterable tasks = scan2.planTasks()) { + Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1); + for (CombinedScanTask combinedScanTask : tasks) { + Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size()); + } + } + } } From b057dc81db525dc1d9fc1025be9737e07dd9c417 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 18 May 2022 10:36:41 +0200 Subject: [PATCH 5/6] naming fixes Change-Id: Ib70d0cefbce629e07d670a2f7876b5f638d87963 --- core/src/test/java/org/apache/iceberg/ScanTestBase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java index 5fe0c0419cea..72136477da33 100644 --- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java @@ -137,18 +137,18 @@ public void testTableScanWithPlanExecutor() { } @Test - public void testReaddingPartitionField() throws Exception { + public void testReAddingPartitionField() throws Exception { Assume.assumeTrue(formatVersion == 2); Schema schema = new Schema( required(1, "a", Types.IntegerType.get()), required(2, "b", Types.StringType.get()), required(3, "data", Types.IntegerType.get()) ); - PartitionSpec spec_a = PartitionSpec.builderFor(schema).identity("a").build(); + PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build(); File dir = temp.newFolder(); dir.delete(); - this.table = TestTables.create(dir, "test_part_evolution", schema, spec_a, formatVersion); - table.newFastAppend().appendFile(DataFiles.builder(spec_a) + this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion); + table.newFastAppend().appendFile(DataFiles.builder(initialSpec) .withPath("/path/to/data/a.parquet") .withFileSizeInBytes(10) .withPartitionPath("a=1") From 073edd0ac80370f28c5c335b8ff78ed7329eb270 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Fri, 10 Jun 2022 17:01:10 +0200 Subject: [PATCH 6/6] add name to matching criteria and adjust test case in TestMetadataTableScans Change-Id: I9b3e0cc02b728986607481de3b9bda1d7513b2c3 --- .../iceberg/BaseUpdatePartitionSpec.java | 26 ++++++++++++------- .../iceberg/TestMetadataTableScans.java | 6 ++++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index 06d530270db1..0181b98ac8ea 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -103,13 +103,19 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec { this.lastAssignedPartitionId = lastAssignedPartitionId; } + private int assignFieldId() { + this.lastAssignedPartitionId += 1; + return lastAssignedPartitionId; + } + /** - * Calculates a field ID to the newly added PartitionField. - * New field ID is assigned in every case for V1 tables, but for V2 we try to recycle a former ID if possible. + * In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field + * ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField. * @param sourceTransform pair of source ID and transform for this PartitionField addition - * @return the calculated field ID + * @param name target partition field name, if specified + * @return the recycled or newly created partition field */ - private int assignFieldId(Pair> sourceTransform) { + private PartitionField recycleOrCreatePartitionField(Pair> sourceTransform, String name) { if (formatVersion == 2 && base != null) { int sourceId = sourceTransform.first(); Transform transform = sourceTransform.second(); @@ -121,13 +127,14 @@ private int assignFieldId(Pair> sourceTransform) { for (PartitionField field : allHistoricalFields) { if (field.sourceId() == sourceId && field.transform().equals(transform)) { - return field.fieldId(); + // if target name is specified then consider it too, otherwise not + if (name == null || field.name().equals(name)) { + return field; + } } } } - - this.lastAssignedPartitionId += 1; - return lastAssignedPartitionId; + return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); } @Override @@ -179,8 +186,7 @@ public BaseUpdatePartitionSpec addField(String name, Term term) { Preconditions.checkArgument(added == null, "Cannot add duplicate partition field %s=%s, already added: %s", name, term, added); - PartitionField newField = new PartitionField( - sourceTransform.first(), assignFieldId(sourceTransform), name, sourceTransform.second()); + PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); if (newField.name() == null) { String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index ee00bb8d801f..a357487ea1ba 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -514,7 +514,11 @@ public void testFilesTableScanWithDroppedPartition() throws IOException { table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); table.refresh(); - table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); + // Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per + // https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed + // data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require + // data_bucket not to be recycled. + table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit(); table.refresh(); table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();