Pull dataSize statistics from Hive for VARCHAR columns#11107
Pull dataSize statistics from Hive for VARCHAR columns#11107findepi wants to merge 15 commits intoprestodb:masterfrom
Conversation
3a72288 to
ad9bb5c
Compare
a1a589e to
0549095
Compare
|
This is alternative to #11043 -- it slipped my mind that we had this. |
There was a problem hiding this comment.
Why 7? Is this an approximation error?
There was a problem hiding this comment.
this is NDVs -- did you look at Fix NATION_PARTITIONED_BY_REGIONKEY data files commit alone?
There was a problem hiding this comment.
Yes. There are only 5 rows per partition in the data files.
There was a problem hiding this comment.
yea -- 5 or 7, i think hive's stats don't really care, do they?
There was a problem hiding this comment.
Obviously they don't use sparse HLL implementation to increase precision for the low cardinality cases
There was a problem hiding this comment.
Move the rowCount.isValueUnknown() check to the beginning of the method.
There was a problem hiding this comment.
I think we should take into account partitions row count for better estimate
private Estimate calculateDataSize(Map<String, PartitionStatistics> statisticsSample, String columnName, Estimate totalRowCount)
{
if (totalRowCount.isValueUnknown()) {
return Estimate.unknownValue();
}
long knownRowCount = 0;
double knownDataSize = 0;
for (PartitionStatistics statistics : statisticsSample.values()) {
OptionalLong partitionRowCount = statistics.getBasicStatistics().getRowCount();
HiveColumnStatistics columnStatistics = statistics.getColumnStatistics().get(columnName);
if (columnStatistics == null) {
continue;
}
OptionalDouble averageColumnLength = columnStatistics.getAverageColumnLength();
if (partitionRowCount.isPresent() && averageColumnLength.isPresent()) {
knownRowCount += partitionRowCount.getAsLong();
knownDataSize += averageColumnLength.getAsDouble() * partitionRowCount.getAsLong();
}
}
if (knownRowCount == 0) {
return Estimate.unknownValue();
}
return new Estimate((knownDataSize * totalRowCount.getValue()) / knownRowCount);
}
There was a problem hiding this comment.
The code assumed average of averages is a, on average, "good enough" estimate, but of course we can make it more precise.
There was a problem hiding this comment.
nit: Unrelated change. I'm fine with keeping it here though.
There was a problem hiding this comment.
nit: Move metadata.getColumnHandles(session, tableHandle) out of the loop
There was a problem hiding this comment.
How about:
private TableStatisticsData addPartitionStats(TableStatisticsData first, TableStatisticsData second, TpchColumn<?> partitionColumn)
{
verify(first.getColumns().keySet().equals(second.getColumns().keySet()));
Set<String> columns = first.getColumns().keySet();
Map<String, ColumnStatisticsData> columnStatistics = columns.stream()
.collect(toImmutableMap(
column -> column,
column -> combineColumnStatistics(first.getColumns().get(column), second.getColumns().get(column), column.equals(partitionColumn.getColumnName()))));
return new TableStatisticsData(
first.getRowCount() + second.getRowCount(),
columnStatistics);
}
private ColumnStatisticsData combineColumnStatistics(ColumnStatisticsData first, ColumnStatisticsData second, boolean isPartitionColumn)
{
Optional<Long> ndv = isPartitionColumn ? Optional.empty() : combine(first.getDistinctValuesCount(), second.getDistinctValuesCount(), (a, b) -> a + b);
Optional<Object> min = combine(first.getMin(), second.getMin(), this::min);
Optional<Object> max = combine(first.getMax(), second.getMax(), this::max);
// Sum data sizes only if both known
Optional<Long> dataSize = first.getDataSize()
.flatMap(leftDataSize -> second.getDataSize().map(rightDataSize -> leftDataSize + rightDataSize));
return new ColumnStatisticsData(ndv, min, max, dataSize);
}
There was a problem hiding this comment.
Passing the partitionColumn and the columnName all the way around seems to be unnatural. Also the trick with filter is obfuscation. We can filter the partition column in the method above. Remove this.
There was a problem hiding this comment.
Make all the methods but estimateStats static
There was a problem hiding this comment.
I would rather tidy up this class, since you are changing it anyway. The way it is implemented right now is hard to read. It is very confusing why the TpchColumn<?> partitionColumn, String columnName are being passed all around. So i would either drop this commit at all, and pretend as i never see this class. Or i will try to make it tidy =)
There was a problem hiding this comment.
Consider moving the statistics recorder to the presto-tests, so you can reuse it in both, TPC-H and TPC-DS
There was a problem hiding this comment.
I don't think they are identical. Anyway, let's consider this outside for this PR.
There was a problem hiding this comment.
does averageDataSize the average over all rows or only non-null rows? if non-null rows, you need to account for that.
There was a problem hiding this comment.
@sopel39's internal PR to update this says this is for non-null rows only, will update
|
@arhimondr comments applied, except for tpch stats estimations. Do you feel strong about them? |
0549095 to
44d90ff
Compare
Before the change, the tests would report something like
java.lang.AssertionError: distinctValuesCount-s differ expected [false] but found [true]
Expected :false
Actual :true
which is not clear -- `distinctValuesCount` is not a boolean.
44d90ff to
2b278ca
Compare
There was a problem hiding this comment.
LGTM % comments
This would mean that, for ASCII text, stats computed with Hive will always be 2x what we would calculate. This can be a problem for other tools. Also, this can be problem for ourselves -- not knowing who calculated the stats, we won't know how to interpret them into internal representation (or "just" we will be 2x off).
No. For ANSII text stat is going to be exactly the same. Since the ANSI characters take 1 bytes per symbol.
| .filter(partition -> partition.getKeys().get(partitionColumn).isNull()) | ||
| .map(HivePartition::getPartitionId) | ||
| .mapToLong(partitionId -> statisticsSample.get(partitionId).getBasicStatistics().getRowCount().orElse((long) rowsPerPartition.getAsDouble())) | ||
| .mapToDouble(partitionId -> orElse(statisticsSample.get(partitionId).getBasicStatistics().getRowCount(), rowsPerPartition.getAsDouble())) |
There was a problem hiding this comment.
How about max(statisticsSample.get(partitionId).getBasicStatistics().getRowCount().orElse(0), rowsPerPartition.getAsDouble()). So you can remove this weird MetastoreHiveStatisticsProvider#orElse
There was a problem hiding this comment.
This would work without this "unnice" orElse method, but doesn't convey actual meaning. Also, why should we assume a partition cannot have 0 rows?
There was a problem hiding this comment.
I mean -1. I don't feel strong about this though.
| return unmodifiableList(result); | ||
| } | ||
|
|
||
| private static double orElse(OptionalLong value, double other) |
There was a problem hiding this comment.
well, yes. But without it, it's even more ugly. E.g. to preserve readability we were casting double-to-long, and would get wrong results if average rows per partition was within [0,1)
There was a problem hiding this comment.
This method is gonna be way nicer once i change HiveColumnStatsitics#averageColumnLength to HiveColumnStatsitics#totalValuesSizeInBytes
There was a problem hiding this comment.
nullsFraction.isValueUnknown() ? 0 : nullsFraction.getValue() ?
There was a problem hiding this comment.
== 0
p.s.: I used to like this kind of a defensive programming. But than i realize that although it is more reliable, it introduces additional confusion for the future readers. + I don't think we do something like this in Presto.
There was a problem hiding this comment.
i don't like == with doubles. It scares me a lot. Updated.
There was a problem hiding this comment.
nope, we calculate include nulls (just nulls have 0 length)
There was a problem hiding this comment.
If the value is null, do not increment row count
There was a problem hiding this comment.
if the value is null, i have length=0.
if i don't increment row count, i cannot then mull by total row count (i need to consider nulls fraction as well)
There was a problem hiding this comment.
nit: you can make it to be Map<String, Type> columnTypes to make it even nicer
There was a problem hiding this comment.
I would rather tidy up this class, since you are changing it anyway. The way it is implemented right now is hard to read. It is very confusing why the TpchColumn<?> partitionColumn, String columnName are being passed all around. So i would either drop this commit at all, and pretend as i never see this class. Or i will try to make it tidy =)
There was a problem hiding this comment.
No i'm confused if you have seen this method. It is totally fine if you don't want to implement it for VARBINARY. Just want to make sure you didn't do this by mistake.
2b278ca to
f653640
Compare
oh, i though they are counting |
|
Yeah, that's why i said
I'm storing just a number of bytes. That seems to be a most reasonable thing we can do. |
i think it was omitted to simplify things. I think binary partition keys are pretty unusual. |
|
Once you merge this i will move forward with https://github.com/arhimondr/presto/tree/collect-colunm-data-size |
| Estimate nullsFraction, | ||
| OptionalDouble rowsPerPartition) | ||
| { | ||
| if (rowCount.isValueUnknown() || !rowsPerPartition.isPresent()) { |
There was a problem hiding this comment.
what's the usefulness of rowsPerPartition? From my perspective it doesn't seem that useful.
- It adds complexity to the method (there's this extra calculation, the ugly long->double or else, etc.)
- it seems unlikely that we'd have a partition that had datasize stats but not number of rows.
- It'll give weird results if you have very variable partitions.
I think we'd be better only getting only getting averageColumnLength where rowCount.isPresent() && averageColumnSize.isPresent() (and possibly nullCount.isPresent() too)
There was a problem hiding this comment.
This is what I had in the latest version from my PR. I think it's simpler.
private Estimate calculateDataSize(Map<String, PartitionStatistics> statisticsByPartitionName, String column, int numberOfPartitions)
{
List<Double> knownPartitionDataSizes = statisticsByPartitionName.values().stream()
.filter(stats -> stats.getBasicStatistics().getRowCount().isPresent()
&& stats.getColumnStatistics().containsKey(column)
&& stats.getColumnStatistics().get(column).getAverageColumnLength().isPresent()
&& stats.getColumnStatistics().get(column).getNullsCount().isPresent())
.map(stats -> {
double averageColumnLength = stats.getColumnStatistics().get(column).getAverageColumnLength().getAsDouble();
long rowCount = stats.getBasicStatistics().getRowCount().getAsLong();
long nullsCount = stats.getColumnStatistics().get(column).getNullsCount().getAsLong();
long nonNullsCount = rowCount - nullsCount;
return averageColumnLength * nonNullsCount;
})
.collect(toImmutableList());
double knownPartitionDataSizesSum = knownPartitionDataSizes.stream().mapToDouble(a -> a).sum();
long partitionsWithStatsCount = knownPartitionDataSizes.size();
if (partitionsWithStatsCount == 0) {
return Estimate.unknownValue();
}
return new Estimate(knownPartitionDataSizesSum * numberOfPartitions / partitionsWithStatsCount);
There was a problem hiding this comment.
what's the usefulness of rowsPerPartition? From my perspective it doesn't seem that useful.
this was extracted opportunistically -- this value was used in 2 existing places and i added 2 new usages
It'll give weird results if you have very variable partitions.
There is no ultimate cure for that, is there?
This is what I had in the latest version from my PR. I think it's simpler.
well, this PR was initially a lot simpler, but grew complicated along the way. If you don't feel strongly about this, I will keep it as it is.
There was a problem hiding this comment.
To clarify-- my question wasn't about why pass in rowsPerPartition vs. compute it. It was why is rowsPerPartition (which means something like "average number of rows per-partition for the partitions that we know how many rows they have") needed in this method at all. IMO it complicates the method without providing a whole lot of value. I don't think it would be very common for the value to actually be used (because generally we either won't know the averageColumnLength or we'll also know the number of rows or possibly we won't know the number of rows for any partitions). For the rare case where the value would get used, something weird is probably going on with the partitions, and so the estimate it provides probably isn't that good. Please let me know if I'm misunderstanding something
I don't care about using my code vs. a version of this. That was to illustrate how some of the complexity could be trimmed.
There was a problem hiding this comment.
@arhimondr says his upcoming pr is going to change this code anyway, so I'll approve.
|
test failures fixed, rebased, : #11176 |
sopel39
left a comment
There was a problem hiding this comment.
some retrospective comments
|
|
||
| return new Estimate(totalNullsCount.getValue() / totalRowsCount.getValue()); | ||
| double nonNullCount = rowCount.getValue() * (1 - (nullsFraction.isValueUnknown() ? 0 : nullsFraction.getValue())); | ||
| return new Estimate(knownDataSize / knownNonNullRowCount * nonNullCount); |
There was a problem hiding this comment.
Is is so much more complex than: https://github.com/starburstdata/presto-private/blob/0a47179b9d0df73724863fba226372a31c9355f5/presto-hive/src/main/java/com/facebook/presto/hive/statistics/MetastoreHiveStatisticsProvider.java#L381 ?
Why it is the case? Are there any practical examples where code here is better/more accurate?
Is there an advantage in computing knownNonNullRowCount and knownDataSize
vs using totalRowsCount and nullsFraction estimates?
Keep in mind that we are still using rowCount and nullsFraction at the end so we still risk measurements errors.
There was a problem hiding this comment.
I see that we want to do weighted average from each partition. This still could be done more in a streaming fashion.
There was a problem hiding this comment.
This method was simplified as part of the #11185. Instead of having avgColumnLength in the HiveColumnStatistics we have totalSizeInBytes. We convert avgColumnLength to totalSizeInBytes when pulling the statistics from the Hive metastore.
| continue; | ||
| } | ||
| double partitionNonNullCount = partitionRowCount - partitionColumnStatistics.getNullsCount().orElse(0); | ||
| if (partitionNonNullCount < 0) { |
There was a problem hiding this comment.
Probably you should cap it at 0 in such case, otherwise you might end up with negative total data size estimates.

No description provided.