Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public final class SystemSessionProperties
public static final String QUERY_PRIORITY = "query_priority";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled";
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled";
public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit";
Expand Down Expand Up @@ -602,14 +603,19 @@ public SystemSessionProperties(
"Enable join spilling",
featuresConfig.isJoinSpillingEnabled(),
false),
booleanProperty(
AGGREGATION_SPILL_ENABLED,
"Enable aggregate spilling if spill_enabled",
featuresConfig.isAggregationSpillEnabled(),
false),
booleanProperty(
DISTINCT_AGGREGATION_SPILL_ENABLED,
"Enable spill for distinct aggregations if spill_enabled",
"Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled",
featuresConfig.isDistinctAggregationSpillEnabled(),
false),
booleanProperty(
ORDER_BY_AGGREGATION_SPILL_ENABLED,
"Enable spill for order-by aggregations if spill_enabled",
"Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled",
featuresConfig.isOrderByAggregationSpillEnabled(),
false),
new PropertyMetadata<>(
Expand Down Expand Up @@ -1394,6 +1400,11 @@ public static boolean isJoinSpillingEnabled(Session session)
return session.getSystemProperty(JOIN_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static boolean isAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
}

public static boolean isDistinctAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public class FeaturesConfig
private MultimapAggGroupImplementation multimapAggGroupImplementation = MultimapAggGroupImplementation.NEW;
private boolean spillEnabled;
private boolean joinSpillingEnabled = true;
private boolean aggregationSpillEnabled = true;
private boolean distinctAggregationSpillEnabled = true;
private boolean orderByAggregationSpillEnabled = true;
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE);
Expand Down Expand Up @@ -903,8 +904,21 @@ public FeaturesConfig setJoinSpillingEnabled(boolean joinSpillingEnabled)
return this;
}

@Config("experimental.aggregation-spill-enabled")
@ConfigDescription("Spill aggregations if spill is enabled")
public FeaturesConfig setAggregationSpillEnabled(boolean aggregationSpillEnabled)
{
this.aggregationSpillEnabled = aggregationSpillEnabled;
return this;
}

public boolean isAggregationSpillEnabled()
{
return aggregationSpillEnabled;
}

@Config("experimental.distinct-aggregation-spill-enabled")
@ConfigDescription("Spill distinct aggregations if spill is enabled")
@ConfigDescription("Spill distinct aggregations if aggregation spill is enabled")
public FeaturesConfig setDistinctAggregationSpillEnabled(boolean distinctAggregationSpillEnabled)
{
this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled;
Expand All @@ -917,7 +931,7 @@ public boolean isDistinctAggregationSpillEnabled()
}

@Config("experimental.order-by-aggregation-spill-enabled")
@ConfigDescription("Spill order-by aggregations if spill is enabled")
@ConfigDescription("Spill order-by aggregations if aggregation spill is enabled")
public FeaturesConfig setOrderByAggregationSpillEnabled(boolean orderByAggregationSpillEnabled)
{
this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isAggregationSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isDistinctAggregationSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnableDynamicFiltering;
import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled;
Expand Down Expand Up @@ -1248,6 +1249,7 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl
node,
source,
spillEnabled,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to make this varilable as an inline as well to align with other invocation of split parameters

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#16581 adds config/session properties for controlling aggregates/window/orderby spilling.

isAggregationSpillEnabled(session),
isDistinctAggregationSpillEnabled(session),
isOrderByAggregationSpillEnabled(session),
unspillMemoryLimit,
Expand Down Expand Up @@ -2531,6 +2533,7 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl
false,
false,
false,
false,
new DataSize(0, BYTE),
context,
STATS_START_CHANNEL,
Expand Down Expand Up @@ -2636,6 +2639,7 @@ public PhysicalOperation visitTableWriteMerge(TableWriterMergeNode node, LocalEx
false,
false,
false,
false,
new DataSize(0, BYTE),
context,
STATS_START_CHANNEL,
Expand Down Expand Up @@ -2690,6 +2694,7 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
false,
false,
false,
false,
new DataSize(0, BYTE),
context,
0,
Expand Down Expand Up @@ -3044,6 +3049,7 @@ private PhysicalOperation planGroupByAggregation(
AggregationNode node,
PhysicalOperation source,
boolean spillEnabled,
boolean aggregationSpillEnabled,
boolean distinctAggregationSpillEnabled,
boolean orderByAggregationSpillEnabled,
DataSize unspillMemoryLimit,
Expand All @@ -3061,6 +3067,7 @@ private PhysicalOperation planGroupByAggregation(
source,
node.hasDefaultOutput(),
spillEnabled,
aggregationSpillEnabled,
distinctAggregationSpillEnabled,
orderByAggregationSpillEnabled,
node.isStreamable(),
Expand All @@ -3085,6 +3092,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
PhysicalOperation source,
boolean hasDefaultOutput,
boolean spillEnabled,
boolean aggregationSpillEnabled,
boolean distinctSpillEnabled,
boolean orderBySpillEnabled,
boolean isStreamable,
Expand All @@ -3098,7 +3106,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
{
List<VariableReferenceExpression> aggregationOutputVariables = new ArrayList<>();
List<AccumulatorFactory> accumulatorFactories = new ArrayList<>();
boolean useSpill = spillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled);
boolean useSpill = spillEnabled && aggregationSpillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled);
for (Map.Entry<VariableReferenceExpression, Aggregation> entry : aggregations.entrySet()) {
VariableReferenceExpression variable = entry.getKey();
Aggregation aggregation = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void testDefaults()
.setRe2JDfaRetries(5)
.setSpillEnabled(false)
.setJoinSpillingEnabled(true)
.setAggregationSpillEnabled(true)
.setDistinctAggregationSpillEnabled(true)
.setOrderByAggregationSpillEnabled(true)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB"))
Expand Down Expand Up @@ -236,6 +237,7 @@ public void testExplicitPropertyMappings()
.put("re2j.dfa-retries", "42")
.put("experimental.spill-enabled", "true")
.put("experimental.join-spill-enabled", "false")
.put("experimental.aggregation-spill-enabled", "false")
.put("experimental.distinct-aggregation-spill-enabled", "false")
.put("experimental.order-by-aggregation-spill-enabled", "false")
.put("experimental.aggregation-operator-unspill-memory-limit", "100MB")
Expand Down Expand Up @@ -355,6 +357,7 @@ public void testExplicitPropertyMappings()
.setRe2JDfaRetries(42)
.setSpillEnabled(true)
.setJoinSpillingEnabled(false)
.setAggregationSpillEnabled(false)
.setDistinctAggregationSpillEnabled(false)
.setOrderByAggregationSpillEnabled(false)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.DISTINCT_AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.ORDER_BY_AGGREGATION_SPILL_ENABLED;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE;
Expand Down Expand Up @@ -113,4 +114,24 @@ public void testMultipleDistinctAggregations()
{
assertQuery("SELECT custkey, count(DISTINCT orderpriority), count(DISTINCT orderstatus), count(DISTINCT totalprice), count(DISTINCT clerk) FROM orders GROUP BY custkey");
}

@Test
public void testDoesNotSpillWhenAggregationSpillDisabled()
{
Session session = Session.builder(getSession())
.setSystemProperty(AGGREGATION_SPILL_ENABLED, "false")
// This will not spill even when distinct/orderBy Spill is enabled since aggregationSpill is disabled above
.setSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, "true")
.setSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, "true")
// set this low so that if we ran with spill the query would fail
.setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B")
.build();

assertQuery(session,
"SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey");

// the sum() is necessary so that the aggregation isn't optimized into multiple aggregation nodes
assertQuery(session,
"SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FROM orders GROUP BY custkey");
}
}