Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public ConnectorPageSource createPageSource(
List<ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns));
return new RecordPageSource(recordSetProvider.getRecordSet(transaction, session, split, table, columns, dynamicFilter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@

public interface ConnectorRecordSetProvider
{
default RecordSet getRecordSet(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
List<? extends ColumnHandle> columns,
DynamicFilter dynamicFilter)
{
// By default, ignore dynamic filter (as it is an optimization and doesn't affect correctness).
return getRecordSet(transaction, session, split, table, columns);
}

@Deprecated
default RecordSet getRecordSet(
ConnectorTransactionHandle transaction,
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordSet;

import javax.inject.Inject;
Expand All @@ -42,6 +43,12 @@ public ClassLoaderSafeConnectorRecordSetProvider(@ForClassLoaderSafe ConnectorRe
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns, DynamicFilter dynamicFilter)
{
return getRecordSet(transaction, session, split, table, columns);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually pass the DynamicFIlter along.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should be something like

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
            return new ClassLoaderSafeRecordSet(delegate.getRecordSet(transaction, session, split, table, columns, dynamicFilter), classLoader);
        }

}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.jdbc;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.NotNull;

import java.util.concurrent.TimeUnit;

public class DynamicFilteringJdbcConfig
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a TestDynamicFilteringJdbcConfig similar to the way test classes are defined for other configs

{
private boolean enableDynamicFiltering;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default off? Should it be on?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can turn it on in a follow-up change after running benchmarks on it. We'll also want tests with actual connectors rather than just H2QueryRunner before we turn it on by default.

private Duration dynamicFilteringWaitTimeout = new Duration(0, TimeUnit.MINUTES);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a default of 20 seconds here, that will ensure that DF collection has sufficient time to complete without hanging too long. Also, if the build side gets too big, the DF collection will automatically bail out sooner.


public boolean isEnableDynamicFiltering()
{
return enableDynamicFiltering;
}

@Config("dynamic-filtering.enabled")
public DynamicFilteringJdbcConfig setEnableDynamicFiltering(boolean enableDynamicFiltering)
{
this.enableDynamicFiltering = enableDynamicFiltering;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getDynamicFilteringWaitTimeout()
{
return dynamicFilteringWaitTimeout;
}

@Config("dynamic-filtering.wait-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters")
public DynamicFilteringJdbcConfig setDynamicFilteringWaitTimeout(Duration timeout)
{
this.dynamicFilteringWaitTimeout = timeout;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import javax.inject.Inject;

import java.util.List;

import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;

public class DynamicFilteringJdbcSessionProperties
implements SessionPropertiesProvider
{
private static final String DYNAMIC_FILTERING_ENABLED = "dynamic_filtering_enabled";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";

private final List<PropertyMetadata<?>> properties;

@Inject
public DynamicFilteringJdbcSessionProperties(DynamicFilteringJdbcConfig config)
{
properties = ImmutableList.of(
booleanProperty(
DYNAMIC_FILTERING_ENABLED,
"If dynamic filtering is enabled",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Enable pushdown of dynamic filters to JDBC source"

config.isEnableDynamicFiltering(),
false),
durationProperty(
DYNAMIC_FILTERING_WAIT_TIMEOUT,
"Duration to wait for completion of dynamic filters",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Duration to wait for the collection of dynamic filters before starting JDBC query"

config.getDynamicFilteringWaitTimeout(),
false));
}

public static boolean isEnableDynamicFiltering(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_ENABLED, Boolean.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ public void setup(Binder binder)
configBinder(binder).bindConfig(BaseJdbcConfig.class);

configBinder(binder).bindConfig(TypeHandlingJdbcConfig.class);
configBinder(binder).bindConfig(DynamicFilteringJdbcConfig.class);
bindSessionPropertiesProvider(binder, TypeHandlingJdbcSessionProperties.class);
bindSessionPropertiesProvider(binder, JdbcMetadataSessionProperties.class);
bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class);
bindSessionPropertiesProvider(binder, DynamicFilteringJdbcSessionProperties.class);

binder.bind(CachingJdbcClient.class).in(Scopes.SINGLETON);
binder.bind(JdbcClient.class).to(Key.get(CachingJdbcClient.class)).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordSet;

import javax.inject.Inject;
Expand All @@ -29,7 +30,10 @@
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Verify.verify;
import static io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.jdbc.DynamicFilteringJdbcSessionProperties.isEnableDynamicFiltering;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class JdbcRecordSetProvider
implements ConnectorRecordSetProvider
Expand All @@ -44,6 +48,31 @@ public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorSer
this.executor = requireNonNull(executor, "executor is null");
}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns, DynamicFilter dynamicFilter)
{
if (isEnableDynamicFiltering(session)) {
long timeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis();
Copy link
Copy Markdown
Member

@lhofhansl lhofhansl Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find some weird behavior when trying this Postgres as a probe side. It almost seems random whether dynamic filtering is used or not.

I've set join_reordering_strategy='NONE' and also set dynamic_filtering_wait_timeout='10000ms'.

Executing the same join over and over I find some sometimes it uses dynamic filters and sometimes it does not.

In the debugger I see that a dynamic filter is passed here, but its tupledomain is "all".
I think this is outside of this code, but wondering if you have seen this too.
(The performance difference between the two cases is 250ms vs 38 seconds.)

All single node between two different JDBC datasources.

Copy link
Copy Markdown
Member

@lhofhansl lhofhansl Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: This happens only with PARTITIONED joins... As expected I guess since I am testing single node.

Is this waiting for something? Looks good to me.


// no time for further narrow down or done already
if (timeoutMillis == 0 || !dynamicFilter.isAwaitable()) {
JdbcTableHandle handle = ((JdbcTableHandle) table).withConstraint(dynamicFilter.getCurrentPredicate());
return getRecordSet(transaction, session, split, handle, columns);
}

try {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the try/catch we shouldn't be hiding any errors that result from DF pushdown

dynamicFilter.isBlocked().get(timeoutMillis, MILLISECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block an execution thread in the engine and prevent other splits from running, so we need to avoid this somehow.
You can apply dynamic filtering in JdbcSplitManager instead and use ConnectorSplitSource#getNextBatch to provide splits with DF using a future there. Changes to ConnectorRecordSetProvider SPI won't be necessary then.

JdbcTableHandle handle = ((JdbcTableHandle) table).withConstraint(dynamicFilter.getCurrentPredicate());
return getRecordSet(transaction, session, split, handle, columns);
}
catch (Exception e) {
return getRecordSet(transaction, session, split, table, columns);
}
}

return getRecordSet(transaction, session, split, table, columns);
}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ public JdbcTableHandle(
this.nextSyntheticColumnId = nextSyntheticColumnId;
}

public JdbcTableHandle withConstraint(TupleDomain<ColumnHandle> newConstraint)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withConstraint -> intersectedWithConstraint

{
TupleDomain<ColumnHandle> newDomain = constraint.intersect(newConstraint);
if (newDomain == constraint) {
return this;
}
Comment on lines +109 to +111
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition isn't really needed, we can keep it simple and remove it


return new JdbcTableHandle(relationHandle, newDomain, constraintExpressions, sortOrder, limit, columns, otherReferencedTables, nextSyntheticColumnId);
}

/**
* @deprecated Use {@code asPlainTable().getSchemaTableName()} instead, but see those methods for more information, as this is not a drop-in replacement.
*/
Expand Down
Loading