Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.DefunctConfig;
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.presto.memory.HighMemoryTaskKillerStrategy;
import com.facebook.presto.util.PowerOfTwo;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
Expand Down Expand Up @@ -92,6 +93,13 @@ public class TaskManagerConfig

private double memoryBasedSlowDownThreshold = 1.0;

private HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy = HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC;

private boolean highMemoryTaskKillerEnabled;
private double highMemoryTaskKillerGCReclaimMemoryThreshold = 0.01;
private Duration highMemoryTaskKillerFrequentFullGCDurationThreshold = new Duration(1, SECONDS);
private double highMemoryTaskKillerHeapMemoryThreshold = 0.9;

@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
Expand Down Expand Up @@ -595,4 +603,67 @@ public TaskManagerConfig setMemoryBasedSlowDownThreshold(double memoryBasedSlowD
this.memoryBasedSlowDownThreshold = memoryBasedSlowDownThreshold;
return this;
}

public boolean isHighMemoryTaskKillerEnabled()
{
return highMemoryTaskKillerEnabled;
}

@Config("experimental.task.high-memory-task-killer-enabled")
public TaskManagerConfig setHighMemoryTaskKillerEnabled(boolean highMemoryTaskKillerEnabled)
{
this.highMemoryTaskKillerEnabled = highMemoryTaskKillerEnabled;
return this;
}

public Double getHighMemoryTaskKillerHeapMemoryThreshold()
{
return highMemoryTaskKillerHeapMemoryThreshold;
}

@Config("experimental.task.high-memory-task-killer-heap-memory-threshold")
@ConfigDescription("Heap memory threshold to help high task memory killer to identify if workers is running with high heap usage")
public TaskManagerConfig setHighMemoryTaskKillerHeapMemoryThreshold(Double highMemoryTaskKillerHeapMemoryThreshold)
{
this.highMemoryTaskKillerHeapMemoryThreshold = highMemoryTaskKillerHeapMemoryThreshold;
return this;
}

public Double getHighMemoryTaskKillerGCReclaimMemoryThreshold()
{
return highMemoryTaskKillerGCReclaimMemoryThreshold;
}

@Config("experimental.task.high-memory-task-killer-reclaim-memory-threshold")
@ConfigDescription("Full GC Reclaim memory threshold (based on -Xmx) to help high task memory killer to identify if enough memory is reclaimed or not.")
public TaskManagerConfig setHighMemoryTaskKillerGCReclaimMemoryThreshold(Double highMemoryTaskKillerGCReclaimMemoryThreshold)
{
this.highMemoryTaskKillerGCReclaimMemoryThreshold = highMemoryTaskKillerGCReclaimMemoryThreshold;
return this;
}

public Duration getHighMemoryTaskKillerFrequentFullGCDurationThreshold()
{
return highMemoryTaskKillerFrequentFullGCDurationThreshold;
}

@Config("experimental.task.high-memory-task-killer-frequent-full-gc-duration-threshold")
@ConfigDescription("Threshold to identify if full GCs happening frequently and considered for the task killer to trigger")
public TaskManagerConfig setHighMemoryTaskKillerFrequentFullGCDurationThreshold(Duration highMemoryTaskKillerFrequentFullGCDurationThreshold)
{
this.highMemoryTaskKillerFrequentFullGCDurationThreshold = highMemoryTaskKillerFrequentFullGCDurationThreshold;
return this;
}

public HighMemoryTaskKillerStrategy getHighMemoryTaskKillerStrategy()
{
return highMemoryTaskKillerStrategy;
}

@Config("experiemental.task.high-memory-task-killer-strategy")
public TaskManagerConfig setHighMemoryTaskKillerStrategy(HighMemoryTaskKillerStrategy highMemoryTaskKillerStrategy)
{
this.highMemoryTaskKillerStrategy = highMemoryTaskKillerStrategy;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.GarbageCollectionNotificationInfo;
import com.facebook.presto.execution.SqlTask;
import com.facebook.presto.execution.SqlTaskManager;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.google.common.base.Ticker;
import com.google.common.collect.ListMultimap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.jheaps.annotations.VisibleForTesting;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.management.JMException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.AbstractMap;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.facebook.presto.memory.HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FREQUENT_FULL_GC;
import static com.facebook.presto.memory.HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_HEAP_MEMORY_LIMIT;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class HighMemoryTaskKiller
{
private static final Logger log = Logger.get(HighMemoryTaskKiller.class);
private static final String GC_NOTIFICATION_TYPE = "com.sun.management.gc.notification";
private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final NotificationListener gcNotificationListener = (notification, ignored) -> onGCNotification(notification);
private final SqlTaskManager sqlTaskManager;
private final HighMemoryTaskKillerStrategy taskKillerStrategy;
private final boolean taskKillerEnabled;
private final Duration taskKillerFrequentFullGCDurationThreshold;
private Duration lastFullGCTimestamp;
private long lastFullGCCollectedBytes;
private final long reclaimMemoryThreshold;
private final long heapMemoryThreshold;
Ticker ticker;

@Inject
public HighMemoryTaskKiller(SqlTaskManager sqlTaskManager, TaskManagerConfig taskManagerConfig)
{
requireNonNull(taskManagerConfig, "taskManagerConfig is null");

this.sqlTaskManager = requireNonNull(sqlTaskManager, "sqlTaskManager must not be null");

this.taskKillerStrategy = taskManagerConfig.getHighMemoryTaskKillerStrategy();
this.taskKillerEnabled = taskManagerConfig.isHighMemoryTaskKillerEnabled();

this.taskKillerFrequentFullGCDurationThreshold = taskManagerConfig.getHighMemoryTaskKillerFrequentFullGCDurationThreshold();
this.reclaimMemoryThreshold = (long) (memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerGCReclaimMemoryThreshold());
Comment thread
swapsmagic marked this conversation as resolved.
Outdated

this.heapMemoryThreshold = (long) (memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerHeapMemoryThreshold());
this.ticker = Ticker.systemTicker();
}

@PostConstruct
public void start()
{
if (!taskKillerEnabled) {
return;
}

for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mbean.getName().equals("TestingMBeanServer")) {
continue;
}

ObjectName objectName = mbean.getObjectName();
try {
ManagementFactory.getPlatformMBeanServer().addNotificationListener(
objectName,
gcNotificationListener,
null,
null);
}
catch (JMException e) {
throw new RuntimeException("Unable to add listener", e);
}
}
}

@PreDestroy
public void stop()
{
if (!taskKillerEnabled) {
return;
}

for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
ObjectName objectName = mbean.getObjectName();
try {
ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, gcNotificationListener);
}
catch (JMException ignored) {
log.error("Error removing notification: " + ignored);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error(ignored, msg) to log the stack trace maybe

}
}
}

private void onGCNotification(Notification notification)
{
if (GC_NOTIFICATION_TYPE.equals(notification.getType())) {
GarbageCollectionNotificationInfo info = new GarbageCollectionNotificationInfo((CompositeData) notification.getUserData());
if (info.isMajorGc()) {
if (shouldTriggerTaskKiller(info)) {
//Kill task consuming most memory
List<SqlTask> activeTasks = getActiveTasks();
ListMultimap<QueryId, SqlTask> activeQueriesToTasksMap = activeTasks.stream()
.collect(toImmutableListMultimap(task -> task.getQueryContext().getQueryId(), Function.identity()));

Optional<QueryId> queryId = getMaxMemoryConsumingQuery(activeQueriesToTasksMap);

if (queryId.isPresent()) {
List<SqlTask> activeTasksToKill = activeQueriesToTasksMap.get(queryId.get());
for (SqlTask sqlTask : activeTasksToKill) {
TaskStats taskStats = sqlTask.getTaskInfo().getStats();
sqlTask.failed(new PrestoException(EXCEEDED_HEAP_MEMORY_LIMIT, format("Worker heap memory limit exceeded: User Memory: %d, System Memory: %d, Revocable Memory: %d", taskStats.getUserMemoryReservationInBytes(), taskStats.getSystemMemoryReservationInBytes(), taskStats.getRevocableMemoryReservationInBytes())));
}
}
}
}
}
}

private boolean shouldTriggerTaskKiller(GarbageCollectionNotificationInfo info)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the check for info.isMajorGc be done inside here ? Otherwise this method is only called for full gcs.

{
boolean triggerTaskKiller = false;
DataSize beforeGcDataSize = info.getBeforeGcTotal();
DataSize afterGcDataSize = info.getAfterGcTotal();

if (taskKillerStrategy == FREE_MEMORY_ON_FREQUENT_FULL_GC) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called FREE_MEMORY_... or just TRIGGER_ON_ or even KILL_ON_... ? It doesn't free any memory explicitly other than failing queries.

long currentGarbageCollectedBytes = beforeGcDataSize.toBytes() - afterGcDataSize.toBytes();
Duration currentFullGCTimestamp = new Duration(ticker.read(), TimeUnit.NANOSECONDS);

if (isFrequentFullGC(lastFullGCTimestamp, currentFullGCTimestamp) && !hasFullGCFreedEnoughBytes(currentGarbageCollectedBytes)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is the initial case of lastFullGCTimestamp = 0 handled ?

triggerTaskKiller = true;
}

lastFullGCTimestamp = currentFullGCTimestamp;
lastFullGCCollectedBytes = currentGarbageCollectedBytes;
}
else if (taskKillerStrategy == FREE_MEMORY_ON_FULL_GC) {
if (isLowMemory() && beforeGcDataSize.toBytes() - afterGcDataSize.toBytes() < reclaimMemoryThreshold) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use currentGarbageCollectedBytes here ... ?

Which strategy are we going to roll out with initially ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the check for isLowMemory be done common to both the approaches ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This second approach of FREE_MEM_ON_FULL_GC is very similar to the first one (even with the reclaimMemoryThreshhold check -- its just time invariant in that it does not look at the prev full gc.

Not a big deal, but can we commonify some code ?

triggerTaskKiller = true;
}
}

return triggerTaskKiller;
}

private List<SqlTask> getActiveTasks()
{
return sqlTaskManager.getAllTasks().stream()
.filter(task -> !task.getTaskState().isDone())
.collect(toImmutableList());
}

@VisibleForTesting
public static Optional<QueryId> getMaxMemoryConsumingQuery(ListMultimap<QueryId, SqlTask> queryIDToSqlTaskMap)
{
if (queryIDToSqlTaskMap.isEmpty()) {
return Optional.empty();
}

Comparator<Map.Entry<QueryId, Long>> comparator = Comparator.comparingLong(Map.Entry::getValue);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the .reversed here ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to having static vs non static method, this is not straight moving the reversed at the top so going to address in followup PR.


Optional<QueryId> maxMemoryConsumpingQueryId = queryIDToSqlTaskMap.asMap().entrySet().stream()
//Convert to Entry<QueryId, Long>, QueryId -> Total Memory Reservation
.map(entry ->
new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue().stream()
.map(SqlTask::getTaskInfo)
.map(TaskInfo::getStats)
.mapToLong(stats -> stats.getUserMemoryReservationInBytes() + stats.getSystemMemoryReservationInBytes() + stats.getRevocableMemoryReservationInBytes())
.sum())
).max(comparator.reversed()).map(Map.Entry::getKey);

return maxMemoryConsumpingQueryId;
}

private boolean isFrequentFullGC(Duration lastFullGCTime, Duration currentFullGCTime)
{
long diffBetweenFullGCMilis = currentFullGCTime.toMillis() - lastFullGCTime.toMillis();
log.debug("Time difference between last 2 full GC in miliseconds: " + diffBetweenFullGCMilis);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend for this debug logging to go out ?

if (diffBetweenFullGCMilis > taskKillerFrequentFullGCDurationThreshold.getValue(TimeUnit.MILLISECONDS)) {
log.debug("Skip killing tasks Due to full GCs were not happening frequently.");
return false;
}
return true;
}

private boolean hasFullGCFreedEnoughBytes(long currentGarbageCollectedBytes)
{
if (currentGarbageCollectedBytes < reclaimMemoryThreshold && lastFullGCCollectedBytes < reclaimMemoryThreshold) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to skip the initial case of lastFullGcCollectedBytes being 0 ?

log.debug("Full GC not able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + lastFullGCCollectedBytes);
return false;
}
log.debug("Full GC able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + lastFullGCCollectedBytes);
return true;
}

private boolean isLowMemory()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this helper method have the name ? isLowFreeMemory or isHighMemoryUsed ?

{
MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();

if (memoryUsage.getUsed() > heapMemoryThreshold) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this helper method ? If so, how about return (memoryUsage.getUsed() > heapMemThresh) ?

return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;

public enum HighMemoryTaskKillerStrategy
{
FREE_MEMORY_ON_FULL_GC, //Kills high memory tasks if worker is running low memory and full GC is not able to reclaim enough memory
FREE_MEMORY_ON_FREQUENT_FULL_GC //Kills high memory tasks if worker if frequent full GC not able to reclaim enough memory
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.failureDetector.NoOpFailureDetector;
import com.facebook.presto.memory.HighMemoryTaskKiller;
import com.facebook.presto.memory.LowMemoryMonitor;
import com.facebook.presto.transaction.NoOpTransactionManager;
import com.facebook.presto.transaction.TransactionManager;
Expand Down Expand Up @@ -58,6 +59,8 @@ public void configure(Binder binder)
}));

binder.bind(LowMemoryMonitor.class).in(Scopes.SINGLETON);

binder.bind(HighMemoryTaskKiller.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Loading