Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ public final class SystemSessionProperties
public static final String FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT = "fault_tolerant_execution_max_partition_count";
Comment thread
linzebing marked this conversation as resolved.
Outdated
public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT = "fault_tolerant_execution_min_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE = "fault_tolerant_execution_min_partition_count_for_write";
public static final String FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_ENABLED = "fault_tolerant_execution_runtime_adaptive_partitioning_enabled";
public static final String FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_PARTITION_COUNT = "fault_tolerant_execution_runtime_adaptive_partitioning_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_MAX_TASK_SIZE = "fault_tolerant_execution_runtime_adaptive_partitioning_max_task_size";
public static final String FAULT_TOLERANT_EXECUTION_MIN_SOURCE_STAGE_PROGRESS = "fault_tolerant_execution_min_source_stage_progress";
private static final String FAULT_TOLERANT_EXECUTION_SMALL_STAGE_ESTIMATION_ENABLED = "fault_tolerant_execution_small_stage_estimation_enabled";
private static final String FAULT_TOLERANT_EXECUTION_SMALL_STAGE_ESTIMATION_THRESHOLD = "fault_tolerant_execution_small_stage_estimation_threshold";
Expand Down Expand Up @@ -949,6 +952,22 @@ public SystemSessionProperties(
queryManagerConfig.getFaultTolerantExecutionMinPartitionCountForWrite(),
value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, 1, FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT, false),
false),
booleanProperty(
FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_ENABLED,
"Enables change of number of partitions at runtime when intermediate data size is large",
queryManagerConfig.isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled(),
true),
integerProperty(
FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_PARTITION_COUNT,
"The partition count to use for runtime adaptive partitioning when enabled",
queryManagerConfig.getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(),
value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_PARTITION_COUNT, 1, FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT, false),
true),
dataSizeProperty(
FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_MAX_TASK_SIZE,
"Max average task input size when deciding runtime adaptive partitioning",
queryManagerConfig.getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(),
true),
doubleProperty(
FAULT_TOLERANT_EXECUTION_MIN_SOURCE_STAGE_PROGRESS,
"Minimal progress of source stage to consider scheduling of parent stage",
Expand Down Expand Up @@ -1790,6 +1809,21 @@ public static int getFaultTolerantExecutionMinPartitionCountForWrite(Session ses
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, Integer.class);
}

public static boolean isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_ENABLED, Boolean.class);
}

public static int getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_PARTITION_COUNT, Integer.class);
}

public static DataSize getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_RUNTIME_ADAPTIVE_PARTITIONING_MAX_TASK_SIZE, DataSize.class);
}

public static double getFaultTolerantExecutionMinSourceStageProgress(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_SOURCE_STAGE_PROGRESS, Double.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public class QueryManagerConfig
private int faultTolerantExecutionMaxPartitionCount = 50;
private int faultTolerantExecutionMinPartitionCount = 4;
private int faultTolerantExecutionMinPartitionCountForWrite = 50;
private boolean faultTolerantExecutionRuntimeAdaptivePartitioningEnabled;
private int faultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount = FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT;
// Currently, initial setup is 5GB of task memory processing 4GB data. Given that we triple the memory in case of
// task OOM, max task size is set to 12GB such that tasks of stages below threshold will succeed within one retry.
private DataSize faultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize = DataSize.of(12, GIGABYTE);
Comment thread
linzebing marked this conversation as resolved.
Outdated
private boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled = true;
private double faultTolerantExecutionMinSourceStageProgress = 0.2;

Expand Down Expand Up @@ -965,6 +970,46 @@ public QueryManagerConfig setFaultTolerantExecutionMinPartitionCountForWrite(int
return this;
}

public boolean isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled()
{
return faultTolerantExecutionRuntimeAdaptivePartitioningEnabled;
}

@Config("fault-tolerant-execution-runtime-adaptive-partitioning-enabled")
public QueryManagerConfig setFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled(boolean faultTolerantExecutionRuntimeAdaptivePartitioningEnabled)
{
this.faultTolerantExecutionRuntimeAdaptivePartitioningEnabled = faultTolerantExecutionRuntimeAdaptivePartitioningEnabled;
return this;
}

@Min(1)
@Max(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT)
public int getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount()
{
return faultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount;
}

@Config("fault-tolerant-execution-runtime-adaptive-partitioning-partition-count")
@ConfigDescription("The partition count to use for runtime adaptive partitioning when enabled")
public QueryManagerConfig setFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(int faultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount)
{
this.faultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount = faultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount;
return this;
}

public DataSize getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize()
{
return faultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize;
}

@Config("fault-tolerant-execution-runtime-adaptive-partitioning-max-task-size")
@ConfigDescription("Max average task input size when deciding runtime adaptive partitioning")
public QueryManagerConfig setFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(DataSize faultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize)
{
this.faultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize = faultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize;
return this;
}

public boolean isFaultTolerantExecutionForcePreferredWritePartitioningEnabled()
{
return faultTolerantExecutionForcePreferredWritePartitioningEnabled;
Expand Down
Loading