Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.stats.Partish;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
import org.apache.hadoop.hive.serde2.Deserializer;
Expand Down Expand Up @@ -450,44 +449,45 @@ public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fr
}

@Override
public Map<String, String> getBasicStatistics(Partish partish) {
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
public Map<String, String> getBasicStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
// For write queries where rows got modified, don't fetch from cache as values could have changed.
Table table = getTable(hmsTable);
Map<String, String> stats = Maps.newHashMap();
if (getStatsSource().equals(HiveMetaHook.ICEBERG)) {
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {
if (!getStatsSource().equals(HiveMetaHook.ICEBERG)) {
return hmsTable.getParameters();
}
Table table = getTable(hmsTable);

if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {

if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}

long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {

long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
}
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));

if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
}

if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
return stats;
}
Expand Down Expand Up @@ -600,7 +600,10 @@ private ColumnStatistics readColStats(Table table, Path statsPath) {

@Override
public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
if (getStatsSource().equals(HiveMetaHook.ICEBERG) && hmsTable.getMetaTable() == null) {
if (hmsTable.getMetaTable() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query against Iceberg Branch/Tag can also benefit from the stats. We can optimize this later.

return false;
}
if (getStatsSource().equals(HiveMetaHook.ICEBERG)) {
Table table = getTable(hmsTable);
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
Expand All @@ -613,7 +616,7 @@ public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Tabl
}
}
}
return false;
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of delete files, if Iceberg statsSource can not compute query( eg. count(*)) using stats, i think HMS stats can't either. They both come from the same place -- SnapshotSummary.

We should not give a wrong impression that HMS can give the accurate stats.

Copy link
Member Author

@deniskuzZ deniskuzZ Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if we do alter table compute stats?
need to check how HMS stats works for ACID table deletes, does it stay accurate or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
In case of delete files, analyze table compute stats job can get the accurate stats as it will launch tez task to compute the stats.

And after the job analyze table compute stats, the HMS stats will be updated & accurate and iceberg.hive.keep.stats will be true, so we can use the HMS stats to optimize the count query.

But if the statsSource is Iceberg & in case of delete files, even we have done the job analyze table compute stats, we won't update the Iceberg SnapshotSummary, so we can not optimize the count query.

This will look a little weird. Users do a job analyze table compute stats to update the stats, but they can not optimize the count query if the statsSource is Iceberg & in case of delete files.

}

private String getStatsSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ default Map<String, String> getOperatorDescProperties(OperatorDesc operatorDesc,
* @param partish a partish wrapper class
* @return map of basic statistics, can be null
*/
@Deprecated
default Map<String, String> getBasicStatistics(Partish partish) {
return getBasicStatistics(partish.getTable());
}

default Map<String, String> getBasicStatistics(org.apache.hadoop.hive.ql.metadata.Table table) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,10 @@ private Long getRowCnt(
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) {
return null;
}
rowCnt = Long.valueOf(tbl.getProperty(StatsSetupConst.ROW_COUNT));
Map<String, String> basicStats = MetaStoreUtils.isNonNativeTable(tbl.getTTable()) ?
tbl.getStorageHandler().getBasicStatistics(tbl) : tbl.getParameters();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discuss:
Shoule we regard the stats is always accurate when statsSource is iceberg?
If so, we need always to keep the configuration iceberg.hive.keep.stats true when statsSource is iceberg, so that we can optimization the count(*) when statsSource is iceberg by StatsOptimizer. Same idea i wanted to do is #5215

boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);

if (!keepHiveStats) {
StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
StatsSetupConst.clearColumnStatsState(tbl.getParameters());

if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I missed the #5215
I am not very certain how accurate is the TOTAL_RECORDS_PROP from the snapshot summary especially when there are deletes.
Since we have a statsSource flag I just wanted to be consistent where we take a stats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think iceberg.hive.keep.stats should be enabled when stats source is not iceberg

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangbutao, could you please check the comments in #5215 and maybe incorporate changes from this PR into yours

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very certain how accurate is the TOTAL_RECORDS_PROP from the snapshot summary especially when there are deletes.

Iceberg table with equal delets should not be optimized by the count query optimization, so we can skip to get the stats or return null in case of existing deletes.

yes, i think iceberg.hive.keep.stats should be enabled when stats source is not iceberg

iceberg.hive.keep.stats should be always enabled(true) when stats source is iceberg. iceberg.hive.keep.stats true means that the stats is accurate.
HMS stats for iceberg can be not accurate if some other engines(Spark、Trino) write the table but not update the HMS stats.
But if the statsSource is iceberg, the stats is retrieved from iceberg SnapshotSummary which is real time and accurate.

could you please check the comments in #5215 and maybe incorporate changes from this PR into yours

#5215 I want to optimize the count query with no care the values of iceberg.hive.keep.stats. But i am not do the limit like your change that not use HMS stats when statsSource is Iceberg. I am ok to incorporate this thange as well as some other supplements to #5215, we can continue to disscuss there.


rowCnt = Long.valueOf(basicStats.get(StatsSetupConst.ROW_COUNT));
}
return rowCnt;
}
Expand Down