Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,25 @@
*/
package com.facebook.presto.spark;

import java.util.OptionalInt;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;

/**
* Class for storing physical resource settings.
* Resource settings could have conflicting overrides coming from either presto or spark.
* The class holds the final values that needs to be applied to the query.
*/
public class PhysicalResourceSettings
{
public static final PhysicalResourceSettings DISABLED_PHYSICAL_RESOURCE_SETTING = new PhysicalResourceSettings(0, 0, () -> {});

private final int executorCount;
private final OptionalInt maxExecutorCount;
private final int hashPartitionCount;

public PhysicalResourceSettings(int executorCount, int hashPartitionCount)
{
this(
executorCount,
hashPartitionCount,
() -> checkArgument(executorCount >= 0 && hashPartitionCount >= 0, "executorCount and hashPartitionCount should be positive"));
}

public PhysicalResourceSettings(int executorCount, int hashPartitionCount, Runnable check)
public PhysicalResourceSettings(int hashPartitionCount, OptionalInt maxExecutorCount)
{
check.run();
this.executorCount = executorCount;
checkArgument(maxExecutorCount.orElse(0) >= 0 && hashPartitionCount >= 0, "executorCount and hashPartitionCount should be positive");
this.maxExecutorCount = maxExecutorCount;
this.hashPartitionCount = hashPartitionCount;
}

Expand All @@ -42,13 +40,22 @@ public int getHashPartitionCount()
return hashPartitionCount;
}

public int getExecutorCount()
/**
* maxExecutorCount is an optional field, the value is based on resource allocation tuning property
* An empty value, means resource tuning is disabled and values from SparkConf will be used.
* When not empty, resource tuning is enabled and calculated by {@link PrestoSparkPhysicalResourceCalculator} based on the query plan
*/
public OptionalInt getMaxExecutorCount()
{
return executorCount;
return maxExecutorCount;
}

public boolean isEnabled()
@Override
public String toString()
{
return ((executorCount > 0) && (hashPartitionCount > 0));
return toStringHelper(this)
.add("maxExecutorCount", maxExecutorCount)
.add("hashPartitionCount", hashPartitionCount)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public class PrestoSparkConfig
private DataSize averageInputDataSizePerPartition = new DataSize(2, GIGABYTE);
private int maxHashPartitionCount = 4096;
private int minHashPartitionCount = 1024;
private boolean isResourceAllocationStrategyEnabled;

private boolean adaptiveJoinSideSwitchingEnabled;
private boolean resourceAllocationStrategyEnabled;
private boolean executorAllocationStrategyEnabled;
private boolean hashPartitionCountAllocationStrategyEnabled;

public boolean isSparkPartitionCountAutoTuneEnabled()
{
Expand Down Expand Up @@ -385,14 +386,14 @@ public PrestoSparkConfig setMinHashPartitionCount(int minHashPartitionCount)

public boolean isSparkResourceAllocationStrategyEnabled()
{
return isResourceAllocationStrategyEnabled;
return resourceAllocationStrategyEnabled;
}

@Config("spark.resource-allocation-strategy-enabled")
@ConfigDescription("Determines whether the resource allocation strategy for executor and partition count is enabled")
public PrestoSparkConfig setSparkResourceAllocationStrategyEnabled(boolean isResourceAllocationStrategyEnabled)
public PrestoSparkConfig setSparkResourceAllocationStrategyEnabled(boolean resourceAllocationStrategyEnabled)
{
this.isResourceAllocationStrategyEnabled = isResourceAllocationStrategyEnabled;
this.resourceAllocationStrategyEnabled = resourceAllocationStrategyEnabled;
return this;
}

Expand Down Expand Up @@ -436,4 +437,30 @@ public PrestoSparkConfig setAdaptiveJoinSideSwitchingEnabled(boolean adaptiveJoi
this.adaptiveJoinSideSwitchingEnabled = adaptiveJoinSideSwitchingEnabled;
return this;
}

public boolean isExecutorAllocationStrategyEnabled()
{
return executorAllocationStrategyEnabled;
}

@Config("spark.executor-allocation-strategy-enabled")
@ConfigDescription("Determines whether the executor allocation strategy is enabled. This will be suppressed if used alongside spark.dynamicAllocation.maxExecutors")
public PrestoSparkConfig setExecutorAllocationStrategyEnabled(boolean executorAllocationStrategyEnabled)
{
this.executorAllocationStrategyEnabled = executorAllocationStrategyEnabled;
return this;
}

public boolean isHashPartitionCountAllocationStrategyEnabled()
{
return hashPartitionCountAllocationStrategyEnabled;
}

@Config("spark.hash-partition-count-allocation-strategy-enabled")
@ConfigDescription("Determines whether the hash partition count strategy is enabled. This will be suppressed if used alongside hash_partition_count")
public PrestoSparkConfig setHashPartitionCountAllocationStrategyEnabled(boolean hashPartitionCountAllocationStrategyEnabled)
{
this.hashPartitionCountAllocationStrategyEnabled = hashPartitionCountAllocationStrategyEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.plan.PlanNode;
import io.airlift.units.DataSize;

import static com.facebook.presto.spark.PhysicalResourceSettings.DISABLED_PHYSICAL_RESOURCE_SETTING;
import java.util.OptionalInt;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerExecutor;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerPartition;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMaxExecutorCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMaxHashPartitionCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMinExecutorCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMinHashPartitionCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkExecutorAllocationStrategyEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkHashPartitionCountAllocationStrategyEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isSparkResourceAllocationStrategyEnabled;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand All @@ -34,29 +38,70 @@ public class PrestoSparkPhysicalResourceCalculator
{
private static final Logger log = Logger.get(PrestoSparkPhysicalResourceCalculator.class);

public PhysicalResourceSettings calculate(PlanNode plan, Metadata metaData, Session session)
/**
* Calculates the final resource settings for the query. This takes into account all override values
* with the following precedence:
* <ul>
* <li>
* Session property enabling allocation strategy {@link PrestoSparkSessionProperties#SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED}
* or {@link PrestoSparkSessionProperties#SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED}.
* If {@link PrestoSparkSessionProperties#SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED} is enabled,
* both the properties will implicitly be enabled.
* </li>
* <li>
* Session property with explicit value for {@link SystemSessionProperties#HASH_PARTITION_COUNT}
* or {@link PrestoSparkSettingsRequirements#SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG}
* </li>
* <li>
* System property as provided by {@link SystemSessionProperties#HASH_PARTITION_COUNT}
* or {@link PrestoSparkSettingsRequirements#SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG}
* </li>
* </ul>
*
*/
public PhysicalResourceSettings calculate(PlanNode plan, PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector, Session session)
{
if (!isSparkResourceAllocationStrategyEnabled(session)) {
return new PhysicalResourceSettings(0, 0);
}
int hashPartitionCount = getHashPartitionCount(session);
OptionalInt maxExecutorCount = OptionalInt.empty();
PhysicalResourceSettings defaultResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount);

double inputDataInBytes = new PrestoSparkSourceStatsCollector(metaData, session).collectSourceStats(plan);
if (!anyAllocationStrategyEnabled(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do the check for the main property here: isSparkResourceAllocationStrategyEnabled(session) and if its disabled, we return back. This is basically a single switch to disable allocation strategy completely.

Basically, my thinking is that spark_resource_allocation_strategy_enabled control this whole logic. If this is disabled, we skip this logic completely. If spark_resource_allocation_strategy_enabled is true, then only we check for individual strategy properties and trigger the one that is enabled.

I know this is bit weird. Ideally, we should just remove spark_resource_allocation_strategy_enabled session property to make this logic cleaner

@highker : What is the right approach of deprecating/removing a property?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the right approach of deprecating/removing a property?

  • We use @DefunctConfig annotation in a config class. Usually, we also rename the property to deprecated.XXX. Check FeatureConfig for examples. We don't usually delete the property directly.
  • We have release note to tell users what config has the deprecated one migrated to

log.info(String.format("ResourceAllocationStrategy disabled. Executing query %s with %s", session.getQueryId(), defaultResourceSettings));
return defaultResourceSettings;
}

double inputDataInBytes = prestoSparkSourceStatsCollector.collectSourceStats(plan);
DataSize inputSize = new DataSize(inputDataInBytes, BYTE);
if (inputDataInBytes < 0) {
log.warn(String.format("Input data statistics missing, inputDataInBytes=%.2f skipping automatic resource tuning.", inputDataInBytes));
return DISABLED_PHYSICAL_RESOURCE_SETTING;
log.warn(String.format("Input data statistics missing, inputDataInBytes=%.2f skipping automatic resource tuning. Executing query %s with %s",
inputDataInBytes, session.getQueryId(), defaultResourceSettings));
return defaultResourceSettings;
}
else if (Double.isNaN(inputDataInBytes)) {
log.warn(String.format("Failed to retrieve correct size, inputDataInBytes=%.2f skipping automatic resource tuning. Executing query %s with %s",
inputDataInBytes, session.getQueryId(), defaultResourceSettings));
return defaultResourceSettings;
}
// update hashPartitionCount only if resource allocation or hash partition allocation is enabled
if (isSparkResourceAllocationStrategyEnabled(session) || isSparkHashPartitionCountAllocationStrategyEnabled(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for isSparkResourceAllocationStrategyEnabled(session) can be removed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will deprecate this config in subsequent PR.

Copy link
Contributor

@pgupta2 pgupta2 Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it an AND condition? That will give us the ability to control individual optimizations separately. Else if isSparkResourceAllocationStrategyEnabled(session) is true, both optimizations will run regardless.

hashPartitionCount = calculateHashPartitionCount(session, inputSize);
}

if ((inputDataInBytes > Double.MAX_VALUE) || (Double.isNaN(inputDataInBytes))) {
log.warn(String.format("Failed to retrieve correct size, data read=%.2f, skipping automatic resource tuning.", inputDataInBytes));
return DISABLED_PHYSICAL_RESOURCE_SETTING;
// update maxExecutorCount only if resource allocation or executor allocation is enabled
if (isSparkResourceAllocationStrategyEnabled(session) || isSparkExecutorAllocationStrategyEnabled(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will deprecate this config in subsequent PR.

maxExecutorCount = OptionalInt.of(calculateExecutorCount(session, inputSize));
}
PhysicalResourceSettings finalResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount);

DataSize inputSize = new DataSize(inputDataInBytes, BYTE);
int executorCount = calculateExecutorCount(session, inputSize);
int hashPartitionCount = calculateHashPartitionCount(session, inputSize);
log.info(String.format("Executing query %s with %s based on resource allocation strategy", session.getQueryId(), finalResourceSettings));
return finalResourceSettings;
}

return new PhysicalResourceSettings(executorCount, hashPartitionCount);
private static boolean anyAllocationStrategyEnabled(Session session)
{
return isSparkResourceAllocationStrategyEnabled(session)
|| isSparkExecutorAllocationStrategyEnabled(session)
|| isSparkHashPartitionCountAllocationStrategyEnabled(session);
}

private static int calculateExecutorCount(Session session, DataSize inputData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class PrestoSparkSessionProperties
public static final String SPARK_MAX_HASH_PARTITION_COUNT = "spark_max_hash_partition_count";
public static final String SPARK_MIN_HASH_PARTITION_COUNT = "spark_min_hash_partition_count";
public static final String SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED = "spark_resource_allocation_strategy_enabled";
public static final String SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED = "spark_executor_allocation_strategy_enabled";
public static final String SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED = "spark_hash_partition_count_allocation_strategy_enabled";
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED = "spark_retry_on_out_of_memory_higher_hash_partition_count_enabled";
public static final String SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY = "spark_hash_partition_count_scaling_factor_on_out_of_memory";
public static final String ADAPTIVE_JOIN_SIDE_SWITCHING_ENABLED = "adaptive_join_side_switching_enabled";
Expand Down Expand Up @@ -196,6 +198,16 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
"Flag to enable optimized resource allocation strategy",
prestoSparkConfig.isSparkResourceAllocationStrategyEnabled(),
false),
booleanProperty(
SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED,
"Flag to enable optimized executor allocation strategy",
prestoSparkConfig.isExecutorAllocationStrategyEnabled(),
false),
booleanProperty(
SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED,
"Flag to enable optimized hash partition count allocation strategy",
prestoSparkConfig.isHashPartitionCountAllocationStrategyEnabled(),
false),
booleanProperty(
SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED,
"Increases hash partition count by scaling factor specified by spark.hash-partition-count-scaling-factor-on-out-of-memory if query fails due to low hash partition count",
Expand Down Expand Up @@ -333,6 +345,16 @@ public static boolean isSparkResourceAllocationStrategyEnabled(Session session)
return session.getSystemProperty(SPARK_RESOURCE_ALLOCATION_STRATEGY_ENABLED, Boolean.class);
}

public static boolean isSparkExecutorAllocationStrategyEnabled(Session session)
{
return session.getSystemProperty(SPARK_EXECUTOR_ALLOCATION_STRATEGY_ENABLED, Boolean.class);
}

public static boolean isSparkHashPartitionCountAllocationStrategyEnabled(Session session)
{
return session.getSystemProperty(SPARK_HASH_PARTITION_COUNT_ALLOCATION_STRATEGY_ENABLED, Boolean.class);
}

public static boolean isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled(Session session)
{
return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker;
Expand Down Expand Up @@ -65,7 +64,6 @@
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorCapabilities;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
Expand All @@ -91,7 +89,6 @@
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -126,7 +123,6 @@
import static com.facebook.presto.execution.scheduler.TableWriteInfo.createTableWriteInfo;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getSparkBroadcastJoinMaxMemoryOverride;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isStorageBasedBroadcastJoinEnabled;
import static com.facebook.presto.spark.PrestoSparkSettingsRequirements.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG;
import static com.facebook.presto.spark.SparkErrorCode.EXCEEDED_SPARK_DRIVER_MAX_RESULT_SIZE;
import static com.facebook.presto.spark.SparkErrorCode.GENERIC_SPARK_ERROR;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_LOST;
Expand Down Expand Up @@ -640,17 +636,6 @@ protected ConnectorNodePartitioningProvider getPartitioningProvider(Partitioning
return partitioningProviderManager.getPartitioningProvider(connectorId);
}

protected int getHashPartitionCount(SparkContext sparkContext, QueryId queryId, Session session, PlanAndMore planAndMore)
{
int hashPartitionCount = SystemSessionProperties.getHashPartitionCount(session);
if (planAndMore.getPhysicalResourceSettings().isEnabled()) {
log.info(String.format("Setting optimized executor count to %d for query with id:%s", planAndMore.getPhysicalResourceSettings().getExecutorCount(), queryId.getId()));
sparkContext.conf().set(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG, Integer.toString(planAndMore.getPhysicalResourceSettings().getExecutorCount()));
hashPartitionCount = planAndMore.getPhysicalResourceSettings().getHashPartitionCount();
}
return hashPartitionCount;
}

protected SubPlan configureOutputPartitioning(Session session, SubPlan subPlan, int hashPartitionCount)
{
PlanFragment fragment = subPlan.getFragment();
Expand Down
Loading