Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public final class SystemSessionProperties
public static final String ENABLE_LARGE_DYNAMIC_FILTERS = "enable_large_dynamic_filters";
public static final String ENABLE_DYNAMIC_ROW_FILTERING = "enable_dynamic_row_filtering";
public static final String DYNAMIC_ROW_FILTERING_SELECTIVITY_THRESHOLD = "dynamic_row_filtering_selectivity_threshold";
public static final String PREFERRED_DYNAMIC_FILTER_WAIT_TIMEOUT = "preferred_dynamic_filter_wait_timeout";
public static final String AWAITED_DYNAMIC_FILTER_MAX_ROW_COUNT = "awaited_dynamic_filter_max_row_count";
public static final String AWAITED_DYNAMIC_FILTER_MAX_NDV_COUNT = "awaited_dynamic_filter_max_ndv_count";
public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node";
public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences";
public static final String FILTERING_SEMI_JOIN_TO_INNER = "rewrite_filtering_semi_join_to_inner_join";
Expand Down Expand Up @@ -702,6 +705,23 @@ public SystemSessionProperties(
}
},
false),
durationProperty(
PREFERRED_DYNAMIC_FILTER_WAIT_TIMEOUT,
"Maximum preferred time to wait for awaitable dynamic filter before table scan is started",
dynamicFilterConfig.getPreferredDynamicFilterWaitTimeout(),
false),
longProperty(
AWAITED_DYNAMIC_FILTER_MAX_ROW_COUNT,
"Maximum number of rows for dynamic filter to be awaitable",
dynamicFilterConfig.getAwaitedDynamicFilterMaxRowCount(),
value -> validateNonNegativeLongValue(value, AWAITED_DYNAMIC_FILTER_MAX_ROW_COUNT),
false),
longProperty(
AWAITED_DYNAMIC_FILTER_MAX_NDV_COUNT,
"Maximum number of distinct values for dynamic filter to be awaitable",
dynamicFilterConfig.getAwaitedDynamicFilterMaxNdvCount(),
value -> validateNonNegativeLongValue(value, AWAITED_DYNAMIC_FILTER_MAX_NDV_COUNT),
false),
dataSizeProperty(
QUERY_MAX_MEMORY_PER_NODE,
"Maximum amount of memory a query can use per node",
Expand Down Expand Up @@ -1638,6 +1658,21 @@ public static double getDynamicRowFilterSelectivityThreshold(Session session)
return session.getSystemProperty(DYNAMIC_ROW_FILTERING_SELECTIVITY_THRESHOLD, Double.class);
}

public static Duration getPreferredDynamicFilterWaitTimeout(Session session)
{
return session.getSystemProperty(PREFERRED_DYNAMIC_FILTER_WAIT_TIMEOUT, Duration.class);
}

public static long getAwaitedDynamicFilterMaxRowCount(Session session)
{
return session.getSystemProperty(AWAITED_DYNAMIC_FILTER_MAX_ROW_COUNT, Long.class);
}

public static long getAwaitedDynamicFilterMaxNdvCount(Session session)
{
return session.getSystemProperty(AWAITED_DYNAMIC_FILTER_MAX_NDV_COUNT, Long.class);
}

public static DataSize getQueryMaxMemoryPerNode(Session session)
{
return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDuration;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig({
"dynamic-filtering-max-per-driver-row-count",
Expand All @@ -45,6 +48,9 @@ public class DynamicFilterConfig
private boolean enableDynamicRowFiltering = true;
private double dynamicRowFilterSelectivityThreshold = 0.7;

private Duration preferredDynamicFilterWaitTimeout = new Duration(20, SECONDS);
private long awaitedDynamicFilterMaxRowCount = 100_000;
private long awaitedDynamicFilterMaxNdvCount = 500;
/*
* dynamic-filtering.small.* and dynamic-filtering.large.* limits are applied when
* collected over a not pre-partitioned source (when join distribution type is
Expand Down Expand Up @@ -129,6 +135,48 @@ public DynamicFilterConfig setDynamicRowFilterSelectivityThreshold(double dynami
return this;
}

@MinDuration("0ms")
public Duration getPreferredDynamicFilterWaitTimeout()
{
return preferredDynamicFilterWaitTimeout;
}

@Config("preferred-dynamic-filter.wait-timeout")
@ConfigDescription("Maximum preferred time to wait for awaitable dynamic filter before table scan is started")
public DynamicFilterConfig setPreferredDynamicFilterWaitTimeout(Duration dynamicFilteringWaitTimeout)
{
this.preferredDynamicFilterWaitTimeout = dynamicFilteringWaitTimeout;
return this;
}

@Min(0)
public long getAwaitedDynamicFilterMaxRowCount()
{
return awaitedDynamicFilterMaxRowCount;
}

@Config("awaited-dynamic-filter.max-row-count")
@ConfigDescription("Maximum number of rows for dynamic filter to be awaited")
public DynamicFilterConfig setAwaitedDynamicFilterMaxRowCount(long awaitedDynamicFilterMaxRowCount)
{
this.awaitedDynamicFilterMaxRowCount = awaitedDynamicFilterMaxRowCount;
return this;
}

@Min(0)
public long getAwaitedDynamicFilterMaxNdvCount()
{
return awaitedDynamicFilterMaxNdvCount;
}

@Config("awaited-dynamic-filter.max-ndv-count")
@ConfigDescription("Maximum number of distinct values for dynamic filter to be awaited")
public DynamicFilterConfig setAwaitedDynamicFilterMaxNdvCount(long awaitedDynamicFilterMaxNdvCount)
{
this.awaitedDynamicFilterMaxNdvCount = awaitedDynamicFilterMaxNdvCount;
return this;
}

@Min(0)
public int getSmallMaxDistinctValuesPerDriver()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -70,6 +71,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.google.common.base.Functions.identity;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -286,10 +288,10 @@ public DynamicFilter createDynamicFilter(
return EMPTY;
}

List<ListenableFuture<Void>> lazyDynamicFilterFutures = dynamicFilters.stream()
.map(context.getLazyDynamicFilters()::get)
.filter(Objects::nonNull)
.collect(toImmutableList());
Map<DynamicFilterId, ListenableFuture<Void>> lazyDynamicFilterFutures = dynamicFilters.stream()
.filter(dynamicFilterId -> context.getLazyDynamicFilters().containsKey(dynamicFilterId))
.collect(toImmutableMap(Function.identity(), dynamicFilterId -> context.getLazyDynamicFilters().get(dynamicFilterId)));

AtomicReference<CurrentDynamicFilter> currentDynamicFilter = new AtomicReference<>(new CurrentDynamicFilter(0, TupleDomain.all()));

Set<ColumnHandle> columnsCovered = symbolsMap.values().stream()
Expand All @@ -298,6 +300,22 @@ public DynamicFilter createDynamicFilter(
.map(probeSymbol -> requireNonNull(columnHandles.get(probeSymbol), () -> "Missing probe column for " + probeSymbol))
.collect(toImmutableSet());

Map<DynamicFilterId, OptionalLong> dynamicFilterTimeouts = dynamicFilterDescriptors.stream()
.collect(toImmutableMap(
DynamicFilters.Descriptor::getId,
DynamicFilters.Descriptor::getPreferredTimeout,
(timeout1, timeout2) -> {
if (timeout1.isPresent() && timeout2.isPresent()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could use io.trino.util.Optionals#combine

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require similar function which is based on OptionalLong.

return OptionalLong.of(Long.max(timeout1.getAsLong(), timeout2.getAsLong()));
}
else if (timeout1.isPresent()) {
return timeout1;
}
else {
return timeout2;
}
}));

return new DynamicFilter()
{
@Override
Expand All @@ -310,7 +328,7 @@ public Set<ColumnHandle> getColumnsCovered()
public CompletableFuture<?> isBlocked()
{
// wait for any of the requested dynamic filter domains to be completed
List<ListenableFuture<Void>> undoneFutures = lazyDynamicFilterFutures.stream()
List<ListenableFuture<Void>> undoneFutures = lazyDynamicFilterFutures.values().stream()
.filter(future -> !future.isDone())
.collect(toImmutableList());

Expand All @@ -331,7 +349,7 @@ public boolean isComplete()
@Override
public boolean isAwaitable()
{
return lazyDynamicFilterFutures.stream()
return lazyDynamicFilterFutures.values().stream()
.anyMatch(future -> !future.isDone());
}

Expand Down Expand Up @@ -363,6 +381,33 @@ public TupleDomain<ColumnHandle> getCurrentPredicate()
currentDynamicFilter.set(new CurrentDynamicFilter(completedDynamicFilters.size(), dynamicFilter));
return dynamicFilter;
}

@Override
public OptionalLong getPreferredDynamicFilterTimeout()
{
long longestPreferredTimeout = -1;
boolean unestimatedDynamicFilter = false;

for (Map.Entry<DynamicFilterId, ListenableFuture<Void>> lazyDynamicFilterFuture : lazyDynamicFilterFutures.entrySet()) {
if (!lazyDynamicFilterFuture.getValue().isDone()) {
OptionalLong preferredTimeout = dynamicFilterTimeouts.get(lazyDynamicFilterFuture.getKey());
if (preferredTimeout != null) {
if (preferredTimeout.isPresent()) {
longestPreferredTimeout = Long.max(preferredTimeout.getAsLong(), longestPreferredTimeout);
}
else {
unestimatedDynamicFilter = true;
}
}
}
}

if (longestPreferredTimeout == -1 || (unestimatedDynamicFilter && longestPreferredTimeout == 0)) {
// at least one unestimated dynamic filter timeout was found and rest of DF are not awaitable
return OptionalLong.empty();
}
return OptionalLong.of(longestPreferredTimeout);
}
};
}

Expand Down
Loading