diff --git a/presto-base-jdbc/src/test/java/com/facebook/presto/plugin/jdbc/TestJdbcClient.java b/presto-base-jdbc/src/test/java/com/facebook/presto/plugin/jdbc/TestJdbcClient.java index f80dc0ac8432c..40f600efd5fc0 100644 --- a/presto-base-jdbc/src/test/java/com/facebook/presto/plugin/jdbc/TestJdbcClient.java +++ b/presto-base-jdbc/src/test/java/com/facebook/presto/plugin/jdbc/TestJdbcClient.java @@ -151,7 +151,8 @@ public void testCreateWithNullableColumns() } } - @Test + // disabled due to https://github.com/prestodb/presto/issues/16081 + @Test(enabled = false) public void testAlterColumns() { String tableName = randomUUID().toString().toUpperCase(ENGLISH); diff --git a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestPrestoDriver.java b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestPrestoDriver.java index 0425fc56657db..76879f89dfb91 100644 --- a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestPrestoDriver.java +++ b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestPrestoDriver.java @@ -1435,7 +1435,8 @@ public void testSetRole() } } - @Test(timeOut = 10000) + // Disabled due to https://github.com/prestodb/presto/issues/16080 + @Test(enabled = false, timeOut = 10000) public void testQueryCancelByInterrupt() throws Exception { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java index 02d49a494114f..7a0fef391a37c 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/MemoryRevokingScheduler.java @@ -17,6 +17,7 @@ import com.facebook.presto.memory.LocalMemoryManager; import com.facebook.presto.memory.MemoryPool; import com.facebook.presto.memory.MemoryPoolListener; +import com.facebook.presto.memory.QueryContext; import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.PipelineContext; @@ -29,28 +30,33 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; +import static com.facebook.airlift.concurrent.Threads.threadsNamed; import static com.facebook.presto.execution.MemoryRevokingUtils.getMemoryPools; import static com.facebook.presto.sql.analyzer.FeaturesConfig.TaskSpillingStrategy.PER_TASK_MEMORY_THRESHOLD; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.Executors.newSingleThreadExecutor; public class MemoryRevokingScheduler { @@ -58,56 +64,59 @@ public class MemoryRevokingScheduler private static final Ordering ORDER_BY_CREATE_TIME = Ordering.natural().onResultOf(SqlTask::getTaskCreatedTime); - private final List memoryPools; + private final Function queryContextSupplier; private final Supplier> currentTasksSupplier; - private final ScheduledExecutorService taskManagementExecutor; + private final ExecutorService memoryRevocationExecutor; private final double memoryRevokingThreshold; private final double memoryRevokingTarget; private final TaskSpillingStrategy spillingStrategy; + private final List memoryPools; private final MemoryPoolListener memoryPoolListener = this::onMemoryReserved; - @Nullable - private ScheduledFuture scheduledFuture; - - private final AtomicBoolean checkPending = new AtomicBoolean(); + private final boolean queryLimitSpillEnabled; @Inject public MemoryRevokingScheduler( LocalMemoryManager localMemoryManager, SqlTaskManager sqlTaskManager, - TaskManagementExecutor taskManagementExecutor, FeaturesConfig config) { this( ImmutableList.copyOf(getMemoryPools(localMemoryManager)), requireNonNull(sqlTaskManager, "sqlTaskManager cannot be null")::getAllTasks, - requireNonNull(taskManagementExecutor, "taskManagementExecutor cannot be null").getExecutor(), + requireNonNull(sqlTaskManager, "sqlTaskManager cannot be null")::getQueryContext, config.getMemoryRevokingThreshold(), config.getMemoryRevokingTarget(), - config.getTaskSpillingStrategy()); + config.getTaskSpillingStrategy(), + config.isQueryLimitSpillEnabled()); } @VisibleForTesting MemoryRevokingScheduler( List memoryPools, Supplier> currentTasksSupplier, - ScheduledExecutorService taskManagementExecutor, + Function queryContextSupplier, double memoryRevokingThreshold, double memoryRevokingTarget, - TaskSpillingStrategy taskSpillingStrategy) + TaskSpillingStrategy taskSpillingStrategy, + boolean queryLimitSpillEnabled) { this.memoryPools = ImmutableList.copyOf(requireNonNull(memoryPools, "memoryPools is null")); - this.currentTasksSupplier = requireNonNull(currentTasksSupplier, "currentTasksSupplier is null"); - this.taskManagementExecutor = requireNonNull(taskManagementExecutor, "taskManagementExecutor is null"); + this.currentTasksSupplier = requireNonNull(currentTasksSupplier, "allTasksSupplier is null"); + this.queryContextSupplier = requireNonNull(queryContextSupplier, "queryContextSupplier is null"); this.memoryRevokingThreshold = checkFraction(memoryRevokingThreshold, "memoryRevokingThreshold"); this.memoryRevokingTarget = checkFraction(memoryRevokingTarget, "memoryRevokingTarget"); + // by using a single thread executor, we don't need to worry about locking to ensure only + // one revocation request per-query/memory pool is processed at a time. + this.memoryRevocationExecutor = newSingleThreadExecutor(threadsNamed("memory-revocation")); this.spillingStrategy = requireNonNull(taskSpillingStrategy, "taskSpillingStrategy is null"); checkArgument(spillingStrategy != PER_TASK_MEMORY_THRESHOLD, "spilling strategy cannot be PER_TASK_MEMORY_THRESHOLD in MemoryRevokingScheduler"); checkArgument( memoryRevokingTarget <= memoryRevokingThreshold, "memoryRevokingTarget should be less than or equal memoryRevokingThreshold, but got %s and %s respectively", memoryRevokingTarget, memoryRevokingThreshold); + this.queryLimitSpillEnabled = queryLimitSpillEnabled; } private static double checkFraction(double value, String valueName) @@ -120,49 +129,43 @@ private static double checkFraction(double value, String valueName) @PostConstruct public void start() { - registerPeriodicCheck(); registerPoolListeners(); } - private void registerPeriodicCheck() - { - this.scheduledFuture = taskManagementExecutor.scheduleWithFixedDelay(() -> { - try { - requestMemoryRevokingIfNeeded(); - } - catch (Exception e) { - log.error(e, "Error requesting system memory revoking"); - } - }, 1, 1, SECONDS); - } - @PreDestroy public void stop() { - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - scheduledFuture = null; - } - memoryPools.forEach(memoryPool -> memoryPool.removeListener(memoryPoolListener)); + memoryRevocationExecutor.shutdown(); } - @VisibleForTesting - void registerPoolListeners() + private void registerPoolListeners() { memoryPools.forEach(memoryPool -> memoryPool.addListener(memoryPoolListener)); } + @VisibleForTesting + void awaitAsynchronousCallbacksRun() + throws InterruptedException + { + memoryRevocationExecutor.invokeAll(singletonList((Callable) () -> null)); + } + private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryMemoryReservation) { try { - if (!memoryRevokingNeeded(memoryPool)) { - return; + if (queryLimitSpillEnabled) { + QueryContext queryContext = queryContextSupplier.apply(queryId); + verify(queryContext != null, "QueryContext not found for queryId %s", queryId); + long maxTotalMemory = queryContext.getMaxTotalMemory(); + if (memoryRevokingNeededForQuery(queryMemoryReservation, maxTotalMemory)) { + log.debug("Scheduling check for %s", queryId); + scheduleQueryRevoking(queryContext, maxTotalMemory); + } } - - if (checkPending.compareAndSet(false, true)) { + if (memoryRevokingNeededForPool(memoryPool)) { log.debug("Scheduling check for %s", memoryPool); - scheduleRevoking(); + scheduleMemoryPoolRevoking(memoryPool); } } catch (Exception e) { @@ -170,19 +173,16 @@ private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long query } } - @VisibleForTesting - void requestMemoryRevokingIfNeeded() + private boolean memoryRevokingNeededForQuery(long queryMemoryReservation, long maxTotalMemory) { - if (checkPending.compareAndSet(false, true)) { - runMemoryRevoking(); - } + return queryMemoryReservation >= maxTotalMemory; } - private void scheduleRevoking() + private void scheduleQueryRevoking(QueryContext queryContext, long maxTotalMemory) { - taskManagementExecutor.execute(() -> { + memoryRevocationExecutor.execute(() -> { try { - runMemoryRevoking(); + revokeQueryMemory(queryContext, maxTotalMemory); } catch (Exception e) { log.error(e, "Error requesting memory revoking"); @@ -190,25 +190,70 @@ private void scheduleRevoking() }); } - private synchronized void runMemoryRevoking() + private void revokeQueryMemory(QueryContext queryContext, long maxTotalMemory) { - if (checkPending.getAndSet(false)) { - Collection allTasks = null; - for (MemoryPool memoryPool : memoryPools) { - if (!memoryRevokingNeeded(memoryPool)) { - continue; + QueryId queryId = queryContext.getQueryId(); + MemoryPool memoryPool = queryContext.getMemoryPool(); + // get a fresh value for queryTotalMemory in case it's changed (e.g. by a previous revocation request) + long queryTotalMemory = getTotalQueryMemoryReservation(queryId, memoryPool); + // order tasks by decreasing revocableMemory so that we don't spill more tasks than needed + SortedMap queryTaskContextsMap = new TreeMap<>(Comparator.reverseOrder()); + queryContext.getAllTaskContexts() + .forEach(taskContext -> queryTaskContextsMap.put(taskContext.getTaskMemoryContext().getRevocableMemory(), taskContext)); + + AtomicLong remainingBytesToRevoke = new AtomicLong(queryTotalMemory - maxTotalMemory); + Collection queryTaskContexts = queryTaskContextsMap.values(); + remainingBytesToRevoke.addAndGet(-MemoryRevokingSchedulerUtils.getMemoryAlreadyBeingRevoked(queryTaskContexts, remainingBytesToRevoke.get())); + for (TaskContext taskContext : queryTaskContexts) { + if (remainingBytesToRevoke.get() <= 0) { + break; + } + taskContext.accept(new VoidTraversingQueryContextVisitor() + { + @Override + public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong remainingBytesToRevoke) + { + if (remainingBytesToRevoke.get() > 0) { + long revokedBytes = operatorContext.requestMemoryRevoking(); + if (revokedBytes > 0) { + remainingBytesToRevoke.addAndGet(-revokedBytes); + log.debug("taskId=%s: requested revoking %s; remaining %s", taskContext.getTaskId(), revokedBytes, remainingBytesToRevoke); + } + } + return null; } + }, remainingBytesToRevoke); + } + } - if (allTasks == null) { - allTasks = requireNonNull(currentTasksSupplier.get()); - } + private static long getTotalQueryMemoryReservation(QueryId queryId, MemoryPool memoryPool) + { + return memoryPool.getQueryMemoryReservation(queryId) + memoryPool.getQueryRevocableMemoryReservation(queryId); + } - requestMemoryRevoking(memoryPool, allTasks); + private void scheduleMemoryPoolRevoking(MemoryPool memoryPool) + { + memoryRevocationExecutor.execute(() -> { + try { + runMemoryPoolRevoking(memoryPool); + } + catch (Exception e) { + log.error(e, "Error requesting memory revoking"); } + }); + } + + @VisibleForTesting + void runMemoryPoolRevoking(MemoryPool memoryPool) + { + if (!memoryRevokingNeededForPool(memoryPool)) { + return; } + Collection allTasks = requireNonNull(currentTasksSupplier.get()); + requestMemoryPoolRevoking(memoryPool, allTasks); } - private void requestMemoryRevoking(MemoryPool memoryPool, Collection allTasks) + private void requestMemoryPoolRevoking(MemoryPool memoryPool, Collection allTasks) { long remainingBytesToRevoke = (long) (-memoryPool.getFreeBytes() + (memoryPool.getMaxBytes() * (1.0 - memoryRevokingTarget))); ArrayList runningTasksInPool = findRunningTasksInMemoryPool(allTasks, memoryPool); @@ -218,7 +263,7 @@ private void requestMemoryRevoking(MemoryPool memoryPool, Collection al } } - private boolean memoryRevokingNeeded(MemoryPool memoryPool) + private boolean memoryRevokingNeededForPool(MemoryPool memoryPool) { return memoryPool.getReservedRevocableBytes() > 0 && memoryPool.getFreeBytes() <= memoryPool.getMaxBytes() * (1.0 - memoryRevokingThreshold); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryLimitMemoryRevokingScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryLimitMemoryRevokingScheduler.java deleted file mode 100644 index 7c9105a8ba5f1..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryLimitMemoryRevokingScheduler.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.execution; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.memory.LocalMemoryManager; -import com.facebook.presto.memory.MemoryPool; -import com.facebook.presto.memory.MemoryPoolListener; -import com.facebook.presto.memory.QueryContext; -import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; -import com.facebook.presto.operator.OperatorContext; -import com.facebook.presto.operator.TaskContext; -import com.facebook.presto.spi.QueryId; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.inject.Inject; - -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; - -import static com.facebook.presto.execution.MemoryRevokingSchedulerUtils.getMemoryAlreadyBeingRevoked; -import static com.facebook.presto.execution.MemoryRevokingUtils.getMemoryPools; -import static com.google.common.base.Verify.verify; -import static java.util.Objects.requireNonNull; - -public class QueryLimitMemoryRevokingScheduler -{ - private static final Logger log = Logger.get(QueryLimitMemoryRevokingScheduler.class); - - private final ScheduledExecutorService taskManagementExecutor; - private final Function queryContextSupplier; - - private final List memoryPools; - private final MemoryPoolListener memoryPoolListener = this::onMemoryReserved; - private final ConcurrentHashMap revocationRequestedByQuery = new ConcurrentHashMap<>(); - - @Inject - public QueryLimitMemoryRevokingScheduler( - LocalMemoryManager localMemoryManager, - SqlTaskManager sqlTaskManager, - TaskManagementExecutor taskManagementExecutor) - { - this( - ImmutableList.copyOf(getMemoryPools(localMemoryManager)), - requireNonNull(sqlTaskManager, "sqlTaskManager cannot be null")::getQueryContext, - requireNonNull(taskManagementExecutor, "taskManagementExecutor cannot be null").getExecutor()); - log.debug("Using QueryLimitMemoryRevokingScheduler spilling strategy"); - } - - @VisibleForTesting - QueryLimitMemoryRevokingScheduler( - List memoryPools, - Function queryContextSupplier, - ScheduledExecutorService taskManagementExecutor) - { - this.memoryPools = ImmutableList.copyOf(requireNonNull(memoryPools, "memoryPools is null")); - this.queryContextSupplier = requireNonNull(queryContextSupplier, "queryContextSupplier is null"); - this.taskManagementExecutor = requireNonNull(taskManagementExecutor, "taskManagementExecutor is null"); - } - - @PostConstruct - public void start() - { - registerPoolListeners(); - } - - @PreDestroy - public void stop() - { - memoryPools.forEach(memoryPool -> memoryPool.removeListener(memoryPoolListener)); - } - - @VisibleForTesting - void registerPoolListeners() - { - memoryPools.forEach(memoryPool -> memoryPool.addListener(memoryPoolListener)); - } - - private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryTotalMemoryUse) - { - try { - QueryContext queryContext = queryContextSupplier.apply(queryId); - verify(queryContext != null, "QueryContext not found for queryId %s", queryId); - long maxTotalMemory = queryContext.getMaxTotalMemory(); - if (queryTotalMemoryUse <= maxTotalMemory) { - return; - } - - log.debug("Scheduling check for %s", queryId); - if (revocationRequestedByQuery.put(queryId, true) == null) { - scheduleRevoking(queryContext, maxTotalMemory); - } - } - catch (Exception e) { - log.error(e, "Error when acting on memory pool reservation"); - } - } - - private void scheduleRevoking(QueryContext queryContext, long maxTotalMemory) - { - taskManagementExecutor.execute(() -> { - try { - revokeMemory(queryContext, maxTotalMemory); - } - catch (Exception e) { - log.error(e, "Error requesting memory revoking"); - } - }); - } - - private void revokeMemory(QueryContext queryContext, long maxTotalMemory) - { - QueryId queryId = queryContext.getQueryId(); - MemoryPool memoryPool = queryContext.getMemoryPool(); - // (queryId, true) gets put into the revocationRequestByQueryId map in onMemoryReserved() whenever - // revocation is needed. scheduleRevoking() is called at that time if the map does not already - // have any entry for that queryId. This ensures that only one revokeMemory() invocation will be running - // for any queryId at a time. At the start of this loop, we set the value for that queryId to false in the map. - // If an additional memory revocation request comes in, the value will be changed to true in onMemoryReserve(). - // The condition for this while loop says that if the value for that queryId is false - // (meaning no further memory revocation requests have come in since our last iteration through the loop), - // then remove that queryId from the map and break out of the loop. - while (!revocationRequestedByQuery.remove(queryId, false)) { - revocationRequestedByQuery.put(queryId, false); - // get a fresh value for queryTotalMemory in case it's changed (e.g. by a previous revocation request) - long queryTotalMemory = getTotalQueryMemoryReservation(queryId, memoryPool); - // order tasks by decreasing revocableMemory so that we don't spill more tasks than needed - SortedMap queryTaskContextsMap = new TreeMap<>(Comparator.reverseOrder()); - queryContext.getAllTaskContexts() - .forEach(taskContext -> queryTaskContextsMap.put(taskContext.getTaskMemoryContext().getRevocableMemory(), taskContext)); - - AtomicLong remainingBytesToRevoke = new AtomicLong(queryTotalMemory - maxTotalMemory); - Collection queryTaskContexts = queryTaskContextsMap.values(); - remainingBytesToRevoke.addAndGet(-getMemoryAlreadyBeingRevoked(queryTaskContexts, remainingBytesToRevoke.get())); - for (TaskContext taskContext : queryTaskContexts) { - if (remainingBytesToRevoke.get() <= 0) { - break; - } - taskContext.accept(new VoidTraversingQueryContextVisitor() - { - @Override - public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong remainingBytesToRevoke) - { - if (remainingBytesToRevoke.get() > 0) { - long revokedBytes = operatorContext.requestMemoryRevoking(); - if (revokedBytes > 0) { - remainingBytesToRevoke.addAndGet(-revokedBytes); - log.debug("taskId=%s: requested revoking %s; remaining %s", taskContext.getTaskId(), revokedBytes, remainingBytesToRevoke); - } - } - return null; - } - }, remainingBytesToRevoke); - } - } - } - - private static long getTotalQueryMemoryReservation(QueryId queryId, MemoryPool memoryPool) - { - return memoryPool.getQueryMemoryReservation(queryId) + memoryPool.getQueryRevocableMemoryReservation(queryId); - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 0c3dfb7d1df29..351c0738a9060 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -47,7 +47,6 @@ import com.facebook.presto.execution.LocationFactory; import com.facebook.presto.execution.MemoryRevokingScheduler; import com.facebook.presto.execution.NodeTaskMap; -import com.facebook.presto.execution.QueryLimitMemoryRevokingScheduler; import com.facebook.presto.execution.QueryManagerConfig; import com.facebook.presto.execution.SqlTaskManager; import com.facebook.presto.execution.StageInfo; @@ -375,9 +374,6 @@ public ScheduledExecutorService createResourceManagerExecutor(ResourceManagerCon case PER_TASK_MEMORY_THRESHOLD: binder.bind(TaskThresholdMemoryRevokingScheduler.class).in(Scopes.SINGLETON); break; - case PER_QUERY_MEMORY_LIMIT: - binder.bind(QueryLimitMemoryRevokingScheduler.class).in(Scopes.SINGLETON); - break; default: binder.bind(MemoryRevokingScheduler.class).in(Scopes.SINGLETON); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index ac84dfd3b01ce..ef1031829b411 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -81,6 +81,7 @@ public class FeaturesConfig private boolean spatialJoinsEnabled = true; private boolean fastInequalityJoins = true; private TaskSpillingStrategy taskSpillingStrategy = ORDER_BY_CREATE_TIME; + private boolean queryLimitSpillEnabled; private SingleStreamSpillerChoice singleStreamSpillerChoice = SingleStreamSpillerChoice.LOCAL_FILE; private String spillerTempStorage = "local"; private DataSize maxRevocableMemoryPerTask = new DataSize(500, MEGABYTE); @@ -253,7 +254,6 @@ public enum TaskSpillingStrategy ORDER_BY_CREATE_TIME, // When spilling is triggered, revoke tasks in order of oldest to newest ORDER_BY_REVOCABLE_BYTES, // When spilling is triggered, revoke tasks by most allocated revocable memory to least allocated revocable memory PER_TASK_MEMORY_THRESHOLD, // Spill any task after it reaches the per task memory threshold defined by experimental.spiller.max-revocable-task-memory - PER_QUERY_MEMORY_LIMIT // Spill whenever a query's combined revocable, system, and user memory exceeds the per-node total memory limit } public enum SingleStreamSpillerChoice @@ -574,6 +574,19 @@ public FeaturesConfig setTaskSpillingStrategy(TaskSpillingStrategy taskSpillingS return this; } + @Config("experimental.query-limit-spill-enabled") + @ConfigDescription("Spill whenever the total memory used by the query (including revocable and non-revocable memory) exceeds maxTotalMemoryPerNode") + public FeaturesConfig setQueryLimitSpillEnabled(boolean queryLimitSpillEnabled) + { + this.queryLimitSpillEnabled = queryLimitSpillEnabled; + return this; + } + + public boolean isQueryLimitSpillEnabled() + { + return queryLimitSpillEnabled; + } + public SingleStreamSpillerChoice getSingleStreamSpillerChoice() { return singleStreamSpillerChoice; diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java index 21c544c33dcd1..c2d521604f42e 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestMemoryRevokingScheduler.java @@ -56,6 +56,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -81,6 +82,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -94,7 +96,8 @@ public class TestMemoryRevokingScheduler private final Map queryContexts = new HashMap<>(); - private ScheduledExecutorService executor; + private ExecutorService singleThreadedExecutor; + private ScheduledExecutorService singleThreadedScheduledExecutor; private ScheduledExecutorService scheduledExecutor; private SqlTaskExecutionFactory sqlTaskExecutionFactory; private MemoryPool memoryPool; @@ -110,13 +113,14 @@ public void setUp() taskExecutor.start(); // Must be single threaded - executor = newScheduledThreadPool(1, threadsNamed("task-notification-%s")); + singleThreadedExecutor = newSingleThreadExecutor(threadsNamed("task-notification-%s")); + singleThreadedScheduledExecutor = newScheduledThreadPool(1, threadsNamed("task-notification-%s")); scheduledExecutor = newScheduledThreadPool(2, threadsNamed("task-notification-%s")); LocalExecutionPlanner planner = createTestingPlanner(); sqlTaskExecutionFactory = new SqlTaskExecutionFactory( - executor, + singleThreadedExecutor, taskExecutor, planner, new BlockEncodingManager(), @@ -137,19 +141,20 @@ public void tearDown() { queryContexts.clear(); memoryPool = null; - executor.shutdownNow(); + singleThreadedExecutor.shutdownNow(); + singleThreadedScheduledExecutor.shutdown(); scheduledExecutor.shutdownNow(); } @Test - public void testScheduleMemoryRevoking() + public void testMemoryPoolRevoking() throws Exception { - QueryContext q1 = getOrCreateQueryContext(new QueryId("q1")); - QueryContext q2 = getOrCreateQueryContext(new QueryId("q2")); + QueryContext q1 = getOrCreateQueryContext(new QueryId("q1"), memoryPool); + QueryContext q2 = getOrCreateQueryContext(new QueryId("q2"), memoryPool); - SqlTask sqlTask1 = newSqlTask(q1.getQueryId()); - SqlTask sqlTask2 = newSqlTask(q2.getQueryId()); + SqlTask sqlTask1 = newSqlTask(q1.getQueryId(), memoryPool); + SqlTask sqlTask2 = newSqlTask(q2.getQueryId(), memoryPool); TaskContext taskContext1 = getOrCreateTaskContext(sqlTask1); PipelineContext pipelineContext11 = taskContext1.addPipelineContext(0, false, false, false); @@ -166,57 +171,65 @@ public void testScheduleMemoryRevoking() OperatorContext operatorContext5 = driverContext211.addOperatorContext(5, new PlanNodeId("na"), "na"); List tasks = ImmutableList.of(sqlTask1, sqlTask2); - MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler(singletonList(memoryPool), () -> tasks, executor, 1.0, 1.0, ORDER_BY_CREATE_TIME); - - allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2, operatorContext3, operatorContext4, operatorContext5); - assertMemoryRevokingNotRequested(); - - requestMemoryRevoking(scheduler); - assertEquals(10, memoryPool.getFreeBytes()); - assertMemoryRevokingNotRequested(); - - LocalMemoryContext revocableMemory1 = operatorContext1.localRevocableMemoryContext(); - LocalMemoryContext revocableMemory3 = operatorContext3.localRevocableMemoryContext(); - LocalMemoryContext revocableMemory4 = operatorContext4.localRevocableMemoryContext(); - LocalMemoryContext revocableMemory5 = operatorContext5.localRevocableMemoryContext(); - - revocableMemory1.setBytes(3); - revocableMemory3.setBytes(6); - assertEquals(1, memoryPool.getFreeBytes()); - requestMemoryRevoking(scheduler); - // we are still good - no revoking needed - assertMemoryRevokingNotRequested(); - - revocableMemory4.setBytes(7); - assertEquals(-6, memoryPool.getFreeBytes()); - requestMemoryRevoking(scheduler); - // we need to revoke 3 and 6 - assertMemoryRevokingRequestedFor(operatorContext1, operatorContext3); - - // yet another revoking request should not change anything - requestMemoryRevoking(scheduler); - assertMemoryRevokingRequestedFor(operatorContext1, operatorContext3); - - // lets revoke some bytes - revocableMemory1.setBytes(0); - operatorContext1.resetMemoryRevokingRequested(); - requestMemoryRevoking(scheduler); - assertMemoryRevokingRequestedFor(operatorContext3); - assertEquals(-3, memoryPool.getFreeBytes()); - - // and allocate some more - revocableMemory5.setBytes(3); - assertEquals(-6, memoryPool.getFreeBytes()); - requestMemoryRevoking(scheduler); - // we are still good with just OC3 in process of revoking - assertMemoryRevokingRequestedFor(operatorContext3); - - // and allocate some more - revocableMemory5.setBytes(4); - assertEquals(-7, memoryPool.getFreeBytes()); - requestMemoryRevoking(scheduler); - // now we have to trigger revoking for OC4 - assertMemoryRevokingRequestedFor(operatorContext3, operatorContext4); + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(memoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_CREATE_TIME, + false); + try { + scheduler.start(); + allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2, operatorContext3, operatorContext4, operatorContext5); + assertMemoryRevokingNotRequested(); + + assertEquals(10, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + + LocalMemoryContext revocableMemory1 = operatorContext1.localRevocableMemoryContext(); + LocalMemoryContext revocableMemory3 = operatorContext3.localRevocableMemoryContext(); + LocalMemoryContext revocableMemory4 = operatorContext4.localRevocableMemoryContext(); + LocalMemoryContext revocableMemory5 = operatorContext5.localRevocableMemoryContext(); + + revocableMemory1.setBytes(3); + revocableMemory3.setBytes(6); + assertEquals(1, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // we are still good - no revoking needed + assertMemoryRevokingNotRequested(); + + revocableMemory4.setBytes(7); + assertEquals(-6, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // we need to revoke 3 and 6 + assertMemoryRevokingRequestedFor(operatorContext1, operatorContext3); + + // lets revoke some bytes + revocableMemory1.setBytes(0); + operatorContext1.resetMemoryRevokingRequested(); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext3); + assertEquals(-3, memoryPool.getFreeBytes()); + + // and allocate some more + revocableMemory5.setBytes(3); + assertEquals(-6, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // we are still good with just OC3 in process of revoking + assertMemoryRevokingRequestedFor(operatorContext3); + + // and allocate some more + revocableMemory5.setBytes(4); + assertEquals(-7, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // now we have to trigger revoking for OC4 + assertMemoryRevokingRequestedFor(operatorContext3, operatorContext4); + } + finally { + scheduler.stop(); + } } @Test @@ -224,59 +237,46 @@ public void testCountAlreadyRevokedMemoryWithinAPool() throws Exception { // Given - SqlTask sqlTask1 = newSqlTask(new QueryId("q1")); MemoryPool anotherMemoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(10, BYTE)); - sqlTask1.getQueryContext().setMemoryPool(anotherMemoryPool); + SqlTask sqlTask1 = newSqlTask(new QueryId("q1"), anotherMemoryPool); OperatorContext operatorContext1 = createContexts(sqlTask1); - SqlTask sqlTask2 = newSqlTask(new QueryId("q2")); + SqlTask sqlTask2 = newSqlTask(new QueryId("q2"), memoryPool); OperatorContext operatorContext2 = createContexts(sqlTask2); List tasks = ImmutableList.of(sqlTask1, sqlTask2); - MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler(asList(memoryPool, anotherMemoryPool), () -> tasks, executor, 1.0, 1.0, ORDER_BY_CREATE_TIME); - allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2); - - /* - * sqlTask1 fills its pool - */ - operatorContext1.localRevocableMemoryContext().setBytes(12); - requestMemoryRevoking(scheduler); - assertMemoryRevokingRequestedFor(operatorContext1); - - /* - * When sqlTask2 fills its pool - */ - operatorContext2.localRevocableMemoryContext().setBytes(12); - requestMemoryRevoking(scheduler); - - /* - * Then sqlTask2 should be asked to revoke its memory too - */ - assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); - } - - /** - * Test that when a {@link MemoryPool} is over-allocated, revocable memory is revoked without delay (although asynchronously). - */ - @Test - public void testImmediateMemoryRevoking() - throws Exception - { - // Given - SqlTask sqlTask = newSqlTask(new QueryId("query")); - OperatorContext operatorContext = createContexts(sqlTask); - - allOperatorContexts = ImmutableSet.of(operatorContext); - List tasks = ImmutableList.of(sqlTask); - MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler(singletonList(memoryPool), () -> tasks, executor, 1.0, 1.0, ORDER_BY_CREATE_TIME); - scheduler.registerPoolListeners(); // no periodic check initiated - - // When - operatorContext.localRevocableMemoryContext().setBytes(12); - awaitAsynchronousCallbacksRun(); - - // Then - assertMemoryRevokingRequestedFor(operatorContext); + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + asList(memoryPool, anotherMemoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_CREATE_TIME, + false); + try { + scheduler.start(); + allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2); + + /* + * sqlTask1 fills its pool + */ + operatorContext1.localRevocableMemoryContext().setBytes(12); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext1); + + /* + * When sqlTask2 fills its pool + */ + operatorContext2.localRevocableMemoryContext().setBytes(12); + scheduler.awaitAsynchronousCallbacksRun(); + /* + * Then sqlTask2 should be asked to revoke its memory too + */ + assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); + } + finally { + scheduler.stop(); + } } /** @@ -286,70 +286,93 @@ public void testImmediateMemoryRevoking() public void testTaskRevokingOrderForCreateTime() throws Exception { - SqlTask sqlTask1 = newSqlTask(new QueryId("query")); + SqlTask sqlTask1 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext1 = createTestingOperatorContexts(sqlTask1, "operator1"); - SqlTask sqlTask2 = newSqlTask(new QueryId("query")); + SqlTask sqlTask2 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext2 = createTestingOperatorContexts(sqlTask2, "operator2"); allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2); List tasks = ImmutableList.of(sqlTask1, sqlTask2); - MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler(singletonList(memoryPool), () -> tasks, executor, 1.0, 1.0, ORDER_BY_CREATE_TIME); - scheduler.registerPoolListeners(); // no periodic check initiated - - assertMemoryRevokingNotRequested(); - - operatorContext1.localRevocableMemoryContext().setBytes(11); - operatorContext2.localRevocableMemoryContext().setBytes(12); - - requestMemoryRevoking(scheduler); - - assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); - assertEquals(TestOperatorContext.firstOperator, "operator1"); // operator1 should revoke first as it belongs to a task that was created earlier + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(memoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_CREATE_TIME, + false); + try { + scheduler.start(); // no periodic check initiated + + assertMemoryRevokingNotRequested(); + + operatorContext1.localRevocableMemoryContext().setBytes(11); + operatorContext2.localRevocableMemoryContext().setBytes(12); + + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); + assertEquals(TestOperatorContext.firstOperator, "operator1"); // operator1 should revoke first as it belongs to a task that was created earlier + } + finally { + scheduler.stop(); + } } @Test public void testTaskRevokingOrderForRevocableBytes() throws Exception { - SqlTask sqlTask1 = newSqlTask(new QueryId("query")); + SqlTask sqlTask1 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext1 = createTestingOperatorContexts(sqlTask1, "operator1"); - SqlTask sqlTask2 = newSqlTask(new QueryId("query")); + SqlTask sqlTask2 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext2 = createTestingOperatorContexts(sqlTask2, "operator2"); allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2); List tasks = ImmutableList.of(sqlTask1, sqlTask2); - MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler(singletonList(memoryPool), () -> tasks, executor, 1.0, 1.0, ORDER_BY_REVOCABLE_BYTES); - scheduler.registerPoolListeners(); // no periodic check initiated - - assertMemoryRevokingNotRequested(); - - operatorContext1.localRevocableMemoryContext().setBytes(11); - operatorContext2.localRevocableMemoryContext().setBytes(12); - - requestMemoryRevoking(scheduler); - - assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); - assertEquals(TestOperatorContext.firstOperator, "operator2"); // operator2 should revoke first since it (and it's encompassing task) has allocated more bytes + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(memoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_REVOCABLE_BYTES, + false); + try { + scheduler.start(); + + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + + operatorContext1.localRevocableMemoryContext().setBytes(11); + operatorContext2.localRevocableMemoryContext().setBytes(12); + + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext1, operatorContext2); + assertEquals(TestOperatorContext.firstOperator, "operator2"); // operator2 should revoke first since it (and it's encompassing task) has allocated more bytes + } + finally { + scheduler.stop(); + } } @Test public void testTaskThresholdRevokingScheduler() throws Exception { - SqlTask sqlTask1 = newSqlTask(new QueryId("query")); + SqlTask sqlTask1 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext11 = createTestingOperatorContexts(sqlTask1, "operator11"); TestOperatorContext operatorContext12 = createTestingOperatorContexts(sqlTask1, "operator12"); - SqlTask sqlTask2 = newSqlTask(new QueryId("query")); + SqlTask sqlTask2 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext2 = createTestingOperatorContexts(sqlTask2, "operator2"); allOperatorContexts = ImmutableSet.of(operatorContext11, operatorContext12, operatorContext2); List tasks = ImmutableList.of(sqlTask1, sqlTask2); ImmutableMap taskMap = ImmutableMap.of(sqlTask1.getTaskId(), sqlTask1, sqlTask2.getTaskId(), sqlTask2); TaskThresholdMemoryRevokingScheduler scheduler = new TaskThresholdMemoryRevokingScheduler( - singletonList(memoryPool), () -> tasks, taskMap::get, executor, 5L); + singletonList(memoryPool), () -> tasks, taskMap::get, singleThreadedScheduledExecutor, 5L); assertMemoryRevokingNotRequested(); @@ -400,18 +423,18 @@ public void testTaskThresholdRevokingScheduler() public void testTaskThresholdRevokingSchedulerImmediate() throws Exception { - SqlTask sqlTask1 = newSqlTask(new QueryId("query")); + SqlTask sqlTask1 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext11 = createTestingOperatorContexts(sqlTask1, "operator11"); TestOperatorContext operatorContext12 = createTestingOperatorContexts(sqlTask1, "operator12"); - SqlTask sqlTask2 = newSqlTask(new QueryId("query")); + SqlTask sqlTask2 = newSqlTask(new QueryId("query"), memoryPool); TestOperatorContext operatorContext2 = createTestingOperatorContexts(sqlTask2, "operator2"); allOperatorContexts = ImmutableSet.of(operatorContext11, operatorContext12, operatorContext2); List tasks = ImmutableList.of(sqlTask1, sqlTask2); ImmutableMap taskMap = ImmutableMap.of(sqlTask1.getTaskId(), sqlTask1, sqlTask2.getTaskId(), sqlTask2); TaskThresholdMemoryRevokingScheduler scheduler = new TaskThresholdMemoryRevokingScheduler( - singletonList(memoryPool), () -> tasks, taskMap::get, executor, 5L); + singletonList(memoryPool), () -> tasks, taskMap::get, singleThreadedScheduledExecutor, 5L); scheduler.registerPoolListeners(); // no periodic check initiated assertMemoryRevokingNotRequested(); @@ -421,13 +444,13 @@ public void testTaskThresholdRevokingSchedulerImmediate() // at this point, Task1 = 3 total bytes, Task2 = 2 total bytes // this ensures that we are waiting for the memory revocation listener and not using polling-based revoking - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); assertMemoryRevokingNotRequested(); operatorContext12.localRevocableMemoryContext().setBytes(3); // at this point, Task1 = 6 total bytes, Task2 = 2 total bytes - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); // only operator11 should revoke since we need to revoke only 1 byte // threshold - (operator11 + operator12) => 5 - (3 + 3) = 1 bytes to revoke assertMemoryRevokingRequestedFor(operatorContext11); @@ -436,12 +459,12 @@ public void testTaskThresholdRevokingSchedulerImmediate() operatorContext11.localRevocableMemoryContext().setBytes(1); // at this point, Task1 = 3 total bytes, Task2 = 2 total bytes operatorContext11.resetMemoryRevokingRequested(); - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); assertMemoryRevokingNotRequested(); operatorContext12.localRevocableMemoryContext().setBytes(6); // operator12 fills up // at this point, Task1 = 7 total bytes, Task2 = 2 total bytes - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); // both operator11 and operator 12 are revoking since we revoke in order of operator creation within the task until we are below the memory revoking threshold assertMemoryRevokingRequestedFor(operatorContext11, operatorContext12); @@ -451,17 +474,17 @@ public void testTaskThresholdRevokingSchedulerImmediate() operatorContext12.resetMemoryRevokingRequested(); // at this point, Task1 = 4 total bytes, Task2 = 2 total bytes - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); assertMemoryRevokingNotRequested(); // no need to revoke operatorContext2.localRevocableMemoryContext().setBytes(6); // at this point, Task1 = 4 total bytes, Task2 = 6 total bytes, operators in Task2 must be revoked - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); assertMemoryRevokingRequestedFor(operatorContext2); } @Test - public void testQueryLimitMemoryRevokingScheduler() + public void testQueryMemoryRevoking() throws Exception { // The various tasks created here use a small amount of system memory independent of what's set explicitly @@ -474,75 +497,193 @@ public void testQueryLimitMemoryRevokingScheduler() // To prevent flakiness in the test, we reset revoke memory requested for all operators, even if only one spilled. QueryId queryId = new QueryId("query"); - SqlTask sqlTask1 = newSqlTask(queryId); + // use a larger memory pool so that we don't trigger spilling due to filling the memory pool + MemoryPool queryLimitMemoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(100, GIGABYTE)); + SqlTask sqlTask1 = newSqlTask(queryId, queryLimitMemoryPool); TestOperatorContext operatorContext11 = createTestingOperatorContexts(sqlTask1, "operator11"); TestOperatorContext operatorContext12 = createTestingOperatorContexts(sqlTask1, "operator12"); - SqlTask sqlTask2 = newSqlTask(queryId); + SqlTask sqlTask2 = newSqlTask(queryId, queryLimitMemoryPool); TestOperatorContext operatorContext2 = createTestingOperatorContexts(sqlTask2, "operator2"); allOperatorContexts = ImmutableSet.of(operatorContext11, operatorContext12, operatorContext2); - QueryLimitMemoryRevokingScheduler scheduler = new QueryLimitMemoryRevokingScheduler(singletonList(memoryPool), queryContexts::get, executor); - scheduler.registerPoolListeners(); - - assertMemoryRevokingNotRequested(); - - operatorContext11.localRevocableMemoryContext().setBytes(150_000); - operatorContext2.localRevocableMemoryContext().setBytes(100_000); - // at this point, Task1 = 150k total bytes, Task2 = 100k total bytes - - // this ensures that we are waiting for the memory revocation listener and not using polling-based revoking - awaitAsynchronousCallbacksRun(); - assertMemoryRevokingNotRequested(); + List tasks = ImmutableList.of(sqlTask1, sqlTask2); + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(queryLimitMemoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_REVOCABLE_BYTES, + true); + try { + scheduler.start(); + + assertMemoryRevokingNotRequested(); + + operatorContext11.localRevocableMemoryContext().setBytes(150_000); + operatorContext2.localRevocableMemoryContext().setBytes(100_000); + // at this point, Task1 = 150k total bytes, Task2 = 100k total bytes + + // this ensures that we are waiting for the memory revocation listener and not using polling-based revoking + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + + operatorContext12.localRevocableMemoryContext().setBytes(300_000); + // at this point, Task1 = 450k total bytes, Task2 = 100k total bytes + + scheduler.awaitAsynchronousCallbacksRun(); + // only operator11 should revoke since we need to revoke only 50k bytes + // limit - (task1 + task2) => 500k - (450k + 100k) = 50k byte to revoke + assertMemoryRevokingRequestedFor(operatorContext11); + + // revoke all bytes in operator11 + operatorContext11.localRevocableMemoryContext().setBytes(0); + // at this point, Task1 = 300k total bytes, Task2 = 100k total bytes + scheduler.awaitAsynchronousCallbacksRun(); + operatorContext11.resetMemoryRevokingRequested(); + operatorContext12.resetMemoryRevokingRequested(); + operatorContext2.resetMemoryRevokingRequested(); + assertMemoryRevokingNotRequested(); + + operatorContext11.localRevocableMemoryContext().setBytes(20_000); + // at this point, Task1 = 320,000 total bytes (oc11 - 20k, oc12 - 300k), Task2 = 100k total bytes + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + + operatorContext2.localSystemMemoryContext().setBytes(150_000); + // at this point, Task1 = 320K total bytes, Task2 = 250K total bytes + // both operator11 and operator 12 are revoking since we revoke in order of operator creation within the task until we are below the memory revoking threshold + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext11, operatorContext12); + + operatorContext11.localRevocableMemoryContext().setBytes(0); + operatorContext12.localRevocableMemoryContext().setBytes(0); + scheduler.awaitAsynchronousCallbacksRun(); + operatorContext11.resetMemoryRevokingRequested(); + operatorContext12.resetMemoryRevokingRequested(); + operatorContext2.resetMemoryRevokingRequested(); + assertMemoryRevokingNotRequested(); + + operatorContext11.localRevocableMemoryContext().setBytes(50_000); + operatorContext12.localRevocableMemoryContext().setBytes(50_000); + operatorContext2.localSystemMemoryContext().setBytes(150_000); + operatorContext2.localRevocableMemoryContext().setBytes(150_000); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); // no need to revoke + // at this point, Task1 = 75k total bytes, Task2 = 300k total bytes (150k revocable, 150k system) + + operatorContext12.localUserMemoryContext().setBytes(300_000); + // at this point, Task1 = 400K total bytes (100k revocable, 300k user), Task2 = 300k total bytes (150k revocable, 150k system) + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingRequestedFor(operatorContext2, operatorContext11); + } + finally { + scheduler.stop(); + } + } - operatorContext12.localRevocableMemoryContext().setBytes(300_000); - // at this point, Task1 = 450k total bytes, Task2 = 100k total bytes + @Test + public void testRevokesPoolWhenFullBeforeQueryLimit() + throws Exception + { + QueryContext q1 = getOrCreateQueryContext(new QueryId("q1"), memoryPool); + QueryContext q2 = getOrCreateQueryContext(new QueryId("q2"), memoryPool); - awaitAsynchronousCallbacksRun(); - // only operator11 should revoke since we need to revoke only 50k bytes - // limit - (task1 + task2) => 500k - (450k + 100k) = 50k byte to revoke - assertMemoryRevokingRequestedFor(operatorContext11); + SqlTask sqlTask1 = newSqlTask(q1.getQueryId(), memoryPool); + SqlTask sqlTask2 = newSqlTask(q2.getQueryId(), memoryPool); - // revoke all bytes in operator11 - operatorContext11.localRevocableMemoryContext().setBytes(0); - // at this point, Task1 = 300k total bytes, Task2 = 100k total bytes - awaitAsynchronousCallbacksRun(); - operatorContext11.resetMemoryRevokingRequested(); - operatorContext12.resetMemoryRevokingRequested(); - operatorContext2.resetMemoryRevokingRequested(); - assertMemoryRevokingNotRequested(); + TaskContext taskContext1 = getOrCreateTaskContext(sqlTask1); + PipelineContext pipelineContext11 = taskContext1.addPipelineContext(0, false, false, false); + DriverContext driverContext111 = pipelineContext11.addDriverContext(); + OperatorContext operatorContext1 = driverContext111.addOperatorContext(1, new PlanNodeId("na"), "na"); + OperatorContext operatorContext2 = driverContext111.addOperatorContext(2, new PlanNodeId("na"), "na"); + DriverContext driverContext112 = pipelineContext11.addDriverContext(); + OperatorContext operatorContext3 = driverContext112.addOperatorContext(3, new PlanNodeId("na"), "na"); - operatorContext11.localRevocableMemoryContext().setBytes(20_000); - // at this point, Task1 = 320,000 total bytes (oc11 - 20k, oc12 - 300k), Task2 = 100k total bytes - awaitAsynchronousCallbacksRun(); - assertMemoryRevokingNotRequested(); + TaskContext taskContext2 = getOrCreateTaskContext(sqlTask2); + PipelineContext pipelineContext21 = taskContext2.addPipelineContext(1, false, false, false); + DriverContext driverContext211 = pipelineContext21.addDriverContext(); + OperatorContext operatorContext4 = driverContext211.addOperatorContext(4, new PlanNodeId("na"), "na"); - operatorContext2.localSystemMemoryContext().setBytes(150_000); - // at this point, Task1 = 320K total bytes, Task2 = 250K total bytes - // both operator11 and operator 12 are revoking since we revoke in order of operator creation within the task until we are below the memory revoking threshold - awaitAsynchronousCallbacksRun(); - assertMemoryRevokingRequestedFor(operatorContext11, operatorContext12); + List tasks = ImmutableList.of(sqlTask1, sqlTask2); + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(memoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_CREATE_TIME, + true); + try { + scheduler.start(); + + allOperatorContexts = ImmutableSet.of(operatorContext1, operatorContext2, operatorContext3, operatorContext4); + assertMemoryRevokingNotRequested(); + + assertEquals(10, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + + LocalMemoryContext revocableMemory1 = operatorContext1.localRevocableMemoryContext(); + LocalMemoryContext revocableMemory3 = operatorContext3.localRevocableMemoryContext(); + LocalMemoryContext revocableMemory4 = operatorContext4.localRevocableMemoryContext(); + + revocableMemory1.setBytes(3); + revocableMemory3.setBytes(6); + assertEquals(1, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // we are still good - no revoking needed + assertMemoryRevokingNotRequested(); + + revocableMemory4.setBytes(7); + assertEquals(-6, memoryPool.getFreeBytes()); + scheduler.awaitAsynchronousCallbacksRun(); + // we need to revoke 3 and 6 + assertMemoryRevokingRequestedFor(operatorContext1, operatorContext3); + } + finally { + scheduler.stop(); + } + } - operatorContext11.localRevocableMemoryContext().setBytes(0); - operatorContext12.localRevocableMemoryContext().setBytes(0); - awaitAsynchronousCallbacksRun(); - operatorContext11.resetMemoryRevokingRequested(); - operatorContext12.resetMemoryRevokingRequested(); - operatorContext2.resetMemoryRevokingRequested(); - assertMemoryRevokingNotRequested(); + @Test + public void testQueryMemoryNotRevokedWhenNotEnabled() + throws Exception + { + // The various tasks created here use a small amount of system memory independent of what's set explicitly + // in this test. Triggering spilling based on differences of thousands of bytes rather than hundreds + // makes the test resilient to any noise that creates. - operatorContext11.localRevocableMemoryContext().setBytes(50_000); - operatorContext12.localRevocableMemoryContext().setBytes(50_000); - operatorContext2.localSystemMemoryContext().setBytes(150_000); - operatorContext2.localRevocableMemoryContext().setBytes(150_000); - awaitAsynchronousCallbacksRun(); - assertMemoryRevokingNotRequested(); // no need to revoke - // at this point, Task1 = 75k total bytes, Task2 = 300k total bytes (150k revocable, 150k system) + QueryId queryId = new QueryId("query"); + // use a larger memory pool so that we don't trigger spilling due to filling the memory pool + MemoryPool queryLimitMemoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(100, GIGABYTE)); + SqlTask sqlTask1 = newSqlTask(queryId, queryLimitMemoryPool); + TestOperatorContext operatorContext11 = createTestingOperatorContexts(sqlTask1, "operator11"); - operatorContext12.localUserMemoryContext().setBytes(300_000); - // at this point, Task1 = 400K total bytes (100k revocable, 300k user), Task2 = 300k total bytes (150k revocable, 150k system) - awaitAsynchronousCallbacksRun(); - assertMemoryRevokingRequestedFor(operatorContext2, operatorContext11); + allOperatorContexts = ImmutableSet.of(operatorContext11); + List tasks = ImmutableList.of(sqlTask1); + MemoryRevokingScheduler scheduler = new MemoryRevokingScheduler( + singletonList(queryLimitMemoryPool), + () -> tasks, + queryContexts::get, + 1.0, + 1.0, + ORDER_BY_REVOCABLE_BYTES, + false); + try { + scheduler.start(); + + assertMemoryRevokingNotRequested(); + + // exceed the query memory limit of 500KB + operatorContext11.localRevocableMemoryContext().setBytes(600_000); + scheduler.awaitAsynchronousCallbacksRun(); + assertMemoryRevokingNotRequested(); + } + finally { + scheduler.stop(); + } } private OperatorContext createContexts(SqlTask sqlTask) @@ -571,7 +712,7 @@ private TestOperatorContext createTestingOperatorContexts(SqlTask sqlTask, Strin new PlanNodeId("na"), "na", driverContext, - executor, + singleThreadedExecutor, driverContext.getDriverMemoryContext().newMemoryTrackingContext(), operatorName); driverContext.addOperatorContext(testOperatorContext); @@ -609,25 +750,18 @@ public long requestMemoryRevoking() } } - private void requestMemoryRevoking(MemoryRevokingScheduler scheduler) - throws Exception - { - scheduler.requestMemoryRevokingIfNeeded(); - awaitAsynchronousCallbacksRun(); - } - private void requestMemoryRevoking(TaskThresholdMemoryRevokingScheduler scheduler) throws Exception { scheduler.revokeHighMemoryTasksIfNeeded(); - awaitAsynchronousCallbacksRun(); + awaitTaskThresholdAsynchronousCallbacksRun(); } - private void awaitAsynchronousCallbacksRun() + private void awaitTaskThresholdAsynchronousCallbacksRun() throws Exception { // Make sure asynchronous callback got called (executor is single-threaded). - executor.invokeAll(singletonList((Callable) () -> null)); + singleThreadedScheduledExecutor.invokeAll(singletonList((Callable) () -> null)); } private void assertMemoryRevokingRequestedFor(OperatorContext... operatorContexts) @@ -644,9 +778,9 @@ private void assertMemoryRevokingNotRequested() assertMemoryRevokingRequestedFor(); } - private SqlTask newSqlTask(QueryId queryId) + private SqlTask newSqlTask(QueryId queryId, MemoryPool memoryPool) { - QueryContext queryContext = getOrCreateQueryContext(queryId); + QueryContext queryContext = getOrCreateQueryContext(queryId, memoryPool); TaskId taskId = new TaskId(queryId.getId(), 0, 0, idGeneator.incrementAndGet()); URI location = URI.create("fake://task/" + taskId); @@ -658,14 +792,14 @@ private SqlTask newSqlTask(QueryId queryId) queryContext, sqlTaskExecutionFactory, new MockExchangeClientSupplier(), - executor, + singleThreadedExecutor, Functions.identity(), new DataSize(32, MEGABYTE), new CounterStat(), new SpoolingOutputBufferFactory(new FeaturesConfig())); } - private QueryContext getOrCreateQueryContext(QueryId queryId) + private QueryContext getOrCreateQueryContext(QueryId queryId, MemoryPool memoryPool) { return queryContexts.computeIfAbsent(queryId, id -> new QueryContext(id, new DataSize(500, KILOBYTE), @@ -674,7 +808,7 @@ private QueryContext getOrCreateQueryContext(QueryId queryId) new DataSize(1, GIGABYTE), memoryPool, new TestingGcMonitor(), - executor, + singleThreadedExecutor, scheduledExecutor, new DataSize(1, GIGABYTE), spillSpaceTracker)); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 254f2cf199cea..1a6b6cd35667d 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -96,6 +96,7 @@ public void testDefaults() .setMemoryRevokingThreshold(0.9) .setMemoryRevokingTarget(0.5) .setTaskSpillingStrategy(ORDER_BY_CREATE_TIME) + .setQueryLimitSpillEnabled(false) .setSingleStreamSpillerChoice(SingleStreamSpillerChoice.LOCAL_FILE) .setSpillerTempStorage("local") .setMaxRevocableMemoryPerTask(new DataSize(500, MEGABYTE)) @@ -231,6 +232,7 @@ public void testExplicitPropertyMappings() .put("experimental.memory-revoking-threshold", "0.2") .put("experimental.memory-revoking-target", "0.8") .put("experimental.spiller.task-spilling-strategy", "PER_TASK_MEMORY_THRESHOLD") + .put("experimental.query-limit-spill-enabled", "true") .put("experimental.spiller.single-stream-spiller-choice", "TEMP_STORAGE") .put("experimental.spiller.spiller-temp-storage", "crail") .put("experimental.spiller.max-revocable-task-memory", "1GB") @@ -340,6 +342,7 @@ public void testExplicitPropertyMappings() .setMemoryRevokingThreshold(0.2) .setMemoryRevokingTarget(0.8) .setTaskSpillingStrategy(PER_TASK_MEMORY_THRESHOLD) + .setQueryLimitSpillEnabled(true) .setSingleStreamSpillerChoice(SingleStreamSpillerChoice.TEMP_STORAGE) .setSpillerTempStorage("crail") .setMaxRevocableMemoryPerTask(new DataSize(1, GIGABYTE))