Skip to content

Materialize tables during planning#23426

Closed
electrum wants to merge 8 commits intotrinodb:masterfrom
electrum:materialize
Closed

Materialize tables during planning#23426
electrum wants to merge 8 commits intotrinodb:masterfrom
electrum:materialize

Conversation

@electrum
Copy link
Copy Markdown
Member

@electrum electrum commented Sep 15, 2024

Release notes

(x) Release notes are required, with the following suggested text:

# General
* Improve query performance by materializing small tables during planning. ({issue}`23426`)

@cla-bot cla-bot bot added the cla-signed label Sep 15, 2024
@electrum electrum force-pushed the materialize branch 9 times, most recently from 6a2b8d6 to 64ff37b Compare September 17, 2024 02:03
@wendigo
Copy link
Copy Markdown
Contributor

wendigo commented Sep 17, 2024

Can you share benchmarks?

@github-actions github-actions bot added the jdbc Relates to Trino JDBC driver label Sep 17, 2024
@sajjoseph
Copy link
Copy Markdown
Contributor

Could this help with CTEs - especially with similar subplans?
I know @sopel39 has this PR - #22827

Looking forward to see these PRs getting merged soon.

@electrum electrum force-pushed the materialize branch 2 times, most recently from 58b509a to c01ebd9 Compare September 18, 2024 02:35
@github-actions github-actions bot added the iceberg Iceberg connector label Sep 18, 2024
Copy link
Copy Markdown
Member

@sopel39 sopel39 Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this support dynamic filters? IMO it should, then it could replace #22527 potentially cc @raunaqmorarka @Dith3r

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it mean to support them? This seems like an alternative.

Copy link
Copy Markdown
Member

@sopel39 sopel39 Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it mean to support them?

There are queries (both user and benchmarks), where there are cascading DFs. So you have 3 table scans:

-> date_dim#1
-> date_dim#2
-> super_large_fact_table

where date_dim#2 depends on DF originating from date_dim#1 and
super_large_fact_table depends on DF originating from date_dim#2.

super_large_fact_table scan size depends on date_dim#2 getting filtered by DF from date_dim#1, which is what #22527 addresses by explicitly waiting for DFs.

To support DFs here you would have to collect them during planning when you materialize table scans (just as it happens during actual execution) and apply them on depending table scans.
Alternatively, we could still use #22527, but it would have to be adjusted so that it waits for DFs on top of ValuesNode.

If this PR clashes with #22527 it will introduce pretty substantial regressions

cc @raunaqmorarka @Dith3r

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. I'm still confused as to how dynamic filters are relevant here as this happens during planning, so it's not dynamic at all. After the table is materialized, we should apply further pushdown based on the actual values.

My expectation is that there is lots of follow up work to be done in the planner to take better advantage of values, and to fix bugs such as introducing unnecessary cross joins. This is follow up work for someone such as yourself who understands the optimizer.

Also, how can conflicts with the other PR be a regression if it's not merged yet?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. I'm still confused as to how dynamic filters are relevant here as this happens during planning

DFs in this case are used for narrowing materialized table content, which is important for downstream joins. This can be done "statically" during planning as you materialize tables.

After the table is materialized, we should apply further pushdown based on the actual values.

Yes, that could work if the table is not too large.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that DynamicFilter is simply TupleDomain which is actually a subset of the pushdown that happens in applyFilter(), so we shouldn't need anything additional here. We seem to collect that properly for values today.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that DynamicFilter is simply TupleDomain which is actually a subset of the pushdown that happens

Keep in mind that TupleDomain was optimized recently to be more memory efficient and to handle larger sets. When we start doing predicate pushdown using large number of Expressions I think this will kill the planner, so the materialization row limit should be quite low IMO

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC queries can be huge while they only yield few results. DF can change JDBC query duration from hours to seconds

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way to estimate that? We would want to skip those.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have a way to estimate the cost of the work done by the pushed down operations into table scan.
Every pushdown into a JDBC table handle will result in io.trino.sql.planner.plan.TableScanNode#statistics being revised with the estimate of the resulting output and we lose the information about estimate of original table at that point. One possibility is to remember the original table scan output estimate in a separate field and rely on that.
We had a somewhat similar need for avoiding a particular optimization on JDBC connectors in #22355 and ended up adding a new API io.trino.spi.connector.ConnectorMetadata#allowSplittingReadIntoMultipleSubQueries for that. Maybe we can explore generalizing/re-using that for this case too.
cc: @martint

private boolean materializeTable = true;
private int materializeTableMaxEstimatedRowCount = 50_000;
private int materializeTableMaxActualRowCount = 100_000;
private Duration materializeTableTimeout = new Duration(5, SECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds is quite long as a default, 1-2 seconds seems more reasonable

}
List<Split> batch = getFutureValue(splitSource.getNextBatch(1000)).getSplits();
for (Split split : batch) {
if (!split.isRemotelyAccessible() && !split.getAddresses().contains(currentNode)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For distributed caching implementations, split.getAddresses() is never going to contain current node when coordinator is not included for scheduling (which is the normal case in production). So I suggest dropping that check.

Copy link
Copy Markdown
Member Author

@electrum electrum Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to materialize as many tables as possible. Checking for current node allows us to materialize certain system tables, which allows queries such as the below to work:

SELECT * FROM t WHERE ds = (SELECT max(ds) FROM "t$partitions")

implements Rule<T>
permits MaterializeFilteredTableScan, MaterializeTableScan
{
private static final int MAX_SPLITS = 10_000;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems rather big for 100K rows

}
}
splits.addAll(batch);
if (splits.size() > MAX_SPLITS) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pull out a large number of splits upfront ?
It would be nicer to iterate over smaller batches of splits as they become available. This way we can cut off on maxRows threshold or timeout without having to generate a lot of splits which also potentially take up a lot of memory.

TableScanNode tableScan = captures.get(TABLE_SCAN);

Constraint constraint = Optional.of(filter.getPredicate())
.map(predicate -> filterConjuncts(predicate, expression -> !DynamicFilters.isDynamicFilter(expression)))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import

return Optional.empty();
}

private Optional<List<Expression>> doMaterializeTable(Session session, TableHandle table, List<ColumnHandle> columns, List<Type> types, Constraint constraint)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that we don't end up re-running this for the same table many times during the planning process ?
I think any change to Filter or Table scan nodes in the pushIntoTableScanRulesExceptJoins iterative optimizer loop would keep re-triggering this Rule

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want to run this again after a filter is pushed? A table might be too large initially, but comes eligible when when more information is available.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A table might be too large initially, but comes eligible when when more information is available.

But that would mean multiple few secs delays to bail-out, right? Splits would be enumerated multiple times. JDBC queries would also be triggered on remote system multiple times.

I think it should have just one shot after all eligible pushdowns, BUT then we also want materialized predicate to be used for further pushdowns. It's chicken and egg problem.

My guess it should just try once and predicates derived from materialized tables should be applied on ValuesNodes. It would solve 90% of small table cases this way.

@Override
public Pattern<TableScanNode> getPattern()
{
return tableScan()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually store the Pattern in a private static final member variable

public final Result apply(T node, Captures captures, Context context)
{
int maxRows = getMaterializeTableMaxEstimatedRowCount(context.getSession());
PlanNodeStatsEstimate stats = context.getStatsProvider().getStats(node);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is potentially problematic for filtered tables.
The scan materialization is only going to benefit from the part of the filter that is enforced by the connector page or split source, but the estimate is going to include the filtering from the entire predicate.
Maybe this should just be estimate of output of TableScan as that should include any filtering that we gain from enforcedConstraint.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. How do I fetch the stats for that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have a way to estimate the cost of the work done by the pushed down operations into table scan.
Every pushdown into a JDBC table handle will result in io.trino.sql.planner.plan.TableScanNode#statistics being revised with the estimate of the resulting output and we lose the information about estimate of original table at that point. One possibility is to remember the original table scan output estimate in a separate field and rely on that.
We had a somewhat similar need for avoiding a particular optimization on JDBC connectors in #22355 and ended up adding a new API io.trino.spi.connector.ConnectorMetadata#allowSplittingReadIntoMultipleSubQueries for that. Maybe we can explore generalizing/re-using that for this case too.
cc: @martint

Comment on lines +657 to +658
.add(new MaterializeFilteredTableScan(plannerContext, splitManager, pageSourceManager, nodeManager, executor))
.add(new MaterializeTableScan(plannerContext, splitManager, pageSourceManager, nodeManager, executor))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be too early in the planning process for this optimization. Ideally, we want to delay this until after as much predicate and projection pushdown as possible has happened.
I think we should at least do it after or near io.trino.sql.planner.optimizations.MetadataQueryOptimizer runs as that is a similar optimization as this one and we probably want that one to run that one first.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have NO IDEA where to put this optimization. I just looked around and guessed. If you can tell me exactly where to put it, that would be great. Or if you want to take over this PR, I'm happy for that too..

@electrum
Copy link
Copy Markdown
Member Author

Thanks @sopel39 and @raunaqmorarka for the valuable feedback. Based on your feedback and discussions with @dain, it seems like trying to do this generically in the engine for all connectors is problematic. I'll close this and go back to the original idea of letting connectors make this decision.

@electrum electrum closed this Sep 20, 2024
@electrum electrum deleted the materialize branch September 20, 2024 17:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed iceberg Iceberg connector jdbc Relates to Trino JDBC driver

Development

Successfully merging this pull request may close these issues.

5 participants