Skip to content

Commit

Permalink
Merge pull request #1853 from HubSpot/failover-on-no-heartbeat
Browse files Browse the repository at this point in the history
Failover when we miss Mesos Master heartbeats.
  • Loading branch information
ssalinas authored Sep 28, 2018
2 parents 2b7a926 + 84f0410 commit 1c97a8b
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SingularityState {
private final Optional<Double> minimumPriorityLevel;

private final long avgStatusUpdateDelayMs;
private final long lastHeartbeatAt;

@JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks,
Expand Down Expand Up @@ -96,7 +97,8 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks,
@JsonProperty("unknownSlaves") int unknownSlaves,
@JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy,
@JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel,
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs) {
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs,
@JsonProperty("lastHeartbeatAt") long lastHeartbeatAt) {
this.activeTasks = activeTasks;
this.launchingTasks = launchingTasks;
this.activeRequests = activeRequests;
Expand Down Expand Up @@ -133,6 +135,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks,
this.authDatastoreHealthy = authDatastoreHealthy;
this.minimumPriorityLevel = minimumPriorityLevel;
this.avgStatusUpdateDelayMs = avgStatusUpdateDelayMs;
this.lastHeartbeatAt = lastHeartbeatAt;
}

@Schema(description = "Count of requests in finished state")
Expand Down Expand Up @@ -338,6 +341,11 @@ public long getAvgStatusUpdateDelayMs() {
return avgStatusUpdateDelayMs;
}

@Schema(description = "Time in UTC millis at which Singularity received the most recent HEARTBEAT event from the Mesos Master")
public long getLastHeartbeatAt() {
return lastHeartbeatAt;
}

@Override
public String toString() {
return "SingularityState{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class SingularityMainModule implements Module {

public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average";
public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas";
public static final String LAST_MESOS_MASTER_HEARTBEAT_TIME = "singularity.last.mesos.master.heartbeat.time";

private final SingularityConfiguration configuration;

Expand Down Expand Up @@ -408,4 +409,11 @@ public AtomicLong provideDeltasMap() {
public ConcurrentHashMap<Long, Long> provideUpdateDeltasMap() {
return new ConcurrentHashMap<>();
}

@Provides
@Singleton
@Named(LAST_MESOS_MASTER_HEARTBEAT_TIME)
public AtomicLong provideLastHeartbeatTime() {
return new AtomicLong(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public class SingularityConfiguration extends Configuration {

private long checkUsageEveryMillis = TimeUnit.MINUTES.toMillis(1);

private long checkMesosMasterHeartbeatEverySeconds = 20;

private long maxMissedMesosMasterHeartbeats = 3;

private int maxConcurrentUsageCollections = 15;

private boolean shuffleTasksForOverloadedSlaves = false; // recommended 'true' when oversubscribing cpu for larger clusters
Expand Down Expand Up @@ -1409,6 +1413,22 @@ public void setCheckUsageEveryMillis(long checkUsageEveryMillis) {
this.checkUsageEveryMillis = checkUsageEveryMillis;
}

public long getCheckMesosMasterHeartbeatEverySeconds() {
return checkMesosMasterHeartbeatEverySeconds;
}

public void setCheckMesosMasterHeartbeatEverySeconds(long checkMesosMasterHeartbeatEverySeconds) {
this.checkMesosMasterHeartbeatEverySeconds = checkMesosMasterHeartbeatEverySeconds;
}

public long getMaxMissedMesosMasterHeartbeats() {
return maxMissedMesosMasterHeartbeats;
}

public void setMaxMissedMesosMasterHeartbeats(long maxMissedMesosMasterHeartbeats) {
this.maxMissedMesosMasterHeartbeats = maxMissedMesosMasterHeartbeats;
}

public int getMaxConcurrentUsageCollections() {
return maxConcurrentUsageCollections;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class StateManager extends CuratorManager {
private final Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder;
private final PriorityManager priorityManager;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;

@Inject
public StateManager(CuratorFramework curatorFramework,
Expand All @@ -82,7 +83,8 @@ public StateManager(CuratorFramework curatorFramework,
SingularityAuthDatastore authDatastore,
PriorityManager priorityManager,
Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
super(curatorFramework, configuration, metricRegistry);

this.requestManager = requestManager;
Expand All @@ -97,6 +99,7 @@ public StateManager(CuratorFramework curatorFramework,
this.priorityManager = priorityManager;
this.taskReconciliationStatisticsTranscoder = taskReconciliationStatisticsTranscoder;
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;
this.lastHeartbeatTime = lastHeartbeatTime;
}

public SingularityCreateResult saveTaskReconciliationStatistics(SingularityTaskReconciliationStatistics taskReconciliationStatistics) {
Expand Down Expand Up @@ -293,7 +296,7 @@ public SingularityState generateState(boolean includeRequestIds) {
deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, oldestDeployStep, activeDeploys, scheduledTasksInfo.getNumLateTasks(),
scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null,
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel,
statusUpdateDeltaAvg.get());
statusUpdateDeltaAvg.get(), lastHeartbeatTime.get());
}

private Map<String, Long> getNumTasks(List<SingularityRequestWithState> requests) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public void killAndRecord(SingularityTaskId taskId, TaskCleanupType taskCleanupT

public abstract Optional<Long> getLastOfferTimestamp();

public abstract Optional<Double> getHeartbeatIntervalSeconds();

/*
* for testing only
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
private final boolean delayWhenStatusUpdateDeltaTooLarge;
private final long delayWhenDeltaOverMs;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;
private final SingularityConfiguration configuration;
private final TaskManager taskManager;
private final Transcoder<SingularityTaskDestroyFrameworkMessage> transcoder;
private final SingularitySchedulerLock lock;

private volatile SchedulerState state;
private Optional<Long> lastOfferTimestamp = Optional.absent();
private Optional<Double> heartbeatIntervalSeconds = Optional.absent();

private final AtomicReference<MasterInfo> masterInfo = new AtomicReference<>();
private final List<TaskStatus> queuedUpdates;
Expand All @@ -114,7 +116,8 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
SingularityConfiguration configuration,
TaskManager taskManager,
Transcoder<SingularityTaskDestroyFrameworkMessage> transcoder,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
this.exceptionNotifier = exceptionNotifier;
this.startup = startup;
this.abort = abort;
Expand All @@ -129,6 +132,8 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta();
this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;

this.lastHeartbeatTime = lastHeartbeatTime;
this.taskManager = taskManager;
this.transcoder = transcoder;
this.leaderCacheCoordinator = leaderCacheCoordinator;
Expand All @@ -143,6 +148,11 @@ public void subscribed(Subscribed subscribed) {
callWithStateLock(() -> {
Preconditions.checkState(state == SchedulerState.NOT_STARTED, "Asked to startup - but in invalid state: %s", state.name());

double advertisedHeartbeatIntervalSeconds = subscribed.getHeartbeatIntervalSeconds();
if (advertisedHeartbeatIntervalSeconds > 0) {
heartbeatIntervalSeconds = Optional.of(advertisedHeartbeatIntervalSeconds);
}

leaderCacheCoordinator.activateLeaderCache();
MasterInfo newMasterInfo = subscribed.getMasterInfo();
masterInfo.set(newMasterInfo);
Expand Down Expand Up @@ -329,7 +339,9 @@ public void error(String message) {

@Override
public void heartbeat(Event event) {
LOG.debug("Heartbeat from mesos");
long now = System.currentTimeMillis();
long delta = (now - lastHeartbeatTime.getAndSet(now));
LOG.debug("Heartbeat from mesos. Delta since last heartbeat is {}ms", delta);
}

@Override
Expand Down Expand Up @@ -451,6 +463,10 @@ public Optional<Long> getLastOfferTimestamp() {
return lastOfferTimestamp;
}

public Optional<Double> getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}

public void killAndRecord(SingularityTaskId taskId, Optional<RequestCleanupType> requestCleanupType, Optional<TaskCleanupType> taskCleanupType, Optional<Long> originalTimestamp, Optional<Integer> retries, Optional<String> user) {
Preconditions.checkState(isRunning());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.hubspot.singularity.scheduler;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.singularity.SingularityAbort;
import com.hubspot.singularity.SingularityAbort.AbortReason;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.mesos.SingularityMesosScheduler;

public class SingularityMesosHeartbeatChecker extends SingularityLeaderOnlyPoller {
private static final Logger LOG = LoggerFactory.getLogger(SingularityMesosHeartbeatChecker.class);

private final SingularityConfiguration configuration;
private final SingularityMesosScheduler mesosScheduler;
private final SingularityAbort abort;
private final AtomicLong lastHeartbeatTime;

@Inject
public SingularityMesosHeartbeatChecker(SingularityConfiguration configuration,
SingularityMesosScheduler mesosScheduler,
SingularityAbort abort,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
super(configuration.getCheckMesosMasterHeartbeatEverySeconds(), TimeUnit.SECONDS);
this.configuration = configuration;
this.mesosScheduler = mesosScheduler;
this.abort = abort;
this.lastHeartbeatTime = lastHeartbeatTime;
}

@Override
public void runActionOnPoll() {
if (!mesosScheduler.getHeartbeatIntervalSeconds().isPresent()) {
if (mesosScheduler.isRunning()) {
LOG.debug("Not checking for a Mesos heartbeat because the Mesos Master didn't advertise a heartbeat interval.");
} else {
LOG.debug("Not checking for a Mesos heartbeat because we haven't subscribed with the Mesos Master yet.");
}

return;
}

long millisSinceLastHeartbeat = System.currentTimeMillis() - lastHeartbeatTime.get();
double missedHeartbeats = millisSinceLastHeartbeat / (mesosScheduler.getHeartbeatIntervalSeconds().get() * 1000);

if (missedHeartbeats > configuration.getMaxMissedMesosMasterHeartbeats()) {
LOG.error("I haven't received a Mesos heartbeat in {}ms! Aborting Singularity...", millisSinceLastHeartbeat);
mesosScheduler.notifyStopping();
abort.abort(AbortReason.LOST_MESOS_CONNECTION, Optional.of(new RuntimeException(String.format("Didn't receive a heartbeat from the Mesos Master for %dms", millisSinceLastHeartbeat))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ protected void configure() {
bind(SingularityLeaderCache.class).in(Scopes.SINGLETON);
bind(SingularityLeaderCacheCoordinator.class).in(Scopes.SINGLETON);
bind(SingularityAutoScaleSpreadAllPoller.class).in(Scopes.SINGLETON);
bind(SingularityMesosHeartbeatChecker.class).in(Scopes.SINGLETON);
}

}

0 comments on commit 1c97a8b

Please sign in to comment.