From 3876695f9c62df3fbe33c27fc8e5e9c23a8e8d07 Mon Sep 17 00:00:00 2001 From: okumin Date: Sat, 25 Jan 2025 16:03:46 +0900 Subject: [PATCH] HIVE-28727: Iceberg: Refactor IcebergTableUtil.toPartitionData --- .../mr/hive/HiveIcebergStorageHandler.java | 15 +-- .../iceberg/mr/hive/IcebergTableUtil.java | 30 +----- ...erg_major_compaction_partition_evolution.q | 13 ++- ...major_compaction_partition_evolution.q.out | 99 +++++++++++++++++-- 4 files changed, 110 insertions(+), 47 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 4652df6bf480..05ab0e478d04 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -174,7 +174,6 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.actions.DeleteOrphanFiles; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.PartitionStatsHandler; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.ValidationException; @@ -569,17 +568,9 @@ private static Map getPartishSummary(Partish partish, Table tabl try (Closeable toClose = partitionStatsRecords) { PartitionStats partitionStats = Iterables.tryFind(partitionStatsRecords, stats -> { PartitionSpec spec = table.specs().get(stats.specId()); - Schema readSchema = spec.partitionType().asSchema(); - GenericRecord record = GenericRecord.create(readSchema); - - List fields = partitionType.fields(); - for (int index = 0, pos = 0; index < fields.size(); index++) { - if (readSchema.findField(fields.get(index).fieldId()) != null) { - record.set(pos++, stats.partition().get(index, Object.class)); - } - } - return spec.partitionToPath(record).equals(partish.getPartition().getName()); - + PartitionData data = IcebergTableUtil.toPartitionData(stats.partition(), partitionType, + spec.partitionType()); + return spec.partitionToPath(data).equals(partish.getPartition().getName()); }).orNull(); if (partitionStats != null) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 3230363ad7fd..35c185a29e25 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -418,36 +418,14 @@ public static void performMetadataDelete(Table icebergTable, String branchName, } public static PartitionData toPartitionData(StructLike key, Types.StructType keyType) { - PartitionData data = new PartitionData(keyType); - for (int i = 0; i < keyType.fields().size(); i++) { - Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass()); - if (val != null) { - data.set(i, val); - } - } - return data; + PartitionData keyTemplate = new PartitionData(keyType); + return keyTemplate.copyFor(key); } public static PartitionData toPartitionData(StructLike sourceKey, Types.StructType sourceKeyType, Types.StructType targetKeyType) { - PartitionData data = new PartitionData(targetKeyType); - for (int i = 0; i < targetKeyType.fields().size(); i++) { - Types.NestedField targetKey = targetKeyType.fields().get(i); - - Optional val = sourceKeyType.fields().stream() - .filter(f -> f.name().equals(targetKey.name())) - .findFirst() - .map(sourceKeyElem -> - sourceKey.get( - sourceKeyType.fields().indexOf(sourceKeyElem), - targetKey.type().typeId().javaClass() - ) - ); - if (val.isPresent()) { - data.set(i, val.get()); - } - } - return data; + StructProjection projection = StructProjection.create(sourceKeyType, targetKeyType).wrap(sourceKey); + return toPartitionData(projection, targetKeyType); } public static Expression generateExpressionFromPartitionSpec(Table table, Map partitionSpec, diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q index e6c40dd20ccb..32bd401e081c 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q @@ -36,13 +36,16 @@ tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg'); insert into ice_orc VALUES ('fn1','ln1', 1, 10, 100); insert into ice_orc VALUES ('fn2','ln2', 1, 10, 100); insert into ice_orc VALUES ('fn3','ln3', 1, 11, 100); +insert into ice_orc VALUES (null,null, null, null, null); alter table ice_orc set partition spec(company_id, dept_id); insert into ice_orc VALUES ('fn4','ln4', 1, 11, 100); insert into ice_orc VALUES ('fn5','ln5', 2, 20, 100); insert into ice_orc VALUES ('fn6','ln6', 2, 20, 100); +insert into ice_orc VALUES (null,null, null, null, null); alter table ice_orc set partition spec(company_id, dept_id, team_id); insert into ice_orc VALUES ('fn7','ln7', 2, 21, 100); insert into ice_orc VALUES ('fn8','ln8', 2, 21, 100); +insert into ice_orc VALUES (null,null, null, null, null); update ice_orc set last_name = 'ln1a' where first_name='fn1'; update ice_orc set last_name = 'ln2a' where first_name='fn2'; @@ -59,9 +62,17 @@ delete from ice_orc where last_name in ('ln1a', 'ln8a'); select * from ice_orc; describe formatted ice_orc; +select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count; + explain alter table ice_orc COMPACT 'major' and wait; alter table ice_orc COMPACT 'major' and wait; select * from ice_orc; describe formatted ice_orc; -show compactions order by 'partition'; \ No newline at end of file +show compactions order by 'partition'; + +select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out index 970467e9d99f..5a742f009255 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out @@ -46,6 +46,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1, 11, 100) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc PREHOOK: query: alter table ice_orc set partition spec(company_id, dept_id) PREHOOK: type: ALTERTABLE_SETPARTSPEC PREHOOK: Input: default@ice_orc @@ -77,6 +85,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn6','ln6', 2, 20, 100) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc PREHOOK: query: alter table ice_orc set partition spec(company_id, dept_id, team_id) PREHOOK: type: ALTERTABLE_SETPARTSPEC PREHOOK: Input: default@ice_orc @@ -100,6 +116,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn8','ln8', 2, 21, 100) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc PREHOOK: query: update ice_orc set last_name = 'ln1a' where first_name='fn1' PREHOOK: type: QUERY PREHOOK: Input: default@ice_orc @@ -203,6 +227,9 @@ POSTHOOK: query: select * from ice_orc POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### +NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL fn2 ln2a 1 10 100 fn3 ln3a 1 11 100 fn4 ln4a 1 11 100 @@ -239,20 +266,20 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"17\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"17\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### - numFiles 14 - numRows 14 + numFiles 17 + numRows 17 parquet.compression zstd #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 17 + snapshot-count 20 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -269,6 +296,43 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No Sort Columns: [] +PREHOOK: query: select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +{"company_id":100,"dept_id":1,"team_id":10} 2 0 1 +{"company_id":100,"dept_id":1,"team_id":11} 2 0 1 +{"company_id":100,"dept_id":1,"team_id":11} 2 0 1 +{"company_id":100,"dept_id":1,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":1,"team_id":null} 1 1 1 +{"company_id":100,"dept_id":2,"team_id":21} 2 0 1 +{"company_id":100,"dept_id":2,"team_id":21} 2 0 1 +{"company_id":100,"dept_id":2,"team_id":21} 2 1 1 +{"company_id":100,"dept_id":2,"team_id":21} 2 1 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 1 1 +{"company_id":100,"dept_id":2,"team_id":null} 1 1 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 0 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 0 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 0 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 1 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 1 1 +{"company_id":100,"dept_id":null,"team_id":null} 0 1 1 +{"company_id":null,"dept_id":null,"team_id":null} 0 0 1 +{"company_id":null,"dept_id":null,"team_id":null} 1 0 1 +{"company_id":null,"dept_id":null,"team_id":null} 2 0 1 PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc @@ -305,6 +369,9 @@ POSTHOOK: query: select * from ice_orc POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### +NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL fn2 ln2a 1 10 100 fn3 ln3a 1 11 100 fn4 ln4a 1 11 100 @@ -341,20 +408,20 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-summary {\"added-data-files\":\"2\",\"deleted-data-files\":\"10\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"5\",\"deleted-records\":\"10\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"8\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### - numFiles 2 - numRows 6 + numFiles 4 + numRows 9 parquet.compression zstd #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 20 + snapshot-count 23 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -379,3 +446,19 @@ CompactionId Database Table Partition Type State Worker host Worker Enqueue Time #Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- #Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +PREHOOK: query: select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_orc +#### A masked pattern was here #### +POSTHOOK: query: select `partition`, spec_id, content, record_count +from default.ice_orc.files +order by `partition`, spec_id, content, record_count +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_orc +#### A masked pattern was here #### +{"company_id":100,"dept_id":1,"team_id":null} 1 0 3 +{"company_id":100,"dept_id":2,"team_id":null} 1 0 3 +{"company_id":null,"dept_id":null,"team_id":null} 1 0 1 +{"company_id":null,"dept_id":null,"team_id":null} 1 0 2