diff --git a/core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java b/core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java index b4460942de99..8f9ee326f7e4 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java @@ -41,24 +41,24 @@ public class DynamicFilterConfig private boolean enableCoordinatorDynamicFiltersDistribution = true; private boolean enableLargeDynamicFilters; - private int smallBroadcastMaxDistinctValuesPerDriver = 200; - private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(20, KILOBYTE); - private int smallBroadcastRangeRowLimitPerDriver = 400; - private DataSize smallBroadcastMaxSizePerOperator = DataSize.of(200, KILOBYTE); - private int smallPartitionedMaxDistinctValuesPerDriver = 20; - private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(10, KILOBYTE); - private int smallPartitionedRangeRowLimitPerDriver = 100; - private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(100, KILOBYTE); - private DataSize smallMaxSizePerFilter = DataSize.of(1, MEGABYTE); - - private int largeBroadcastMaxDistinctValuesPerDriver = 5_000; - private DataSize largeBroadcastMaxSizePerDriver = DataSize.of(500, KILOBYTE); - private int largeBroadcastRangeRowLimitPerDriver = 10_000; + private int smallBroadcastMaxDistinctValuesPerDriver = 1_000; + private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(100, KILOBYTE); + private int smallBroadcastRangeRowLimitPerDriver = 2_000; + private DataSize smallBroadcastMaxSizePerOperator = DataSize.of(1, MEGABYTE); + private int smallPartitionedMaxDistinctValuesPerDriver = 100; + private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE); + private int smallPartitionedRangeRowLimitPerDriver = 500; + private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE); + private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE); + + private int largeBroadcastMaxDistinctValuesPerDriver = 10_000; + private DataSize largeBroadcastMaxSizePerDriver = DataSize.of(2, MEGABYTE); + private int largeBroadcastRangeRowLimitPerDriver = 20_000; private DataSize largeBroadcastMaxSizePerOperator = DataSize.of(5, MEGABYTE); - private int largePartitionedMaxDistinctValuesPerDriver = 500; - private DataSize largePartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE); - private int largePartitionedRangeRowLimitPerDriver = 1_000; - private DataSize largePartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE); + private int largePartitionedMaxDistinctValuesPerDriver = 1_000; + private DataSize largePartitionedMaxSizePerDriver = DataSize.of(200, KILOBYTE); + private int largePartitionedRangeRowLimitPerDriver = 2_000; + private DataSize largePartitionedMaxSizePerOperator = DataSize.of(2, MEGABYTE); private DataSize largeMaxSizePerFilter = DataSize.of(5, MEGABYTE); public boolean isEnableDynamicFiltering() diff --git a/core/trino-main/src/main/java/io/trino/operator/DynamicFilterSourceOperator.java b/core/trino-main/src/main/java/io/trino/operator/DynamicFilterSourceOperator.java index 43aa623064fe..342e81d64e53 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DynamicFilterSourceOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/DynamicFilterSourceOperator.java @@ -58,6 +58,8 @@ public class DynamicFilterSourceOperator implements Operator { + private static final int EXPECTED_BLOCK_BUILDER_SIZE = 64; + public static class Channel { private final DynamicFilterId filterId; @@ -410,14 +412,13 @@ private ChannelFilter( if (collectMinMax) { minMaxComparison = blockTypeOperators.getComparisonUnorderedLastOperator(type); } - int expectedSize = Math.min(maxDistinctValues, 8192) * 2; - blockBuilder = type.createBlockBuilder(null, expectedSize); + blockBuilder = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE); valueSet = createUnboundedEqualityTypedSet( type, blockTypeOperators.getEqualOperator(type), blockTypeOperators.getHashCodeOperator(type), blockBuilder, - expectedSize, + Math.min(maxDistinctValues, 2048), format("DynamicFilterSourceOperator_%s_%d", planNodeId, channel.index)); } diff --git a/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java b/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java index 5d9d362401d4..ec4bad24cc03 100644 --- a/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java +++ b/core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java @@ -328,7 +328,7 @@ public CompletableFuture isBlocked() public boolean isComplete() { return dynamicFilters.stream() - .allMatch(context.getDynamicFilterSummaries()::containsKey); + .allMatch(filterId -> context.getDynamicFilterSummary(filterId).isPresent()); } @Override @@ -341,9 +341,12 @@ public boolean isAwaitable() @Override public TupleDomain getCurrentPredicate() { - Set completedDynamicFilters = dynamicFilters.stream() - .filter(filter -> context.getDynamicFilterSummaries().containsKey(filter)) - .collect(toImmutableSet()); + ImmutableMap.Builder completedFiltersBuilder = ImmutableMap.builder(); + for (DynamicFilterId filterId : dynamicFilters) { + Optional summary = context.getDynamicFilterSummary(filterId); + summary.ifPresent(domain -> completedFiltersBuilder.put(filterId, domain)); + } + Map completedDynamicFilters = completedFiltersBuilder.buildOrThrow(); CurrentDynamicFilter currentFilter = currentDynamicFilter.get(); if (currentFilter.getCompletedDynamicFiltersCount() >= completedDynamicFilters.size()) { @@ -352,8 +355,8 @@ public TupleDomain getCurrentPredicate() } TupleDomain dynamicFilter = TupleDomain.intersect( - completedDynamicFilters.stream() - .map(filter -> translateSummaryToTupleDomain(filter, context, symbolsMap, columnHandles, typeProvider)) + completedDynamicFilters.entrySet().stream() + .map(filter -> translateSummaryToTupleDomain(context.getSession(), filter.getKey(), filter.getValue(), symbolsMap, columnHandles, typeProvider)) .collect(toImmutableList())); // It could happen that two threads update currentDynamicFilter concurrently. @@ -425,18 +428,18 @@ public static Set getOutboundDynamicFilters(PlanFragment plan) @VisibleForTesting Optional getSummary(QueryId queryId, DynamicFilterId filterId) { - return Optional.ofNullable(dynamicFilterContexts.get(queryId).getDynamicFilterSummaries().get(filterId)); + return dynamicFilterContexts.get(queryId).getDynamicFilterSummary(filterId); } private TupleDomain translateSummaryToTupleDomain( + Session session, DynamicFilterId filterId, - DynamicFilterContext dynamicFilterContext, + Domain summary, Multimap descriptorMultimap, Map columnHandles, TypeProvider typeProvider) { Collection descriptors = descriptorMultimap.get(filterId); - Domain summary = dynamicFilterContext.getDynamicFilterSummaries().get(filterId); return TupleDomain.withColumnDomains(descriptors.stream() .collect(toImmutableMap( descriptor -> { @@ -447,7 +450,7 @@ private TupleDomain translateSummaryToTupleDomain( Type targetType = typeProvider.get(Symbol.from(descriptor.getInput())); Domain updatedSummary = descriptor.applyComparison(summary); if (!updatedSummary.getType().equals(targetType)) { - return applySaturatedCasts(metadata, functionManager, typeOperators, dynamicFilterContext.getSession(), updatedSummary, targetType); + return applySaturatedCasts(metadata, functionManager, typeOperators, session, updatedSummary, targetType); } return updatedSummary; }))); @@ -1005,6 +1008,15 @@ private Map getDynamicFilterSummaries() .collect(toImmutableMap(Map.Entry::getKey, entry -> getFutureValue(entry.getValue().getCollectedDomainFuture()))); } + private Optional getDynamicFilterSummary(DynamicFilterId filterId) + { + DynamicFilterCollectionContext context = dynamicFilterCollectionContexts.get(filterId); + if (context == null || !context.getCollectedDomainFuture().isDone()) { + return Optional.empty(); + } + return Optional.of(getFutureValue(context.getCollectedDomainFuture())); + } + private Map> getLazyDynamicFilters() { return lazyDynamicFilters; diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDynamicFilterConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestDynamicFilterConfig.java index 33c927229952..ad704a6b51f5 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDynamicFilterConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDynamicFilterConfig.java @@ -34,23 +34,23 @@ public void testDefaults() .setEnableDynamicFiltering(true) .setEnableCoordinatorDynamicFiltersDistribution(true) .setEnableLargeDynamicFilters(false) - .setSmallBroadcastMaxDistinctValuesPerDriver(200) - .setSmallBroadcastMaxSizePerDriver(DataSize.of(20, KILOBYTE)) - .setSmallBroadcastRangeRowLimitPerDriver(400) - .setSmallPartitionedMaxDistinctValuesPerDriver(20) - .setSmallBroadcastMaxSizePerOperator(DataSize.of(200, KILOBYTE)) - .setSmallPartitionedMaxSizePerDriver(DataSize.of(10, KILOBYTE)) - .setSmallPartitionedRangeRowLimitPerDriver(100) - .setSmallPartitionedMaxSizePerOperator(DataSize.of(100, KILOBYTE)) - .setSmallMaxSizePerFilter(DataSize.of(1, MEGABYTE)) - .setLargeBroadcastMaxDistinctValuesPerDriver(5000) - .setLargeBroadcastMaxSizePerDriver(DataSize.of(500, KILOBYTE)) - .setLargeBroadcastRangeRowLimitPerDriver(10_000) + .setSmallBroadcastMaxDistinctValuesPerDriver(1_000) + .setSmallBroadcastMaxSizePerDriver(DataSize.of(100, KILOBYTE)) + .setSmallBroadcastRangeRowLimitPerDriver(2_000) + .setSmallPartitionedMaxDistinctValuesPerDriver(100) + .setSmallBroadcastMaxSizePerOperator(DataSize.of(1, MEGABYTE)) + .setSmallPartitionedMaxSizePerDriver(DataSize.of(50, KILOBYTE)) + .setSmallPartitionedRangeRowLimitPerDriver(500) + .setSmallPartitionedMaxSizePerOperator(DataSize.of(500, KILOBYTE)) + .setSmallMaxSizePerFilter(DataSize.of(5, MEGABYTE)) + .setLargeBroadcastMaxDistinctValuesPerDriver(10_000) + .setLargeBroadcastMaxSizePerDriver(DataSize.of(2, MEGABYTE)) + .setLargeBroadcastRangeRowLimitPerDriver(20_000) .setLargeBroadcastMaxSizePerOperator(DataSize.of(5, MEGABYTE)) - .setLargePartitionedMaxDistinctValuesPerDriver(500) - .setLargePartitionedMaxSizePerDriver(DataSize.of(50, KILOBYTE)) - .setLargePartitionedRangeRowLimitPerDriver(1_000) - .setLargePartitionedMaxSizePerOperator(DataSize.of(500, KILOBYTE)) + .setLargePartitionedMaxDistinctValuesPerDriver(1_000) + .setLargePartitionedMaxSizePerDriver(DataSize.of(200, KILOBYTE)) + .setLargePartitionedRangeRowLimitPerDriver(2_000) + .setLargePartitionedMaxSizePerOperator(DataSize.of(2, MEGABYTE)) .setLargeMaxSizePerFilter(DataSize.of(5, MEGABYTE))); } @@ -63,20 +63,20 @@ public void testExplicitPropertyMappings() .put("enable-large-dynamic-filters", "true") .put("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "256") .put("dynamic-filtering.small-broadcast.max-size-per-driver", "64kB") - .put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "10000") + .put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "20000") .put("dynamic-filtering.small-broadcast.max-size-per-operator", "640kB") .put("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "256") .put("dynamic-filtering.small-partitioned.max-size-per-driver", "64kB") - .put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "10000") + .put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "20000") .put("dynamic-filtering.small-partitioned.max-size-per-operator", "641kB") .put("dynamic-filtering.small.max-size-per-filter", "341kB") .put("dynamic-filtering.large-broadcast.max-distinct-values-per-driver", "256") .put("dynamic-filtering.large-broadcast.max-size-per-driver", "64kB") - .put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "100000") + .put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "200000") .put("dynamic-filtering.large-broadcast.max-size-per-operator", "642kB") .put("dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "256") .put("dynamic-filtering.large-partitioned.max-size-per-driver", "64kB") - .put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000") + .put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "200000") .put("dynamic-filtering.large-partitioned.max-size-per-operator", "643kB") .put("dynamic-filtering.large.max-size-per-filter", "3411kB") .buildOrThrow(); @@ -87,20 +87,20 @@ public void testExplicitPropertyMappings() .setEnableLargeDynamicFilters(true) .setSmallBroadcastMaxDistinctValuesPerDriver(256) .setSmallBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE)) - .setSmallBroadcastRangeRowLimitPerDriver(10000) + .setSmallBroadcastRangeRowLimitPerDriver(20000) .setSmallBroadcastMaxSizePerOperator(DataSize.of(640, KILOBYTE)) .setSmallPartitionedMaxDistinctValuesPerDriver(256) .setSmallPartitionedMaxSizePerDriver(DataSize.of(64, KILOBYTE)) - .setSmallPartitionedRangeRowLimitPerDriver(10000) + .setSmallPartitionedRangeRowLimitPerDriver(20000) .setSmallPartitionedMaxSizePerOperator(DataSize.of(641, KILOBYTE)) .setSmallMaxSizePerFilter(DataSize.of(341, KILOBYTE)) .setLargeBroadcastMaxDistinctValuesPerDriver(256) .setLargeBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE)) - .setLargeBroadcastRangeRowLimitPerDriver(100000) + .setLargeBroadcastRangeRowLimitPerDriver(200000) .setLargeBroadcastMaxSizePerOperator(DataSize.of(642, KILOBYTE)) .setLargePartitionedMaxDistinctValuesPerDriver(256) .setLargePartitionedMaxSizePerDriver(DataSize.of(64, KILOBYTE)) - .setLargePartitionedRangeRowLimitPerDriver(100000) + .setLargePartitionedRangeRowLimitPerDriver(200000) .setLargePartitionedMaxSizePerOperator(DataSize.of(643, KILOBYTE)) .setLargeMaxSizePerFilter(DataSize.of(3411, KILOBYTE)); diff --git a/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java b/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java index 61c1a6637063..672c8ef0e993 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java +++ b/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java @@ -317,8 +317,7 @@ public boolean isAll() if (getRangeCount() != 1) { return false; } - RangeView onlyRange = getRangeView(0); - return onlyRange.isLowUnbounded() && onlyRange.isHighUnbounded(); + return isRangeLowUnbounded(0) && isRangeHighUnbounded(0); } @Override @@ -446,6 +445,16 @@ private RangeView getRangeView(int rangeIndex) rangeRight); } + private boolean isRangeLowUnbounded(int rangeIndex) + { + return sortedRanges.isNull(2 * rangeIndex); + } + + private boolean isRangeHighUnbounded(int rangeIndex) + { + return sortedRanges.isNull(2 * rangeIndex + 1); + } + @Override public Ranges getRanges() { diff --git a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java index b791e843781b..3eb201cd5f5d 100644 --- a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java +++ b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java @@ -67,6 +67,8 @@ protected QueryRunner createQueryRunner() .put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "100") .put("dynamic-filtering.large-broadcast.max-distinct-values-per-driver", "100") .put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "100000") + .put("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "100") + .put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "200") .put("dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "100") .put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000") // disable semi join to inner join rewrite to test semi join operators explicitly diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/TestFaultTolerantExecutionDynamicFiltering.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/TestFaultTolerantExecutionDynamicFiltering.java index f9c3d8b58a05..62ba8db2a1f2 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/TestFaultTolerantExecutionDynamicFiltering.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/TestFaultTolerantExecutionDynamicFiltering.java @@ -58,6 +58,8 @@ protected QueryRunner createQueryRunner() // keep limits lower to test edge cases .addExtraProperty("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "10") .addExtraProperty("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "10") + .addExtraProperty("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "100") + .addExtraProperty("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "400") .build(); }