Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
Expand Down Expand Up @@ -76,7 +77,9 @@ public final class SystemSessionProperties
public static final String TASK_CONCURRENCY = "task_concurrency";
public static final String TASK_SHARE_INDEX_LOADING = "task_share_index_loading";
public static final String QUERY_MAX_MEMORY = "query_max_memory";
public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node";
public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node";
public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time";
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
Expand Down Expand Up @@ -147,15 +150,16 @@ public final class SystemSessionProperties

public SystemSessionProperties()
{
this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig());
this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig());
}

@Inject
public SystemSessionProperties(
QueryManagerConfig queryManagerConfig,
TaskManagerConfig taskManagerConfig,
MemoryManagerConfig memoryManagerConfig,
FeaturesConfig featuresConfig)
FeaturesConfig featuresConfig,
NodeMemoryConfig nodeMemoryConfig)
{
sessionProperties = ImmutableList.of(
stringProperty(
Expand Down Expand Up @@ -343,6 +347,15 @@ public SystemSessionProperties(
true,
value -> DataSize.valueOf((String) value),
DataSize::toString),
new PropertyMetadata<>(
QUERY_MAX_MEMORY_PER_NODE,
"Maximum amount of user task memory a query can use",
VARCHAR,
DataSize.class,
nodeMemoryConfig.getMaxQueryMemoryPerNode(),
true,
value -> DataSize.valueOf((String) value),
DataSize::toString),
new PropertyMetadata<>(
QUERY_MAX_TOTAL_MEMORY,
"Maximum amount of distributed total memory a query can use",
Expand All @@ -352,6 +365,15 @@ public SystemSessionProperties(
true,
value -> DataSize.valueOf((String) value),
DataSize::toString),
new PropertyMetadata<>(
QUERY_MAX_TOTAL_MEMORY_PER_NODE,
"Maximum amount of total (user + system) task memory a query can use",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to mention task here? or just
Maximum amount of total (user + system) memory a query can use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes otherwise the description makes it seem like this a distributed memory setting.

VARCHAR,
DataSize.class,
nodeMemoryConfig.getMaxQueryTotalMemoryPerNode(),
true,
value -> DataSize.valueOf((String) value),
DataSize::toString),
booleanProperty(
RESOURCE_OVERCOMMIT,
"Use resources which are not guaranteed to be available to the query",
Expand Down Expand Up @@ -856,11 +878,21 @@ public static DataSize getQueryMaxMemory(Session session)
return session.getSystemProperty(QUERY_MAX_MEMORY, DataSize.class);
}

public static DataSize getQueryMaxMemoryPerNode(Session session)
{
return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class);
}

public static DataSize getQueryMaxTotalMemory(Session session)
{
return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY, DataSize.class);
}

public static DataSize getQueryMaxTotalMemoryPerNode(Session session)
{
return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, DataSize.class);
}

public static Duration getQueryMaxRunTime(Session session)
{
return session.getSystemProperty(QUERY_MAX_RUN_TIME, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode;
import static com.facebook.presto.SystemSessionProperties.resourceOvercommit;
import static com.facebook.presto.execution.SqlTask.createSqlTask;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
Expand Down Expand Up @@ -157,7 +159,7 @@ public SqlTaskManager(
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();

queryContexts = CacheBuilder.newBuilder().weakValues().build(CacheLoader.from(
queryId -> createQueryContext(queryId, localMemoryManager, nodeMemoryConfig, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode)));
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode)));

tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
taskId -> createSqlTask(
Expand All @@ -179,7 +181,6 @@ public SqlTaskManager(
private QueryContext createQueryContext(
QueryId queryId,
LocalMemoryManager localMemoryManager,
NodeMemoryConfig nodeMemoryConfig,
LocalSpillManager localSpillManager,
GcMonitor gcMonitor,
DataSize maxQueryUserMemoryPerNode,
Expand Down Expand Up @@ -377,6 +378,12 @@ public TaskInfo updateTask(
// TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
}
else {
queryContexts.getUnchecked(
taskId.getQueryId()).setMemoryLimits(
getQueryMaxMemoryPerNode(session),
getQueryMaxTotalMemoryPerNode(session));
}

SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ public QueryId getQueryId()
return queryId;
}

public synchronized void setMemoryLimits(DataSize queryMaxTaskMemory, DataSize queryMaxTotalTaskMemory)
{
// Don't allow session properties to increase memory beyond configured limits
maxUserMemory = Math.min(maxUserMemory, queryMaxTaskMemory.toBytes());
maxTotalMemory = Math.min(maxTotalMemory, queryMaxTotalTaskMemory.toBytes());
}

private static class QueryMemoryReservationHandler
implements MemoryReservationHandler
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.AnalyzePropertyManager;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.ColumnPropertyManager;
Expand Down Expand Up @@ -334,7 +335,13 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
featuresConfig,
typeRegistry,
blockEncodingManager,
new SessionPropertyManager(new SystemSessionProperties(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), featuresConfig)),
new SessionPropertyManager(
new SystemSessionProperties(
new QueryManagerConfig(),
new TaskManagerConfig(),
new MemoryManagerConfig(),
featuresConfig,
new NodeMemoryConfig())),
new SchemaPropertyManager(),
new TablePropertyManager(),
new ColumnPropertyManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.warnings.WarningCollector;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.AnalyzePropertyManager;
import com.facebook.presto.metadata.Catalog;
import com.facebook.presto.metadata.CatalogManager;
Expand Down Expand Up @@ -538,7 +539,8 @@ public void testTooManyGroupingElements()
new QueryManagerConfig(),
new TaskManagerConfig(),
new MemoryManagerConfig(),
new FeaturesConfig().setMaxGroupingSets(2048)))).build();
new FeaturesConfig().setMaxGroupingSets(2048),
new NodeMemoryConfig()))).build();
analyze(session, "SELECT a, b, c, d, e, f, g, h, i, j, k, SUM(l)" +
"FROM (VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))\n" +
"t (a, b, c, d, e, f, g, h, i, j, k, l)\n" +
Expand Down