Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,24 @@ Spilling Properties

This config property can be overridden by the ``join_spill_enabled`` session property.

``experimental.aggregation-spill-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for aggregations to
avoid exceeding memory limits for the query.

This config property can be overridden by the ``aggregation_spill_enabled`` session property.

``experimental.distinct-aggregation-spill-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for distinct
When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for distinct
aggregations to avoid exceeding memory limits for the query.

This config property can be overridden by the ``distinct_aggregation_spill_enabled`` session property.
Expand All @@ -179,11 +190,33 @@ Spilling Properties
* **Type:** ``boolean``
* **Default value:** ``true``

When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by
When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by
aggregations to avoid exceeding memory limits for the query.

This config property can be overridden by the ``order_by_aggregation_spill_enabled`` session property.

``experimental.window-spill-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for window functions to
avoid exceeding memory limits for the query.

This config property can be overridden by the ``window_spill_enabled`` session property.

``experimental.order-by-spill-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by to
avoid exceeding memory limits for the query.

This config property can be overridden by the ``order_by_spill_enabled`` session property.

``experimental.spiller.task-spilling-strategy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* **Type:** ``string``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ public final class SystemSessionProperties
public static final String QUERY_PRIORITY = "query_priority";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled";
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled";
public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled";
public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled";
public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit";
public static final String QUERY_MAX_REVOCABLE_MEMORY_PER_NODE = "query_max_revocable_memory_per_node";
public static final String TEMP_STORAGE_SPILLER_BUFFER_SIZE = "temp_storage_spiller_buffer_size";
Expand Down Expand Up @@ -602,16 +605,31 @@ public SystemSessionProperties(
"Enable join spilling",
featuresConfig.isJoinSpillingEnabled(),
false),
booleanProperty(
AGGREGATION_SPILL_ENABLED,
"Enable aggregate spilling if spill_enabled",
featuresConfig.isAggregationSpillEnabled(),
false),
booleanProperty(
DISTINCT_AGGREGATION_SPILL_ENABLED,
"Enable spill for distinct aggregations if spill_enabled",
"Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled",
featuresConfig.isDistinctAggregationSpillEnabled(),
false),
booleanProperty(
ORDER_BY_AGGREGATION_SPILL_ENABLED,
"Enable spill for order-by aggregations if spill_enabled",
"Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled",
featuresConfig.isOrderByAggregationSpillEnabled(),
false),
booleanProperty(
WINDOW_SPILL_ENABLED,
"Enable window spilling if spill_enabled",
featuresConfig.isWindowSpillEnabled(),
false),
booleanProperty(
ORDER_BY_SPILL_ENABLED,
"Enable order by spilling if spill_enabled",
featuresConfig.isOrderBySpillEnabled(),
false),
new PropertyMetadata<>(
AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT,
"Experimental: How much memory can should be allocated per aggragation operator in unspilling process",
Expand Down Expand Up @@ -1394,14 +1412,29 @@ public static boolean isJoinSpillingEnabled(Session session)
return session.getSystemProperty(JOIN_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static boolean isAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static boolean isDistinctAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
}

public static boolean isOrderByAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
}

public static boolean isWindowSpillEnabled(Session session)
{
return session.getSystemProperty(WINDOW_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static boolean isOrderBySpillEnabled(Session session)
{
return session.getSystemProperty(ORDER_BY_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static DataSize getAggregationOperatorUnspillMemoryLimit(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ public class FeaturesConfig
private MultimapAggGroupImplementation multimapAggGroupImplementation = MultimapAggGroupImplementation.NEW;
private boolean spillEnabled;
private boolean joinSpillingEnabled = true;
private boolean aggregationSpillEnabled = true;
private boolean distinctAggregationSpillEnabled = true;
private boolean orderByAggregationSpillEnabled = true;
private boolean windowSpillEnabled = true;
private boolean orderBySpillEnabled = true;
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
Expand Down Expand Up @@ -903,8 +906,21 @@ public FeaturesConfig setJoinSpillingEnabled(boolean joinSpillingEnabled)
return this;
}

@Config("experimental.aggregation-spill-enabled")
@ConfigDescription("Spill aggregations if spill is enabled")
public FeaturesConfig setAggregationSpillEnabled(boolean aggregationSpillEnabled)
{
this.aggregationSpillEnabled = aggregationSpillEnabled;
return this;
}

public boolean isAggregationSpillEnabled()
{
return aggregationSpillEnabled;
}

@Config("experimental.distinct-aggregation-spill-enabled")
@ConfigDescription("Spill distinct aggregations if spill is enabled")
@ConfigDescription("Spill distinct aggregations if aggregation spill is enabled")
public FeaturesConfig setDistinctAggregationSpillEnabled(boolean distinctAggregationSpillEnabled)
{
this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled;
Expand All @@ -917,7 +933,7 @@ public boolean isDistinctAggregationSpillEnabled()
}

@Config("experimental.order-by-aggregation-spill-enabled")
@ConfigDescription("Spill order-by aggregations if spill is enabled")
@ConfigDescription("Spill order-by aggregations if aggregation spill is enabled")
public FeaturesConfig setOrderByAggregationSpillEnabled(boolean orderByAggregationSpillEnabled)
{
this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled;
Expand All @@ -929,6 +945,32 @@ public boolean isOrderByAggregationSpillEnabled()
return orderByAggregationSpillEnabled;
}

@Config("experimental.window-spill-enabled")
@ConfigDescription("Enable Window Operator Spilling if spill is enabled")
public FeaturesConfig setWindowSpillEnabled(boolean windowSpillEnabled)
{
this.windowSpillEnabled = windowSpillEnabled;
return this;
}

public boolean isWindowSpillEnabled()
{
return windowSpillEnabled;
}

@Config("experimental.order-by-spill-enabled")
@ConfigDescription("Enable Order-by Operator Spilling if spill is enabled")
public FeaturesConfig setOrderBySpillEnabled(boolean orderBySpillEnabled)
{
this.orderBySpillEnabled = orderBySpillEnabled;
return this;
}

public boolean isOrderBySpillEnabled()
{
return orderBySpillEnabled;
}

public boolean isIterativeOptimizerEnabled()
{
return iterativeOptimizerEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isAggregationSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isDistinctAggregationSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnableDynamicFiltering;
import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled;
Expand All @@ -261,7 +262,9 @@
import static com.facebook.presto.SystemSessionProperties.isOptimizeCommonSubExpressions;
import static com.facebook.presto.SystemSessionProperties.isOptimizedRepartitioningEnabled;
import static com.facebook.presto.SystemSessionProperties.isOrderByAggregationSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isOrderBySpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isWindowSpillEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.common.type.TypeUtils.writeNativeValue;
Expand Down Expand Up @@ -1085,7 +1088,7 @@ public PhysicalOperation visitWindow(WindowNode node, LocalExecutionPlanContext
node.getPreSortedOrderPrefix(),
10_000,
pagesIndexFactory,
isSpillEnabled(session),
isWindowSpillEnabled(session),
spillerFactory,
orderingCompiler);

Expand Down Expand Up @@ -1136,7 +1139,7 @@ public PhysicalOperation visitSort(SortNode node, LocalExecutionPlanContext cont
outputChannels.add(i);
}

boolean spillEnabled = isSpillEnabled(context.getSession());
boolean spillEnabled = isOrderBySpillEnabled(context.getSession());

OperatorFactory operator = new OrderByOperatorFactory(
context.getNextOperatorId(),
Expand Down Expand Up @@ -1247,7 +1250,7 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl
return planGroupByAggregation(
node,
source,
spillEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove the spillEnabled variable now since its unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : This variable is already removed. We now only pass isAggregationSpillEnabled flag along with isDistinctAggregationSpillEnabled and isOrderByAggregationSpillEnabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry got it confused due to collapsed code in github. Looks good.

isAggregationSpillEnabled(session),
isDistinctAggregationSpillEnabled(session),
isOrderByAggregationSpillEnabled(session),
unspillMemoryLimit,
Expand Down Expand Up @@ -3043,7 +3046,7 @@ private AggregationOperatorFactory createAggregationOperatorFactory(
private PhysicalOperation planGroupByAggregation(
AggregationNode node,
PhysicalOperation source,
boolean spillEnabled,
boolean aggregationSpillEnabled,
boolean distinctAggregationSpillEnabled,
boolean orderByAggregationSpillEnabled,
DataSize unspillMemoryLimit,
Expand All @@ -3060,7 +3063,7 @@ private PhysicalOperation planGroupByAggregation(
node.getGroupIdVariable(),
source,
node.hasDefaultOutput(),
spillEnabled,
aggregationSpillEnabled,
distinctAggregationSpillEnabled,
orderByAggregationSpillEnabled,
node.isStreamable(),
Expand All @@ -3084,7 +3087,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
Optional<VariableReferenceExpression> groupIdVariable,
PhysicalOperation source,
boolean hasDefaultOutput,
boolean spillEnabled,
boolean aggregationSpillEnabled,
boolean distinctSpillEnabled,
boolean orderBySpillEnabled,
boolean isStreamable,
Expand All @@ -3098,7 +3101,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
{
List<VariableReferenceExpression> aggregationOutputVariables = new ArrayList<>();
List<AccumulatorFactory> accumulatorFactories = new ArrayList<>();
boolean useSpill = spillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled);
boolean useSpill = aggregationSpillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled);
for (Map.Entry<VariableReferenceExpression, Aggregation> entry : aggregations.entrySet()) {
VariableReferenceExpression variable = entry.getKey();
Aggregation aggregation = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ public void testDefaults()
.setRe2JDfaRetries(5)
.setSpillEnabled(false)
.setJoinSpillingEnabled(true)
.setAggregationSpillEnabled(true)
.setDistinctAggregationSpillEnabled(true)
.setOrderByAggregationSpillEnabled(true)
.setWindowSpillEnabled(true)
.setOrderBySpillEnabled(true)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB"))
.setSpillerSpillPaths("")
.setSpillerThreads(4)
Expand Down Expand Up @@ -236,8 +239,11 @@ public void testExplicitPropertyMappings()
.put("re2j.dfa-retries", "42")
.put("experimental.spill-enabled", "true")
.put("experimental.join-spill-enabled", "false")
.put("experimental.aggregation-spill-enabled", "false")
.put("experimental.distinct-aggregation-spill-enabled", "false")
.put("experimental.order-by-aggregation-spill-enabled", "false")
.put("experimental.window-spill-enabled", "false")
.put("experimental.order-by-spill-enabled", "false")
.put("experimental.aggregation-operator-unspill-memory-limit", "100MB")
.put("experimental.spiller-spill-path", "/tmp/custom/spill/path1,/tmp/custom/spill/path2")
.put("experimental.spiller-threads", "42")
Expand Down Expand Up @@ -355,8 +361,11 @@ public void testExplicitPropertyMappings()
.setRe2JDfaRetries(42)
.setSpillEnabled(true)
.setJoinSpillingEnabled(false)
.setAggregationSpillEnabled(false)
.setDistinctAggregationSpillEnabled(false)
.setOrderByAggregationSpillEnabled(false)
.setWindowSpillEnabled(false)
.setOrderBySpillEnabled(false)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB"))
.setSpillerSpillPaths("/tmp/custom/spill/path1,/tmp/custom/spill/path2")
.setSpillerThreads(42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.DISTINCT_AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.ORDER_BY_AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE;
Expand Down Expand Up @@ -113,4 +114,24 @@ public void testMultipleDistinctAggregations()
{
assertQuery("SELECT custkey, count(DISTINCT orderpriority), count(DISTINCT orderstatus), count(DISTINCT totalprice), count(DISTINCT clerk) FROM orders GROUP BY custkey");
}

@Test
public void testDoesNotSpillWhenAggregationSpillDisabled()
{
Session session = Session.builder(getSession())
.setSystemProperty(AGGREGATION_SPILL_ENABLED, "false")
// This will not spill even when distinct/orderBy Spill is enabled since aggregationSpill is disabled above
.setSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, "true")
.setSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, "true")
// set this low so that if we ran with spill the query would fail
.setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B")
.build();

assertQuery(session,
"SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey");

// the sum() is necessary so that the aggregation isn't optimized into multiple aggregation nodes
assertQuery(session,
"SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FROM orders GROUP BY custkey");
}
}
Loading