Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,41 +464,44 @@ public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fr
public Map<String, String> getBasicStatistics(Partish partish) {
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
// For write queries where rows got modified, don't fetch from cache as values could have changed.
Table table = getTable(hmsTable);
Map<String, String> stats = Maps.newHashMap();
if (getStatsSource().equals(HiveMetaHook.ICEBERG)) {
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {
if (!getStatsSource().equals(HiveMetaHook.ICEBERG)) {
return hmsTable.getParameters();
}
Table table = getTable(hmsTable);

if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(hmsTable, table);
if (snapshot != null) {
Map<String, String> summary = snapshot.summary();
if (summary != null) {

if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}

if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {

long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));

long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
}

if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
return stats;
}
Expand Down Expand Up @@ -613,8 +616,9 @@ private ColumnStatistics readColStats(Table table, Path statsPath) {
public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
if (getStatsSource().equals(HiveMetaHook.ICEBERG) && hmsTable.getMetaTable() == null) {
Table table = getTable(hmsTable);
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(hmsTable, table);
if (snapshot != null) {
Map<String, String> summary = snapshot.summary();
if (summary != null && summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
Expand All @@ -42,6 +46,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -59,6 +64,7 @@
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand All @@ -79,6 +85,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -559,4 +566,32 @@ public static List<String> getPartitionNames(Table icebergTable, Map<String, Str
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}

public static Snapshot getTableSnapshot(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Table table) {
String refName = HiveUtils.getTableSnapshotRef(hmsTable.getSnapshotRef());
Snapshot snapshot;
if (refName != null) {
snapshot = table.snapshot(refName);
} else if (hmsTable.getAsOfTimestamp() != null) {
ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
SessionState.get().getConf().getLocalTimeZone();
TimestampTZ time = TimestampTZUtil.parse(hmsTable.getAsOfTimestamp(), timeZone);
long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, time.toEpochMilli());
snapshot = table.snapshot(snapshotId);
} else if (hmsTable.getAsOfVersion() != null) {
try {
snapshot = table.snapshot(Long.parseLong(hmsTable.getAsOfVersion()));
} catch (NumberFormatException e) {
SnapshotRef ref = table.refs().get(hmsTable.getAsOfVersion());
if (ref == null) {
throw new RuntimeException("Cannot find matching snapshot ID or reference name for version " +
hmsTable.getAsOfVersion());
}
snapshot = table.snapshot(ref.snapshotId());
}
} else {
snapshot = table.currentSnapshot();
}
return snapshot;
}
}
22 changes: 22 additions & 0 deletions iceberg/iceberg-handler/src/test/queries/positive/iceberg_stats.q
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,27 @@ select count(*) from ice01;
insert overwrite table ice01 select * from ice01;
explain select count(*) from ice01;

-- false means that count(*) query won't use row count stored in HMS
set iceberg.hive.keep.stats=false;

create external table ice03 (id int, key int) Stored by Iceberg stored as ORC
TBLPROPERTIES('format-version'='2');

insert into ice03 values (1,1),(2,1),(3,1),(4,1),(5,1);
-- Iceberg table can utilize fetch task to directly retrieve the row count from iceberg SnapshotSummary
explain select count(*) from ice03;
select count(*) from ice03;

-- delete some values
delete from ice03 where id in (2,4);

explain select count(*) from ice03;
select count(*) from ice03;

-- iow
insert overwrite table ice03 select * from ice03;
explain select count(*) from ice03;

drop table ice01;
drop table ice02;
drop table ice03;
159 changes: 159 additions & 0 deletions iceberg/iceberg-handler/src/test/results/positive/iceberg_stats.q.out
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,155 @@ STAGE PLANS:
Processor Tree:
ListSink

PREHOOK: query: create external table ice03 (id int, key int) Stored by Iceberg stored as ORC
TBLPROPERTIES('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice03
POSTHOOK: query: create external table ice03 (id int, key int) Stored by Iceberg stored as ORC
TBLPROPERTIES('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice03
PREHOOK: query: insert into ice03 values (1,1),(2,1),(3,1),(4,1),(5,1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice03
POSTHOOK: query: insert into ice03 values (1,1),(2,1),(3,1),(4,1),(5,1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice03
PREHOOK: query: explain select count(*) from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: explain select count(*) from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: hdfs://### HDFS PATH ###
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: 1
Processor Tree:
ListSink

PREHOOK: query: select count(*) from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select count(*) from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: hdfs://### HDFS PATH ###
5
PREHOOK: query: delete from ice03 where id in (2,4)
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: default@ice03
POSTHOOK: query: delete from ice03 where id in (2,4)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: default@ice03
PREHOOK: query: explain select count(*) from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: explain select count(*) from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: hdfs://### HDFS PATH ###
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: ice03
Statistics: Num rows: 3 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
Statistics: Num rows: 3 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count()
minReductionHashAggr: 0.6666666
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: bigint)
Execution mode: vectorized
Reducer 2
Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: #Masked# Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

PREHOOK: query: select count(*) from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select count(*) from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: hdfs://### HDFS PATH ###
3
PREHOOK: query: insert overwrite table ice03 select * from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: default@ice03
POSTHOOK: query: insert overwrite table ice03 select * from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: default@ice03
PREHOOK: query: explain select count(*) from ice03
PREHOOK: type: QUERY
PREHOOK: Input: default@ice03
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: explain select count(*) from ice03
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice03
POSTHOOK: Output: hdfs://### HDFS PATH ###
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: 1
Processor Tree:
ListSink

PREHOOK: query: drop table ice01
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@ice01
Expand All @@ -212,3 +361,13 @@ POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@ice02
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice02
PREHOOK: query: drop table ice03
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@ice03
PREHOOK: Output: database:default
PREHOOK: Output: default@ice03
POSTHOOK: query: drop table ice03
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@ice03
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice03
Loading