diff --git a/presto-docs/src/main/sphinx/admin/properties-session.rst b/presto-docs/src/main/sphinx/admin/properties-session.rst index 185c2de7ac402..f8a46b4063e30 100644 --- a/presto-docs/src/main/sphinx/admin/properties-session.rst +++ b/presto-docs/src/main/sphinx/admin/properties-session.rst @@ -460,6 +460,32 @@ Set to ``true`` to use as shown in this example: ``SET SESSION schedule_splits_based_on_task_load=true;`` +``table_scan_shuffle_parallelism_threshold`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``double`` +* **Default value:** ``0.1`` + +Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor +is below this threshold (0.0-1.0) and ``table_scan_shuffle_strategy`` is ``COST_BASED``, +a round-robin shuffle exchange is added above the table scan to redistribute data. + +The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.table-scan-shuffle-parallelism-threshold\`\``. + +``table_scan_shuffle_strategy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``string`` +* **Allowed values:** ``DISABLED``, ``ALWAYS_ENABLED``, ``COST_BASED`` +* **Default value:** ``DISABLED`` + +Strategy for adding shuffle above table scan to redistribute data. When set to ``DISABLED``, +no shuffle is added. When set to ``ALWAYS_ENABLED``, a round-robin shuffle exchange is always +added above table scans. When set to ``COST_BASED``, a shuffle is added only when the table's +parallelism factor is below the ``table_scan_shuffle_parallelism_threshold``. + +The corresponding configuration property is :ref:`admin/properties:\`\`optimizer.table-scan-shuffle-strategy\`\``. + JDBC Properties --------------- diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 898a6095bd54c..bb84818e9645f 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -1074,6 +1074,32 @@ being collected by ``ANALYZE``, and also prevents the existing histograms from b during query optimization. This behavior can be controlled on a per-query basis using the ``optimizer_use_histograms`` session property. +``optimizer.table-scan-shuffle-parallelism-threshold`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``double`` +* **Default value:** ``0.1`` + +Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor +is below this threshold (0.0-1.0) and ``optimizer.table-scan-shuffle-strategy`` is ``COST_BASED``, +a round-robin shuffle exchange is added above the table scan to redistribute data. + +The corresponding session property is :ref:`admin/properties-session:\`\`table_scan_shuffle_parallelism_threshold\`\``. + +``optimizer.table-scan-shuffle-strategy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``string`` +* **Allowed values:** ``DISABLED``, ``ALWAYS_ENABLED``, ``COST_BASED`` +* **Default value:** ``DISABLED`` + +Strategy for adding shuffle above table scan to redistribute data. When set to ``DISABLED``, +no shuffle is added. When set to ``ALWAYS_ENABLED``, a round-robin shuffle exchange is always +added above table scans. When set to ``COST_BASED``, a shuffle is added only when the table's +parallelism factor is below the ``optimizer.table-scan-shuffle-parallelism-threshold``. + +The corresponding session property is :ref:`admin/properties-session:\`\`table_scan_shuffle_strategy\`\``. + Planner Properties ------------------ diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index 8d42cde384a13..3440e22d3e809 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -48,6 +48,7 @@ import com.facebook.presto.sql.analyzer.FeaturesConfig.RandomizeNullSourceKeyInSemiJoinStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.RandomizeOuterJoinNullKeyStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.ShardedJoinStrategy; +import com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.SingleStreamSpillerChoice; import com.facebook.presto.sql.analyzer.FunctionsConfig; import com.facebook.presto.sql.planner.CompilerConfig; @@ -357,6 +358,8 @@ public final class SystemSessionProperties public static final String PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS = "pushdown_subfields_for_map_functions"; public static final String MAX_SERIALIZABLE_OBJECT_SIZE = "max_serializable_object_size"; public static final String EXPRESSION_OPTIMIZER_IN_ROW_EXPRESSION_REWRITE = "expression_optimizer_in_row_expression_rewrite"; + public static final String TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD = "table_scan_shuffle_parallelism_threshold"; + public static final String TABLE_SCAN_SHUFFLE_STRATEGY = "table_scan_shuffle_strategy"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -2053,6 +2056,23 @@ public SystemSessionProperties( "Configure the maximum byte size of a serializable object in expression interpreters", featuresConfig.getMaxSerializableObjectSize(), false), + doubleProperty( + TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD, + "Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor is below this threshold (0.0-1.0) and TABLE_SCAN_SHUFFLE_STRATEGY is COST_BASED, a round-robin shuffle exchange is added above the table scan to redistribute data", + featuresConfig.getTableScanShuffleParallelismThreshold(), + false), + new PropertyMetadata<>( + TABLE_SCAN_SHUFFLE_STRATEGY, + format("Strategy for adding shuffle above table scan to redistribute data. Options are %s", + Stream.of(ShuffleForTableScanStrategy.values()) + .map(ShuffleForTableScanStrategy::name) + .collect(joining(","))), + VARCHAR, + ShuffleForTableScanStrategy.class, + featuresConfig.getTableScanShuffleStrategy(), + false, + value -> ShuffleForTableScanStrategy.valueOf(((String) value).toUpperCase()), + ShuffleForTableScanStrategy::name), new PropertyMetadata<>( QUERY_CLIENT_TIMEOUT, "Configures how long the query runs without contact from the client application, such as the CLI, before it's abandoned", @@ -3523,4 +3543,14 @@ public static long getMaxSerializableObjectSize(Session session) { return session.getSystemProperty(MAX_SERIALIZABLE_OBJECT_SIZE, Long.class); } + + public static double getTableScanShuffleParallelismThreshold(Session session) + { + return session.getSystemProperty(TABLE_SCAN_SHUFFLE_PARALLELISM_THRESHOLD, Double.class); + } + + public static ShuffleForTableScanStrategy getTableScanShuffleStrategy(Session session) + { + return session.getSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, ShuffleForTableScanStrategy.class); + } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/cost/ConnectorFilterStatsCalculatorService.java b/presto-main-base/src/main/java/com/facebook/presto/cost/ConnectorFilterStatsCalculatorService.java index 85a51c2b6bb51..e400481929d25 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/cost/ConnectorFilterStatsCalculatorService.java +++ b/presto-main-base/src/main/java/com/facebook/presto/cost/ConnectorFilterStatsCalculatorService.java @@ -81,6 +81,9 @@ else if (!tableStatistics.getTotalSize().isUnknown() double totalSizeAfterFilter = filteredStatistics.getRowCount().getValue() / tableStatistics.getRowCount().getValue() * tableStatistics.getTotalSize().getValue(); filteredStatsWithSize.setTotalSize(Estimate.of(totalSizeAfterFilter)); } + if (!tableStatistics.getParallelismFactor().isUnknown()) { + filteredStatsWithSize.setParallelismFactor(tableStatistics.getParallelismFactor()); + } return filteredStatsWithSize.setConfidenceLevel(LOW).build(); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 4c3b7d0a6dd13..fc9f4077d79c1 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -327,6 +327,8 @@ public class FeaturesConfig private long maxSerializableObjectSize = 1000; private boolean utilizeUniquePropertyInQueryPlanning = true; private String expressionOptimizerUsedInRowExpressionRewrite = ""; + private double tableScanShuffleParallelismThreshold = 0.1; + private ShuffleForTableScanStrategy tableScanShuffleStrategy = ShuffleForTableScanStrategy.DISABLED; private boolean builtInSidecarFunctionsEnabled; @@ -485,6 +487,13 @@ public enum LeftJoinArrayContainsToInnerJoinStrategy ALWAYS_ENABLED } + public enum ShuffleForTableScanStrategy + { + DISABLED, + ALWAYS_ENABLED, + COST_BASED + } + @Min(1) @Config("max-prefixes-count") @ConfigDescription("Maximum number of prefixes (catalog/schema/table scopes used to narrow metadata lookups) that Presto generates when querying information_schema.") @@ -3298,6 +3307,32 @@ public long getMaxSerializableObjectSize() return maxSerializableObjectSize; } + public double getTableScanShuffleParallelismThreshold() + { + return tableScanShuffleParallelismThreshold; + } + + @Config("optimizer.table-scan-shuffle-parallelism-threshold") + @ConfigDescription("Parallelism threshold for adding a shuffle above table scan. When the table's parallelism factor is below this threshold (0.0-1.0) and TABLE_SCAN_SHUFFLE_STRATEGY is COST_BASED, a round-robin shuffle exchange is added above the table scan to redistribute data.") + public FeaturesConfig setTableScanShuffleParallelismThreshold(double tableScanShuffleParallelismThreshold) + { + this.tableScanShuffleParallelismThreshold = tableScanShuffleParallelismThreshold; + return this; + } + + public ShuffleForTableScanStrategy getTableScanShuffleStrategy() + { + return tableScanShuffleStrategy; + } + + @Config("optimizer.table-scan-shuffle-strategy") + @ConfigDescription("Strategy for adding shuffle above table scan to redistribute data. Options are DISABLED, ALWAYS_ENABLED, COST_BASED") + public FeaturesConfig setTableScanShuffleStrategy(ShuffleForTableScanStrategy tableScanShuffleStrategy) + { + this.tableScanShuffleStrategy = tableScanShuffleStrategy; + return this; + } + @Config("built-in-sidecar-functions-enabled") @ConfigDescription("Enable using CPP functions from sidecar over coordinator SQL implementations.") public FeaturesConfig setBuiltInSidecarFunctionsEnabled(boolean builtInSidecarFunctionsEnabled) diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 047326398832a..1b6741896e2df 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -18,7 +18,9 @@ import com.facebook.presto.connector.system.GlobalSystemConnector; import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.GroupingProperty; import com.facebook.presto.spi.LocalProperty; import com.facebook.presto.spi.PrestoException; @@ -60,6 +62,7 @@ import com.facebook.presto.spi.plan.WindowNode; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy; import com.facebook.presto.sql.planner.PartitioningProviderManager; @@ -82,6 +85,7 @@ import com.facebook.presto.sql.planner.plan.TableFunctionNode; import com.facebook.presto.sql.planner.plan.TableFunctionProcessorNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -112,6 +116,8 @@ import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy; import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog; +import static com.facebook.presto.SystemSessionProperties.getTableScanShuffleParallelismThreshold; +import static com.facebook.presto.SystemSessionProperties.getTableScanShuffleStrategy; import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; import static com.facebook.presto.SystemSessionProperties.isAddPartialNodeForRowNumberWithLimit; import static com.facebook.presto.SystemSessionProperties.isColocatedJoinEnabled; @@ -131,6 +137,9 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.spi.plan.LimitNode.Step.PARTIAL; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.ALWAYS_ENABLED; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.COST_BASED; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.ShuffleForTableScanStrategy.DISABLED; import static com.facebook.presto.sql.planner.FragmentTableScanCounter.getNumberOfTableScans; import static com.facebook.presto.sql.planner.FragmentTableScanCounter.hasMultipleTableScans; import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan; @@ -213,6 +222,7 @@ private class Rewriter private final ExchangeMaterializationStrategy exchangeMaterializationStrategy; private final PartitioningProviderManager partitioningProviderManager; private final boolean nativeExecution; + private boolean isDeleteOrUpdateQuery; public Rewriter( PlanNodeIdAllocator idAllocator, @@ -605,9 +615,17 @@ public PlanWithProperties visitTopN(TopNNode node, PreferredProperties preferred return rebaseAndDeriveProperties(node, child); } + @Override + public PlanWithProperties visitUpdate(UpdateNode node, PreferredProperties context) + { + isDeleteOrUpdateQuery = true; + return visitPlan(node, context); + } + @Override public PlanWithProperties visitDelete(DeleteNode node, PreferredProperties preferredProperties) { + isDeleteOrUpdateQuery = true; if (!node.getInputDistribution().isPresent()) { return visitPlan(node, preferredProperties); } @@ -900,6 +918,18 @@ private PlanWithProperties planTableScan(TableScanNode node, RowExpression predi if (nativeExecution && containsSystemTableScan(plan)) { plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); } + else if (!getTableScanShuffleStrategy(session).equals(DISABLED) && !isDeleteOrUpdateQuery) { + if (getTableScanShuffleStrategy(session).equals(ALWAYS_ENABLED)) { + plan = roundRobinExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + else if (getTableScanShuffleStrategy(session).equals(COST_BASED)) { + Constraint constraint = new Constraint<>(node.getCurrentConstraint()); + TableStatistics tableStatistics = metadata.getTableStatistics(session, node.getTable(), ImmutableList.copyOf(node.getAssignments().values()), constraint); + if (!tableStatistics.getParallelismFactor().isUnknown() && tableStatistics.getParallelismFactor().getValue() < getTableScanShuffleParallelismThreshold(session)) { + plan = roundRobinExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + } + } // TODO: Support selecting layout with best local property once connector can participate in query optimization. return new PlanWithProperties(plan, derivePropertiesRecursively(plan)); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4cdf93c51c0b6..1caa9566faec6 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -280,6 +280,8 @@ public void testDefaults() .setRewriteMinMaxByToTopNEnabled(false) .setPrestoSparkExecutionEnvironment(false) .setMaxSerializableObjectSize(1000) + .setTableScanShuffleParallelismThreshold(0.1) + .setTableScanShuffleStrategy(FeaturesConfig.ShuffleForTableScanStrategy.DISABLED) .setUseConnectorProvidedSerializationCodecs(false)); } @@ -506,6 +508,8 @@ public void testExplicitPropertyMappings() .put("optimizer.expression-optimizer-used-in-expression-rewrite", "custom") .put("optimizer.add-exchange-below-partial-aggregation-over-group-id", "true") .put("max_serializable_object_size", "50") + .put("optimizer.table-scan-shuffle-parallelism-threshold", "0.3") + .put("optimizer.table-scan-shuffle-strategy", "ALWAYS_ENABLED") .put("use-connector-provided-serialization-codecs", "true") .build(); @@ -731,6 +735,8 @@ public void testExplicitPropertyMappings() .setInnerJoinPushdownEnabled(true) .setPrestoSparkExecutionEnvironment(true) .setMaxSerializableObjectSize(50) + .setTableScanShuffleParallelismThreshold(0.3) + .setTableScanShuffleStrategy(FeaturesConfig.ShuffleForTableScanStrategy.ALWAYS_ENABLED) .setUseConnectorProvidedSerializationCodecs(true); assertFullMapping(properties, expected); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index aca3982dfad43..edb1b3bdcca3f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -38,6 +38,7 @@ import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY; import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.SIMPLIFY_PLAN_WITH_EMPTY_INPUT; +import static com.facebook.presto.SystemSessionProperties.TABLE_SCAN_SHUFFLE_STRATEGY; import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; import static com.facebook.presto.SystemSessionProperties.USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT; import static com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy.ALL; @@ -383,8 +384,8 @@ public void testJoinExactlyPartitioned() " AND orders.orderstatus = t.orderstatus", anyTree( join(INNER, ImmutableList.of( - equiJoinClause("ORDERKEY_LEFT", "ORDERKEY_RIGHT"), - equiJoinClause("orderstatus", "ORDERSTATUS_RIGHT")), + equiJoinClause("ORDERKEY_LEFT", "ORDERKEY_RIGHT"), + equiJoinClause("orderstatus", "ORDERSTATUS_RIGHT")), exchange(REMOTE_STREAMING, REPARTITION, anyTree( aggregation( @@ -522,4 +523,22 @@ void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) .build(), pattern); } + + @Test + public void testShuffleAboveTableScanAlwaysEnabled() + { + Session session = TestingSession.testSessionBuilder().setCatalog("local").setSchema("tiny").setSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, "ALWAYS_ENABLED").build(); + + // When ALWAYS_ENABLED, a round robin exchange should be added above the table scan + assertDistributedPlan("SELECT nationkey FROM nation", session, anyTree(exchange(REMOTE_STREAMING, ExchangeNode.Type.GATHER, exchange(REMOTE_STREAMING, ExchangeNode.Type.REPARTITION, tableScan("nation"))))); + } + + @Test + public void testShuffleAboveTableScanDisabled() + { + Session session = TestingSession.testSessionBuilder().setCatalog("local").setSchema("tiny").setSystemProperty(TABLE_SCAN_SHUFFLE_STRATEGY, "DISABLED").build(); + + // When DISABLED, no extra round robin exchange should be added + assertDistributedPlan("SELECT nationkey FROM nation", session, anyTree(exchange(REMOTE_STREAMING, ExchangeNode.Type.GATHER, tableScan("nation")))); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java index c6fece6d1a66c..5dc23b62fe485 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java @@ -34,6 +34,7 @@ public final class TableStatistics private final Estimate rowCount; private final Estimate totalSize; + private final Estimate parallelismFactor; private final Map columnStatistics; private ConfidenceLevel confidenceLevel; @@ -42,7 +43,7 @@ public static TableStatistics empty() return EMPTY; } - private TableStatistics(Estimate rowCount, Estimate totalSize, Map columnStatistics, ConfidenceLevel confidenceLevel) + private TableStatistics(Estimate rowCount, Estimate totalSize, Estimate parallelismFactor, Map columnStatistics, ConfidenceLevel confidenceLevel) { this.rowCount = requireNonNull(rowCount, "rowCount can not be null"); if (!rowCount.isUnknown() && rowCount.getValue() < 0) { @@ -52,6 +53,10 @@ private TableStatistics(Estimate rowCount, Estimate totalSize, Map 1)) { + throw new IllegalArgumentException(format("parallelismFactor must be between 0 and 1: %s", parallelismFactor.getValue())); + } this.columnStatistics = unmodifiableMap(requireNonNull(columnStatistics, "columnStatistics can not be null")); this.confidenceLevel = confidenceLevel; } @@ -68,6 +73,19 @@ public Estimate getTotalSize() return totalSize; } + /** + * Returns the estimated parallelism factor for scanning this table. + * Range: 0.0 to 1.0 + * - 0.0: Very low parallelism (few sources, may need shuffle to redistribute) + * - 1.0: Full parallelism (enough sources to utilize all available workers) + * - 0.5: 50% of available workers expected to be utilized + */ + @JsonProperty + public Estimate getParallelismFactor() + { + return parallelismFactor; + } + @JsonProperty public ConfidenceLevel getConfidence() { @@ -92,13 +110,14 @@ public boolean equals(Object o) TableStatistics that = (TableStatistics) o; return Objects.equals(rowCount, that.rowCount) && Objects.equals(totalSize, that.totalSize) && + Objects.equals(parallelismFactor, that.parallelismFactor) && Objects.equals(columnStatistics, that.columnStatistics); } @Override public int hashCode() { - return Objects.hash(rowCount, totalSize, columnStatistics); + return Objects.hash(rowCount, totalSize, parallelismFactor, columnStatistics); } @Override @@ -107,6 +126,7 @@ public String toString() return "TableStatistics{" + "rowCount=" + rowCount + ", totalSize=" + totalSize + + ", parallelismFactor=" + parallelismFactor + ", columnStatistics=" + columnStatistics + '}'; } @@ -121,6 +141,7 @@ public static Builder buildFrom(TableStatistics tableStatistics) return new Builder() .setRowCount(tableStatistics.getRowCount()) .setTotalSize(tableStatistics.getTotalSize()) + .setParallelismFactor(tableStatistics.getParallelismFactor()) .setConfidenceLevel(tableStatistics.getConfidence()) .setColumnStatistics(tableStatistics.getColumnStatistics()); } @@ -129,6 +150,7 @@ public static final class Builder { private Estimate rowCount = Estimate.unknown(); private Estimate totalSize = Estimate.unknown(); + private Estimate parallelismFactor = Estimate.unknown(); private Map columnStatisticsMap = new LinkedHashMap<>(); private ConfidenceLevel confidenceLevel = HIGH; @@ -155,6 +177,12 @@ public Builder setTotalSize(Estimate totalSize) return this; } + public Builder setParallelismFactor(Estimate parallelismFactor) + { + this.parallelismFactor = requireNonNull(parallelismFactor, "parallelismFactor can not be null"); + return this; + } + public Builder setColumnStatistics(ColumnHandle columnHandle, ColumnStatistics columnStatistics) { requireNonNull(columnHandle, "columnHandle can not be null"); @@ -177,7 +205,7 @@ public Map getColumnStatistics() public TableStatistics build() { - return new TableStatistics(rowCount, totalSize, columnStatisticsMap, confidenceLevel); + return new TableStatistics(rowCount, totalSize, parallelismFactor, columnStatisticsMap, confidenceLevel); } } } diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java index 432ef7c665f43..83c1296ba59d6 100644 --- a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java +++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsMetadataStatistics.java @@ -192,6 +192,9 @@ public void testTableStatisticsSerialization() " \"totalSize\" : {\n" + " \"value\" : \"NaN\"\n" + " },\n" + + " \"parallelismFactor\" : {\n" + + " \"value\" : \"NaN\"\n" + + " },\n" + " \"columnStatistics\" : {\n" + " \"tpcds:web_site_sk\" : {\n" + " \"nullsFraction\" : {\n" +