Skip to content

Cache Iceberg equality and positional delete filters.#13112

Closed
lhofhansl wants to merge 2 commits intotrinodb:masterfrom
lhofhansl:delete_filter_cache
Closed

Cache Iceberg equality and positional delete filters.#13112
lhofhansl wants to merge 2 commits intotrinodb:masterfrom
lhofhansl:delete_filter_cache

Conversation

@lhofhansl
Copy link
Member

@lhofhansl lhofhansl commented Jul 7, 2022

Description

This follows the apparent design choice of iceberg's delete filters operating on partitions at a time.

Before this change Iceberg DeleteFilters were reloaded and reparsed for each page. This PR keeps filters for the split, which is how Iceberg delete filters are designed.
This uses only existing API from Iceberg's DeleteFilter.
This speeds up some queries involving delete filters by a factor of 1000 or more. See #13092 for an explanation.

Is this change a fix, improvement, new feature, refactoring, or other?

Performance Fix

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

Iceberg Connector

How would you describe this change to a non-technical end user or system administrator?

Deleting rows in Iceberg V2 leads to very slow read performance following the delete.

Related issues, pull requests, and links

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(x) No release notes entries required.
( ) Release notes entries required with the following suggested text:

@cla-bot cla-bot bot added the cla-signed label Jul 7, 2022
@lhofhansl lhofhansl requested review from alexjo2144 and findepi July 7, 2022 04:54
@lhofhansl
Copy link
Member Author

lhofhansl commented Jul 7, 2022

Looking at the failures.

... should be fixed now.

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from d53f3a6 to b1d01c5 Compare July 7, 2022 16:45
@findinpath
Copy link
Contributor

Can we add tests similar to what is found io.trino.plugin.iceberg.TestIcebergMetadataFileOperations to see how the newly added functionality actually works.

Maybe add a preparatory commit with the corresponding tests before your changes so that in the main commit of this PR can be seen the improvements that are coming in this PR.

Copy link
Contributor

@findinpath findinpath Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having a hard time to understand what is actually being cached here.

Can you please add a comment or change the 'xxxCached` method names to better match their purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. For that you'll need to look at Iceberg's DeleteFilter. Let me think about to best comment that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments. Lemme know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time to point out the memoization technique in applying the delete filters.
I'm curious what are the downsides of using the memoization technique that make org.apache.iceberg.data.DeleteFilter#filter not using this technique instead.

cc @rdblue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this has to do with Spark's partition at a time operation. In that case it can evaluate the filters in a streaming fashion.

See https://github.com/apache/iceberg/blob/c8b97c91ac04a2ee5ee8f746dcc4619a9c8d5ffe/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L232

So for many filtered rows this might be slower in Spark. In Trino it's "disaster" in either case as it is doing Page at a time.
Although I will say that when the set of deleted rows is large one should try to formulate the delete as an equality delete anyway.

Note that the fix the Iceberg folks are proposing is to always memoize the filters.

@lhofhansl
Copy link
Member Author

lhofhansl commented Jul 11, 2022

Can we add tests similar to what is found io.trino.plugin.iceberg.TestIcebergMetadataFileOperations to see how the newly added functionality actually works.

Let me look at that test. Since this is performance improvement with no correctness implications, it might be a bit tricky as what we should cache. Perhaps we can count the number of time the filter is re-read and parsed (update: I see that's exactly what TrackingFileIoProvider does). I'll think about.

(I'll be on the road the next few days, so might go a bit more slowly)

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from b1d01c5 to 07ba39c Compare July 11, 2022 09:00
@lhofhansl
Copy link
Member Author

Added comments and fixed and fixed Filter<>() nit. Thinking about the test.

@findinpath
Copy link
Contributor

Perhaps we can count the number of time the filter is re-read and parsed. I'll think about.

Note that the trino-iceberg module has already TrackingFileIoProvider, TrackingFileIo which can be employed for such purposes.

@findinpath
Copy link
Contributor

@lhofhansl FYI related work in also underway on Iceberg apache/iceberg#5195

@lhofhansl
Copy link
Member Author

lhofhansl commented Jul 11, 2022

Note that the trino-iceberg module has already TrackingFileIoProvider

It seems to track only metadata operations (not data operations)...?

related work in also underway on Iceberg apache/iceberg#5195

That would solve the problem in the exact same way. Should we wait for that instead?

Looking at the Iceberg code... If we do not cache the filter we can stream the data rows through the filters without completely materializing them. So for Spark it seems the caching could be detrimental.
It only helps Trino, because Trino operates on a Page at a time, and hence it is important to load the filters only once and then pass multiple Pages "through" it.

I'm fine either way :)
Let me know how you want to proceed.

@lhofhansl lhofhansl changed the title [WIP] Cache Iceberg equality and positional delete filters. Cache Iceberg equality and positional delete filters. Jul 11, 2022
@lhofhansl
Copy link
Member Author

Since Iceberg is considering the same strategy (cache the filters) I removed the WIP annotation.

@findinpath
Copy link
Contributor

That would solve the problem in the exact same way. Should we wait for that instead?

When the Iceberg PR lands, it may still take a while until the new Iceberg version is being released and integrated into Trino.
I'd recommend going forward with this fix (and corresponding test) and add a TODO in the code to point to the ongoing Iceberg PR.

Having a test in this PR would ensure that when eventually switching to the Iceberg fix there will be no performance penalty.

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from 07ba39c to 33c46e4 Compare July 12, 2022 13:15
@lhofhansl
Copy link
Member Author

Rebased for another test run. Still looking into the test.

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from 33c46e4 to 0ce2711 Compare July 12, 2022 15:36
@lhofhansl
Copy link
Member Author

OK.. Added a test.

Took me a while to realize that (a) delete filters are not read via newStream and (b) that TrackingFileIo does not track calls to location on input files. Meh :(

Then I had to fix up TestIcebergMetadataFileOperations because now also gets location on input files.

@rdblue
Copy link
Contributor

rdblue commented Jul 12, 2022

@lhofhansl, I merged the Iceberg change to only load equality deletes once per DeleteFilter. Not sure how that affects this.

@lhofhansl
Copy link
Member Author

@rdblue So the Iceberg change won't fix the Trino problem with positional deletes.
When there is a release of Iceberg with this, we can slightly simplify this PR (I included equality deletes in this PR because the snapshots might be written by tools other Trino).

Curious, why only equality deletes and not also positional deletes?

@rdblue
Copy link
Contributor

rdblue commented Jul 12, 2022

Curious, why only equality deletes and not also positional deletes?

The PR only covered equality. Doing the same for positional deletes is next, although those aren't always held in memory.

@lhofhansl
Copy link
Member Author

I see. Thanks. This is mostly about positional deletes.

although those aren't always held in memory.

That's the part where I think could be detrimental to Spark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good short term improvement but I think we should try to avoid keeping all the deleted row numbers in memory if possible.

If we can assume splits for the same file will always come in together, and in order we could use a streaming/iterative read approach.

  • First Split for a file arrives, initialize a DeleteFilter
  • Second Split arrives for the same file, reuse the existing DeleteFilter
  • First Split arrives for a different file, close the old DeleteFilter and open a new one for the new file
  • repeat

But I'd want some validation from @findepi on if we can rely on ordering like that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single delete filter file can affect any data file in the partition. So I think we need to load all filters for the partition.
(Unless we open them ahead of time, Analyse them and the determine which ones affect which files)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be filtered using the min/max values of the path column, so only some of the partition's filters are fully opened

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for positional deletes specifically

Copy link
Member Author

@lhofhansl lhofhansl Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alexjo2144 . Are you saying that is already happening or something we need to add?
(In any case, whatever it is doing it is doing for every page without this change. :) )

@lhofhansl
Copy link
Member Author

I suggest we merge this. As is, Iceberg delete filters are useless with Trino (tiny queries just time out.)
Then we can (and should) come back and improve together with the Iceberg folks.

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from 0ce2711 to aba2dd8 Compare July 13, 2022 11:23
@lhofhansl lhofhansl force-pushed the delete_filter_cache branch 3 times, most recently from 85f0349 to cdf0ad6 Compare July 13, 2022 16:10
@findinpath
Copy link
Contributor

nit: remove . at the end of the commit message as recommended on https://github.com/trinodb/trino/blob/master/.github/DEVELOPMENT.md#commits-and-pull-requests

@alexjo2144
Copy link
Member

Another relevant PR from the Iceberg side: apache/iceberg#5264

@alexjo2144
Copy link
Member

It looks like the Iceberg community is considering a release soon anyway. Maybe we can just wait for that?

@lhofhansl
Copy link
Member Author

lhofhansl commented Jul 13, 2022

apache/iceberg#5264 won't fix their case when they decide not a materialize in a set. (hardcoded to 100k rows)
Ironically for Trino those are the worst cases where the giant filter is re-read for each page.

I am also not sure that this should be fixed in Iceberg itself. It should just provide the right API to implement it as we see fit. I prefer an explicit implementation in Trino. But that's just a preference.

In the end I do not really care as long as it gets fixed. :)

@lhofhansl lhofhansl force-pushed the delete_filter_cache branch from cdf0ad6 to d2e73c0 Compare July 13, 2022 18:30
@alexjo2144
Copy link
Member

Another relevant Issue I filed with the Iceberg community that would help this case apache/iceberg#5272

Not to say we should not make this change. Reopening the delete files for every page is definitely an issue

@lhofhansl
Copy link
Member Author

Is this waiting for something from me? (Just making sure)

@alexjo2144
Copy link
Member

So, I was thinking about this more and I think we can do something better here without waiting for the next Iceberg release. The main thing I'm worried about here is keeping all of the deleted rows in memory for each Split. The Iceberg cutoff at 100,000 records seems pretty reasonable so I'm not sure we should circumvent that.

Can we try refactoring the code a bit so that there's only one call to DeleteFilter#filter per split, using an Iterable that we can append to. That way each page can add new rows to the existing Iterable? That should allow us to use the streaming comparison for large files and the in memory approach for small ones.

I think that with the other improvements in the Iceberg codebase should help a lot.

Does that make sense?

@lhofhansl
Copy link
Member Author

lhofhansl commented Jul 18, 2022

Yep. If you can make that work. How would you match up the new delete positions with the new rows passed by the current page? Wanna open another PR and propose the change?

And let's also make sure we do not allow the perfect to be the enemy of the good. As is, V2 deletes are dangerous in Trino and we should disable them or fail immediately until we have it fixed. Caching at least makes them usable. If there's a is a large set of deleted rows, yes, there's a risk of high memory usage; folks should use a predicate (i.e. equality deletes) instead of the positional deletes anyway.

@lhofhansl
Copy link
Member Author

And how would you avoid that each page worth if rows is now passed through the entire set filters each time? We'd be trading more CPU for less memory.

@electrum
Copy link
Member

I don't think keeping all the positions in memory for the split is a problem. RoaringBitmap is very efficient and most Iceberg data files won't have more than a few million rows. I'm working on a change to reimplement delete handling natively in Trino and didn't bother with a streaming approach.

@lhofhansl
Copy link
Member Author

@electrum Should I close this one in favor of the coming native implementation?

@electrum
Copy link
Member

@lhofhansl thanks for your work on this. I started my native implementation before I was aware of your fix. It should be ready now: #13219

If you are able to test it out on real data, that would be much appreciated.

@lhofhansl
Copy link
Member Author

Closing in favor of #13219.

@lhofhansl lhofhansl closed this Jul 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Iceberg scanning with Delete Files is extremely/unusably slow

5 participants