diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 50280a8a0abe..9bbc6d3566dd 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -464,41 +464,44 @@ public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fr public Map getBasicStatistics(Partish partish) { org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable(); // For write queries where rows got modified, don't fetch from cache as values could have changed. - Table table = getTable(hmsTable); Map stats = Maps.newHashMap(); - if (getStatsSource().equals(HiveMetaHook.ICEBERG)) { - if (table.currentSnapshot() != null) { - Map summary = table.currentSnapshot().summary(); - if (summary != null) { + if (!getStatsSource().equals(HiveMetaHook.ICEBERG)) { + return hmsTable.getParameters(); + } + Table table = getTable(hmsTable); - if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) { - stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); - } + Snapshot snapshot = IcebergTableUtil.getTableSnapshot(hmsTable, table); + if (snapshot != null) { + Map summary = snapshot.summary(); + if (summary != null) { + + if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) { + stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } - if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) { - long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); - if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) && - summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) { + if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) { + long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) && + summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) { - long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP)); - long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP)); + long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP)); + long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP)); - long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes); - totalRecords = actualRecords > 0 ? actualRecords : totalRecords; - // actualRecords maybe -ve in edge cases - } - stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords)); + long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes); + totalRecords = actualRecords > 0 ? actualRecords : totalRecords; + // actualRecords maybe -ve in edge cases } + stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords)); + } - if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) { - stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); - } + if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) { + stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); } - } else { - stats.put(StatsSetupConst.NUM_FILES, "0"); - stats.put(StatsSetupConst.ROW_COUNT, "0"); - stats.put(StatsSetupConst.TOTAL_SIZE, "0"); } + } else { + stats.put(StatsSetupConst.NUM_FILES, "0"); + stats.put(StatsSetupConst.ROW_COUNT, "0"); + stats.put(StatsSetupConst.TOTAL_SIZE, "0"); } return stats; } @@ -613,8 +616,9 @@ private ColumnStatistics readColStats(Table table, Path statsPath) { public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { if (getStatsSource().equals(HiveMetaHook.ICEBERG) && hmsTable.getMetaTable() == null) { Table table = getTable(hmsTable); - if (table.currentSnapshot() != null) { - Map summary = table.currentSnapshot().summary(); + Snapshot snapshot = IcebergTableUtil.getTableSnapshot(hmsTable, table); + if (snapshot != null) { + Map summary = snapshot.summary(); if (summary != null && summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) && summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 88dd006b7721..c096246bc53a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; +import java.time.ZoneId; import java.util.Collections; import java.util.List; import java.util.Map; @@ -31,6 +32,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.TimestampTZ; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -42,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -59,6 +64,7 @@ import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.ScanTask; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -79,6 +85,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.StructProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -559,4 +566,32 @@ public static List getPartitionNames(Table icebergTable, Map 100)) (type: boolean) Statistics: Num rows: 1 Data size: 581 Basic stats: COMPLETE Column stats: COMPLETE @@ -498,14 +498,14 @@ STAGE PLANS: name: default.ice01 Filter Operator predicate: ((_col10 = _col1) and (_col10 <= 100)) (type: boolean) - Statistics: Num rows: 1 Data size: 581 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 1162 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col7 (type: int), _col6 (type: bigint), _col2 (type: string), _col5 (type: bigint), _col3 (type: string), _col10 (type: int), _col9 (type: string), _col8 (type: int) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 - Statistics: Num rows: 1 Data size: 485 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 970 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 485 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 970 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat @@ -513,14 +513,14 @@ STAGE PLANS: name: default.ice01 Filter Operator predicate: ((_col10 = _col1) and (_col10 <= 100)) (type: boolean) - Statistics: Num rows: 1 Data size: 581 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 1162 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col10 (type: int), 'Merged' (type: string), (_col8 + 10) (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 98 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 196 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 98 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 196 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat @@ -543,24 +543,24 @@ STAGE PLANS: name: default.ice01 Filter Operator predicate: (_col10 = _col1) (type: boolean) - Statistics: Num rows: 1 Data size: 581 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 1162 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col2 (type: string), _col5 (type: bigint), _col6 (type: bigint), _col7 (type: int) outputColumnNames: _col2, _col5, _col6, _col7 - Statistics: Num rows: 1 Data size: 581 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 1162 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() keys: _col7 (type: int), _col6 (type: bigint), _col2 (type: string), _col5 (type: bigint) minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4 - Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 424 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint) null sort order: zzzz sort order: ++++ Map-reduce partition columns: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint) - Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 424 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col4 (type: bigint) Reducer 3 Execution mode: vectorized @@ -570,7 +570,7 @@ STAGE PLANS: keys: KEY._col0 (type: int), KEY._col1 (type: bigint), KEY._col2 (type: string), KEY._col3 (type: bigint) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4 - Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 424 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator predicate: (_col4 > 1L) (type: boolean) Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE @@ -795,14 +795,14 @@ STAGE PLANS: TableScan alias: ice01 Snapshot ref: branch_test1 - Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 95 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: a (type: int), b (type: string), c (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 95 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 95 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java index 5343a1bb3bb4..e4076d113678 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java @@ -518,11 +518,8 @@ public static Path getDumpPath(Path root, String dbName, String tableName) { } public static String getTableSnapshotRef(String refName) { - Matcher ref = SNAPSHOT_REF.matcher(refName); - if (ref.matches()) { - return ref.group(1); - } - return null; + Matcher ref = SNAPSHOT_REF.matcher(String.valueOf(refName)); + return ref.matches() ? ref.group(1) : null; } public static Boolean isTableTag(String refName) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java index 38d67660c639..1de37c421051 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.stats.Partish; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; @@ -942,10 +943,16 @@ private Long getRowCnt( rowCnt += partRowCnt; } } else { // unpartitioned table - if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) { + Map basicStats = tbl.getParameters(); + if (MetaStoreUtils.isNonNativeTable(tbl.getTTable())) { + if (!tbl.getStorageHandler().canComputeQueryUsingStats(tbl)) { + return null; + } + basicStats = tbl.getStorageHandler().getBasicStatistics(Partish.buildFor(tbl)); + } else if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) { return null; } - rowCnt = Long.valueOf(tbl.getProperty(StatsSetupConst.ROW_COUNT)); + rowCnt = Long.valueOf(basicStats.get(StatsSetupConst.ROW_COUNT)); } return rowCnt; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 239f57b69b3e..81fde429cb31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -2036,10 +2036,7 @@ public static Range combineRange(Range range1, Range range2) { public static boolean checkCanProvideStats(Table table) { if (MetaStoreUtils.isExternalTable(table.getTTable())) { - if (MetaStoreUtils.isNonNativeTable(table.getTTable()) && table.getStorageHandler().canProvideBasicStatistics()) { - return true; - } - return false; + return MetaStoreUtils.isNonNativeTable(table.getTTable()) && table.getStorageHandler().canProvideBasicStatistics(); } return true; } @@ -2049,7 +2046,7 @@ public static boolean checkCanProvideStats(Table table) { * Can run additional checks compared to the version in StatsSetupConst. */ public static boolean areBasicStatsUptoDateForQueryAnswering(Table table, Map params) { - return checkCanProvideStats(table) == true ? StatsSetupConst.areBasicStatsUptoDate(params) : false; + return checkCanProvideStats(table) && StatsSetupConst.areBasicStatsUptoDate(params); } /** @@ -2057,7 +2054,7 @@ public static boolean areBasicStatsUptoDateForQueryAnswering(Table table, Map params, String colName) { - return checkCanProvideStats(table) == true ? StatsSetupConst.areColumnStatsUptoDate(params, colName) : false; + return checkCanProvideStats(table) && StatsSetupConst.areColumnStatsUptoDate(params, colName); } /**