diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java index bb6c0a2967f0e..77c38d1e5f354 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java @@ -17,6 +17,7 @@ import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.airlift.configuration.DefunctConfig; import com.facebook.airlift.configuration.LegacyConfig; +import com.facebook.presto.memory.HighMemoryTaskKillerStrategy; import com.facebook.presto.util.PowerOfTwo; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; @@ -92,6 +93,13 @@ public class TaskManagerConfig private double memoryBasedSlowDownThreshold = 1.0; + private HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy = HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC; + + private boolean highMemoryTaskKillerEnabled; + private double highMemoryTaskKillerGCReclaimMemoryThreshold = 0.01; + private Duration highMemoryTaskKillerFrequentFullGCDurationThreshold = new Duration(1, SECONDS); + private double highMemoryTaskKillerHeapMemoryThreshold = 0.9; + @MinDuration("1ms") @MaxDuration("10s") @NotNull @@ -595,4 +603,67 @@ public TaskManagerConfig setMemoryBasedSlowDownThreshold(double memoryBasedSlowD this.memoryBasedSlowDownThreshold = memoryBasedSlowDownThreshold; return this; } + + public boolean isHighMemoryTaskKillerEnabled() + { + return highMemoryTaskKillerEnabled; + } + + @Config("experimental.task.high-memory-task-killer-enabled") + public TaskManagerConfig setHighMemoryTaskKillerEnabled(boolean highMemoryTaskKillerEnabled) + { + this.highMemoryTaskKillerEnabled = highMemoryTaskKillerEnabled; + return this; + } + + public Double getHighMemoryTaskKillerHeapMemoryThreshold() + { + return highMemoryTaskKillerHeapMemoryThreshold; + } + + @Config("experimental.task.high-memory-task-killer-heap-memory-threshold") + @ConfigDescription("Heap memory threshold to help high task memory killer to identify if workers is running with high heap usage") + public TaskManagerConfig setHighMemoryTaskKillerHeapMemoryThreshold(Double highMemoryTaskKillerHeapMemoryThreshold) + { + this.highMemoryTaskKillerHeapMemoryThreshold = highMemoryTaskKillerHeapMemoryThreshold; + return this; + } + + public Double getHighMemoryTaskKillerGCReclaimMemoryThreshold() + { + return highMemoryTaskKillerGCReclaimMemoryThreshold; + } + + @Config("experimental.task.high-memory-task-killer-reclaim-memory-threshold") + @ConfigDescription("Full GC Reclaim memory threshold (based on -Xmx) to help high task memory killer to identify if enough memory is reclaimed or not.") + public TaskManagerConfig setHighMemoryTaskKillerGCReclaimMemoryThreshold(Double highMemoryTaskKillerGCReclaimMemoryThreshold) + { + this.highMemoryTaskKillerGCReclaimMemoryThreshold = highMemoryTaskKillerGCReclaimMemoryThreshold; + return this; + } + + public Duration getHighMemoryTaskKillerFrequentFullGCDurationThreshold() + { + return highMemoryTaskKillerFrequentFullGCDurationThreshold; + } + + @Config("experimental.task.high-memory-task-killer-frequent-full-gc-duration-threshold") + @ConfigDescription("Threshold to identify if full GCs happening frequently and considered for the task killer to trigger") + public TaskManagerConfig setHighMemoryTaskKillerFrequentFullGCDurationThreshold(Duration highMemoryTaskKillerFrequentFullGCDurationThreshold) + { + this.highMemoryTaskKillerFrequentFullGCDurationThreshold = highMemoryTaskKillerFrequentFullGCDurationThreshold; + return this; + } + + public HighMemoryTaskKillerStrategy getHighMemoryTaskKillerStrategy() + { + return highMemoryTaskKillerStrategy; + } + + @Config("experiemental.task.high-memory-task-killer-strategy") + public TaskManagerConfig setHighMemoryTaskKillerStrategy(HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy) + { + this.highMemoryTaskKillerStrategy = highMemoryTaskKillerStrategy; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKiller.java b/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKiller.java new file mode 100644 index 0000000000000..66a99bbd5415c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKiller.java @@ -0,0 +1,248 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +import com.facebook.airlift.log.Logger; +import com.facebook.airlift.stats.GarbageCollectionNotificationInfo; +import com.facebook.presto.execution.SqlTask; +import com.facebook.presto.execution.SqlTaskManager; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.operator.TaskStats; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.QueryId; +import com.google.common.base.Ticker; +import com.google.common.collect.ListMultimap; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.jheaps.annotations.VisibleForTesting; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.management.JMException; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.AbstractMap; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static com.facebook.presto.memory.HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FREQUENT_FULL_GC; +import static com.facebook.presto.memory.HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC; +import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_HEAP_MEMORY_LIMIT; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class HighMemoryTaskKiller +{ + private static final Logger log = Logger.get(HighMemoryTaskKiller.class); + private static final String GC_NOTIFICATION_TYPE = "com.sun.management.gc.notification"; + private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final NotificationListener gcNotificationListener = (notification, ignored) -> onGCNotification(notification); + private final SqlTaskManager sqlTaskManager; + private final HighMemoryTaskKillerStrategy taskKillerStrategy; + private final boolean taskKillerEnabled; + private final Duration taskKillerFrequentFullGCDurationThreshold; + private Duration lastFullGCTimestamp; + private long lastFullGCCollectedBytes; + private final long reclaimMemoryThreshold; + private final long heapMemoryThreshold; + Ticker ticker; + + @Inject + public HighMemoryTaskKiller(SqlTaskManager sqlTaskManager, TaskManagerConfig taskManagerConfig) + { + requireNonNull(taskManagerConfig, "taskManagerConfig is null"); + + this.sqlTaskManager = requireNonNull(sqlTaskManager, "sqlTaskManager must not be null"); + + this.taskKillerStrategy = taskManagerConfig.getHighMemoryTaskKillerStrategy(); + this.taskKillerEnabled = taskManagerConfig.isHighMemoryTaskKillerEnabled(); + + this.taskKillerFrequentFullGCDurationThreshold = taskManagerConfig.getHighMemoryTaskKillerFrequentFullGCDurationThreshold(); + this.reclaimMemoryThreshold = (long) (memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerGCReclaimMemoryThreshold()); + + this.heapMemoryThreshold = (long) (memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerHeapMemoryThreshold()); + this.ticker = Ticker.systemTicker(); + } + + @PostConstruct + public void start() + { + if (!taskKillerEnabled) { + return; + } + + for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { + if (mbean.getName().equals("TestingMBeanServer")) { + continue; + } + + ObjectName objectName = mbean.getObjectName(); + try { + ManagementFactory.getPlatformMBeanServer().addNotificationListener( + objectName, + gcNotificationListener, + null, + null); + } + catch (JMException e) { + throw new RuntimeException("Unable to add listener", e); + } + } + } + + @PreDestroy + public void stop() + { + if (!taskKillerEnabled) { + return; + } + + for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) { + ObjectName objectName = mbean.getObjectName(); + try { + ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, gcNotificationListener); + } + catch (JMException ignored) { + log.error("Error removing notification: " + ignored); + } + } + } + + private void onGCNotification(Notification notification) + { + if (GC_NOTIFICATION_TYPE.equals(notification.getType())) { + GarbageCollectionNotificationInfo info = new GarbageCollectionNotificationInfo((CompositeData) notification.getUserData()); + if (info.isMajorGc()) { + if (shouldTriggerTaskKiller(info)) { + //Kill task consuming most memory + List activeTasks = getActiveTasks(); + ListMultimap activeQueriesToTasksMap = activeTasks.stream() + .collect(toImmutableListMultimap(task -> task.getQueryContext().getQueryId(), Function.identity())); + + Optional queryId = getMaxMemoryConsumingQuery(activeQueriesToTasksMap); + + if (queryId.isPresent()) { + List activeTasksToKill = activeQueriesToTasksMap.get(queryId.get()); + for (SqlTask sqlTask : activeTasksToKill) { + TaskStats taskStats = sqlTask.getTaskInfo().getStats(); + sqlTask.failed(new PrestoException(EXCEEDED_HEAP_MEMORY_LIMIT, format("Worker heap memory limit exceeded: User Memory: %d, System Memory: %d, Revocable Memory: %d", taskStats.getUserMemoryReservationInBytes(), taskStats.getSystemMemoryReservationInBytes(), taskStats.getRevocableMemoryReservationInBytes()))); + } + } + } + } + } + } + + private boolean shouldTriggerTaskKiller(GarbageCollectionNotificationInfo info) + { + boolean triggerTaskKiller = false; + DataSize beforeGcDataSize = info.getBeforeGcTotal(); + DataSize afterGcDataSize = info.getAfterGcTotal(); + + if (taskKillerStrategy == FREE_MEMORY_ON_FREQUENT_FULL_GC) { + long currentGarbageCollectedBytes = beforeGcDataSize.toBytes() - afterGcDataSize.toBytes(); + Duration currentFullGCTimestamp = new Duration(ticker.read(), TimeUnit.NANOSECONDS); + + if (isFrequentFullGC(lastFullGCTimestamp, currentFullGCTimestamp) && !hasFullGCFreedEnoughBytes(currentGarbageCollectedBytes)) { + triggerTaskKiller = true; + } + + lastFullGCTimestamp = currentFullGCTimestamp; + lastFullGCCollectedBytes = currentGarbageCollectedBytes; + } + else if (taskKillerStrategy == FREE_MEMORY_ON_FULL_GC) { + if (isLowMemory() && beforeGcDataSize.toBytes() - afterGcDataSize.toBytes() < reclaimMemoryThreshold) { + triggerTaskKiller = true; + } + } + + return triggerTaskKiller; + } + + private List getActiveTasks() + { + return sqlTaskManager.getAllTasks().stream() + .filter(task -> !task.getTaskState().isDone()) + .collect(toImmutableList()); + } + + @VisibleForTesting + public static Optional getMaxMemoryConsumingQuery(ListMultimap queryIDToSqlTaskMap) + { + if (queryIDToSqlTaskMap.isEmpty()) { + return Optional.empty(); + } + + Comparator> comparator = Comparator.comparingLong(Map.Entry::getValue); + + Optional maxMemoryConsumpingQueryId = queryIDToSqlTaskMap.asMap().entrySet().stream() + //Convert to Entry, QueryId -> Total Memory Reservation + .map(entry -> + new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue().stream() + .map(SqlTask::getTaskInfo) + .map(TaskInfo::getStats) + .mapToLong(stats -> stats.getUserMemoryReservationInBytes() + stats.getSystemMemoryReservationInBytes() + stats.getRevocableMemoryReservationInBytes()) + .sum()) + ).max(comparator.reversed()).map(Map.Entry::getKey); + + return maxMemoryConsumpingQueryId; + } + + private boolean isFrequentFullGC(Duration lastFullGCTime, Duration currentFullGCTime) + { + long diffBetweenFullGCMilis = currentFullGCTime.toMillis() - lastFullGCTime.toMillis(); + log.debug("Time difference between last 2 full GC in miliseconds: " + diffBetweenFullGCMilis); + if (diffBetweenFullGCMilis > taskKillerFrequentFullGCDurationThreshold.getValue(TimeUnit.MILLISECONDS)) { + log.debug("Skip killing tasks Due to full GCs were not happening frequently."); + return false; + } + return true; + } + + private boolean hasFullGCFreedEnoughBytes(long currentGarbageCollectedBytes) + { + if (currentGarbageCollectedBytes < reclaimMemoryThreshold && lastFullGCCollectedBytes < reclaimMemoryThreshold) { + log.debug("Full GC not able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + lastFullGCCollectedBytes); + return false; + } + log.debug("Full GC able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + lastFullGCCollectedBytes); + return true; + } + + private boolean isLowMemory() + { + MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); + + if (memoryUsage.getUsed() > heapMemoryThreshold) { + return true; + } + + return false; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKillerStrategy.java b/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKillerStrategy.java new file mode 100644 index 0000000000000..1893d9eb1d0b0 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/memory/HighMemoryTaskKillerStrategy.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.memory; + +public enum HighMemoryTaskKillerStrategy +{ + FREE_MEMORY_ON_FULL_GC, //Kills high memory tasks if worker is running low memory and full GC is not able to reclaim enough memory + FREE_MEMORY_ON_FREQUENT_FULL_GC //Kills high memory tasks if worker if frequent full GC not able to reclaim enough memory +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java index 6e0521aba1aa4..1227ebe411251 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/WorkerModule.java @@ -18,6 +18,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; import com.facebook.presto.failureDetector.FailureDetector; import com.facebook.presto.failureDetector.NoOpFailureDetector; +import com.facebook.presto.memory.HighMemoryTaskKiller; import com.facebook.presto.memory.LowMemoryMonitor; import com.facebook.presto.transaction.NoOpTransactionManager; import com.facebook.presto.transaction.TransactionManager; @@ -58,6 +59,8 @@ public void configure(Binder binder) })); binder.bind(LowMemoryMonitor.class).in(Scopes.SINGLETON); + + binder.bind(HighMemoryTaskKiller.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java index 0197773bb2a67..98d1ecf6cca2b 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.execution; +import com.facebook.presto.memory.HighMemoryTaskKillerStrategy; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -71,7 +72,12 @@ public void testDefaults() .setLegacyLifespanCompletionCondition(false) .setTaskPriorityTracking(TASK_FAIR) .setInterruptRunawaySplitsTimeout(new Duration(600, SECONDS)) - .setMemoryBasedSlowDownThreshold(1.0)); + .setMemoryBasedSlowDownThreshold(1.0) + .setHighMemoryTaskKillerEnabled(false) + .setHighMemoryTaskKillerStrategy(HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC) + .setHighMemoryTaskKillerGCReclaimMemoryThreshold(0.01) + .setHighMemoryTaskKillerFrequentFullGCDurationThreshold(new Duration(1, SECONDS)) + .setHighMemoryTaskKillerHeapMemoryThreshold(0.9)); } @Test @@ -114,6 +120,11 @@ public void testExplicitPropertyMappings() .put("task.task-priority-tracking", "QUERY_FAIR") .put("task.interrupt-runaway-splits-timeout", "599s") .put("experimental.task.memory-based-slowdown-threshold", "0.9") + .put("experimental.task.high-memory-task-killer-enabled", "true") + .put("experiemental.task.high-memory-task-killer-strategy", "FREE_MEMORY_ON_FREQUENT_FULL_GC") + .put("experimental.task.high-memory-task-killer-reclaim-memory-threshold", "0.8") + .put("experimental.task.high-memory-task-killer-frequent-full-gc-duration-threshold", "2s") + .put("experimental.task.high-memory-task-killer-heap-memory-threshold", "0.8") .build(); TaskManagerConfig expected = new TaskManagerConfig() @@ -152,7 +163,12 @@ public void testExplicitPropertyMappings() .setLegacyLifespanCompletionCondition(true) .setTaskPriorityTracking(QUERY_FAIR) .setInterruptRunawaySplitsTimeout(new Duration(599, SECONDS)) - .setMemoryBasedSlowDownThreshold(0.9); + .setMemoryBasedSlowDownThreshold(0.9) + .setHighMemoryTaskKillerEnabled(true) + .setHighMemoryTaskKillerStrategy(HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FREQUENT_FULL_GC) + .setHighMemoryTaskKillerGCReclaimMemoryThreshold(0.8) + .setHighMemoryTaskKillerFrequentFullGCDurationThreshold(new Duration(2, SECONDS)) + .setHighMemoryTaskKillerHeapMemoryThreshold(0.8); assertFullMapping(properties, expected); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java index a172f4d9235a2..34b1c21aad079 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java @@ -131,6 +131,7 @@ public enum StandardErrorCode NATIVE_EXECUTION_BINARY_NOT_EXIST(0x0002_000E, USER_ERROR), NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR(0x0002_000F, INTERNAL_ERROR), MISSING_RESOURCE_GROUP_SELECTOR(0x0002_0010, INTERNAL_ERROR), + EXCEEDED_HEAP_MEMORY_LIMIT(0x0002_0011, INSUFFICIENT_RESOURCES), /**/; // Error code range 0x0003 is reserved for Presto-on-Spark