Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for external engines #84

Merged
merged 8 commits into from
Jan 20, 2025
Merged

Add support for external engines #84

merged 8 commits into from
Jan 20, 2025

Conversation

ryannedolan
Copy link
Collaborator

@ryannedolan ryannedolan commented Jan 15, 2025

Summary

This adds support for installing remote query engines, e.g. Trino, DuckDB, or Flink SQL Gateway.

  • Added Engine CRD.
  • Added k8s.engines metadata table.
  • Added RemoteTableScan, RemoteJoin, associated optimizer rules.

Details

The Hoptimator JDBC Driver is able to talk to remote Databases, but it previously relied on Calcite's Enumerable engine to process queries locally. For example, joining tables in two different Databases would involve first fetching the rows from each table and then joining locally in the driver itself.

With Engines, we can outsource these operations to fast, distributed query engines like Trino. Queries are sent off to the remote engine, and the Driver simply collects the results.

We don't expect remote engines to have the same "catalog" of databases and tables that Hoptimator knows about -- Hoptimator's idea of foo.bar may or may not match Trino's, for example. For this reason, we fully-specify the tables used by each query, sending DDL statements ahead of each query. To do this, we leverage the same PipelineRel mechanism that we use for deploying a pipeline. This means that the remote query will use the same connectors and configuration as an equivalent pipeline.

Testing

Without an Engine installed, a query must be processed locally via the Enumerable convention:

0: Hoptimator> explain plan for select * from ads.ad_clicks, profile.members;
PLAN  EnumerableNestedLoopJoin(condition=[true], joinType=[inner])
  JdbcToEnumerableConverter
    JdbcTableScan(table=[[ADS, AD_CLICKS]])
  JdbcToEnumerableConverter
    JdbcTableScan(table=[[PROFILE, MEMBERS]])


1 row selected (0.062 seconds)

The EnumerableNestedLoopJoin would be very slow for large datasets.

After installing an engine, we see that the query plan now involves a RemoteJoin instead:

0: Hoptimator> explain plan for select * from ads.ad_clicks, profile.members;
PLAN  RemoteToEnumerableConverter
  RemoteJoin(condition=[true], joinType=[inner])
    PipelineTableScan(table=[[ADS, AD_CLICKS]])
    PipelineTableScan(table=[[PROFILE, MEMBERS]])


1 row selected (0.041 seconds)

The RemoteJoin is able to leverage Trino or similar distributed query engines. For example, we are able to leverage our Flink session cluster via the Flink SQL gateway:

0: Hoptimator> select * from ads.ad_clicks, profile.members;
+------------------------------------------------------------------------------+
|                                             CAMPAIGN_URN                     |
+------------------------------------------------------------------------------+
| dad91062e84d5979ec74b5b0d0bf8e838158bed8de7a5f1b7aa0e90ffc45ea47767945bbe577 |
| f2dddb0778af2fbd2d79cd6950d44e26112413443dce63c2673c30949a99a469201388c84485 |
+------------------------------------------------------------------------------+

In the above results, the data is generated via Flink's datagen connector.

Copy link
Collaborator

@jogrogan jogrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the planner able to distinguish when it should and shouldn't use these remote rules?

@ryannedolan
Copy link
Collaborator Author

Is the planner able to distinguish when it should and shouldn't use these remote rules?

Yes, via four mechanisms:

  1. Whether an engine is installed or not. If there are no engines in the current namespace, the planner will just run queries locally as before.
  2. The database(s) involved. Engines can target specific databases or all databases. If a query involves a database that isn't supported by any engine, that part of the query will fall back to local execution.
  3. The rules only match certain types of query/sub-query. Some queries won't match and fall back.
  4. Cost models. Right now I've hardcoded zero cost for remote queries, but we should be able to be smarter here.

Eventually we may want to add more details to the Engine CRD, e.g. to specify the engine's capabilities. That metadata could theoretically inform the planner better.

Internally, we can install different engines that target different databases, e.g. Trino can target offline while Flink targets nearline.

@ryannedolan ryannedolan force-pushed the engines branch 6 times, most recently from ad0b0c1 to eba9397 Compare January 16, 2025 19:08
@ryannedolan ryannedolan marked this pull request as ready for review January 16, 2025 19:14
Copy link
Collaborator

@jogrogan jogrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super cool functionality

@ryannedolan ryannedolan changed the title WIP: Add support for external engines Add support for external engines Jan 16, 2025
@ryannedolan ryannedolan enabled auto-merge (squash) January 20, 2025 19:57
@ryannedolan ryannedolan merged commit 0e19953 into main Jan 20, 2025
1 check passed
@ryannedolan ryannedolan deleted the engines branch January 20, 2025 20:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants