Skip to content

Bundle Iceberg reads into combined scan tasks#12359

Closed
alexjo2144 wants to merge 5 commits intotrinodb:masterfrom
alexjo2144:iceberg/experiment-split-bin-packing
Closed

Bundle Iceberg reads into combined scan tasks#12359
alexjo2144 wants to merge 5 commits intotrinodb:masterfrom
alexjo2144:iceberg/experiment-split-bin-packing

Conversation

@alexjo2144
Copy link
Copy Markdown
Member

@alexjo2144 alexjo2144 commented May 12, 2022

Reuse the bin packing done by the Iceberg table scan
planning by keeping small reads bundled together in
a single Split.

Description

Is this change a fix, improvement, new feature, refactoring, or other?

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

How would you describe this change to a non-technical end user or system administrator?

Related issues, pull requests, and links

Fixes: #12162

Documentation

( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
( ) Release notes entries required with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label May 12, 2022
@alexjo2144 alexjo2144 marked this pull request as draft May 12, 2022 22:04
Copy link
Copy Markdown
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this for UPDATE and DELETE makes the code complicated and hard to follow, and the amount of test coverage is not clear. After we have MERGE, we'll eventually replace the existing UPDATE and DELETE implementations to use it, so we won't need UpdatablePageSource anymore.

We can support this only for reads like we do in Raptor by only packing for reads. The table handle can have a flag to know if it's a write (it already does for UPDATE, just needs one for DELETE). Then the split source can skip packing for writes. On the page source provider side, skip creating the combined page source if there is only one split.

I suspect almost all of this complexity goes away if we only support reads.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This recomputes the values for every call. Instead, we can compute as we go, since the stats for completed page sources will not change. See io.trino.plugin.raptor.legacy.util.ConcatPageSource

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply add them all to a com.google.common.io.Closer which handles this automatically.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a builder class. This is basically just an ImmutableList.Builder.

Copy link
Copy Markdown
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the recently added support for multiple handles, no need to make changes to IcebergSplit. You can add a new class CombinedIcebergSplit that contains List<IcebergSplit>. Then just do a simple instanceof check in IcebergPageSourceProvider and turn the existing createPageSource() method into a private helper method that you call in a loop.

@alexjo2144 alexjo2144 force-pushed the iceberg/experiment-split-bin-packing branch from 599a255 to 300bc6d Compare May 13, 2022 20:02
@alexjo2144
Copy link
Copy Markdown
Member Author

alexjo2144 commented May 13, 2022

Doing this for UPDATE and DELETE makes the code complicated

Yeah, pretty much all of it is just there to get the rows back to the PageSource that generated them. I can try going with the original Split handling for update and delete.

Alternatively we could write one new data file and one delete file per batch, and not worry about delegating at all. The complexity there is just that the position delete file needs to be sorted by the data file path.

@findepi
Copy link
Copy Markdown
Member

findepi commented May 16, 2022

we could write one new data file and one delete file per batch

That sounds undesirable (more data to read at read-time, than if one deletion file per one input data file)

I like the idea to keep this simple, and apply it to read queries only. We may always revisit this later if needed.

@alexjo2144 alexjo2144 force-pushed the iceberg/experiment-split-bin-packing branch 2 times, most recently from b696472 to 3398ae3 Compare May 17, 2022 20:35
@przemekak
Copy link
Copy Markdown
Member

@alexjo2144
I can confirm that this PR increase performance of iceberg table scans with large amount of files.
so before it:

trino> select count(*) from iceberg.iceberg_tpch_sf1000_orc.lineitem_small_files;
   _col0
------------
 5999989709
(1 row)

Query 20220518_232650_00102_fmvhd, FINISHED, 6 nodes
Splits: 237,965 total, 237,965 done (100.00%)
4:57 [6B rows, 151GB] [20.2M rows/s, 522MB/s]

after:

trino> select count(*) from iceberg.iceberg_tpch_sf1000_orc.lineitem_small_files;
   _col0
------------
 5999989709
(1 row)

Query 20220519_001812_00007_d54gs, FINISHED, 6 nodes
Splits: 7,469 total, 7,469 done (100.00%)
29.20 [6B rows, 8.05TB] [205M rows/s, 282GB/s]

So its like 297s vs 30s, around 10x better.

@sopel39
Copy link
Copy Markdown
Member

sopel39 commented May 19, 2022

So with packing splits there is less FileScanTasks? Can single FileScanTask read multiple files? Why FileScanTask is a bottleneck? I suppose the same amount of IO needs to happen.

Generally we now have weighted splits support. It's used in Hive. Would it help here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try assigning split weight (as in hive) instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do weighted split work?
does it allow splits to be combined to avoid scheduling overhead?

Copy link
Copy Markdown
Member

@sopel39 sopel39 May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do weighted split work?

Check #9059. It's described there pretty well.

does it allow splits to be combined to avoid scheduling overhead?

Split scheduling overhead itself is minimal. Split queues might be too short if there is a lot of splits (so workers could get starved), but that's what weighted split scheduling solves. Overall I would use it instead of gluing splits together. Additional complexity of weighted splits is minimal and also splits could be balanced better.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that provided this solves the problem.

@alexjo2144 can you please make another version of the improvement, based on weighted splits?

Copy link
Copy Markdown
Member Author

@alexjo2144 alexjo2144 May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a stab at this here. Results are:

trino> select count(*) from iceberg.iceberg_tpch_sf1000_orc.lineitem_small_files;
   _col0    
------------
 5999989709 
(1 row)

Query 20220519_184353_00008_68mz2, FINISHED, 6 nodes
Splits: 237,965 total, 237,965 done (100.00%)
1:02 [6B rows, 151GB] [97.4M rows/s, 2.46GB/s]

So still faster than we have on master, but there seems to still be some extra overhead from the number of splits being so high.

I'm going to try a third branch with both the code here and SplitWeights

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's combined splits + split weights:

trino> select count(*) from iceberg.iceberg_tpch_sf1000_orc.lineitem_small_files;
   _col0    
------------
 5999989709 
(1 row)

Query 20220519_200949_00011_empm9, FINISHED, 6 nodes
Splits: 7,469 total, 7,469 done (100.00%)
24.89 [6B rows, 7.25TB] [241M rows/s, 298GB/s]

Copy link
Copy Markdown
Member

@sopel39 sopel39 May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So still faster than we have on master, but there seems to still be some extra overhead from the number of splits being so high.

How do you assign split weight? Is it similar to Hive connector approach?

but there seems to still be some extra overhead from the number of splits being so high.

That is an assumption. It can either be:

  • some kind of overhead (not yet known what)
  • workers still get starved as queue size is not sufficient even with weighted splits. You can actually try increasing split queue size (node-scheduler.max-splits-per-node, node-scheduler.max-pending-splits-per-task) to see if it helps. Is Iceberg generating splits fast enough? Check query.schedule-split-batch-size.

I would like to learn where the overhead comes from since we already have a mechanism (weighted splits) that should address the issue of multiple small splits. It would be better to fix/tune it rather than add yet another mechanism.

If the conclusion is that the overhead is X and it can be only addressed in Iceberg with merging splits, that's fine. However, if we start packing splits together for other connectors we should know what overhead we address that cannot be addressed by weighted splits.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to learn where the overhead comes from since we already have a mechanism (weighted splits) that should address the issue of multiple small splits. It would be better to fix/tune it rather than add yet another mechanism.

That's a good point

If the conclusion is that the overhead is X and it can be only addressed in Iceberg with merging splits, that's fine.

or we can even think of making engine-side equivalent of these changes, so connectors don't have to do this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can even think of making engine-side equivalent of these changes, so connectors don't have to do this

Sure, but then you probably don't need weighted split scheduling. Also, large splits are not neccecerly good because they provide less granularity of work which leads to work skewness (especially when there are more nodes)

@findepi
Copy link
Copy Markdown
Member

findepi commented May 19, 2022

cc @losipiuk

@findepi findepi marked this pull request as ready for review May 19, 2022 10:16
@findepi findepi dismissed electrum’s stale review May 19, 2022 10:16

comments applied

@findepi findepi requested a review from electrum May 19, 2022 10:16
Copy link
Copy Markdown
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skimming

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the semantics of the (undocumented) io.trino.spi.connector.ConnectorSplit#getAddresses?

Do we need to union or intersect these lists?
(concatenation is unlikely what we want)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize delegates lazily, they may allocate resources

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to unify this with io.trino.plugin.raptor.legacy.util.ConcatPageSource within plugin toolkit?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for simplicity we may checkArgument(!delegatePageSources.isEmpty() to avoid dead code

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please extract some refactorings?

This shows up as a single huge change, but i guess most code is just being moved around.
Please make it apparent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish has this.splitIterator = CloseableIterator.empty();

let's decide whether splitIterator is nullable and simplify finish or isfinished

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like something engine could (should?) provide directly to ConnectorSplitManager.

cc @losipiuk

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant change?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant else

Copy link
Copy Markdown
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending discussion #12359 (comment)

@alexjo2144 alexjo2144 marked this pull request as draft May 20, 2022 19:37
alexjo2144 and others added 5 commits May 26, 2022 14:19
Reuse the bin packing done by the Iceberg table scan
planning by keeping small reads bundled together in
a single Split.
@alexjo2144 alexjo2144 force-pushed the iceberg/experiment-split-bin-packing branch from 0b48662 to a149426 Compare May 26, 2022 18:21
@alexjo2144
Copy link
Copy Markdown
Member Author

Bundling did not end up having a significant improvement over using SplitWeights. Replacing this PR with #12579

@alexjo2144 alexjo2144 closed this May 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Slow scan performance on Iceberg table with many small files due to 1-1 mapping IcebergSplit to Iceberg single FileScanTask

5 participants