Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ public class DynamicFilterConfig
private boolean enableCoordinatorDynamicFiltersDistribution = true;
private boolean enableLargeDynamicFilters;

private int smallBroadcastMaxDistinctValuesPerDriver = 200;
private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(20, KILOBYTE);
private int smallBroadcastRangeRowLimitPerDriver = 400;
private DataSize smallBroadcastMaxSizePerOperator = DataSize.of(200, KILOBYTE);
private int smallPartitionedMaxDistinctValuesPerDriver = 20;
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(10, KILOBYTE);
private int smallPartitionedRangeRowLimitPerDriver = 100;
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(100, KILOBYTE);
private DataSize smallMaxSizePerFilter = DataSize.of(1, MEGABYTE);

private int largeBroadcastMaxDistinctValuesPerDriver = 5_000;
private DataSize largeBroadcastMaxSizePerDriver = DataSize.of(500, KILOBYTE);
private int largeBroadcastRangeRowLimitPerDriver = 10_000;
private int smallBroadcastMaxDistinctValuesPerDriver = 1_000;
private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(100, KILOBYTE);
private int smallBroadcastRangeRowLimitPerDriver = 2_000;
private DataSize smallBroadcastMaxSizePerOperator = DataSize.of(1, MEGABYTE);
private int smallPartitionedMaxDistinctValuesPerDriver = 100;
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
private int smallPartitionedRangeRowLimitPerDriver = 500;
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE);

private int largeBroadcastMaxDistinctValuesPerDriver = 10_000;
private DataSize largeBroadcastMaxSizePerDriver = DataSize.of(2, MEGABYTE);
private int largeBroadcastRangeRowLimitPerDriver = 20_000;
private DataSize largeBroadcastMaxSizePerOperator = DataSize.of(5, MEGABYTE);
private int largePartitionedMaxDistinctValuesPerDriver = 500;
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
private int largePartitionedRangeRowLimitPerDriver = 1_000;
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
private int largePartitionedMaxDistinctValuesPerDriver = 1_000;
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(200, KILOBYTE);
private int largePartitionedRangeRowLimitPerDriver = 2_000;
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(2, MEGABYTE);
private DataSize largeMaxSizePerFilter = DataSize.of(5, MEGABYTE);

public boolean isEnableDynamicFiltering()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
public class DynamicFilterSourceOperator
implements Operator
{
private static final int EXPECTED_BLOCK_BUILDER_SIZE = 64;

public static class Channel
{
private final DynamicFilterId filterId;
Expand Down Expand Up @@ -410,14 +412,13 @@ private ChannelFilter(
if (collectMinMax) {
minMaxComparison = blockTypeOperators.getComparisonUnorderedLastOperator(type);
}
int expectedSize = Math.min(maxDistinctValues, 8192) * 2;
blockBuilder = type.createBlockBuilder(null, expectedSize);
blockBuilder = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE);
valueSet = createUnboundedEqualityTypedSet(
type,
blockTypeOperators.getEqualOperator(type),
blockTypeOperators.getHashCodeOperator(type),
blockBuilder,
expectedSize,
Math.min(maxDistinctValues, 2048),
format("DynamicFilterSourceOperator_%s_%d", planNodeId, channel.index));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public CompletableFuture<?> isBlocked()
public boolean isComplete()
{
return dynamicFilters.stream()
.allMatch(context.getDynamicFilterSummaries()::containsKey);
.allMatch(filterId -> context.getDynamicFilterSummary(filterId).isPresent());
}

@Override
Expand All @@ -341,9 +341,12 @@ public boolean isAwaitable()
@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
Set<DynamicFilterId> completedDynamicFilters = dynamicFilters.stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the trick here? Avoiding stream?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each invocation to getDynamicFilterSummaries was creating a new map of completed dynamic filters and then we would lookup filter in that map.
I just simplified that to lookup in existing map directly.

.filter(filter -> context.getDynamicFilterSummaries().containsKey(filter))
.collect(toImmutableSet());
ImmutableMap.Builder<DynamicFilterId, Domain> completedFiltersBuilder = ImmutableMap.builder();
for (DynamicFilterId filterId : dynamicFilters) {
Optional<Domain> summary = context.getDynamicFilterSummary(filterId);
summary.ifPresent(domain -> completedFiltersBuilder.put(filterId, domain));
}
Map<DynamicFilterId, Domain> completedDynamicFilters = completedFiltersBuilder.buildOrThrow();

CurrentDynamicFilter currentFilter = currentDynamicFilter.get();
if (currentFilter.getCompletedDynamicFiltersCount() >= completedDynamicFilters.size()) {
Expand All @@ -352,8 +355,8 @@ public TupleDomain<ColumnHandle> getCurrentPredicate()
}

TupleDomain<ColumnHandle> dynamicFilter = TupleDomain.intersect(
completedDynamicFilters.stream()
.map(filter -> translateSummaryToTupleDomain(filter, context, symbolsMap, columnHandles, typeProvider))
completedDynamicFilters.entrySet().stream()
.map(filter -> translateSummaryToTupleDomain(context.getSession(), filter.getKey(), filter.getValue(), symbolsMap, columnHandles, typeProvider))
.collect(toImmutableList()));

// It could happen that two threads update currentDynamicFilter concurrently.
Expand Down Expand Up @@ -425,18 +428,18 @@ public static Set<DynamicFilterId> getOutboundDynamicFilters(PlanFragment plan)
@VisibleForTesting
Optional<Domain> getSummary(QueryId queryId, DynamicFilterId filterId)
{
return Optional.ofNullable(dynamicFilterContexts.get(queryId).getDynamicFilterSummaries().get(filterId));
return dynamicFilterContexts.get(queryId).getDynamicFilterSummary(filterId);
}

private TupleDomain<ColumnHandle> translateSummaryToTupleDomain(
Session session,
DynamicFilterId filterId,
DynamicFilterContext dynamicFilterContext,
Domain summary,
Multimap<DynamicFilterId, DynamicFilters.Descriptor> descriptorMultimap,
Map<Symbol, ColumnHandle> columnHandles,
TypeProvider typeProvider)
{
Collection<DynamicFilters.Descriptor> descriptors = descriptorMultimap.get(filterId);
Domain summary = dynamicFilterContext.getDynamicFilterSummaries().get(filterId);
return TupleDomain.withColumnDomains(descriptors.stream()
.collect(toImmutableMap(
descriptor -> {
Expand All @@ -447,7 +450,7 @@ private TupleDomain<ColumnHandle> translateSummaryToTupleDomain(
Type targetType = typeProvider.get(Symbol.from(descriptor.getInput()));
Domain updatedSummary = descriptor.applyComparison(summary);
if (!updatedSummary.getType().equals(targetType)) {
return applySaturatedCasts(metadata, functionManager, typeOperators, dynamicFilterContext.getSession(), updatedSummary, targetType);
return applySaturatedCasts(metadata, functionManager, typeOperators, session, updatedSummary, targetType);
}
return updatedSummary;
})));
Expand Down Expand Up @@ -1005,6 +1008,15 @@ private Map<DynamicFilterId, Domain> getDynamicFilterSummaries()
.collect(toImmutableMap(Map.Entry::getKey, entry -> getFutureValue(entry.getValue().getCollectedDomainFuture())));
}

private Optional<Domain> getDynamicFilterSummary(DynamicFilterId filterId)
{
DynamicFilterCollectionContext context = dynamicFilterCollectionContexts.get(filterId);
if (context == null || !context.getCollectedDomainFuture().isDone()) {
return Optional.empty();
}
return Optional.of(getFutureValue(context.getCollectedDomainFuture()));
}

private Map<DynamicFilterId, SettableFuture<Void>> getLazyDynamicFilters()
{
return lazyDynamicFilters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ public void testDefaults()
.setEnableDynamicFiltering(true)
.setEnableCoordinatorDynamicFiltersDistribution(true)
.setEnableLargeDynamicFilters(false)
.setSmallBroadcastMaxDistinctValuesPerDriver(200)
.setSmallBroadcastMaxSizePerDriver(DataSize.of(20, KILOBYTE))
.setSmallBroadcastRangeRowLimitPerDriver(400)
.setSmallPartitionedMaxDistinctValuesPerDriver(20)
.setSmallBroadcastMaxSizePerOperator(DataSize.of(200, KILOBYTE))
.setSmallPartitionedMaxSizePerDriver(DataSize.of(10, KILOBYTE))
.setSmallPartitionedRangeRowLimitPerDriver(100)
.setSmallPartitionedMaxSizePerOperator(DataSize.of(100, KILOBYTE))
.setSmallMaxSizePerFilter(DataSize.of(1, MEGABYTE))
.setLargeBroadcastMaxDistinctValuesPerDriver(5000)
.setLargeBroadcastMaxSizePerDriver(DataSize.of(500, KILOBYTE))
.setLargeBroadcastRangeRowLimitPerDriver(10_000)
.setSmallBroadcastMaxDistinctValuesPerDriver(1_000)
.setSmallBroadcastMaxSizePerDriver(DataSize.of(100, KILOBYTE))
.setSmallBroadcastRangeRowLimitPerDriver(2_000)
.setSmallPartitionedMaxDistinctValuesPerDriver(100)
.setSmallBroadcastMaxSizePerOperator(DataSize.of(1, MEGABYTE))
.setSmallPartitionedMaxSizePerDriver(DataSize.of(50, KILOBYTE))
.setSmallPartitionedRangeRowLimitPerDriver(500)
.setSmallPartitionedMaxSizePerOperator(DataSize.of(500, KILOBYTE))
.setSmallMaxSizePerFilter(DataSize.of(5, MEGABYTE))
.setLargeBroadcastMaxDistinctValuesPerDriver(10_000)
.setLargeBroadcastMaxSizePerDriver(DataSize.of(2, MEGABYTE))
.setLargeBroadcastRangeRowLimitPerDriver(20_000)
.setLargeBroadcastMaxSizePerOperator(DataSize.of(5, MEGABYTE))
.setLargePartitionedMaxDistinctValuesPerDriver(500)
.setLargePartitionedMaxSizePerDriver(DataSize.of(50, KILOBYTE))
.setLargePartitionedRangeRowLimitPerDriver(1_000)
.setLargePartitionedMaxSizePerOperator(DataSize.of(500, KILOBYTE))
.setLargePartitionedMaxDistinctValuesPerDriver(1_000)
.setLargePartitionedMaxSizePerDriver(DataSize.of(200, KILOBYTE))
.setLargePartitionedRangeRowLimitPerDriver(2_000)
.setLargePartitionedMaxSizePerOperator(DataSize.of(2, MEGABYTE))
.setLargeMaxSizePerFilter(DataSize.of(5, MEGABYTE)));
}

Expand All @@ -63,20 +63,20 @@ public void testExplicitPropertyMappings()
.put("enable-large-dynamic-filters", "true")
.put("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "256")
.put("dynamic-filtering.small-broadcast.max-size-per-driver", "64kB")
.put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "10000")
.put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "20000")
.put("dynamic-filtering.small-broadcast.max-size-per-operator", "640kB")
.put("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "256")
.put("dynamic-filtering.small-partitioned.max-size-per-driver", "64kB")
.put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "10000")
.put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "20000")
.put("dynamic-filtering.small-partitioned.max-size-per-operator", "641kB")
.put("dynamic-filtering.small.max-size-per-filter", "341kB")
.put("dynamic-filtering.large-broadcast.max-distinct-values-per-driver", "256")
.put("dynamic-filtering.large-broadcast.max-size-per-driver", "64kB")
.put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "100000")
.put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "200000")
.put("dynamic-filtering.large-broadcast.max-size-per-operator", "642kB")
.put("dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "256")
.put("dynamic-filtering.large-partitioned.max-size-per-driver", "64kB")
.put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000")
.put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "200000")
.put("dynamic-filtering.large-partitioned.max-size-per-operator", "643kB")
.put("dynamic-filtering.large.max-size-per-filter", "3411kB")
.buildOrThrow();
Expand All @@ -87,20 +87,20 @@ public void testExplicitPropertyMappings()
.setEnableLargeDynamicFilters(true)
.setSmallBroadcastMaxDistinctValuesPerDriver(256)
.setSmallBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE))
.setSmallBroadcastRangeRowLimitPerDriver(10000)
.setSmallBroadcastRangeRowLimitPerDriver(20000)
.setSmallBroadcastMaxSizePerOperator(DataSize.of(640, KILOBYTE))
.setSmallPartitionedMaxDistinctValuesPerDriver(256)
.setSmallPartitionedMaxSizePerDriver(DataSize.of(64, KILOBYTE))
.setSmallPartitionedRangeRowLimitPerDriver(10000)
.setSmallPartitionedRangeRowLimitPerDriver(20000)
.setSmallPartitionedMaxSizePerOperator(DataSize.of(641, KILOBYTE))
.setSmallMaxSizePerFilter(DataSize.of(341, KILOBYTE))
.setLargeBroadcastMaxDistinctValuesPerDriver(256)
.setLargeBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE))
.setLargeBroadcastRangeRowLimitPerDriver(100000)
.setLargeBroadcastRangeRowLimitPerDriver(200000)
.setLargeBroadcastMaxSizePerOperator(DataSize.of(642, KILOBYTE))
.setLargePartitionedMaxDistinctValuesPerDriver(256)
.setLargePartitionedMaxSizePerDriver(DataSize.of(64, KILOBYTE))
.setLargePartitionedRangeRowLimitPerDriver(100000)
.setLargePartitionedRangeRowLimitPerDriver(200000)
.setLargePartitionedMaxSizePerOperator(DataSize.of(643, KILOBYTE))
.setLargeMaxSizePerFilter(DataSize.of(3411, KILOBYTE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ public boolean isAll()
if (getRangeCount() != 1) {
return false;
}
RangeView onlyRange = getRangeView(0);
return onlyRange.isLowUnbounded() && onlyRange.isHighUnbounded();
return isRangeLowUnbounded(0) && isRangeHighUnbounded(0);
}

@Override
Expand Down Expand Up @@ -446,6 +445,16 @@ private RangeView getRangeView(int rangeIndex)
rangeRight);
}

private boolean isRangeLowUnbounded(int rangeIndex)
{
return sortedRanges.isNull(2 * rangeIndex);
}

private boolean isRangeHighUnbounded(int rangeIndex)
{
return sortedRanges.isNull(2 * rangeIndex + 1);
}

@Override
public Ranges getRanges()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ protected QueryRunner createQueryRunner()
.put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "100")
.put("dynamic-filtering.large-broadcast.max-distinct-values-per-driver", "100")
.put("dynamic-filtering.large-broadcast.range-row-limit-per-driver", "100000")
.put("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "100")
.put("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "200")
.put("dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "100")
.put("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000")
// disable semi join to inner join rewrite to test semi join operators explicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ protected QueryRunner createQueryRunner()
// keep limits lower to test edge cases
.addExtraProperty("dynamic-filtering.small-partitioned.max-distinct-values-per-driver", "10")
.addExtraProperty("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "10")
.addExtraProperty("dynamic-filtering.small-partitioned.range-row-limit-per-driver", "100")
.addExtraProperty("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "400")
.build();
}

Expand Down