Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/src/main/sphinx/connector/jdbc-common-configurations.fragment
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ connector:
Do not change this setting from the default. Non-default values may
negatively impact performance.
- ``1000``
* - ``dynamic-filtering.enabled``
- Push down dynamic filters into JDBC queries
- ``true``
* - ``dynamic-filtering.wait-timeout``
- Maximum duration for which Trino will wait for dynamic filters to be
collected from the build side of joins before starting a JDBC query.
Using a large timeout can potentially result in more detailed dynamic filters.
However, it can also increase latency for some queries.
- ``20s``
* - ``domain-compaction-threshold``
- Minimum size of query predicates above which Trino compacts the predicates.
Pushing down a large list of predicates to the data source can compromise
performance. For optimization in that situation, Trino can compact the large
predicates into a simpler range predicate. If necessary, adjust the threshold
to ensure a balance between performance and predicate pushdown.
- ``32``
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;

import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

/**
* Attaches dynamic filter to {@link JdbcSplit} after {@link JdbcDynamicFilteringSplitManager}
* has waited for the collection of dynamic filters.
* This allows JDBC based connectors to avoid waiting for dynamic filters again on the worker node
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
* in {@link JdbcRecordSetProvider}. The number of splits generated by JDBC based connectors are
* typically small, therefore attaching dynamic filter here does not add significant overhead.
* Waiting for dynamic filters in {@link JdbcDynamicFilteringSplitManager} is preferred over waiting
* for them on the worker node in {@link JdbcRecordSetProvider} to allow connectors to take advantage of
* dynamic filters during the splits generation phase.
*/
public class DynamicFilteringJdbcSplitSource
implements ConnectorSplitSource
{
private final ConnectorSplitSource delegateSplitSource;
private final DynamicFilter dynamicFilter;
private final JdbcTableHandle tableHandle;

DynamicFilteringJdbcSplitSource(ConnectorSplitSource delegateSplitSource, DynamicFilter dynamicFilter, JdbcTableHandle tableHandle)
{
this.delegateSplitSource = requireNonNull(delegateSplitSource, "delegateSplitSource is null");
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
}

@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
{
if (!isEligibleForDynamicFilter(tableHandle)) {
return delegateSplitSource.getNextBatch(maxSize);
}
return delegateSplitSource.getNextBatch(maxSize)
.thenApply(batch -> {
TupleDomain<JdbcColumnHandle> dynamicFilterPredicate = dynamicFilter.getCurrentPredicate()
Comment thread
sopel39 marked this conversation as resolved.
Outdated
.transformKeys(JdbcColumnHandle.class::cast);
return new ConnectorSplitBatch(
batch.getSplits().stream()
// attach dynamic filter constraint to JdbcSplit
.map(split -> {
JdbcSplit jdbcSplit = (JdbcSplit) split;
// If split was a subclass of JdbcSplit, there would be additional information
// that we would need to pass further on.
verify(jdbcSplit.getClass() == JdbcSplit.class, "Unexpected split type %s", jdbcSplit);
return jdbcSplit.withDynamicFilter(dynamicFilterPredicate);
})
.collect(toImmutableList()),
batch.isNoMoreSplits());
});
}

@Override
public void close()
{
delegateSplitSource.close();
}

@Override
public boolean isFinished()
{
return delegateSplitSource.isFinished();
}

public static boolean isEligibleForDynamicFilter(JdbcTableHandle tableHandle)
{
// don't pushdown predicate through limit as it could reduce performance
return tableHandle.getLimit().isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.airlift.stats.CounterStat;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import io.trino.spi.connector.DynamicFilter;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DynamicFilteringStats
{
private final CounterStat totalDynamicFilters = new CounterStat();
private final CounterStat completedDynamicFilters = new CounterStat();
private final CounterStat awaitableDynamicFilters = new CounterStat();
private final TimeStat dynamicFilterWaitingTime = new TimeStat(MILLISECONDS);

@Managed
@Nested
public CounterStat getTotalDynamicFilters()
{
return totalDynamicFilters;
}

@Managed
@Nested
public CounterStat getCompletedDynamicFilters()
{
return completedDynamicFilters;
}

@Managed
@Nested
public CounterStat getAwaitableDynamicFilters()
{
return awaitableDynamicFilters;
}

@Managed
@Nested
public TimeStat getDynamicFilterWaitingTime()
{
return dynamicFilterWaitingTime;
}

public void processDynamicFilter(DynamicFilter dynamicFilter, Duration waitingTime)
{
totalDynamicFilters.update(1);
if (dynamicFilter.isComplete()) {
completedDynamicFilters.update(1);
}
if (dynamicFilter.isAwaitable()) {
awaitableDynamicFilters.update(1);
}
dynamicFilterWaitingTime.add(waitingTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForJdbcDynamicFiltering
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import io.airlift.slice.SizeOf;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;

import java.util.Objects;
import java.util.Optional;

import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.util.Objects.requireNonNull;

public final class JdbcColumnHandle
implements ColumnHandle
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(JdbcColumnHandle.class).instanceSize();

private final String columnName;
private final JdbcTypeHandle jdbcTypeHandle;
private final Type columnType;
Expand Down Expand Up @@ -136,6 +142,16 @@ public String toString()
jdbcTypeHandle.getJdbcTypeName().orElse(null));
}

public long getRetainedSizeInBytes()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate commit

{
// columnType is not accounted for as the instances are cached (by TypeRegistry) and shared
return INSTANCE_SIZE
+ sizeOf(nullable)
+ estimatedSizeOf(columnName)
+ sizeOf(comment, SizeOf::estimatedSizeOf)
+ jdbcTypeHandle.getRetainedSizeInBytes();
}

public static Builder builder()
{
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import javax.validation.constraints.NotNull;

import static java.util.concurrent.TimeUnit.SECONDS;

public class JdbcDynamicFilteringConfig
{
private boolean dynamicFilteringEnabled = true;
// 20s allows DF collection from dimensional tables as well as
// some larger build side subqueries
private Duration dynamicFilteringWaitTimeout = new Duration(20, SECONDS);

public boolean isDynamicFilteringEnabled()
{
return dynamicFilteringEnabled;
}

@Config("dynamic-filtering.enabled")
@ConfigDescription("Wait for dynamic filters before starting JDBC query")
public JdbcDynamicFilteringConfig setDynamicFilteringEnabled(boolean dynamicFilteringEnabled)
{
this.dynamicFilteringEnabled = dynamicFilteringEnabled;
return this;
}

@NotNull
public Duration getDynamicFilteringWaitTimeout()
{
return dynamicFilteringWaitTimeout;
}

@Config("dynamic-filtering.wait-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters")
public JdbcDynamicFilteringConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout)
{
this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import javax.inject.Inject;

import java.util.List;

import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;

public class JdbcDynamicFilteringSessionProperties
implements SessionPropertiesProvider
{
public static final String DYNAMIC_FILTERING_ENABLED = "dynamic_filtering_enabled";
public static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public JdbcDynamicFilteringSessionProperties(JdbcDynamicFilteringConfig config)
{
sessionProperties = ImmutableList.of(
booleanProperty(
DYNAMIC_FILTERING_ENABLED,
"Wait for dynamic filters before starting JDBC query",
config.isDynamicFilteringEnabled(),
false),
durationProperty(
DYNAMIC_FILTERING_WAIT_TIMEOUT,
"Duration to wait for completion of dynamic filters",
config.getDynamicFilteringWaitTimeout(),
false));
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

public static boolean dynamicFilteringEnabled(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_ENABLED, Boolean.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
}
}
Loading