Skip to content

Enable dynamic filtering in JDBC connector#8137

Closed
jerryleooo wants to merge 2 commits intotrinodb:masterfrom
jerryleooo:impl-jdbc-dynamic-filtering
Closed

Enable dynamic filtering in JDBC connector#8137
jerryleooo wants to merge 2 commits intotrinodb:masterfrom
jerryleooo:impl-jdbc-dynamic-filtering

Conversation

@jerryleooo
Copy link
Copy Markdown
Member

@jerryleooo jerryleooo commented May 29, 2021

#7968

depends on #8046

@cla-bot cla-bot bot added the cla-signed label May 29, 2021
@jerryleooo jerryleooo marked this pull request as draft May 29, 2021 08:47
@jerryleooo jerryleooo changed the title implement dynamic filtering for ConnectorRecordSetProvider Enable dynamic filtering for JDBC connectors May 29, 2021
@jerryleooo jerryleooo changed the title Enable dynamic filtering for JDBC connectors Enable dynamic filtering in JDBC connector May 29, 2021
@jerryleooo jerryleooo force-pushed the impl-jdbc-dynamic-filtering branch 2 times, most recently from 74c2852 to fb02adb Compare June 3, 2021 15:06
@jerryleooo jerryleooo marked this pull request as ready for review June 3, 2021 15:06
@jerryleooo jerryleooo requested a review from findepi June 3, 2021 15:06
@jerryleooo jerryleooo marked this pull request as draft June 4, 2021 04:12
@jerryleooo jerryleooo force-pushed the impl-jdbc-dynamic-filtering branch from 2fa8276 to f98a4d1 Compare June 7, 2021 05:44
@jerryleooo jerryleooo marked this pull request as ready for review June 7, 2021 05:44
@jerryleooo jerryleooo added the jdbc Relates to Trino JDBC driver label Jun 9, 2021
@jerryleooo jerryleooo requested a review from ebyhr June 9, 2021 08:48
@ebyhr ebyhr added the needs-docs This pull request requires changes to the documentation label Jun 12, 2021
Copy link
Copy Markdown
Member

@ebyhr ebyhr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some initial comments.

@jerryleooo
Copy link
Copy Markdown
Member Author

@ebyhr updated as commented, can see if need further review, thanks for the hard work!

@jerryleooo jerryleooo requested review from ebyhr and sopel39 June 16, 2021 08:55
@jerryleooo
Copy link
Copy Markdown
Member Author

Hi @ebyhr @sopel39 thanks for the hard work and can help suggest what I still need to do to get this PR approved?

@haldes
Copy link
Copy Markdown
Contributor

haldes commented Jun 26, 2021

@jerryleooo Thanks for this feature. We also want to use this feature for few of the massive join use case. Waiting for this ticket to be merged.
Also wanted to understand in case the build side emits a large no. of keys (say a 100K keys), how will this behave. For columns like varchar, integers , how will these keys be pushed to the probe side? Databases like PostgresSQL has a IN clause limit of 32767. So will it split the 100K keys into batches of 32767?

@jerryleooo
Copy link
Copy Markdown
Member Author

Hi @haldes thanks for the interest!
I think "dynamic filtering" has some configurations about the maximum values on per driver, like max-distinct-values-per-driver and max-size-per-driver. Can check them on https://trino.io/docs/current/admin/dynamic-filtering.html

Correct me if I misunderstood @sopel39 @findepi @ebyhr

@wolfgangcolsman
Copy link
Copy Markdown

For the "massive join case" I created #8519. I kept this separate as this is a principle issue, not only related to joins.
One issue we found when developing this functionality is the tuple domain simplification on VACHAR values when they contain random text such as guids that are not suitable for range queries. We configured compaction threashold to use Integer.MAX

@jibinpt
Copy link
Copy Markdown

jibinpt commented Jul 26, 2021

@jerryleooo Thanks for working on this feature. Would like to use this feature. Is there any chance that this feature would be merged to Trino master soon? Was able to see dynamic filtering kicking in after CP the commits.

@ebyhr ebyhr removed the jdbc Relates to Trino JDBC driver label Aug 16, 2021
@wolfgangcolsman
Copy link
Copy Markdown

What needs to be done to merge this? Can we move this forward?

@lhofhansl
Copy link
Copy Markdown
Member

lhofhansl commented Jun 8, 2022

Just started looking into this (see my slack comment on dev). Then found your PR (I started almost exactly the same way.) Let's finish one and get it in.
I'll try this out today.

@lhofhansl
Copy link
Copy Markdown
Member

I had a fix up a few things, but this works nicely. Should we finish this?

@jerryleooo jerryleooo force-pushed the impl-jdbc-dynamic-filtering branch from 1be1a71 to cd1674c Compare June 8, 2022 08:14
@jerryleooo
Copy link
Copy Markdown
Member Author

Thanks, @lhofhansl, I just rebased the code to resolve the conflict, let's see how the organization thinks?

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns, DynamicFilter dynamicFilter)
{
return getRecordSet(transaction, session, split, table, columns);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually pass the DynamicFIlter along.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should be something like

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
            return new ClassLoaderSafeRecordSet(delegate.getRecordSet(transaction, session, split, table, columns, dynamicFilter), classLoader);
        }

configBinder(binder).bindConfig(JdbcMetadataConfig.class);
configBinder(binder).bindConfig(JdbcWriteConfig.class);

configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is needed because phoenix doesn't derive from JdbcPlugin.
Changes for Phoenix connector should go into separate commit and need to be explicitly covered by query runner tests.


public class DynamicFilteringJdbcConfig
{
private boolean enableDynamicFiltering;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default off? Should it be on?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can turn it on in a follow-up change after running benchmarks on it. We'll also want tests with actual connectors rather than just H2QueryRunner before we turn it on by default.

@lhofhansl
Copy link
Copy Markdown
Member

@raunaqmorarka FYI :)

public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns, DynamicFilter dynamicFilter)
{
if (isEnableDynamicFiltering(session)) {
long timeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis();
Copy link
Copy Markdown
Member

@lhofhansl lhofhansl Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find some weird behavior when trying this Postgres as a probe side. It almost seems random whether dynamic filtering is used or not.

I've set join_reordering_strategy='NONE' and also set dynamic_filtering_wait_timeout='10000ms'.

Executing the same join over and over I find some sometimes it uses dynamic filters and sometimes it does not.

In the debugger I see that a dynamic filter is passed here, but its tupledomain is "all".
I think this is outside of this code, but wondering if you have seen this too.
(The performance difference between the two cases is 250ms vs 38 seconds.)

All single node between two different JDBC datasources.

Copy link
Copy Markdown
Member

@lhofhansl lhofhansl Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: This happens only with PARTITIONED joins... As expected I guess since I am testing single node.

Is this waiting for something? Looks good to me.

@lhofhansl
Copy link
Copy Markdown
Member

Ping... Seems to be quite useful, especially when Trino is used as a general query platform between many big and small data sources.

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns, DynamicFilter dynamicFilter)
{
return getRecordSet(transaction, session, split, table, columns);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should be something like

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
            return new ClassLoaderSafeRecordSet(delegate.getRecordSet(transaction, session, split, table, columns, dynamicFilter), classLoader);
        }


public class DynamicFilteringJdbcConfig
{
private boolean enableDynamicFiltering;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can turn it on in a follow-up change after running benchmarks on it. We'll also want tests with actual connectors rather than just H2QueryRunner before we turn it on by default.

public class DynamicFilteringJdbcConfig
{
private boolean enableDynamicFiltering;
private Duration dynamicFilteringWaitTimeout = new Duration(0, TimeUnit.MINUTES);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a default of 20 seconds here, that will ensure that DF collection has sufficient time to complete without hanging too long. Also, if the build side gets too big, the DF collection will automatically bail out sooner.

this.nextSyntheticColumnId = nextSyntheticColumnId;
}

public JdbcTableHandle withConstraint(TupleDomain<ColumnHandle> newConstraint)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withConstraint -> intersectedWithConstraint

Comment on lines +109 to +111
if (newDomain == constraint) {
return this;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition isn't really needed, we can keep it simple and remove it

return getRecordSet(transaction, session, split, handle, columns);
}

try {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the try/catch we shouldn't be hiding any errors that result from DF pushdown

}

@Test
public void testSemiJoinDynamicFilteringNone()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default semi joins with the query shapes we're using here get rewritten to inner joins by the optimizer. We don't need explicit testing of semi join in these tests, so you can just remove semijoin tests.

1, 1, 1);
}

private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to assert on specific values of expectedRowCount and expectedOperatorRowsRead here.
We should run the query with DF explicitly disabled, getPhysicalInputPositions from QueryStats and assert that the value is less than the same value from running the query with DF enabled.
We should also use assertEqualsIgnoreOrder to ensure that results are the same with DF enabled/disabled.

}

try {
dynamicFilter.isBlocked().get(timeoutMillis, MILLISECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block an execution thread in the engine and prevent other splits from running, so we need to avoid this somehow.
You can apply dynamic filtering in JdbcSplitManager instead and use ConnectorSplitSource#getNextBatch to provide splits with DF using a future there. Changes to ConnectorRecordSetProvider SPI won't be necessary then.

configBinder(binder).bindConfig(JdbcMetadataConfig.class);
configBinder(binder).bindConfig(JdbcWriteConfig.class);

configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is needed because phoenix doesn't derive from JdbcPlugin.
Changes for Phoenix connector should go into separate commit and need to be explicitly covered by query runner tests.

@raunaqmorarka
Copy link
Copy Markdown
Member

raunaqmorarka commented Jul 25, 2022

Thank you for your work on this. I'm closing this in favour of #13334 which will implement this functionality with a more holistic approach and more complete testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed needs-docs This pull request requires changes to the documentation performance

Development

Successfully merging this pull request may close these issues.

9 participants