diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 1dbb556f38109..61130e0a9f10a 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -17,6 +17,7 @@ import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy; import com.facebook.presto.execution.TaskManagerConfig; import com.facebook.presto.memory.MemoryManagerConfig; +import com.facebook.presto.memory.NodeMemoryConfig; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.session.PropertyMetadata; import com.facebook.presto.sql.analyzer.FeaturesConfig; @@ -76,7 +77,9 @@ public final class SystemSessionProperties public static final String TASK_CONCURRENCY = "task_concurrency"; public static final String TASK_SHARE_INDEX_LOADING = "task_share_index_loading"; public static final String QUERY_MAX_MEMORY = "query_max_memory"; + public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node"; public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory"; + public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node"; public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time"; public static final String QUERY_MAX_RUN_TIME = "query_max_run_time"; public static final String RESOURCE_OVERCOMMIT = "resource_overcommit"; @@ -147,7 +150,7 @@ public final class SystemSessionProperties public SystemSessionProperties() { - this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig()); + this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig()); } @Inject @@ -155,7 +158,8 @@ public SystemSessionProperties( QueryManagerConfig queryManagerConfig, TaskManagerConfig taskManagerConfig, MemoryManagerConfig memoryManagerConfig, - FeaturesConfig featuresConfig) + FeaturesConfig featuresConfig, + NodeMemoryConfig nodeMemoryConfig) { sessionProperties = ImmutableList.of( stringProperty( @@ -343,6 +347,15 @@ public SystemSessionProperties( true, value -> DataSize.valueOf((String) value), DataSize::toString), + new PropertyMetadata<>( + QUERY_MAX_MEMORY_PER_NODE, + "Maximum amount of user task memory a query can use", + VARCHAR, + DataSize.class, + nodeMemoryConfig.getMaxQueryMemoryPerNode(), + true, + value -> DataSize.valueOf((String) value), + DataSize::toString), new PropertyMetadata<>( QUERY_MAX_TOTAL_MEMORY, "Maximum amount of distributed total memory a query can use", @@ -352,6 +365,15 @@ public SystemSessionProperties( true, value -> DataSize.valueOf((String) value), DataSize::toString), + new PropertyMetadata<>( + QUERY_MAX_TOTAL_MEMORY_PER_NODE, + "Maximum amount of total (user + system) task memory a query can use", + VARCHAR, + DataSize.class, + nodeMemoryConfig.getMaxQueryTotalMemoryPerNode(), + true, + value -> DataSize.valueOf((String) value), + DataSize::toString), booleanProperty( RESOURCE_OVERCOMMIT, "Use resources which are not guaranteed to be available to the query", @@ -856,11 +878,21 @@ public static DataSize getQueryMaxMemory(Session session) return session.getSystemProperty(QUERY_MAX_MEMORY, DataSize.class); } + public static DataSize getQueryMaxMemoryPerNode(Session session) + { + return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class); + } + public static DataSize getQueryMaxTotalMemory(Session session) { return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY, DataSize.class); } + public static DataSize getQueryMaxTotalMemoryPerNode(Session session) + { + return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, DataSize.class); + } + public static Duration getQueryMaxRunTime(Session session) { return session.getSystemProperty(QUERY_MAX_RUN_TIME, Duration.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index f0ef4dd6b79e6..2436dd21440b4 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -68,6 +68,8 @@ import java.util.concurrent.TimeUnit; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; import static com.facebook.presto.execution.SqlTask.createSqlTask; import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; @@ -157,7 +159,7 @@ public SqlTaskManager( DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode(); queryContexts = CacheBuilder.newBuilder().weakValues().build(CacheLoader.from( - queryId -> createQueryContext(queryId, localMemoryManager, nodeMemoryConfig, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode))); + queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQuerySpillPerNode))); tasks = CacheBuilder.newBuilder().build(CacheLoader.from( taskId -> createSqlTask( @@ -179,7 +181,6 @@ public SqlTaskManager( private QueryContext createQueryContext( QueryId queryId, LocalMemoryManager localMemoryManager, - NodeMemoryConfig nodeMemoryConfig, LocalSpillManager localSpillManager, GcMonitor gcMonitor, DataSize maxQueryUserMemoryPerNode, @@ -377,6 +378,12 @@ public TaskInfo updateTask( // TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point. queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit(); } + else { + queryContexts.getUnchecked( + taskId.getQueryId()).setMemoryLimits( + getQueryMaxMemoryPerNode(session), + getQueryMaxTotalMemoryPerNode(session)); + } SqlTask sqlTask = tasks.getUnchecked(taskId); sqlTask.recordHeartbeat(); diff --git a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java index ed23c34ac9f2f..63f542955b03d 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java @@ -302,6 +302,13 @@ public QueryId getQueryId() return queryId; } + public synchronized void setMemoryLimits(DataSize queryMaxTaskMemory, DataSize queryMaxTotalTaskMemory) + { + // Don't allow session properties to increase memory beyond configured limits + maxUserMemory = Math.min(maxUserMemory, queryMaxTaskMemory.toBytes()); + maxTotalMemory = Math.min(maxTotalMemory, queryMaxTotalTaskMemory.toBytes()); + } + private static class QueryMemoryReservationHandler implements MemoryReservationHandler { diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index c119579b1c20e..63fb9af98e19e 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -75,6 +75,7 @@ import com.facebook.presto.execution.warnings.WarningCollectorConfig; import com.facebook.presto.index.IndexManager; import com.facebook.presto.memory.MemoryManagerConfig; +import com.facebook.presto.memory.NodeMemoryConfig; import com.facebook.presto.metadata.AnalyzePropertyManager; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.ColumnPropertyManager; @@ -334,7 +335,13 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, featuresConfig, typeRegistry, blockEncodingManager, - new SessionPropertyManager(new SystemSessionProperties(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), featuresConfig)), + new SessionPropertyManager( + new SystemSessionProperties( + new QueryManagerConfig(), + new TaskManagerConfig(), + new MemoryManagerConfig(), + featuresConfig, + new NodeMemoryConfig())), new SchemaPropertyManager(), new TablePropertyManager(), new ColumnPropertyManager(), diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java index a23d4a14c1644..a208f58bac66b 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.TaskManagerConfig; import com.facebook.presto.execution.warnings.WarningCollector; import com.facebook.presto.memory.MemoryManagerConfig; +import com.facebook.presto.memory.NodeMemoryConfig; import com.facebook.presto.metadata.AnalyzePropertyManager; import com.facebook.presto.metadata.Catalog; import com.facebook.presto.metadata.CatalogManager; @@ -538,7 +539,8 @@ public void testTooManyGroupingElements() new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), - new FeaturesConfig().setMaxGroupingSets(2048)))).build(); + new FeaturesConfig().setMaxGroupingSets(2048), + new NodeMemoryConfig()))).build(); analyze(session, "SELECT a, b, c, d, e, f, g, h, i, j, k, SUM(l)" + "FROM (VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))\n" + "t (a, b, c, d, e, f, g, h, i, j, k, l)\n" +