Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
set hive.stats.autogather=false;

create external table test_iceberg_stats (strcol string, intcol integer) partitioned by (pcol int) stored by iceberg;

insert into table test_iceberg_stats values ('abc', 1, 1);
insert into table test_iceberg_stats values ('def', 2, 2);
insert into table test_iceberg_stats values ('ghi', 3, 3);

set hive.iceberg.stats.source=iceberg;
-- No column stats is written in puffin files yet.
explain analyze table test_iceberg_stats compute statistics for columns;

-- Column stats is written in puffin files.
analyze table test_iceberg_stats compute statistics for columns;
explain analyze table test_iceberg_stats compute statistics for columns;

set hive.iceberg.stats.source=metastore;
-- No column stats must be seen when accessing metastore.
explain analyze table test_iceberg_stats compute statistics for columns;
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
PREHOOK: query: create external table test_iceberg_stats (strcol string, intcol integer) partitioned by (pcol int) stored by iceberg
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@test_iceberg_stats
POSTHOOK: query: create external table test_iceberg_stats (strcol string, intcol integer) partitioned by (pcol int) stored by iceberg
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@test_iceberg_stats
PREHOOK: query: insert into table test_iceberg_stats values ('abc', 1, 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@test_iceberg_stats
POSTHOOK: query: insert into table test_iceberg_stats values ('abc', 1, 1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@test_iceberg_stats
PREHOOK: query: insert into table test_iceberg_stats values ('def', 2, 2)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@test_iceberg_stats
POSTHOOK: query: insert into table test_iceberg_stats values ('def', 2, 2)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@test_iceberg_stats
PREHOOK: query: insert into table test_iceberg_stats values ('ghi', 3, 3)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@test_iceberg_stats
POSTHOOK: query: insert into table test_iceberg_stats values ('ghi', 3, 3)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@test_iceberg_stats
PREHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@test_iceberg_stats
PREHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
POSTHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@test_iceberg_stats
POSTHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-0
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: test_iceberg_stats
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: strcol (type: string), intcol (type: int), pcol (type: int)
outputColumnNames: strcol, intcol, pcol
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: max(length(strcol)), avg(COALESCE(length(strcol),0)), count(1), count(strcol), compute_bit_vector_hll(strcol), min(intcol), max(intcol), count(intcol), compute_bit_vector_hll(intcol), min(pcol), max(pcol), count(pcol), compute_bit_vector_hll(pcol)
minReductionHashAggr: 0.99
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: struct<count:bigint,sum:double,input:int>), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-2
Stats Work
Basic Stats Work:
Column Stats Desc:
Columns: strcol, intcol, pcol
Column Types: string, int, int
Table: default.test_iceberg_stats

PREHOOK: query: analyze table test_iceberg_stats compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@test_iceberg_stats
PREHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
POSTHOOK: query: analyze table test_iceberg_stats compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@test_iceberg_stats
POSTHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
PREHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@test_iceberg_stats
PREHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
POSTHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@test_iceberg_stats
POSTHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-0
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: test_iceberg_stats
Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: strcol (type: string), intcol (type: int), pcol (type: int)
outputColumnNames: strcol, intcol, pcol
Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: max(length(strcol)), avg(COALESCE(length(strcol),0)), count(1), count(strcol), compute_bit_vector_hll(strcol), min(intcol), max(intcol), count(intcol), compute_bit_vector_hll(intcol), min(pcol), max(pcol), count(pcol), compute_bit_vector_hll(pcol)
minReductionHashAggr: 0.6666666
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type: struct<count:bigint,sum:double,input:int>), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-2
Stats Work
Basic Stats Work:
Column Stats Desc:
Columns: strcol, intcol, pcol
Column Types: string, int, int
Table: default.test_iceberg_stats

PREHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@test_iceberg_stats
PREHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
POSTHOOK: query: explain analyze table test_iceberg_stats compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@test_iceberg_stats
POSTHOOK: Output: default@test_iceberg_stats
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-0
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: test_iceberg_stats
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: strcol (type: string), intcol (type: int), pcol (type: int)
outputColumnNames: strcol, intcol, pcol
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: max(length(strcol)), avg(COALESCE(length(strcol),0)), count(1), count(strcol), compute_bit_vector_hll(strcol), min(intcol), max(intcol), count(intcol), compute_bit_vector_hll(intcol), min(pcol), max(pcol), count(pcol), compute_bit_vector_hll(pcol)
minReductionHashAggr: 0.99
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: struct<count:bigint,sum:double,input:int>), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
Group By Operator
aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 752 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-2
Stats Work
Basic Stats Work:
Column Stats Desc:
Columns: strcol, intcol, pcol
Column Types: string, int, int
Table: default.test_iceberg_stats

Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,11 @@ public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaExce
if (!(tbl.isMaterializedView() || tbl.isView() || tbl.isTemporary())) {
setOrRemoveColumnStatsAccurateProperty(db, tbl, colStatDesc.getColName(), success);
}
tbl.getStorageHandler().setColStatistics(tbl, colStats);
} else {
// Set table or partition column statistics in metastore.
db.setPartitionColumnStatistics(request);
}
// TODO: Write stats for native tables only (See HIVE-27421)
db.setPartitionColumnStatistics(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about the change.
If we can not get stats from puffine due to some exception, we can fallback get stats from metastore. So i think maybe write stats into the two places is meaningful. Please correct me if i misunderstand. Thansk.

Copy link
Contributor Author

@SourabhBadhya SourabhBadhya Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangbutao I agree with your point. However, storing stats in 2 places has its pros & cons -
Pros -

  1. We can fallback to metastore by changing the config - hive.iceberg.stats.source=metastore if we are not able to get stats from Puffin files.

Cons -

  1. Any change in Puffin files by external clients is not visible to metastore.
  2. Performance effect of executing these metastore DB calls to store column stats.

In the approach mentioned in the PR, if users want to use metastore to get stats if they are not able to get stats from Puffin, then set hive.iceberg.stats.source=metastore and execute ANALYZE TABLE <tableName> COMPUTE STATISTICS FOR COLUMNS. (This will have an overhead of one more ANALYZE query).

I will leave it to the community to decide if its best to store stats in 2 places or storing it in a single place is sufficient. If the community thinks that this it is best to store in 2 places, then I won't proceed further. Otherwise, I will continue with the patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can not get stats from puffine due to some exception, we can fallback get stats from metastore. So i think maybe write stats into the two places is meaningful

Storing at two places have additional costs during write & currently we have two modes, "iceberg" & "metastore", so both denotes where to store the stats.

Storing at both sides, seems to be a third mode, like "both" and presently we don't have a fallback logic either during read side, that if puffin file are inaccessible then go to metastore kind of thing.

May be if we want such a thing, we can have a new mode, if we feel that is required in future stages.

As of now, I think, "iceberg" mode should store only in puffin and "metastore" mode should store only in "metastore"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long rowCnt = getRowCnt(pctx, tsOp, tbl);
// if we can not have correct table stats, then both the table stats and column stats are not useful.
if (rowCnt == null) {

private Long getRowCnt(
ParseContext pCtx, TableScanOperator tsOp, Table tbl) throws HiveException {
Long rowCnt = 0L;
if (tbl.isPartitioned()) {
for (Partition part : pctx.getPrunedPartitions(
tsOp.getConf().getAlias(), tsOp).getPartitions()) {
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(part.getTable(), part.getParameters())) {
return null;
}
long partRowCnt = Long.parseLong(part.getParameters().get(StatsSetupConst.ROW_COUNT));
rowCnt += partRowCnt;
}
} else { // unpartitioned table
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(tbl, tbl.getParameters())) {
return null;
}
rowCnt = Long.valueOf(tbl.getProperty(StatsSetupConst.ROW_COUNT));
}
return rowCnt;

Currently, Like this example HIVE-27347 always uses the iceberg basic stats from metatstore to optimize count(*) query. We should consider how to do this if only using puffin stats.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #5400. to address above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

above item is resolved now, so we could proceed with the merge

end = System.currentTimeMillis();
LOG.info("Time taken to update " + colStats.size() + " stats : " + ((end - start)/1000F) + " seconds.");
}
Expand Down
Loading