diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PhysicalResourceSettings.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PhysicalResourceSettings.java index e3a6159908619..071d3b548cc85 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PhysicalResourceSettings.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PhysicalResourceSettings.java @@ -13,27 +13,25 @@ */ package com.facebook.presto.spark; +import java.util.OptionalInt; + +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +/** + * Class for storing physical resource settings. + * Resource settings could have conflicting overrides coming from either presto or spark. + * The class holds the final values that needs to be applied to the query. + */ public class PhysicalResourceSettings { - public static final PhysicalResourceSettings DISABLED_PHYSICAL_RESOURCE_SETTING = new PhysicalResourceSettings(0, 0, () -> {}); - - private final int executorCount; + private final OptionalInt maxExecutorCount; private final int hashPartitionCount; - public PhysicalResourceSettings(int executorCount, int hashPartitionCount) - { - this( - executorCount, - hashPartitionCount, - () -> checkArgument(executorCount >= 0 && hashPartitionCount >= 0, "executorCount and hashPartitionCount should be positive")); - } - - public PhysicalResourceSettings(int executorCount, int hashPartitionCount, Runnable check) + public PhysicalResourceSettings(int hashPartitionCount, OptionalInt maxExecutorCount) { - check.run(); - this.executorCount = executorCount; + checkArgument(maxExecutorCount.orElse(0) >= 0 && hashPartitionCount >= 0, "executorCount and hashPartitionCount should be positive"); + this.maxExecutorCount = maxExecutorCount; this.hashPartitionCount = hashPartitionCount; } @@ -42,13 +40,22 @@ public int getHashPartitionCount() return hashPartitionCount; } - public int getExecutorCount() + /** + * maxExecutorCount is an optional field, the value is based on resource allocation tuning property + * An empty value, means resource tuning is disabled and values from SparkConf will be used. + * When not empty, resource tuning is enabled and calculated by {@link PrestoSparkPhysicalResourceCalculator} based on the query plan + */ + public OptionalInt getMaxExecutorCount() { - return executorCount; + return maxExecutorCount; } - public boolean isEnabled() + @Override + public String toString() { - return ((executorCount > 0) && (hashPartitionCount > 0)); + return toStringHelper(this) + .add("maxExecutorCount", maxExecutorCount) + .add("hashPartitionCount", hashPartitionCount) + .toString(); } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java index 9c3a58e3bf1b8..a33672a08f203 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java @@ -61,9 +61,10 @@ public class PrestoSparkConfig private DataSize averageInputDataSizePerPartition = new DataSize(2, GIGABYTE); private int maxHashPartitionCount = 4096; private int minHashPartitionCount = 1024; - private boolean isResourceAllocationStrategyEnabled; - private boolean adaptiveJoinSideSwitchingEnabled; + private boolean resourceAllocationStrategyEnabled; + private boolean executorAllocationStrategyEnabled; + private boolean hashPartitionCountAllocationStrategyEnabled; public boolean isSparkPartitionCountAutoTuneEnabled() { @@ -385,14 +386,14 @@ public PrestoSparkConfig setMinHashPartitionCount(int minHashPartitionCount) public boolean isSparkResourceAllocationStrategyEnabled() { - return isResourceAllocationStrategyEnabled; + return resourceAllocationStrategyEnabled; } @Config("spark.resource-allocation-strategy-enabled") @ConfigDescription("Determines whether the resource allocation strategy for executor and partition count is enabled") - public PrestoSparkConfig setSparkResourceAllocationStrategyEnabled(boolean isResourceAllocationStrategyEnabled) + public PrestoSparkConfig setSparkResourceAllocationStrategyEnabled(boolean resourceAllocationStrategyEnabled) { - this.isResourceAllocationStrategyEnabled = isResourceAllocationStrategyEnabled; + this.resourceAllocationStrategyEnabled = resourceAllocationStrategyEnabled; return this; } @@ -436,4 +437,30 @@ public PrestoSparkConfig setAdaptiveJoinSideSwitchingEnabled(boolean adaptiveJoi this.adaptiveJoinSideSwitchingEnabled = adaptiveJoinSideSwitchingEnabled; return this; } + + public boolean isExecutorAllocationStrategyEnabled() + { + return executorAllocationStrategyEnabled; + } + + @Config("spark.executor-allocation-strategy-enabled") + @ConfigDescription("Determines whether the executor allocation strategy is enabled. This will be suppressed if used alongside spark.dynamicAllocation.maxExecutors") + public PrestoSparkConfig setExecutorAllocationStrategyEnabled(boolean executorAllocationStrategyEnabled) + { + this.executorAllocationStrategyEnabled = executorAllocationStrategyEnabled; + return this; + } + + public boolean isHashPartitionCountAllocationStrategyEnabled() + { + return hashPartitionCountAllocationStrategyEnabled; + } + + @Config("spark.hash-partition-count-allocation-strategy-enabled") + @ConfigDescription("Determines whether the hash partition count strategy is enabled. This will be suppressed if used alongside hash_partition_count") + public PrestoSparkConfig setHashPartitionCountAllocationStrategyEnabled(boolean hashPartitionCountAllocationStrategyEnabled) + { + this.hashPartitionCountAllocationStrategyEnabled = hashPartitionCountAllocationStrategyEnabled; + return this; + } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkPhysicalResourceCalculator.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkPhysicalResourceCalculator.java index adfb42062bbeb..3b1e869ed62c1 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkPhysicalResourceCalculator.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkPhysicalResourceCalculator.java @@ -15,17 +15,21 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.Session; -import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.spi.plan.PlanNode; import io.airlift.units.DataSize; -import static com.facebook.presto.spark.PhysicalResourceSettings.DISABLED_PHYSICAL_RESOURCE_SETTING; +import java.util.OptionalInt; + +import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerExecutor; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerPartition; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMaxExecutorCount; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMaxHashPartitionCount; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMinExecutorCount; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMinHashPartitionCount; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkExecutorAllocationStrategyEnabled; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkHashPartitionCountAllocationStrategyEnabled; import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkResourceAllocationStrategyEnabled; import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.DataSize.Unit.BYTE; @@ -34,29 +38,70 @@ public class PrestoSparkPhysicalResourceCalculator { private static final Logger log = Logger.get(PrestoSparkPhysicalResourceCalculator.class); - public PhysicalResourceSettings calculate(PlanNode plan, Metadata metaData, Session session) + /** + * Calculates the final resource settings for the query. This takes into account all override values + * with the following precedence: + * + * + */ + public PhysicalResourceSettings calculate(PlanNode plan, PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector, Session session) { - if (!isSparkResourceAllocationStrategyEnabled(session)) { - return new PhysicalResourceSettings(0, 0); - } + int hashPartitionCount = getHashPartitionCount(session); + OptionalInt maxExecutorCount = OptionalInt.empty(); + PhysicalResourceSettings defaultResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount); - double inputDataInBytes = new PrestoSparkSourceStatsCollector(metaData, session).collectSourceStats(plan); + if (!anyAllocationStrategyEnabled(session)) { + log.info(String.format("ResourceAllocationStrategy disabled. Executing query %s with %s", session.getQueryId(), defaultResourceSettings)); + return defaultResourceSettings; + } + double inputDataInBytes = prestoSparkSourceStatsCollector.collectSourceStats(plan); + DataSize inputSize = new DataSize(inputDataInBytes, BYTE); if (inputDataInBytes < 0) { - log.warn(String.format("Input data statistics missing, inputDataInBytes=%.2f skipping automatic resource tuning.", inputDataInBytes)); - return DISABLED_PHYSICAL_RESOURCE_SETTING; + log.warn(String.format("Input data statistics missing, inputDataInBytes=%.2f skipping automatic resource tuning. Executing query %s with %s", + inputDataInBytes, session.getQueryId(), defaultResourceSettings)); + return defaultResourceSettings; + } + else if (Double.isNaN(inputDataInBytes)) { + log.warn(String.format("Failed to retrieve correct size, inputDataInBytes=%.2f skipping automatic resource tuning. Executing query %s with %s", + inputDataInBytes, session.getQueryId(), defaultResourceSettings)); + return defaultResourceSettings; + } + // update hashPartitionCount only if resource allocation or hash partition allocation is enabled + if (isSparkResourceAllocationStrategyEnabled(session) || isSparkHashPartitionCountAllocationStrategyEnabled(session)) { + hashPartitionCount = calculateHashPartitionCount(session, inputSize); } - if ((inputDataInBytes > Double.MAX_VALUE) || (Double.isNaN(inputDataInBytes))) { - log.warn(String.format("Failed to retrieve correct size, data read=%.2f, skipping automatic resource tuning.", inputDataInBytes)); - return DISABLED_PHYSICAL_RESOURCE_SETTING; + // update maxExecutorCount only if resource allocation or executor allocation is enabled + if (isSparkResourceAllocationStrategyEnabled(session) || isSparkExecutorAllocationStrategyEnabled(session)) { + maxExecutorCount = OptionalInt.of(calculateExecutorCount(session, inputSize)); } + PhysicalResourceSettings finalResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount); - DataSize inputSize = new DataSize(inputDataInBytes, BYTE); - int executorCount = calculateExecutorCount(session, inputSize); - int hashPartitionCount = calculateHashPartitionCount(session, inputSize); + log.info(String.format("Executing query %s with %s based on resource allocation strategy", session.getQueryId(), finalResourceSettings)); + return finalResourceSettings; + } - return new PhysicalResourceSettings(executorCount, hashPartitionCount); + private static boolean anyAllocationStrategyEnabled(Session session) + { + return isSparkResourceAllocationStrategyEnabled(session) + || isSparkExecutorAllocationStrategyEnabled(session) + || isSparkHashPartitionCountAllocationStrategyEnabled(session); } private static int calculateExecutorCount(Session session, DataSize inputData) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java index dfe45647c4a96..ef944fc397a4c 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java @@ -58,6 +58,8 @@ public class PrestoSparkSessionProperties public static final String SPARK_MAX_HASH_PARTITION_COUNT = "spark_max_hash_partition_count"; public static final String SPARK_MIN_HASH_PARTITION_COUNT = "spark_min_hash_partition_count"; public static final String SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED = "spark_resource_allocation_strategy_enabled"; + public static final String SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED = "spark_executor_allocation_strategy_enabled"; + public static final String SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED = "spark_hash_partition_count_allocation_strategy_enabled"; public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED = "spark_retry_on_out_of_memory_higher_hash_partition_count_enabled"; public static final String SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY = "spark_hash_partition_count_scaling_factor_on_out_of_memory"; public static final String ADAPTIVE_JOIN_SIDE_SWITCHING_ENABLED = "adaptive_join_side_switching_enabled"; @@ -196,6 +198,16 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig) "Flag to enable optimized resource allocation strategy", prestoSparkConfig.isSparkResourceAllocationStrategyEnabled(), false), + booleanProperty( + SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED, + "Flag to enable optimized executor allocation strategy", + prestoSparkConfig.isExecutorAllocationStrategyEnabled(), + false), + booleanProperty( + SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED, + "Flag to enable optimized hash partition count allocation strategy", + prestoSparkConfig.isHashPartitionCountAllocationStrategyEnabled(), + false), booleanProperty( SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "Increases hash partition count by scaling factor specified by spark.hash-partition-count-scaling-factor-on-out-of-memory if query fails due to low hash partition count", @@ -333,6 +345,16 @@ public static boolean isSparkResourceAllocationStrategyEnabled(Session session) return session.getSystemProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, Boolean.class); } + public static boolean isSparkExecutorAllocationStrategyEnabled(Session session) + { + return session.getSystemProperty(SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED, Boolean.class); + } + + public static boolean isSparkHashPartitionCountAllocationStrategyEnabled(Session session) + { + return session.getSystemProperty(SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED, Boolean.class); + } + public static boolean isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(Session session) { return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, Boolean.class); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java index bc9a447f46ef9..469871b5f98e4 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java @@ -17,7 +17,6 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; import com.facebook.presto.Session; -import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.common.Page; import com.facebook.presto.common.type.Type; import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker; @@ -65,7 +64,6 @@ import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.connector.ConnectorCapabilities; import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; @@ -91,7 +89,6 @@ import org.apache.spark.Partitioner; import org.apache.spark.ShuffleDependency; import org.apache.spark.SimpleFutureAction; -import org.apache.spark.SparkContext; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -126,7 +123,6 @@ import static com.facebook.presto.execution.scheduler.TableWriteInfo.createTableWriteInfo; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getSparkBroadcastJoinMaxMemoryOverride; import static com.facebook.presto.spark.PrestoSparkSessionProperties.isStorageBasedBroadcastJoinEnabled; -import static com.facebook.presto.spark.PrestoSparkSettingsRequirements.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG; import static com.facebook.presto.spark.SparkErrorCode.EXCEEDED_SPARK_DRIVER_MAX_RESULT_SIZE; import static com.facebook.presto.spark.SparkErrorCode.GENERIC_SPARK_ERROR; import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_LOST; @@ -640,17 +636,6 @@ protected ConnectorNodePartitioningProvider getPartitioningProvider(Partitioning return partitioningProviderManager.getPartitioningProvider(connectorId); } - protected int getHashPartitionCount(SparkContext sparkContext, QueryId queryId, Session session, PlanAndMore planAndMore) - { - int hashPartitionCount = SystemSessionProperties.getHashPartitionCount(session); - if (planAndMore.getPhysicalResourceSettings().isEnabled()) { - log.info(String.format("Setting optimized executor count to %d for query with id:%s", planAndMore.getPhysicalResourceSettings().getExecutorCount(), queryId.getId())); - sparkContext.conf().set(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG, Integer.toString(planAndMore.getPhysicalResourceSettings().getExecutorCount())); - hashPartitionCount = planAndMore.getPhysicalResourceSettings().getHashPartitionCount(); - } - return hashPartitionCount; - } - protected SubPlan configureOutputPartitioning(Session session, SubPlan subPlan, int hashPartitionCount) { PlanFragment fragment = subPlan.getFragment(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkStaticQueryExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkStaticQueryExecution.java index 012b01afe3274..60181e6e540c2 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkStaticQueryExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkStaticQueryExecution.java @@ -73,6 +73,7 @@ import static com.facebook.presto.execution.QueryState.PLANNING; import static com.facebook.presto.spark.PrestoSparkQueryExecutionFactory.createQueryInfo; import static com.facebook.presto.spark.PrestoSparkQueryExecutionFactory.createStageInfo; +import static com.facebook.presto.spark.PrestoSparkSettingsRequirements.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG; import static com.facebook.presto.spark.classloader_interface.ScalaUtils.collectScalaIterator; import static com.facebook.presto.spark.classloader_interface.ScalaUtils.emptyScalaIterator; import static com.facebook.presto.spark.util.PrestoSparkUtils.computeNextTimeout; @@ -163,6 +164,14 @@ protected List> doExecute( throws SparkException, TimeoutException { SubPlan rootFragmentedPlan = createFragmentedPlan(); + + // executor allocation is currently only supported at root level of the plan + // in future this could be extended to fragment level configuration + if (planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().isPresent()) { + sparkContext.sc().conf().set(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG, + Integer.toString(planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().getAsInt())); + } + setFinalFragmentedPlan(rootFragmentedPlan); TableWriteInfo tableWriteInfo = getTableWriteInfo(session, rootFragmentedPlan); PlanFragment rootFragment = rootFragmentedPlan.getFragment(); @@ -256,7 +265,7 @@ public SubPlan createFragmentedPlan() warningCollector)); log.info(textDistributedPlan(rootFragmentedPlan, metadata.getFunctionAndTypeManager(), session, true)); - int hashPartitionCount = getHashPartitionCount(sparkContext.sc(), session.getQueryId(), session, planAndMore); + int hashPartitionCount = planAndMore.getPhysicalResourceSettings().getHashPartitionCount(); rootFragmentedPlan = configureOutputPartitioning(session, rootFragmentedPlan, hashPartitionCount); return rootFragmentedPlan; } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkQueryPlanner.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkQueryPlanner.java index edee558d8d179..27abd2a0bd0ce 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkQueryPlanner.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkQueryPlanner.java @@ -24,6 +24,7 @@ import com.facebook.presto.security.AccessControl; import com.facebook.presto.spark.PhysicalResourceSettings; import com.facebook.presto.spark.PrestoSparkPhysicalResourceCalculator; +import com.facebook.presto.spark.PrestoSparkSourceStatsCollector; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; import com.facebook.presto.sql.analyzer.Analysis; @@ -122,7 +123,8 @@ public PlanAndMore createQueryPlan(Session session, BuiltInPreparedQuery prepare Optional output = new OutputExtractor().extractOutput(plan.getRoot()); Optional queryType = getQueryType(preparedQuery.getStatement().getClass()); List columnNames = ((OutputNode) plan.getRoot()).getColumnNames(); - PhysicalResourceSettings physicalResourceSettings = new PrestoSparkPhysicalResourceCalculator().calculate(plan.getRoot(), metadata, session); + PhysicalResourceSettings physicalResourceSettings = new PrestoSparkPhysicalResourceCalculator() + .calculate(plan.getRoot(), new PrestoSparkSourceStatsCollector(metadata, session), session); return new PlanAndMore( plan, Optional.ofNullable(analysis.getUpdateType()), diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkConfig.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkConfig.java index e8ca15f720491..cdfe206c472d0 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkConfig.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkConfig.java @@ -59,7 +59,9 @@ public void testDefaults() .setSparkResourceAllocationStrategyEnabled(false) .setRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(false) .setHashPartitionCountScalingFactorOnOutOfMemory(2.0) - .setAdaptiveJoinSideSwitchingEnabled(false)); + .setAdaptiveJoinSideSwitchingEnabled(false) + .setExecutorAllocationStrategyEnabled(false) + .setHashPartitionCountAllocationStrategyEnabled(false)); } @Test @@ -94,6 +96,8 @@ public void testExplicitPropertyMappings() .put("spark.retry-on-out-of-memory-higher-hash-partition-count-enabled", "true") .put("spark.hash-partition-count-scaling-factor-on-out-of-memory", "5.6") .put("optimizer.adaptive-join-side-switching-enabled", "true") + .put("spark.executor-allocation-strategy-enabled", "true") + .put("spark.hash-partition-count-allocation-strategy-enabled", "true") .build(); PrestoSparkConfig expected = new PrestoSparkConfig() .setSparkPartitionCountAutoTuneEnabled(false) @@ -123,7 +127,9 @@ public void testExplicitPropertyMappings() .setSparkResourceAllocationStrategyEnabled(true) .setRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(true) .setHashPartitionCountScalingFactorOnOutOfMemory(5.6) - .setAdaptiveJoinSideSwitchingEnabled(true); + .setAdaptiveJoinSideSwitchingEnabled(true) + .setHashPartitionCountAllocationStrategyEnabled(true) + .setExecutorAllocationStrategyEnabled(true); assertFullMapping(properties, expected); } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestPrestoSparkPhysicalResourceAllocationStrategy.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestPrestoSparkPhysicalResourceAllocationStrategy.java new file mode 100644 index 0000000000000..fd25bc1c8bde4 --- /dev/null +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestPrestoSparkPhysicalResourceAllocationStrategy.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spark.planner; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.AbstractMockMetadata; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.spark.PhysicalResourceSettings; +import com.facebook.presto.spark.PrestoSparkPhysicalResourceCalculator; +import com.facebook.presto.spark.PrestoSparkSourceStatsCollector; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.spi.session.PropertyMetadata; +import com.facebook.presto.spi.statistics.Estimate; +import com.facebook.presto.spi.statistics.TableStatistics; +import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; +import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.testing.TestingMetadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MAX_EXECUTOR_COUNT; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MAX_HASH_PARTITION_COUNT; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MIN_EXECUTOR_COUNT; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_MIN_HASH_PARTITION_COUNT; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class TestPrestoSparkPhysicalResourceAllocationStrategy +{ + // mocked metadata with table statistics generating random estimate count for the purpose of testing + // no other method is stubbed so will likely throw UnsupportedOperationException + private static class MockedMetadata + extends AbstractMockMetadata + { + @Override + public TableStatistics getTableStatistics(Session session, TableHandle tableHandle, List columnHandles, Constraint constraint) + { + return TableStatistics.builder().setRowCount(Estimate.of(100)).setTotalSize(Estimate.of(1000)).build(); + } + } + + // default properties passed as part of system property + private static final PropertyMetadata[] defaultPropertyMetadata = new PropertyMetadata[] { + PropertyMetadata.integerProperty(SPARK_MIN_EXECUTOR_COUNT, "SPARK_MIN_EXECUTOR_COUNT", 10, false), + PropertyMetadata.integerProperty(SPARK_MAX_EXECUTOR_COUNT, "SPARK_MAX_EXECUTOR_COUNT", 1000, false), + PropertyMetadata.integerProperty(SPARK_MIN_HASH_PARTITION_COUNT, "SPARK_MIN_HASH_PARTITION_COUNT", 10, false), + PropertyMetadata.integerProperty(SPARK_MAX_HASH_PARTITION_COUNT, "SPARK_MAX_HASH_PARTITION_COUNT", 1000, false), + PropertyMetadata.dataSizeProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, "SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR", new DataSize(200, DataSize.Unit.BYTE), false), + PropertyMetadata.dataSizeProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION, "SPARK_AVERAGE_INPUT_DATA_SIZE_PER_PARTITION", new DataSize(100, DataSize.Unit.BYTE), false), + PropertyMetadata.integerProperty(HASH_PARTITION_COUNT, "HASH_PARTITION_COUNT", 150, false) + }; + // system property with allocation based tuning enabled + private static final Session testSessionWithAllocation = testSessionBuilder(new SessionPropertyManager( + new ImmutableList.Builder>().add(defaultPropertyMetadata).add( + PropertyMetadata.booleanProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, "SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED", true, false) + ).build())).build(); + // system property with allocation based tuning disabled + private static final Session testSessionWithoutAllocation = testSessionBuilder(new SessionPropertyManager( + new ImmutableList.Builder>().add(defaultPropertyMetadata).add( + PropertyMetadata.booleanProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, "SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED", false, false), + PropertyMetadata.booleanProperty(SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED, "SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED", false, false), + PropertyMetadata.booleanProperty(SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED, "SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED", false, false) + ).build())).build(); + private static final Metadata mockedMetadata = new MockedMetadata(); + + /** + * Return any plan node, the node does not even need to be "correct", + * only used for the purpose of traversing and estimating the source stats + */ + private PlanNode getPlanToTest(Session session, Metadata metadata) + { + PlanBuilder planBuilder = new PlanBuilder(session, new PlanNodeIdAllocator(), metadata); + VariableReferenceExpression sourceJoin = planBuilder.variable("sourceJoin"); + + TableScanNode a = planBuilder.tableScan(ImmutableList.of(sourceJoin), ImmutableMap.of(sourceJoin, new TestingMetadata.TestingColumnHandle("sourceJoin"))); + VariableReferenceExpression filteringSource = planBuilder.variable("filteringSource"); + TableScanNode b = planBuilder.tableScan(ImmutableList.of(filteringSource), ImmutableMap.of(filteringSource, new TestingMetadata.TestingColumnHandle("filteringSource"))); + + return planBuilder.join(JoinNode.Type.LEFT, a, b); + } + + @Test + public void testHashPartitionCountAllocationStrategy() + { + PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector = new PrestoSparkSourceStatsCollector(mockedMetadata, testSessionWithAllocation); + PlanNode nodeToTest = getPlanToTest(testSessionWithAllocation, mockedMetadata); + + PhysicalResourceSettings settingsHolder = new PrestoSparkPhysicalResourceCalculator() + .calculate(nodeToTest, prestoSparkSourceStatsCollector, testSessionWithAllocation); + assertEquals(settingsHolder.getHashPartitionCount(), 20); + assertEquals(settingsHolder.getMaxExecutorCount().getAsInt(), 10); + } + + @Test + public void testHashPartitionCountWithoutAllocationStrategy() + { + PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector = new PrestoSparkSourceStatsCollector(mockedMetadata, testSessionWithoutAllocation); + PlanNode nodeToTest = getPlanToTest(testSessionWithoutAllocation, mockedMetadata); + + PhysicalResourceSettings settingsHolder = new PrestoSparkPhysicalResourceCalculator() + .calculate(nodeToTest, prestoSparkSourceStatsCollector, testSessionWithoutAllocation); + assertEquals(settingsHolder.getHashPartitionCount(), 150); + assertFalse(settingsHolder.getMaxExecutorCount().isPresent()); + } +}