Skip to content

Use only latest Puffin files for Iceberg stats#16745

Merged
findepi merged 2 commits intotrinodb:masterfrom
homar:homar/read_only_newest_stats_file_for_iceberg
May 18, 2023
Merged

Use only latest Puffin files for Iceberg stats#16745
findepi merged 2 commits intotrinodb:masterfrom
homar:homar/read_only_newest_stats_file_for_iceberg

Conversation

@homar
Copy link
Copy Markdown
Member

@homar homar commented Mar 27, 2023

Description

The goal of this change is to improve performance of reading stats, also older stats files may have skewed data which is not optimal for CBO.

Additional context and related issues

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Iceberg
* Use only latest puffin file for statistics. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Mar 27, 2023
@homar homar requested a review from findepi March 27, 2023 12:12
@findepi findepi requested review from alexjo2144 and findinpath and removed request for findepi March 27, 2023 12:51
@github-actions github-actions bot added the iceberg Iceberg connector label Mar 27, 2023
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create this here, if it's not needed until the loop below?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove here? The map isn't used if the value is found, so I don't see the point in modifying it.

Seems this is leftover from before, and we can change to the three-argument version of toMap() that returns an unmodifiable map.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not anymore the result of previous analyze right?
ANALYZE " + tableName + " WITH (columns = ARRAY['nationkey', 'regionkey']) has replaced the stats for comment and name as well.
Does the comment still make sense?

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from 58cad8f to b174d9a Compare March 28, 2023 10:10
@findinpath
Copy link
Copy Markdown
Contributor

Could you please add in the commit comment also an explanation about why is this change being done?

@homar homar requested a review from findinpath April 3, 2023 09:06
@findepi
Copy link
Copy Markdown
Member

findepi commented Apr 3, 2023

(x) Release notes are required, with the following suggested text:

# Iceberg
* Use only latest puffin file for statistics. ({issue}`issuenumber`)

I think we should skip this from release notes.
Per our knowledge, this should have no end-user visible impact for currently existing tables.

@findepi findepi added the no-release-notes This pull request does not require release notes entry label Apr 3, 2023
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOnlyElement(statsFileBySnapshot.values()

more importantly, this special-case path can return a stat file that the longer version below wouldn't, when there is stats file for a newer snapshot)

let's remove the == 1 special case

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// No statistics files

?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLatestStatisticsFile

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used now;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reanalyze with a subset of columns should carry on unchanged stats to the new stats file

that's the basic idea behind this whole change

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch 2 times, most recently from f3df999 to 42545df Compare April 3, 2023 21:01
@homar homar requested a review from findepi April 4, 2023 08:51
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if we used INCREMENTAL_UPDATE always?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would call unnecessarily call code that merges changes instead of just replacing them i think nothing else

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INCREMENTAL_UPDATE facilitates stats update during inserts
we cannot update stats during deletions though. That's why we need REPLACE mode. ANALYZE uses REPLACE mode to "erase" values that are no longer in the table. (this is a challenge for incremental analyze, but that's another story).

now, ANALYZE wants to use REPLACE mode regardless of whether all data columns got analyzed, or only the subset. Otherwise it wouldn't be able to fix stats drifting over time. see #16889

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cherry-picked your pr and the tests passed so I'd assume we are fine. Or what am I missing ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That test doesn't cover the case where subset of columns is being analyzed. Can modify the test accordingly?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated #16889 to include analyze with a subset of columns.
The current version of this PR will break that test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure we need to capture that in a handle, as the finish* method can know the analyzed vs all column names

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did it for 2 reasons, and probably both are invalid :|
Initially I wanted to do that based on column IDs but that is not feasible because they come as Strings as ANALYZE properties.
When I realised that I didn't revert this because I know I will need this for INCREMENTAL ANALYZE anyway. But as it is premature here I will do what you suggest.

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch 3 times, most recently from 4f04088 to 1670550 Compare April 7, 2023 14:53
@homar homar requested a review from findepi April 7, 2023 15:14
Copy link
Copy Markdown
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall a good change, just a couple style nitpicks and things

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as columns.size() right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use Stream.distinct() rather than collecting to a set.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but then I will have to collect to List and get size of that list so I thought that this is simpler ;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to clean this up, why are they commented out?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, silly, I just commented them out for the debugging purpose..

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from 1670550 to 4bbc03e Compare April 11, 2023 09:02
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated #16889 to include analyze with a subset of columns

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated #16889 to include analyze with a subset of columns.
The current version of this PR will break that test.

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch 2 times, most recently from bed55cc to 4736938 Compare April 11, 2023 15:25
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see need to distinguisih between the two cases.

mergeStatisticsIfNecessary gets some statistics to write.
in REPLACE mode, it should write those new statistics, and should carry forward all statistics that hasn't been replaced (other columns, or other stats).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we can remove the extra case and always run what's in the REPLACE_PARTIALLY block now, right?. It may just be worth refactoring the updateStats method slightly to make the reading of the previous Puffin file lazy.

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from 5f62ccf to d3315e9 Compare April 18, 2023 22:31
@homar homar requested a review from findepi April 18, 2023 22:33
@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch 3 times, most recently from 92fe842 to 3aa690f Compare April 20, 2023 08:40
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove, the class is used

the right way to deal with error-prone reporting problems is do add record compact constructor making a defensive copy and null checks on the provided parameters. thank you, error-prone, for reminding us about that :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks like validation belonging to the (compact) constructor?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is .map(entry -> Map.entry(entry.getKey(), entry.getValue())) for?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't come up with a better way to make a one stream of type Stream<Map.Entry<Integer, Object>> out of these two...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is oldSketches for?

why isn't io.trino.plugin.iceberg.TableStatisticsWriter#copyRetainedStatistics covering it?

(i think i found out -- the writer should use getLatestStatisticsFile and then reading oldSketches isn't needed here)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use getLatestStatisticsFile

getLatestStatisticsFile(table, snapshotId)
                        .ifPresent(previousStatisticsFile -> copyRetainedStatistics

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE shows a warning here. Is it because of my specific configuration, or something to address?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability: orElse for an optional created over previous 37 lines isn't readable to me

correctness: in INCREMENTAL_UPDATE, the collectedNdvSketches need to be merged with existing stats, otherwise they are incorrect.
i'd expect #17227 to fail (but it doesn't). Can you explain to me where the logic applied ensuring we don't write NDV(1) after inserting one row to a table that already has million of rows, but doesn't have stats?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found the code, it's here

Set<String> columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream()
.filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown())
.map(entry -> ((IcebergColumnHandle) entry.getKey()).getName())
.collect(toImmutableSet());
return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {});

still, the stats writer needs to be smarter, as if that code didn't exist.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I added that test.
  2. I will refactor that orElse
  3. It is merged here
 if (finalUpdateMode == INCREMENTAL_UPDATE) {
                                newSketch = SetOperation.builder().buildUnion().union(readPreviousSketch(read), newSketch);
                            }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per the comment elsewhere, the else { sholdn't be needed, the old values should be taken care of using copyRetainedStatistics

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from 3aa690f to 734bc7f Compare April 26, 2023 10:48
Copy link
Copy Markdown
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure TestIcebergStatistics passes also when

  • you include test added in #17227
  • apply the below change
Index: plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java	(revision 734bc7f5a61c503044fac6ce73aa432751a3550b)
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java	(date 1682514123649)
@@ -1789,7 +1789,7 @@
             return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
         }
         Set<String> columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream()
-                .filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown())
+                // .filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown()) -- this is for optimization only, not correctness, so let's see what happens without it
                 .map(entry -> ((IcebergColumnHandle) entry.getKey()).getName())
                 .collect(toImmutableSet());
         return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {});

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch 2 times, most recently from 02c3db4 to 5181c1f Compare April 26, 2023 22:31
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why read anything in REPLACE mode?
i think we gonna not use the read results anyway?

Comment on lines 225 to 226
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if updateMode is something else than REPLACE or INCREMENTAL_UPDATE? (future-proofing the code)

Comment on lines 198 to 200
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am looking at this diff and i don't understand why this is being removed.
Wasn't the previous structure OK?

same for mergeStatisticsIfNecessary return type. What was wrong about it?
if, in the future, we collect anything else besides NDV (CollectedStatistics changes), we want the mergeStatisticsIfNecessary to be updated appropriately, so having the method return CollectedStatistics looked right to me

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this only if no previous statistics file? Shouldn't REPLACE mean always replace?

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from 973b638 to fbe72e0 Compare April 27, 2023 19:09
@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from fbe72e0 to c8b9e0f Compare May 5, 2023 07:43
@homar
Copy link
Copy Markdown
Member Author

homar commented May 5, 2023

rebasing

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from c8b9e0f to 024cca9 Compare May 5, 2023 17:25
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is needed only in one case, so should inside switch below

i pushed a fixup with this (and some small other changes)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be same as hasUsefulData above, i.e.

.filter(blobMetadata -> columnsWithRecentlyComputedStats.contains(getOnlyElement(blobMetadata.fields())))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i fail to understand why this .filter(blobMetadata -> pendingPreviousNdvSketches.contains(getOnlyElement(blobMetadata.inputFields()))) is removed.
can you please explain?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably lost it while changing the implementation few times. Thanks for catching it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove if (columnsWithRecentlyComputedStats.remove(fieldId)), here we shouldn't be reading anything that we're not joining with computed NDVs

everything else should go with copyRetainedStatistics path

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columnsWithRecentlyComputedStats will become immutable, so no need for initializing it with new HashSet<>(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means "for any column we have only partial data (previously had no NDV information), treat this partial data as complete information".
So if i insert 1 row into a table with 1 M rows, i can have NDV of 1 for such column.

Am i reading this right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can inline it back now

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from eeb1af4 to f524e09 Compare May 9, 2023 19:13
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

CompactSketch union = SetOperation.builder().buildUnion().union(
                                        readPreviousSketch(read),
                                        requireNonNull(collectedNdvSketches.get(fieldId), "collectedNdvSketches.get(fieldId) is null"));

however, if you look at the PR diff and want to avoid unnecessary changes, the code should look like this

Memory memory = Memory.wrap(ByteBuffers.getBytes(read.second())); // Memory.wrap(ByteBuffer) results in a different deserialized state
                                CompactSketch previousSketch = CompactSketch.wrap(memory);
                                CompactSketch newSketch = requireNonNull(collectedNdvSketches.get(fieldId), "collectedNdvSketches.get(fieldId) is null");
                                ndvSketches.put(fieldId, SetOperation.builder().buildUnion().union(previousSketch, newSketch));

You can view the PR diff at https://github.com/trinodb/trino/pull/16745/files

@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from f524e09 to d8eebd1 Compare May 11, 2023 09:39
@findepi findepi changed the title Use only latest puffin files for Iceberg stats Use only latest Puffin files for Iceberg stats May 12, 2023
@findepi findepi force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from d8eebd1 to ce4cd66 Compare May 12, 2023 09:17
@findepi
Copy link
Copy Markdown
Member

findepi commented May 12, 2023

pushed a small change to the writer & commit message

We should require that one Puffin file has all the stats that are still
relevant, thus we should not need to traverse stat files.
@homar homar force-pushed the homar/read_only_newest_stats_file_for_iceberg branch from ce4cd66 to 21d80fd Compare May 14, 2023 10:02
@findepi findepi merged commit 6c80092 into trinodb:master May 18, 2023
@findepi
Copy link
Copy Markdown
Member

findepi commented May 18, 2023

thank you

@github-actions github-actions bot added this to the 419 milestone May 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed iceberg Iceberg connector no-release-notes This pull request does not require release notes entry

Development

Successfully merging this pull request may close these issues.

5 participants