Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,32 @@ Set to ``true`` to use as shown in this example:

``SET SESSION schedule_splits_based_on_task_load=true;``

``table_scan_shuffle_parallelism_threshold``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``double``
* **Default value:** ``0.1``

Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor
is below this threshold (0.0-1.0) and ``table_scan_shuffle_strategy`` is ``COST_BASED``,
a round-robin shuffle exchange is added above the table scan to redistribute data.

The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.table-scan-shuffle-parallelism-threshold\`\``.

``table_scan_shuffle_strategy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``DISABLED``, ``ALWAYS_ENABLED``, ``COST_BASED``
* **Default value:** ``DISABLED``

Strategy for adding shuffle above table scan to redistribute data. When set to ``DISABLED``,
no shuffle is added. When set to ``ALWAYS_ENABLED``, a round-robin shuffle exchange is always
added above table scans. When set to ``COST_BASED``, a shuffle is added only when the table's
parallelism factor is below the ``table_scan_shuffle_parallelism_threshold``.

The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.table-scan-shuffle-strategy\`\``.


JDBC Properties
---------------
Expand Down
26 changes: 26 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,32 @@ being collected by ``ANALYZE``, and also prevents the existing histograms from b
during query optimization. This behavior can be controlled on a per-query basis using the
``optimizer_use_histograms`` session property.

``optimizer.table-scan-shuffle-parallelism-threshold``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``double``
* **Default value:** ``0.1``

Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor
is below this threshold (0.0-1.0) and ``optimizer.table-scan-shuffle-strategy`` is ``COST_BASED``,
a round-robin shuffle exchange is added above the table scan to redistribute data.

The corresponding session property is :ref:`admin/properties-session:\`\`table_scan_shuffle_parallelism_threshold\`\``.

``optimizer.table-scan-shuffle-strategy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``DISABLED``, ``ALWAYS_ENABLED``, ``COST_BASED``
* **Default value:** ``DISABLED``

Strategy for adding shuffle above table scan to redistribute data. When set to ``DISABLED``,
no shuffle is added. When set to ``ALWAYS_ENABLED``, a round-robin shuffle exchange is always
added above table scans. When set to ``COST_BASED``, a shuffle is added only when the table's
parallelism factor is below the ``optimizer.table-scan-shuffle-parallelism-threshold``.

The corresponding session property is :ref:`admin/properties-session:\`\`table_scan_shuffle_strategy\`\``.

Planner Properties
------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.RandomizeNullSourceKeyInSemiJoinStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.RandomizeOuterJoinNullKeyStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.ShardedJoinStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.SingleStreamSpillerChoice;
import com.facebook.presto.sql.analyzer.FunctionsConfig;
import com.facebook.presto.sql.planner.CompilerConfig;
Expand Down Expand Up @@ -357,6 +358,8 @@ public final class SystemSessionProperties
public static final String PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS = "pushdown_subfields_for_map_functions";
public static final String MAX_SERIALIZABLE_OBJECT_SIZE = "max_serializable_object_size";
public static final String EXPRESSION_OPTIMIZER_IN_ROW_EXPRESSION_REWRITE = "expression_optimizer_in_row_expression_rewrite";
public static final String TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD = "table_scan_shuffle_parallelism_threshold";
public static final String TABLE_SCAN_SHUFFLE_STRATEGY = "table_scan_shuffle_strategy";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -2053,6 +2056,23 @@ public SystemSessionProperties(
"Configure the maximum byte size of a serializable object in expression interpreters",
featuresConfig.getMaxSerializableObjectSize(),
false),
doubleProperty(
TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD,
"Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor is below this threshold (0.0-1.0) and TABLE_SCAN_SHUFFLE_STRATEGY is COST_BASED, a round-robin shuffle exchange is added above the table scan to redistribute data",
featuresConfig.getTableScanShuffleParallelismThreshold(),
false),
new PropertyMetadata<>(
TABLE_SCAN_SHUFFLE_STRATEGY,
format("Strategy for adding shuffle above table scan to redistribute data. Options are %s",
Stream.of(ShuffleForTableScanStrategy.values())
.map(ShuffleForTableScanStrategy::name)
.collect(joining(","))),
VARCHAR,
ShuffleForTableScanStrategy.class,
featuresConfig.getTableScanShuffleStrategy(),
false,
value -> ShuffleForTableScanStrategy.valueOf(((String) value).toUpperCase()),
ShuffleForTableScanStrategy::name),
new PropertyMetadata<>(
QUERY_CLIENT_TIMEOUT,
"Configures how long the query runs without contact from the client application, such as the CLI, before it's abandoned",
Expand Down Expand Up @@ -3523,4 +3543,14 @@ public static long getMaxSerializableObjectSize(Session session)
{
return session.getSystemProperty(MAX_SERIALIZABLE_OBJECT_SIZE, Long.class);
}

public static double getTableScanShuffleParallelismThreshold(Session session)
{
return session.getSystemProperty(TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD, Double.class);
}

public static ShuffleForTableScanStrategy getTableScanShuffleStrategy(Session session)
{
return session.getSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, ShuffleForTableScanStrategy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ else if (!tableStatistics.getTotalSize().isUnknown()
double totalSizeAfterFilter = filteredStatistics.getRowCount().getValue() / tableStatistics.getRowCount().getValue() * tableStatistics.getTotalSize().getValue();
filteredStatsWithSize.setTotalSize(Estimate.of(totalSizeAfterFilter));
}
if (!tableStatistics.getParallelismFactor().isUnknown()) {
filteredStatsWithSize.setParallelismFactor(tableStatistics.getParallelismFactor());
}
return filteredStatsWithSize.setConfidenceLevel(LOW).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public class FeaturesConfig
private long maxSerializableObjectSize = 1000;
private boolean utilizeUniquePropertyInQueryPlanning = true;
private String expressionOptimizerUsedInRowExpressionRewrite = "";
private double tableScanShuffleParallelismThreshold = 0.1;
private ShuffleForTableScanStrategy tableScanShuffleStrategy = ShuffleForTableScanStrategy.DISABLED;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid leaving this disabled by default, I'm wondering if you considered always making it cost based, but set a default estimate if not provided to 1, which would effectively leave the optimization disabled until connectors modify their code to provide a better estimate?

Please also add documentation for these session properties and config properties.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it more explicit, and it's also consistent with other existing optimization config.

Added documentation in the corresponding rst files.


private boolean builtInSidecarFunctionsEnabled;

Expand Down Expand Up @@ -485,6 +487,13 @@ public enum LeftJoinArrayContainsToInnerJoinStrategy
ALWAYS_ENABLED
}

public enum ShuffleForTableScanStrategy
{
DISABLED,
ALWAYS_ENABLED,
COST_BASED
}

@Min(1)
@Config("max-prefixes-count")
@ConfigDescription("Maximum number of prefixes (catalog/schema/table scopes used to narrow metadata lookups) that Presto generates when querying information_schema.")
Expand Down Expand Up @@ -3298,6 +3307,32 @@ public long getMaxSerializableObjectSize()
return maxSerializableObjectSize;
}

public double getTableScanShuffleParallelismThreshold()
{
return tableScanShuffleParallelismThreshold;
}

@Config("optimizer.table-scan-shuffle-parallelism-threshold")
@ConfigDescription("Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor is below this threshold (0.0-1.0) and TABLE_SCAN_SHUFFLE_STRATEGY is COST_BASED, a round-robin shuffle exchange is added above the table scan to redistribute data.")
public FeaturesConfig setTableScanShuffleParallelismThreshold(double tableScanShuffleParallelismThreshold)
{
this.tableScanShuffleParallelismThreshold = tableScanShuffleParallelismThreshold;
return this;
}

public ShuffleForTableScanStrategy getTableScanShuffleStrategy()
{
return tableScanShuffleStrategy;
}

@Config("optimizer.table-scan-shuffle-strategy")
@ConfigDescription("Strategy for adding shuffle above table scan to redistribute data. Options are DISABLED, ALWAYS_ENABLED, COST_BASED")
public FeaturesConfig setTableScanShuffleStrategy(ShuffleForTableScanStrategy tableScanShuffleStrategy)
{
this.tableScanShuffleStrategy = tableScanShuffleStrategy;
return this;
}

@Config("built-in-sidecar-functions-enabled")
@ConfigDescription("Enable using CPP functions from sidecar over coordinator SQL implementations.")
public FeaturesConfig setBuiltInSidecarFunctionsEnabled(boolean builtInSidecarFunctionsEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import com.facebook.presto.connector.system.GlobalSystemConnector;
import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.GroupingProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -60,6 +62,7 @@
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
Expand All @@ -82,6 +85,7 @@
import com.facebook.presto.sql.planner.plan.TableFunctionNode;
import com.facebook.presto.sql.planner.plan.TableFunctionProcessorNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UpdateNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand Down Expand Up @@ -112,6 +116,8 @@
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy;
import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog;
import static com.facebook.presto.SystemSessionProperties.getTableScanShuffleParallelismThreshold;
import static com.facebook.presto.SystemSessionProperties.getTableScanShuffleStrategy;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.isAddPartialNodeForRowNumberWithLimit;
import static com.facebook.presto.SystemSessionProperties.isColocatedJoinEnabled;
Expand All @@ -131,6 +137,9 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR;
import static com.facebook.presto.spi.plan.LimitNode.Step.PARTIAL;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.ALWAYS_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.COST_BASED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.DISABLED;
import static com.facebook.presto.sql.planner.FragmentTableScanCounter.getNumberOfTableScans;
import static com.facebook.presto.sql.planner.FragmentTableScanCounter.hasMultipleTableScans;
import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan;
Expand Down Expand Up @@ -213,6 +222,7 @@ private class Rewriter
private final ExchangeMaterializationStrategy exchangeMaterializationStrategy;
private final PartitioningProviderManager partitioningProviderManager;
private final boolean nativeExecution;
private boolean isDeleteOrUpdateQuery;

public Rewriter(
PlanNodeIdAllocator idAllocator,
Expand Down Expand Up @@ -605,9 +615,17 @@ public PlanWithProperties visitTopN(TopNNode node, PreferredProperties preferred
return rebaseAndDeriveProperties(node, child);
}

@Override
public PlanWithProperties visitUpdate(UpdateNode node, PreferredProperties context)
{
isDeleteOrUpdateQuery = true;
return visitPlan(node, context);
}

@Override
public PlanWithProperties visitDelete(DeleteNode node, PreferredProperties preferredProperties)
{
isDeleteOrUpdateQuery = true;
if (!node.getInputDistribution().isPresent()) {
return visitPlan(node, preferredProperties);
}
Expand Down Expand Up @@ -900,6 +918,18 @@ private PlanWithProperties planTableScan(TableScanNode node, RowExpression predi
if (nativeExecution && containsSystemTableScan(plan)) {
plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan);
}
else if (!getTableScanShuffleStrategy(session).equals(DISABLED) && !isDeleteOrUpdateQuery) {
if (getTableScanShuffleStrategy(session).equals(ALWAYS_ENABLED)) {
plan = roundRobinExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan);
}
else if (getTableScanShuffleStrategy(session).equals(COST_BASED)) {
Constraint<ColumnHandle> constraint = new Constraint<>(node.getCurrentConstraint());
TableStatistics tableStatistics = metadata.getTableStatistics(session, node.getTable(), ImmutableList.copyOf(node.getAssignments().values()), constraint);
if (!tableStatistics.getParallelismFactor().isUnknown() && tableStatistics.getParallelismFactor().getValue() < getTableScanShuffleParallelismThreshold(session)) {
plan = roundRobinExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan);
}
}
}
// TODO: Support selecting layout with best local property once connector can participate in query optimization.
return new PlanWithProperties(plan, derivePropertiesRecursively(plan));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public void testDefaults()
.setRewriteMinMaxByToTopNEnabled(false)
.setPrestoSparkExecutionEnvironment(false)
.setMaxSerializableObjectSize(1000)
.setTableScanShuffleParallelismThreshold(0.1)
.setTableScanShuffleStrategy(FeaturesConfig.ShuffleForTableScanStrategy.DISABLED)
.setUseConnectorProvidedSerializationCodecs(false));
}

Expand Down Expand Up @@ -506,6 +508,8 @@ public void testExplicitPropertyMappings()
.put("optimizer.expression-optimizer-used-in-expression-rewrite", "custom")
.put("optimizer.add-exchange-below-partial-aggregation-over-group-id", "true")
.put("max_serializable_object_size", "50")
.put("optimizer.table-scan-shuffle-parallelism-threshold", "0.3")
.put("optimizer.table-scan-shuffle-strategy", "ALWAYS_ENABLED")
.put("use-connector-provided-serialization-codecs", "true")
.build();

Expand Down Expand Up @@ -731,6 +735,8 @@ public void testExplicitPropertyMappings()
.setInnerJoinPushdownEnabled(true)
.setPrestoSparkExecutionEnvironment(true)
.setMaxSerializableObjectSize(50)
.setTableScanShuffleParallelismThreshold(0.3)
.setTableScanShuffleStrategy(FeaturesConfig.ShuffleForTableScanStrategy.ALWAYS_ENABLED)
.setUseConnectorProvidedSerializationCodecs(true);
assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.SIMPLIFY_PLAN_WITH_EMPTY_INPUT;
import static com.facebook.presto.SystemSessionProperties.TABLE_SCAN_SHUFFLE_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY;
import static com.facebook.presto.SystemSessionProperties.USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT;
import static com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy.ALL;
Expand Down Expand Up @@ -383,8 +384,8 @@ public void testJoinExactlyPartitioned()
" AND orders.orderstatus = t.orderstatus",
anyTree(
join(INNER, ImmutableList.of(
equiJoinClause("ORDERKEY_LEFT", "ORDERKEY_RIGHT"),
equiJoinClause("orderstatus", "ORDERSTATUS_RIGHT")),
equiJoinClause("ORDERKEY_LEFT", "ORDERKEY_RIGHT"),
equiJoinClause("orderstatus", "ORDERSTATUS_RIGHT")),
exchange(REMOTE_STREAMING, REPARTITION,
anyTree(
aggregation(
Expand Down Expand Up @@ -522,4 +523,22 @@ void assertExactDistributedPlan(String sql, PlanMatchPattern pattern)
.build(),
pattern);
}

@Test
public void testShuffleAboveTableScanAlwaysEnabled()
{
Session session = TestingSession.testSessionBuilder().setCatalog("local").setSchema("tiny").setSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, "ALWAYS_ENABLED").build();

// When ALWAYS_ENABLED, a round robin exchange should be added above the table scan
assertDistributedPlan("SELECT nationkey FROM nation", session, anyTree(exchange(REMOTE_STREAMING, ExchangeNode.Type.GATHER, exchange(REMOTE_STREAMING, ExchangeNode.Type.REPARTITION, tableScan("nation")))));
}

@Test
public void testShuffleAboveTableScanDisabled()
{
Session session = TestingSession.testSessionBuilder().setCatalog("local").setSchema("tiny").setSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, "DISABLED").build();

// When DISABLED, no extra round robin exchange should be added
assertDistributedPlan("SELECT nationkey FROM nation", session, anyTree(exchange(REMOTE_STREAMING, ExchangeNode.Type.GATHER, tableScan("nation"))));
}
}
Loading
Loading