Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -138,6 +139,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";
public static final String LIST_BUILT_IN_FUNCTIONS_ONLY = "list_built_in_functions_only";
public static final String PARTITIONING_PRECISION_STRATEGY = "partitioning_precision_strategy";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -684,7 +686,19 @@ public SystemSessionProperties(
LIST_BUILT_IN_FUNCTIONS_ONLY,
"Only List built-in functions in SHOW FUNCTIONS",
featuresConfig.isListBuiltInFunctionsOnly(),
false));
false),
new PropertyMetadata<>(
PARTITIONING_PRECISION_STRATEGY,
format("The strategy to use to pick when to repartition. Options are %s",
Stream.of(PartitioningPrecisionStrategy.values())
.map(PartitioningPrecisionStrategy::name)
.collect(joining(","))),
VARCHAR,
PartitioningPrecisionStrategy.class,
featuresConfig.getPartitioningPrecisionStrategy(),
false,
value -> PartitioningPrecisionStrategy.valueOf(((String) value).toUpperCase()),
PartitioningPrecisionStrategy::name));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1164,4 +1178,10 @@ public static boolean isListBuiltInFunctionsOnly(Session session)
{
return session.getSystemProperty(LIST_BUILT_IN_FUNCTIONS_ONLY, Boolean.class);
}

public static boolean isExactPartitioningPreferred(Session session)
{
return session.getSystemProperty(PARTITIONING_PRECISION_STRATEGY, PartitioningPrecisionStrategy.class)
== PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public class FeaturesConfig

private boolean listBuiltInFunctionsOnly = true;

private PartitioningPrecisionStrategy partitioningPrecisionStrategy = PartitioningPrecisionStrategy.AUTOMATIC;

public enum PartitioningPrecisionStrategy
{
// Let Presto decide when to repartition
AUTOMATIC,
// Use exact partitioning until Presto becomes smarter WRT to picking when to repartition
PREFER_EXACT_PARTITIONING
}

public enum JoinReorderingStrategy
{
NONE,
Expand Down Expand Up @@ -1146,4 +1156,17 @@ public FeaturesConfig setListBuiltInFunctionsOnly(boolean listBuiltInFunctionsOn
this.listBuiltInFunctionsOnly = listBuiltInFunctionsOnly;
return this;
}

public PartitioningPrecisionStrategy getPartitioningPrecisionStrategy()
{
return partitioningPrecisionStrategy;
}

@Config("partitioning-precision-strategy")
@ConfigDescription("Set strategy used to determine whether to repartition (AUTOMATIC, PREFER_EXACT)")
public FeaturesConfig setPartitioningPrecisionStrategy(PartitioningPrecisionStrategy partitioningPrecisionStrategy)
{
this.partitioningPrecisionStrategy = partitioningPrecisionStrategy;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.concurrent.Immutable;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -241,6 +242,26 @@ public boolean isPartitionedOn(Collection<VariableReferenceExpression> columns,
return true;
}

public boolean isPartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> knownConstants)
{
Set<VariableReferenceExpression> toCheck = new HashSet<>();
for (RowExpression argument : arguments) {
// partitioned on (k_1, k_2, ..., k_n) => partitioned on (k_1, k_2, ..., k_n, k_n+1, ...)
// can safely ignore all constant columns when comparing partition properties
if (argument instanceof ConstantExpression) {
continue;
}
if (!(argument instanceof VariableReferenceExpression)) {
return false;
}
if (knownConstants.contains(argument)) {
continue;
}
toCheck.add((VariableReferenceExpression) argument);
}
return ImmutableSet.copyOf(columns).equals(toCheck);
}

public boolean isEffectivelySinglePartition(Set<VariableReferenceExpression> knownConstants)
{
return isPartitionedOn(ImmutableSet.of(), knownConstants);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,34 @@ public boolean isNullsAndAnyReplicated()
return global.isNullsAndAnyReplicated();
}

public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns)
public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
{
return isStreamPartitionedOn(columns, false);
return isStreamPartitionedOn(columns, false, exactly);
}

public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated)
public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
{
return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
if (exactly) {
return global.isStreamPartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
}
else {
return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
}
}

public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns)
public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
{
return isNodePartitionedOn(columns, false);
return isNodePartitionedOn(columns, false, exactly);
}

public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated)
public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
{
return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
if (exactly) {
return global.isNodePartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
}
else {
return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
}
}

@Deprecated
Expand Down Expand Up @@ -368,6 +378,9 @@ public static final class Global
// will be executed on multiple servers, but only one server will get all the data.

// Description of whether rows with nulls in partitioning columns or some arbitrary rows have been replicated to all *nodes*
// When doing an IN query NULL in empty set is false, NULL in non-empty set is NULL. Say non-NULL element A (number 1) in
// a set that is missing A ( say 2, 3) is false, but A in (2, 3, NULL) is NULL.
// IN is equivalent to "a = b OR a = c OR a = d...).
private final boolean nullsAndAnyReplicated;

private Global(Optional<Partitioning> nodePartitioning, Optional<Partitioning> streamPartitioning, boolean nullsAndAnyReplicated)
Expand Down Expand Up @@ -471,6 +484,11 @@ private boolean isNodePartitionedOn(Collection<VariableReferenceExpression> colu
return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isNodePartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
{
return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
{
return nodePartitioning.isPresent() && nodePartitioning.get().isCompatibleWith(partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
Expand Down Expand Up @@ -531,6 +549,11 @@ private boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> co
return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

private boolean isStreamPartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
{
return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
}

/**
* @return true if all the data will effectively land in a single stream
*/
Expand Down
Loading