-
Notifications
You must be signed in to change notification settings - Fork 3k
API: Drop column of deleted partitioned field to Unbound partitionSpec #4602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
32d4bec
98ee0fe
f193aa2
831ec01
ca05e06
1e9c1c6
fee7bc9
f7e2fa9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,9 +18,11 @@ | |
| */ | ||
| package org.apache.iceberg.spark.extensions; | ||
|
|
||
| import java.sql.Timestamp; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.spark.source.SparkTable; | ||
| import org.apache.spark.sql.connector.catalog.CatalogManager; | ||
| import org.apache.spark.sql.connector.catalog.Identifier; | ||
|
|
@@ -421,16 +423,97 @@ public void testSparkTableAddDropPartitions() throws Exception { | |
| "spark table partition should be empty", 0, sparkTable().partitioning().length); | ||
| } | ||
|
|
||
| @Test | ||
| public void testUnboundPartitionSpecFormatVersion1() throws Exception { | ||
| sql( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just an idea: since both tests are identical except for the This way it could be easily extended for future versions as well. WDYT? |
||
| "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " | ||
| + "TBLPROPERTIES ('format-version' = 1, 'write.delete.mode' = 'merge-on-read')", | ||
| tableName); | ||
| Assert.assertEquals( | ||
| "spark table partition should be empty", 0, sparkTable().partitioning().length); | ||
|
|
||
| sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-1-first-data')", tableName); | ||
| Assert.assertEquals( | ||
| "Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); | ||
| assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-1-second-data')", | ||
| tableName); | ||
| Assert.assertEquals( | ||
| "Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); | ||
| Assert.assertEquals( | ||
| "spark table partition should be empty", 0, sparkTable().partitioning().length); | ||
|
|
||
| sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-1-third-data')", tableName); | ||
| Assert.assertEquals( | ||
| "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s DROP COLUMN data", tableName); | ||
|
|
||
| Assert.assertEquals( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In addition to the data read test, shall we add a test to read back the |
||
| "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testUnboundPartitionSpecFormatVersion2() throws Exception { | ||
| sql( | ||
| "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, ts timestamp, data string) USING iceberg " | ||
| + "TBLPROPERTIES ('format-version' = 2, 'write.delete.mode' = 'merge-on-read')", | ||
| tableName); | ||
| Assert.assertEquals( | ||
| "spark table partition should be empty", 0, sparkTable().partitioning().length); | ||
|
|
||
| sql("INSERT INTO %s VALUES (1, current_timestamp(), 'format-version-2-first-data')", tableName); | ||
| Assert.assertEquals( | ||
| "Should have 1 rows after insert", 1L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName); | ||
| assertPartitioningEquals(sparkTable(), 1, "truncate(data, 4)"); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (2, current_timestamp(), 'format-version-2-second-data')", | ||
| tableName); | ||
| Assert.assertEquals( | ||
| "Should have 2 rows after insert", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName); | ||
| Assert.assertEquals( | ||
| "spark table partition should be empty", 0, sparkTable().partitioning().length); | ||
|
|
||
| sql("INSERT INTO %s VALUES (3, current_timestamp(), 'format-version-2-third-data')", tableName); | ||
| Assert.assertEquals( | ||
| "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
|
|
||
| sql("ALTER TABLE %s DROP COLUMN data", tableName); | ||
|
|
||
| Assert.assertEquals( | ||
| "Should have 3 rows after insert", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testDropColumnOfOldPartitionFieldV1() { | ||
| // default table created in v1 format | ||
| sql( | ||
| "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')", | ||
| tableName); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))", | ||
| tableName); | ||
|
|
||
| sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); | ||
|
|
||
| sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); | ||
|
|
||
| assertEquals( | ||
| "Should have expected rows", | ||
| ImmutableList.of(row(1L, Timestamp.valueOf("2022-01-01 10:00:00"))), | ||
| sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading from the table is actually breaking on current master: |
||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -439,9 +522,18 @@ public void testDropColumnOfOldPartitionFieldV2() { | |
| "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')", | ||
| tableName); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))", | ||
| tableName); | ||
|
|
||
| sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); | ||
|
|
||
| sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); | ||
|
|
||
| assertEquals( | ||
| "Should have expected rows", | ||
| ImmutableList.of(row(1L, Timestamp.valueOf("2022-01-01 10:00:00"))), | ||
| sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName)); | ||
| } | ||
|
|
||
| private void assertPartitioningEquals(SparkTable table, int len, String transform) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.