Skip to content

Subquery cache & friends 2#22827

Open
sopel39 wants to merge 12 commits intotrinodb:masterfrom
sopel39:ks/subquery_cache_ks
Open

Subquery cache & friends 2#22827
sopel39 wants to merge 12 commits intotrinodb:masterfrom
sopel39:ks/subquery_cache_ks

Conversation

@sopel39
Copy link
Copy Markdown
Member

@sopel39 sopel39 commented Jul 26, 2024

Implement subquery cache for Hive/Iceberg/Delta

Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Supersedes: #21888

Fixes: #22117

@sopel39 sopel39 requested a review from martint July 26, 2024 12:56
@cla-bot cla-bot bot added the cla-signed label Jul 26, 2024
@github-actions github-actions bot added ui Web UI iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels Jul 26, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache_ks branch 2 times, most recently from 639385b to 64ed26c Compare July 26, 2024 13:16
@sopel39 sopel39 force-pushed the ks/subquery_cache_ks branch from 64ed26c to f9a0e1a Compare July 26, 2024 13:54
@sopel39 sopel39 force-pushed the ks/subquery_cache_ks branch 3 times, most recently from 831731a to beb14bb Compare July 30, 2024 13:03
* Might be empty if there isn't sufficient memory or split data is
* already cached.
*/
Optional<ConnectorPageSink> storePages(CacheSplitId splitId, TupleDomain<CacheColumnId> predicate, TupleDomain<CacheColumnId> unenforcedPredicate);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get a different predicate per split? If not, will it make more sense to pass it in PlanSignature alongside output columns and group by columns?

Copy link
Copy Markdown
Member Author

@sopel39 sopel39 Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to get different predicate for different splits with same PlanSignature as predicates are pruned/simplified against split domains.

.build();
TupleDomain<CacheColumnId> retainedTupleDomain = extractedTupleDomain
.filter((columnId, domain) -> retainedColumnIds.contains(columnId));
// Remaining expression and non-projected domains must be part of signature key
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so? Instead of encoding remainingTupleDomain in signatureKey, can we make it a field of PlanSignature, for the case CacheManager is able to apply it on the cached data?
For instance, assume CacheManager stored colA, colB. Then, colA is projected with a predicate on colA and colB, the predicate on colA is passed as an argument to SplitCache#loadPages while the predicate on colB is encoded in the key. But CacheManager might be able to apply the predicate on colA as much as it might be able to apply the predicate on colB.

Copy link
Copy Markdown
Member Author

@sopel39 sopel39 Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you cache data and PlanSignature has only colB, then predicate on colA is not meaningful for CacheManager, hence it needs to be stored in signatureKey

When loading data we could pass predicate on extra columns in loadPages indeed.

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Aug 6, 2024

Rebased and added #22117

@sopel39 sopel39 force-pushed the ks/subquery_cache_ks branch 2 times, most recently from 768d847 to a4411ee Compare August 7, 2024 14:02
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of a limit query we assume this method will be called while pageSink is still not null.
But if limit was pushed down to the connector, then connectorPageSource#isFinished may return true after returning one page with the required amount of positions. then, tableScanOperator#isFinish will return true as well and CacheDataOperator#finish will be called before CacheDataOperator#closed. pageSink won’t get aborted and the data will be stored in cache without awareness of limit.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Sep 4, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Sep 4, 2024
@mosabua
Copy link
Copy Markdown
Member

mosabua commented Sep 4, 2024

I assume you will continue on this @sopel39

@assaf2
Copy link
Copy Markdown
Member

assaf2 commented Sep 5, 2024

I assume you will continue on this @sopel39

We need a review from @martint to actually move forward

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Sep 5, 2024

Indeed, waiting for @martint review

@github-actions github-actions bot removed the stale label Sep 5, 2024
sopel39 and others added 8 commits December 6, 2024 14:21
These methods are required by subquery cache to describe
split data for cache key purpose.

ConnectorPageSourceProvider#getUnenforcedPredicate
is used to describe what unenforced predicate will be
applied on split data.

ConnectorPageSourceProvider#prunePredicate is used
to simplify filter predicates on per split bases
(e.g. removing paritioning predicates that fully
contain split data)

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
CacheManager is a set of SPI classes for implementing
split level cache storage.

MemoryCacheManager is a high-performance implementation of
CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent
rows produced by ConnectorPageSource for a given split.

Cache ids can also be used to canonicalise query plans
for the purpouse of comparison or cache key generation.

This commit implements cache ids for Hive, Iceberg, Delta and TPCH
connectors.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Dynamic filter id might be registered by both local join
and as coming from coordinator.
CanonicalSubplanExtractor creates a canonical
representation of a subplan using cache ids
provided by the connector. Canonical subplans
are used to compare plans against each other
and enable extracting of common subplans.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Co-authored-by: Raunaq Morarka <raunaqmorarka@gmail.com>
Goal: Try not to schedule splits with same
cache id at the same time.

This commit has changes at two level:

First, we re-arrange splits coming out of
CacheSplitSource such that splits with same
CacheSplitId has some separation and are not
processed in parallel.

Second at worker level, we try not to schedule
splits with same CacheSplitId. However, to make
progress, splits with same CacheSplitId will be
scheduled if there are no splits with unique
CachesplitId  This is needed to avoid potential deadlock.

Benchmarks:
Query:
select sum(orderkey / 10000), sum(partkey / 10000),
sum(suppkey / 10000), sum(linenumber), sum(quantity)
 from lineitem
union all
select sum(orderkey / 10000), sum(partkey / 10000),
sum(suppkey / 10000), sum(linenumber), sum(quantity)
from lineitem;

Query 20240614_234254_00006_ehfpk, FINISHED, 9 nodes
Splits: 9,909 total, 9,909 done (100.00%)
9.68 [6.21B rows, 107GB] [641M rows/s, 11.1GB/s]

As you can see we only read ~6B rows as compare
to 12B rows which is the actual data.
@sopel39 sopel39 force-pushed the ks/subquery_cache_ks branch from 081a2f2 to e078c24 Compare December 6, 2024 13:23
@github-actions
Copy link
Copy Markdown

This pull request has gone a while without any activity. Tagging for triage help: @mosabua

@github-actions github-actions bot added the stale label Dec 27, 2024
@github-actions
Copy link
Copy Markdown

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Jan 20, 2025
@sopel39 sopel39 reopened this Jan 20, 2025
@mosabua mosabua added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Jan 20, 2025
@mosabua
Copy link
Copy Markdown
Member

mosabua commented Jan 20, 2025

I added stale-ignore to prevent closure of this PR @sopel39 under the assumption that you will continue to drive this.

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Jan 20, 2025

It all depends on @martint if he wants to review it

@jirislav
Copy link
Copy Markdown

jirislav commented Jul 8, 2025

The hard work that was done here is really useful, it's sad to see that the review is stuck. Maybe someone someone else could review this instead?

@hhhonzik
Copy link
Copy Markdown

hhhonzik commented Jul 8, 2025

This feature would be very welcomed for us as well.

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Jul 8, 2025

@jirislav @hhhonzik is you are interested in this, please reach out to me on Trino Slack (sopel39) or linkedin (https://www.linkedin.com/in/karol-sobczak-a7b19a10/)

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Nov 17, 2025

This PR introduces an important feature that significantly improves performance and also lays the foundations for future innovation in the domain of “smart” MVs and subquery caching. Competing OSS projects already have similar capabilities, while Trino is lagging behind. Although the code in this PR is already running in production, it has not received any review from @martint , who is supposed to be the language and planner lead according to https://trino.io/development/roles.html. However, @martint is also the CTO of Starburst and, in my view, there is a significant conflict of interest that most likely results in blocking this particular effort. If there is a corresponding effort from Starburst, then either it should be open-sourced or @martint should step aside and let other maintainers review and merge this code. In any case, community efforts should not be blocked, as this contradicts the OSS spirit and the Trino code of conduct.

cc @trinodb/maintainers

@rdsarvar
Copy link
Copy Markdown

Is there any path forward for this feature? This would significantly improve my experience on Trino as CTE cacheing is currently an expensive hurdle. Has it been brought up at the contributor call?

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Dec 31, 2025

Is there any path forward for this feature? This would significantly improve my experience on Trino as CTE cacheing is currently an expensive hurdle. Has it been brought up at the contributor call?

@rdsarvar PTAL at the my #22827 (comment) above. I brought the issue up to Trino leadership, but to no avail. Please reach out to me on DM if you are interested in subquery cache.

@shazi7804
Copy link
Copy Markdown

@sopel39 I very interesting this feature. We have any workaround unit trino support?

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Feb 4, 2026

@sopel39 I very interesting this feature. We have any workaround unit trino support?

No workaround unfortunately

@shohamyamin
Copy link
Copy Markdown
Contributor

@mosabua what's the best way to get this moving? maybe making the PR smaller so it's easier to review?

@shohamyamin
Copy link
Copy Markdown
Contributor

@sopel39 do you have any performance benchmarks or metrics for this? I'd love to understand the scale of the impact this feature will have.

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Mar 18, 2026

@mosabua what's the best way to get this moving? maybe making the PR smaller so it's easier to review?

It's not a matter of PR size

@sopel39 do you have any performance benchmarks or metrics for this? I'd love to understand the scale of the impact this feature will have.

depending on query gains are substantial, especially in repeated workloads

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector performance stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. ui Web UI

Development

Successfully merging this pull request may close these issues.

Order split scheduling between stages for common subqueries