From 32d4bece0e63913a7cbbdcfee51ea43b64e4085a Mon Sep 17 00:00:00 2001 From: felixYyu Date: Thu, 21 Apr 2022 18:42:51 +0800 Subject: [PATCH 1/6] fix #4563 issue --- .../org/apache/iceberg/PartitionSpec.java | 3 + .../apache/iceberg/UnboundPartitionSpec.java | 6 +- .../TestAlterTablePartitionFields.java | 60 +++++++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 288445f82a17..74675c9cc28d 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -539,6 +539,9 @@ PartitionSpec buildUnchecked() { static void checkCompatibility(PartitionSpec spec, Schema schema) { for (PartitionField field : spec.fields) { Type sourceType = schema.findType(field.sourceId()); + if (sourceType == null && field.transform().toString().equalsIgnoreCase("void")) { + continue; + } ValidationException.check(sourceType != null, "Cannot find source column for partition field: %s", field); ValidationException.check(sourceType.isPrimitiveType(), diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index 5cde0f6324a3..897673e8217e 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class UnboundPartitionSpec { @@ -52,10 +53,9 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) { PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); for (UnboundPartitionField field : fields) { - if (field.partitionId != null) { + Types.NestedField column = schema.findField(field.sourceId); + if (column != null) { builder.add(field.sourceId, field.partitionId, field.name, field.transformAsString); - } else { - builder.add(field.sourceId, field.name, field.transformAsString); } } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 9d630508b6e4..478c7fa39a79 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -399,6 +399,66 @@ public void testSparkTableAddDropPartitions() throws Exception { Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); } + @Test + public void testUnboundPartitionSpecFormatVersion1() throws Exception { + sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + + "TBLPROPERTIES ('format-version' = 1, 'write.delete.mode' = 'merge-on-read')", tableName); + Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + + sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-1-first-data')", tableName); + Assert.assertEquals("Should have 1 rows after insert", 1L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); + assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); + + sql("INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-1-second-data')", tableName); + Assert.assertEquals("Should have 2 rows after insert", 2L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); + Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + + sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-1-third-data')", tableName); + Assert.assertEquals("Should have 3 rows after insert", 3L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s DROP COLUMN data", tableName); + + Assert.assertEquals("Should have 3 rows after insert", 3L, + scalarSql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testUnboundPartitionSpecFormatVersion2() throws Exception { + sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + + "TBLPROPERTIES ('format-version' = 2, 'write.delete.mode' = 'merge-on-read')", tableName); + Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + + sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-2-first-data')", tableName); + Assert.assertEquals("Should have 1 rows after insert", 1L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); + assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); + + sql("INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-2-second-data')", tableName); + Assert.assertEquals("Should have 2 rows after insert", 2L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); + Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + + sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-2-third-data')", tableName); + Assert.assertEquals("Should have 3 rows after insert", 3L, + scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("ALTER TABLE %s DROP COLUMN data", tableName); + + Assert.assertEquals("Should have 3 rows after insert", 3L, + scalarSql("SELECT count(*) FROM %s", tableName)); + } + private void assertPartitioningEquals(SparkTable table, int len, String transform) { Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length); Assert.assertEquals("latest spark table partition transform should match", From 98ee0fe8029b62f49bd95641969dd55470381d63 Mon Sep 17 00:00:00 2001 From: felixYyu Date: Thu, 21 Apr 2022 19:32:59 +0800 Subject: [PATCH 2/6] fix NullPointerException --- .../main/java/org/apache/iceberg/UnboundPartitionSpec.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index 897673e8217e..68c5ca423b13 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -55,7 +55,11 @@ private PartitionSpec.Builder copyToBuilder(Schema schema) { for (UnboundPartitionField field : fields) { Types.NestedField column = schema.findField(field.sourceId); if (column != null) { - builder.add(field.sourceId, field.partitionId, field.name, field.transformAsString); + if (field.partitionId != null) { + builder.add(field.sourceId, field.partitionId, field.name, field.transformAsString); + } else { + builder.add(field.sourceId, field.name, field.transformAsString); + } } } From 831ec01739c3b8173b749f6ddb01e4346039c4a0 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 26 Sep 2022 12:33:38 +0200 Subject: [PATCH 3/6] Fix Spotless --- api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java index 1a3764e1a448..187a3adafd79 100644 --- a/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/UnboundPartitionSpec.java @@ -20,9 +20,9 @@ import java.util.List; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; public class UnboundPartitionSpec { From ca05e06a4994cce76adbafaa96c208f3f3344667 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 26 Sep 2022 14:25:48 +0200 Subject: [PATCH 4/6] Fix spotless --- .../TestAlterTablePartitionFields.java | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index a591e8f79159..a7c95228627a 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -423,62 +423,74 @@ public void testSparkTableAddDropPartitions() throws Exception { @Test public void testUnboundPartitionSpecFormatVersion1() throws Exception { - sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + - "TBLPROPERTIES ('format-version' = 1, 'write.delete.mode' = 'merge-on-read')", tableName); - Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + sql( + "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + + "TBLPROPERTIES ('format-version' = 1, 'write.delete.mode' = 'merge-on-read')", + tableName); + Assert.assertEquals( + "spark table partition should be empty", 0, sparkTable().partitioning().length); sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-1-first-data')", tableName); - Assert.assertEquals("Should have 1 rows after insert", 1L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); - sql("INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-1-second-data')", tableName); - Assert.assertEquals("Should have 2 rows after insert", 2L, - scalarSql("SELECT count(*) FROM %s", tableName)); + sql( + "INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-1-second-data')", + tableName); + Assert.assertEquals( + "Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); - Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + Assert.assertEquals( + "spark table partition should be empty", 0, sparkTable().partitioning().length); sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-1-third-data')", tableName); - Assert.assertEquals("Should have 3 rows after insert", 3L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s DROP COLUMN data", tableName); - Assert.assertEquals("Should have 3 rows after insert", 3L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); } @Test public void testUnboundPartitionSpecFormatVersion2() throws Exception { - sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + - "TBLPROPERTIES ('format-version' = 2, 'write.delete.mode' = 'merge-on-read')", tableName); - Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + sql( + "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " + + "TBLPROPERTIES ('format-version' = 2, 'write.delete.mode' = 'merge-on-read')", + tableName); + Assert.assertEquals( + "spark table partition should be empty", 0, sparkTable().partitioning().length); sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-2-first-data')", tableName); - Assert.assertEquals("Should have 1 rows after insert", 1L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); - sql("INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-2-second-data')", tableName); - Assert.assertEquals("Should have 2 rows after insert", 2L, - scalarSql("SELECT count(*) FROM %s", tableName)); + sql( + "INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-2-second-data')", + tableName); + Assert.assertEquals( + "Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); - Assert.assertEquals("spark table partition should be empty", 0, sparkTable().partitioning().length); + Assert.assertEquals( + "spark table partition should be empty", 0, sparkTable().partitioning().length); sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-2-third-data')", tableName); - Assert.assertEquals("Should have 3 rows after insert", 3L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); sql("ALTER TABLE %s DROP COLUMN data", tableName); - Assert.assertEquals("Should have 3 rows after insert", 3L, - scalarSql("SELECT count(*) FROM %s", tableName)); + Assert.assertEquals( + "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); } private void assertPartitioningEquals(SparkTable table, int len, String transform) { From fee7bc9d0e9ff61254ce62f73e95815637b6438b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 4 Oct 2022 18:27:28 +0200 Subject: [PATCH 5/6] Fix spotless --- .../iceberg/spark/extensions/TestAlterTablePartitionFields.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index bd2da91163ef..4e7e9b9a53ef 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -492,7 +492,7 @@ public void testUnboundPartitionSpecFormatVersion2() throws Exception { Assert.assertEquals( "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); } - + @Test public void testDropColumnOfOldPartitionFieldV1() { // default table created in v1 format From f7e2fa9102790ac4f2d6ae8cd2ec2cc8f185b879 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 4 Oct 2022 23:46:43 +0200 Subject: [PATCH 6/6] Add missing imports --- .../iceberg/spark/extensions/TestAlterTablePartitionFields.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 4e7e9b9a53ef..9bba4ed339d7 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -18,9 +18,11 @@ */ package org.apache.iceberg.spark.extensions; +import java.sql.Timestamp; import java.util.Map; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.CatalogManager; import org.apache.spark.sql.connector.catalog.Identifier;