Refactor Iceberg table statistics to be deterministic#9906
Refactor Iceberg table statistics to be deterministic#9906findepi merged 1 commit intotrinodb:masterfrom
Conversation
|
@alexjo2144 the build is red |
d4515b5 to
248943a
Compare
|
@findepi finally had some time to get back to this. Mind taking a look? |
There was a problem hiding this comment.
add empty line between immutable and mutable state
There was a problem hiding this comment.
computeIfAbsent (without another get) gives you 0.5x map access and skips IcebergStatisticsBuilder allocation.
There was a problem hiding this comment.
Functions.compose -> plan lambda entry -> ...
There was a problem hiding this comment.
You're doing 3 map lookups (contains, get, merge).
You can do this in one shot:
nullCounts.merge(id, nullCount, (existingCount, newCount) ->
existingCount.isPresent() && newCount.isPresent() ? Optional.of(existingCount.get() + newCount.get()) : Optional.empty());
(you can extract lambda body to a method like sumOfOptionals)
There was a problem hiding this comment.
This method is also equivalent to this:
nullCounts.merge(id, nullCount, (oldCount, newCount) ->
oldCount.flatMap(oldValue ->
newCount.map(newValue -> newValue + oldValue)));Though after looking at this, it's definitely less clear about the intent, though I would use this form if extracting a method to add optionals.
Actually, instead of just addition, maybe mergeOptionals would be better (I see somewhere else in this file it could be used). Here, it would be used like mergeOptionals(oldCount, newCount, Long::sum).
/**
* Apply a function to the values in two optionals, returning an optional containing the result.
* If either argument is empty, return empty.
*/
<A, B, C> Optional<C> mergeOptionals(Optional<A> a, Optional<B> b, BiFunction<A, B, C> mergeFunction)
{
return a.flatMap(aa -> b.map(bb -> mergeFunction.apply(aa, bb)));
}There was a problem hiding this comment.
return a.flatMap(aa -> b.map(bb -> mergeFunction.apply(aa, bb)));
i thought about that, but i don't find it readable, that's why i suggested ?: use
There was a problem hiding this comment.
Before my changes the ORC version of this table was deemed to have invalid column metrics, thus the NULL min/max/null count rows. I think that was a bug though, and the stats are the same now for both Parquet and ORC, with a few exceptions below.
jirassimok
left a comment
There was a problem hiding this comment.
I don't fully understand how this works, but overall it looks pretty good.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergStatistics.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Should the builder's method return this?
There was a problem hiding this comment.
Do these need to be declared so long before they're used?
There was a problem hiding this comment.
Any closer and they'd be inside the loop
There was a problem hiding this comment.
I see partitionValue.ifPresentOrElse here to avoid Optional.get.
Or maybe just an ifPresent, leaving the updateNullCountStats call outside:
partitionValues.get(id).ifPresent(partition -> {
// ...
updateMinMaxStats(...);
});
updateNullCountStats(id, partitionValue.map(v -> 0).orElseGet(dataFile::recordCount));There was a problem hiding this comment.
I see what you're getting at. I kinda like the separation as it is because there's a "this partition value is non-null" block, and a "this partition value is null" block.
There was a problem hiding this comment.
I'm not a fan of these null checks.
Why not inline upperBounds and lowerBounds here as optionals (changing the signatureof convertBounds)?
Object lowerBound = convertBounds(idToTypeMapping, dataFile.lowerBounds())
.map(bounds -> convertIcebergValueToTrino(column.type(), bounds)
.orElse(null);There was a problem hiding this comment.
Actually, maybe the bounds should actually be Optional themselves, rather than nullable.
There was a problem hiding this comment.
This method is also equivalent to this:
nullCounts.merge(id, nullCount, (oldCount, newCount) ->
oldCount.flatMap(oldValue ->
newCount.map(newValue -> newValue + oldValue)));Though after looking at this, it's definitely less clear about the intent, though I would use this form if extracting a method to add optionals.
Actually, instead of just addition, maybe mergeOptionals would be better (I see somewhere else in this file it could be used). Here, it would be used like mergeOptionals(oldCount, newCount, Long::sum).
/**
* Apply a function to the values in two optionals, returning an optional containing the result.
* If either argument is empty, return empty.
*/
<A, B, C> Optional<C> mergeOptionals(Optional<A> a, Optional<B> b, BiFunction<A, B, C> mergeFunction)
{
return a.flatMap(aa -> b.map(bb -> mergeFunction.apply(aa, bb)));
}There was a problem hiding this comment.
This could be a stream.
idToMetricMap.entrySet().stream().map(...).collect(toImmutableMap(Entry::getKey, Entry::getValue))There was a problem hiding this comment.
Or make the method return an Optional (also noted in an earlier comment).
There was a problem hiding this comment.
If it is, ImmutableMap will throw an exception.
There was a problem hiding this comment.
This could use ifPresent, or even better, it could use map or mergeOptionals (suggested above).
this.min = this.min.map(currentMin ->
min != null && compareTrinoValue(min, currentMin) < 0 ? newValue : currentMin);
this.min = mergeOptionals(this.min, Optional.ofNullable(min), (currentValue, newValue) ->
compareTrinoValue(newValue, currentMin) < 0 ? newValue : currentValue);(If you make the bound variables Optionals as I suggested in an earlier comment, then I think mergeOptionals is best here. Otherwise, I think the map version is better.)
There was a problem hiding this comment.
These miss a case where we want to invalidate the stats. For example, if the first file has stats for a column and then the second file does not, we should treat that the same as if the order is reversed.
|
Comments addressed in the fixup, however I realized this doesn't work with tables that have gone through some schema evolution. Need to work on a fix for that before re-reviewing. |
There was a problem hiding this comment.
Are Iceberg Types safe to use as map keys?
(my initial thought was to have List<> columnTrinoType and correlate with columns based on list index.)
There was a problem hiding this comment.
Mapping them here is useful when you have many columns of same type, with stats.
And it only matters when # files isn't huge.
Doing this in ColumnStatistics::new is simpler and IMO sufficient, as you do lookup per column.
(originally you had lookup per column x file)
There was a problem hiding this comment.
Why Optional?
How does empty() differ from empty map?
There was a problem hiding this comment.
This is what breaks the schema evolution.
I notice that you didn't have this check and deemed OK.
Maybe we want if (identityPartitionFieldIds.contains(id) && partitionValues.containsKey(id)), so that we still try to take min/max from file stats?
Actually, is identityPartitionFieldIds.contains(id) important?
if partitionValues.containsKey(id) should be enough. The current table partitioning is not important when calculating the stats.
There was a problem hiding this comment.
Actually, is identityPartitionFieldIds.contains(id) important?
I was using that to proxy checking if the partition has a transform. If the partitioning is on hour(ts) we can't use the partition information to calculate max(ts), but you're right we can't use the current partitioning it needs to be the spec for that file.
There was a problem hiding this comment.
lowerBounds, upperBounds are unnecessarily Optional. "No entry" (null) is treated the same as "no map at all".
There was a problem hiding this comment.
Do typeToComparisonHandle.get(type) only if constructing new ColumnStatistics
There was a problem hiding this comment.
i missed that previously -- the ColumnStatistics captures initial bounds, so on the first round the call to updateMinMax is noop.
You can skip the call in a somewhat verbose manner with Map.compute
columnStatistics.compute(id, (ignored, columnStatistics) -> {
if (columnStatistics == null) {
columnStatistics = new ColumnStatistics(lowerBound, upperBound, comparisonHandle);
}
else {
columnStatistics.updateMinMax(lowerBound, upperBound);
}
return columnStatistics;
});(or document that you're doing what you're doing currently)
There was a problem hiding this comment.
comparisonHandle is immutable state, so fits better as first arg (as you put the field order)
There was a problem hiding this comment.
i'd remove Optional.of().
also Optional.empty() -> "Empty"
(the fact that value is wrapped in an Optional is obvious, and doesn't need to be talked about)
|
Linking a thread I started on the iceberg slack channel on how to deal with schema evolution vs missing metrics, still don't have a clear answer for how to tell the two apart though https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1642177083095300 |
c041780 to
64bbf63
Compare
|
AC thanks |
|
@nineinchnick what is |
findepi
left a comment
There was a problem hiding this comment.
@alexjo2144 please squash (sans rebase)
There was a problem hiding this comment.
Fix bug in Iceberg partition schema evolution
this will be squashed, right?
also, should we have a test, with two partitioning transformations over a column?
There was a problem hiding this comment.
Yeah, I'll put it in a separate PR though
@findepi the It's marked as failed, because there were other failures. Marking it as successful would be a false positive if it contained error annotations. |
|
@nineinchnick could it be attached to the ci flow somehow? |
64bbf63 to
712277b
Compare
|
Squashed. Thanks @findepi |
|
We have millions of files and find |
|
@vincentpoon this PR probably didn't change how the stats are calculated, just made the code saner & "more correct" in any case, let's have an issue |
Fixes #9716
Existing table statistics were non-deterministic because they depended on the order that data files were loaded from the Iceberg API. This hopefully cleans the code up a bit and makes it more consistent.