Skip to content

Commit

Permalink
HIVE-28727: Iceberg: Refactor IcebergTableUtil.toPartitionData
Browse files Browse the repository at this point in the history
  • Loading branch information
okumin committed Feb 11, 2025
1 parent b914b6a commit 3876695
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.PartitionStatsHandler;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -569,17 +568,9 @@ private static Map<String, String> getPartishSummary(Partish partish, Table tabl
try (Closeable toClose = partitionStatsRecords) {
PartitionStats partitionStats = Iterables.tryFind(partitionStatsRecords, stats -> {
PartitionSpec spec = table.specs().get(stats.specId());
Schema readSchema = spec.partitionType().asSchema();
GenericRecord record = GenericRecord.create(readSchema);

List<Types.NestedField> fields = partitionType.fields();
for (int index = 0, pos = 0; index < fields.size(); index++) {
if (readSchema.findField(fields.get(index).fieldId()) != null) {
record.set(pos++, stats.partition().get(index, Object.class));
}
}
return spec.partitionToPath(record).equals(partish.getPartition().getName());

PartitionData data = IcebergTableUtil.toPartitionData(stats.partition(), partitionType,
spec.partitionType());
return spec.partitionToPath(data).equals(partish.getPartition().getName());
}).orNull();

if (partitionStats != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,36 +418,14 @@ public static void performMetadataDelete(Table icebergTable, String branchName,
}

public static PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
PartitionData data = new PartitionData(keyType);
for (int i = 0; i < keyType.fields().size(); i++) {
Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
if (val != null) {
data.set(i, val);
}
}
return data;
PartitionData keyTemplate = new PartitionData(keyType);
return keyTemplate.copyFor(key);
}

public static PartitionData toPartitionData(StructLike sourceKey, Types.StructType sourceKeyType,
Types.StructType targetKeyType) {
PartitionData data = new PartitionData(targetKeyType);
for (int i = 0; i < targetKeyType.fields().size(); i++) {
Types.NestedField targetKey = targetKeyType.fields().get(i);

Optional<Object> val = sourceKeyType.fields().stream()
.filter(f -> f.name().equals(targetKey.name()))
.findFirst()
.map(sourceKeyElem ->
sourceKey.get(
sourceKeyType.fields().indexOf(sourceKeyElem),
targetKey.type().typeId().javaClass()
)
);
if (val.isPresent()) {
data.set(i, val.get());
}
}
return data;
StructProjection projection = StructProjection.create(sourceKeyType, targetKeyType).wrap(sourceKey);
return toPartitionData(projection, targetKeyType);
}

public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg');
insert into ice_orc VALUES ('fn1','ln1', 1, 10, 100);
insert into ice_orc VALUES ('fn2','ln2', 1, 10, 100);
insert into ice_orc VALUES ('fn3','ln3', 1, 11, 100);
insert into ice_orc VALUES (null,null, null, null, null);
alter table ice_orc set partition spec(company_id, dept_id);
insert into ice_orc VALUES ('fn4','ln4', 1, 11, 100);
insert into ice_orc VALUES ('fn5','ln5', 2, 20, 100);
insert into ice_orc VALUES ('fn6','ln6', 2, 20, 100);
insert into ice_orc VALUES (null,null, null, null, null);
alter table ice_orc set partition spec(company_id, dept_id, team_id);
insert into ice_orc VALUES ('fn7','ln7', 2, 21, 100);
insert into ice_orc VALUES ('fn8','ln8', 2, 21, 100);
insert into ice_orc VALUES (null,null, null, null, null);

update ice_orc set last_name = 'ln1a' where first_name='fn1';
update ice_orc set last_name = 'ln2a' where first_name='fn2';
Expand All @@ -59,9 +62,17 @@ delete from ice_orc where last_name in ('ln1a', 'ln8a');
select * from ice_orc;
describe formatted ice_orc;

select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc COMPACT 'major' and wait;

select * from ice_orc;
describe formatted ice_orc;
show compactions order by 'partition';
show compactions order by 'partition';

select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count;
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1, 11, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: alter table ice_orc set partition spec(company_id, dept_id)
PREHOOK: type: ALTERTABLE_SETPARTSPEC
PREHOOK: Input: default@ice_orc
Expand Down Expand Up @@ -77,6 +85,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn6','ln6', 2, 20, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: alter table ice_orc set partition spec(company_id, dept_id, team_id)
PREHOOK: type: ALTERTABLE_SETPARTSPEC
PREHOOK: Input: default@ice_orc
Expand All @@ -100,6 +116,14 @@ POSTHOOK: query: insert into ice_orc VALUES ('fn8','ln8', 2, 21, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES (null,null, null, null, null)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: update ice_orc set last_name = 'ln1a' where first_name='fn1'
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
Expand Down Expand Up @@ -203,6 +227,9 @@ POSTHOOK: query: select * from ice_orc
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
#### A masked pattern was here ####
NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL
fn2 ln2a 1 10 100
fn3 ln3a 1 11 100
fn4 ln4a 1 11 100
Expand Down Expand Up @@ -239,20 +266,20 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"14\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"14\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"deleted-data-files\":\"2\",\"deleted-records\":\"2\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"17\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"17\",\"total-delete-files\":\"8\",\"total-position-deletes\":\"8\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 14
numRows 14
numFiles 17
numRows 17
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 17
snapshot-count 20
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -269,6 +296,43 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
#### A masked pattern was here ####
{"company_id":100,"dept_id":1,"team_id":10} 2 0 1
{"company_id":100,"dept_id":1,"team_id":11} 2 0 1
{"company_id":100,"dept_id":1,"team_id":11} 2 0 1
{"company_id":100,"dept_id":1,"team_id":null} 1 0 1
{"company_id":100,"dept_id":1,"team_id":null} 1 1 1
{"company_id":100,"dept_id":2,"team_id":21} 2 0 1
{"company_id":100,"dept_id":2,"team_id":21} 2 0 1
{"company_id":100,"dept_id":2,"team_id":21} 2 1 1
{"company_id":100,"dept_id":2,"team_id":21} 2 1 1
{"company_id":100,"dept_id":2,"team_id":null} 1 0 1
{"company_id":100,"dept_id":2,"team_id":null} 1 0 1
{"company_id":100,"dept_id":2,"team_id":null} 1 0 1
{"company_id":100,"dept_id":2,"team_id":null} 1 0 1
{"company_id":100,"dept_id":2,"team_id":null} 1 0 1
{"company_id":100,"dept_id":2,"team_id":null} 1 1 1
{"company_id":100,"dept_id":2,"team_id":null} 1 1 1
{"company_id":100,"dept_id":null,"team_id":null} 0 0 1
{"company_id":100,"dept_id":null,"team_id":null} 0 0 1
{"company_id":100,"dept_id":null,"team_id":null} 0 0 1
{"company_id":100,"dept_id":null,"team_id":null} 0 1 1
{"company_id":100,"dept_id":null,"team_id":null} 0 1 1
{"company_id":100,"dept_id":null,"team_id":null} 0 1 1
{"company_id":null,"dept_id":null,"team_id":null} 0 0 1
{"company_id":null,"dept_id":null,"team_id":null} 1 0 1
{"company_id":null,"dept_id":null,"team_id":null} 2 0 1
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
Expand Down Expand Up @@ -305,6 +369,9 @@ POSTHOOK: query: select * from ice_orc
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
#### A masked pattern was here ####
NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL
fn2 ln2a 1 10 100
fn3 ln3a 1 11 100
fn4 ln4a 1 11 100
Expand Down Expand Up @@ -341,20 +408,20 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"3\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"5\",\"total-records\":\"6\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"2\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"2\",\"deleted-data-files\":\"10\",\"removed-position-delete-files\":\"5\",\"removed-delete-files\":\"5\",\"added-records\":\"5\",\"deleted-records\":\"10\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"5\",\"changed-partition-count\":\"8\",\"total-records\":\"9\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 2
numRows 6
numFiles 4
numRows 9
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 20
snapshot-count 23
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -379,3 +446,19 @@ CompactionId Database Table Partition Type State Worker host Worker Enqueue Time
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
PREHOOK: query: select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: select `partition`, spec_id, content, record_count
from default.ice_orc.files
order by `partition`, spec_id, content, record_count
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
#### A masked pattern was here ####
{"company_id":100,"dept_id":1,"team_id":null} 1 0 3
{"company_id":100,"dept_id":2,"team_id":null} 1 0 3
{"company_id":null,"dept_id":null,"team_id":null} 1 0 1
{"company_id":null,"dept_id":null,"team_id":null} 1 0 2

0 comments on commit 3876695

Please sign in to comment.