diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java index 02e202e1eded8..26baa8b3b6e20 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java @@ -24,6 +24,8 @@ import io.airlift.units.MaxDuration; import io.airlift.units.MinDuration; +import javax.validation.constraints.DecimalMax; +import javax.validation.constraints.DecimalMin; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -88,6 +90,8 @@ public class TaskManagerConfig private Duration interruptRunawaySplitsTimeout = new Duration(600, SECONDS); + private double memoryBasedSlowDownThreshold = 1.0; + @MinDuration("1ms") @MaxDuration("10s") @NotNull @@ -575,4 +579,20 @@ public TaskManagerConfig setInterruptRunawaySplitsTimeout(Duration interruptRuna this.interruptRunawaySplitsTimeout = interruptRunawaySplitsTimeout; return this; } + + //Allowing low value to be 70 percent to avoid slowing down overall cluster by setting it too low + @DecimalMin("0.7") + @DecimalMax("1.0") + public double getMemoryBasedSlowDownThreshold() + { + return memoryBasedSlowDownThreshold; + } + + @Config("task.memory-based-slowdown-threshold") + @ConfigDescription("Pause processing new leaf split if heap memory usage crosses the threshold") + public TaskManagerConfig setMemoryBasedSlowDownThreshold(double memoryBasedSlowDownThreshold) + { + this.memoryBasedSlowDownThreshold = memoryBasedSlowDownThreshold; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java index 03fb55a0d6971..43b237531c370 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java @@ -174,12 +174,15 @@ public class TaskExecutor // shared between SplitRunners private final CounterStat globalCpuTimeMicros = new CounterStat(); private final CounterStat globalScheduledTimeMicros = new CounterStat(); + private final CounterStat splitSkippedDueToMemoryPressure = new CounterStat(); private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS); private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS); private volatile boolean closed; + private volatile boolean lowMemory; + @Inject public TaskExecutor(TaskManagerConfig config, EmbedVersion embedVersion, MultilevelSplitQueue splitQueue) { @@ -483,6 +486,13 @@ private void splitFinished(PrioritizedSplitRunner split) private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle) { + // Worker skip processing split if jvm heap usage crosses configured threhold + // Helps reduce memory pressure on the worker and avoid OOMs + if (isLowMemory()) { + log.debug("Skip task scheduling due to low memory"); + splitSkippedDueToMemoryPressure.update(1); + return; + } // if task has less than the minimum guaranteed splits running, // immediately schedule a new split for this task. This assures // that a task gets its fair amount of consideration (you have to @@ -498,6 +508,14 @@ private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle) private synchronized void addNewEntrants() { + // Worker skip processing split if jvm heap usage crosses configured threhold + // Helps reduce memory pressure on the worker and avoid OOMs + if (isLowMemory()) { + log.debug("Skip polling next split worker due to low memory"); + splitSkippedDueToMemoryPressure.update(1); + return; + } + // Ignore intermediate splits when checking minimumNumberOfDrivers. // Otherwise with (for example) minimumNumberOfDrivers = 100, 200 intermediate splits // and 100 leaf splits, depending on order of appearing splits, number of @@ -904,6 +922,13 @@ public CounterStat getGlobalCpuTimeMicros() return globalCpuTimeMicros; } + @Managed + @Nested + public CounterStat getSplitSkippedDueToMemoryPressure() + { + return splitSkippedDueToMemoryPressure; + } + private synchronized int getRunningTasksForLevel(int level) { int count = 0; @@ -1032,4 +1057,14 @@ public ThreadPoolExecutorMBean getProcessorExecutor() { return executorMBean; } + + public void setLowMemory(boolean lowMemory) + { + this.lowMemory = lowMemory; + } + + public boolean isLowMemory() + { + return this.lowMemory; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/LowMemoryMonitor.java b/presto-main/src/main/java/com/facebook/presto/memory/LowMemoryMonitor.java new file mode 100644 index 0000000000000..634e26648fb9f --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/LowMemoryMonitor.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.execution.executor.TaskExecutor; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newScheduledThreadPool; + +public class LowMemoryMonitor +{ + private static final Logger log = Logger.get(LowMemoryMonitor.class); + private final ScheduledExecutorService lowMemoryExecutor = newScheduledThreadPool(1, daemonThreadsNamed("low-memory-monitor-executor")); + private final TaskExecutor taskExecutor; + private final double threshold; + private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + + @Inject + public LowMemoryMonitor(TaskExecutor taskExecutor, TaskManagerConfig taskManagerConfig) + { + this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null"); + this.threshold = taskManagerConfig.getMemoryBasedSlowDownThreshold(); + } + + @PostConstruct + public void start() + { + if (threshold < 1.0) { + lowMemoryExecutor.scheduleWithFixedDelay(() -> checkLowMemory(), 1, 1, TimeUnit.SECONDS); + } + } + + @PreDestroy + public void stop() + { + lowMemoryExecutor.shutdown(); + } + + private void checkLowMemory() + { + MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); + + long usedMemory = memoryUsage.getUsed(); + long maxMemory = memoryUsage.getMax(); + long memoryThreshold = (long) (maxMemory * threshold); + + if (usedMemory > memoryThreshold) { + if (!taskExecutor.isLowMemory()) { + log.debug("Enabling Low Memory: Used: %s Max: %s Threshold: %s", usedMemory, maxMemory, memoryThreshold); + taskExecutor.setLowMemory(true); + } + } + else { + if (taskExecutor.isLowMemory()) { + log.debug("Disabling Low Memory: Used: %s Max: %s Threshold: %s", usedMemory, maxMemory, memoryThreshold); + taskExecutor.setLowMemory(false); + } + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java index 08b104bc86d20..6e0521aba1aa4 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java @@ -18,6 +18,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; import com.facebook.presto.failureDetector.FailureDetector; import com.facebook.presto.failureDetector.NoOpFailureDetector; +import com.facebook.presto.memory.LowMemoryMonitor; import com.facebook.presto.transaction.NoOpTransactionManager; import com.facebook.presto.transaction.TransactionManager; import com.google.inject.Binder; @@ -55,6 +56,8 @@ public void configure(Binder binder) binder.bind(NodeResourceStatusProvider.class).toInstance(newProxy(NodeResourceStatusProvider.class, (proxy, method, args) -> { return true; })); + + binder.bind(LowMemoryMonitor.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java index 6b5dccb9bc737..4c5bd8d93719a 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java @@ -70,7 +70,8 @@ public void testDefaults() .setStatisticsCpuTimerEnabled(true) .setLegacyLifespanCompletionCondition(false) .setTaskPriorityTracking(TASK_FAIR) - .setInterruptRunawaySplitsTimeout(new Duration(600, SECONDS))); + .setInterruptRunawaySplitsTimeout(new Duration(600, SECONDS)) + .setMemoryBasedSlowDownThreshold(1.0)); } @Test @@ -112,6 +113,7 @@ public void testExplicitPropertyMappings() .put("task.legacy-lifespan-completion-condition", "true") .put("task.task-priority-tracking", "QUERY_FAIR") .put("task.interrupt-runaway-splits-timeout", "599s") + .put("task.memory-based-slowdown-threshold", "0.9") .build(); TaskManagerConfig expected = new TaskManagerConfig() @@ -149,7 +151,8 @@ public void testExplicitPropertyMappings() .setStatisticsCpuTimerEnabled(false) .setLegacyLifespanCompletionCondition(true) .setTaskPriorityTracking(QUERY_FAIR) - .setInterruptRunawaySplitsTimeout(new Duration(599, SECONDS)); + .setInterruptRunawaySplitsTimeout(new Duration(599, SECONDS)) + .setMemoryBasedSlowDownThreshold(0.9); assertFullMapping(properties, expected); }