Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failover when we miss Mesos Master heartbeats. #1853

Merged
merged 6 commits into from
Sep 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SingularityState {
private final Optional<Double> minimumPriorityLevel;

private final long avgStatusUpdateDelayMs;
private final long lastHeartbeatAt;

@JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks,
Expand Down Expand Up @@ -96,7 +97,8 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks,
@JsonProperty("unknownSlaves") int unknownSlaves,
@JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy,
@JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel,
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs) {
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs,
@JsonProperty("lastHeartbeatAt") long lastHeartbeatAt) {
this.activeTasks = activeTasks;
this.launchingTasks = launchingTasks;
this.activeRequests = activeRequests;
Expand Down Expand Up @@ -133,6 +135,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks,
this.authDatastoreHealthy = authDatastoreHealthy;
this.minimumPriorityLevel = minimumPriorityLevel;
this.avgStatusUpdateDelayMs = avgStatusUpdateDelayMs;
this.lastHeartbeatAt = lastHeartbeatAt;
}

@Schema(description = "Count of requests in finished state")
Expand Down Expand Up @@ -338,6 +341,11 @@ public long getAvgStatusUpdateDelayMs() {
return avgStatusUpdateDelayMs;
}

@Schema(description = "Time in UTC millis at which Singularity received the most recent HEARTBEAT event from the Mesos Master")
public long getLastHeartbeatAt() {
return lastHeartbeatAt;
}

@Override
public String toString() {
return "SingularityState{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class SingularityMainModule implements Module {

public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average";
public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas";
public static final String LAST_MESOS_MASTER_HEARTBEAT_TIME = "singularity.last.mesos.master.heartbeat.time";

private final SingularityConfiguration configuration;

Expand Down Expand Up @@ -408,4 +409,11 @@ public AtomicLong provideDeltasMap() {
public ConcurrentHashMap<Long, Long> provideUpdateDeltasMap() {
return new ConcurrentHashMap<>();
}

@Provides
@Singleton
@Named(LAST_MESOS_MASTER_HEARTBEAT_TIME)
public AtomicLong provideLastHeartbeatTime() {
return new AtomicLong(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public class SingularityConfiguration extends Configuration {

private long checkUsageEveryMillis = TimeUnit.MINUTES.toMillis(1);

private long checkMesosMasterHeartbeatEverySeconds = 20;

private long maxMissedMesosMasterHeartbeats = 3;

private int maxConcurrentUsageCollections = 15;

private boolean shuffleTasksForOverloadedSlaves = false; // recommended 'true' when oversubscribing cpu for larger clusters
Expand Down Expand Up @@ -1409,6 +1413,22 @@ public void setCheckUsageEveryMillis(long checkUsageEveryMillis) {
this.checkUsageEveryMillis = checkUsageEveryMillis;
}

public long getCheckMesosMasterHeartbeatEverySeconds() {
return checkMesosMasterHeartbeatEverySeconds;
}

public void setCheckMesosMasterHeartbeatEverySeconds(long checkMesosMasterHeartbeatEverySeconds) {
this.checkMesosMasterHeartbeatEverySeconds = checkMesosMasterHeartbeatEverySeconds;
}

public long getMaxMissedMesosMasterHeartbeats() {
return maxMissedMesosMasterHeartbeats;
}

public void setMaxMissedMesosMasterHeartbeats(long maxMissedMesosMasterHeartbeats) {
this.maxMissedMesosMasterHeartbeats = maxMissedMesosMasterHeartbeats;
}

public int getMaxConcurrentUsageCollections() {
return maxConcurrentUsageCollections;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.jetbrains.annotations.NotNull;

import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -49,7 +48,7 @@ public List<String> getBlacklist() {
return cache.getUnchecked(BLACKLIST_ROOT);
}

@NotNull private String getEmailPath(String email) {
private String getEmailPath(String email) {
return ZKPaths.makePath(BLACKLIST_ROOT, email);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class StateManager extends CuratorManager {
private final Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder;
private final PriorityManager priorityManager;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;

@Inject
public StateManager(CuratorFramework curatorFramework,
Expand All @@ -82,7 +83,8 @@ public StateManager(CuratorFramework curatorFramework,
SingularityAuthDatastore authDatastore,
PriorityManager priorityManager,
Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
super(curatorFramework, configuration, metricRegistry);

this.requestManager = requestManager;
Expand All @@ -97,6 +99,7 @@ public StateManager(CuratorFramework curatorFramework,
this.priorityManager = priorityManager;
this.taskReconciliationStatisticsTranscoder = taskReconciliationStatisticsTranscoder;
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;
this.lastHeartbeatTime = lastHeartbeatTime;
}

public SingularityCreateResult saveTaskReconciliationStatistics(SingularityTaskReconciliationStatistics taskReconciliationStatistics) {
Expand Down Expand Up @@ -293,7 +296,7 @@ public SingularityState generateState(boolean includeRequestIds) {
deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, oldestDeployStep, activeDeploys, scheduledTasksInfo.getNumLateTasks(),
scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null,
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel,
statusUpdateDeltaAvg.get());
statusUpdateDeltaAvg.get(), lastHeartbeatTime.get());
}

private Map<String, Long> getNumTasks(List<SingularityRequestWithState> requests) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public void killAndRecord(SingularityTaskId taskId, TaskCleanupType taskCleanupT

public abstract Optional<Long> getLastOfferTimestamp();

public abstract Optional<Double> getHeartbeatIntervalSeconds();

/*
* for testing only
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
private final boolean delayWhenStatusUpdateDeltaTooLarge;
private final long delayWhenDeltaOverMs;
private final AtomicLong statusUpdateDeltaAvg;
private final AtomicLong lastHeartbeatTime;
private final SingularityConfiguration configuration;
private final TaskManager taskManager;
private final Transcoder<SingularityTaskDestroyFrameworkMessage> transcoder;
private final SingularitySchedulerLock lock;

private volatile SchedulerState state;
private Optional<Long> lastOfferTimestamp = Optional.absent();
private Optional<Double> heartbeatIntervalSeconds = Optional.absent();

private final AtomicReference<MasterInfo> masterInfo = new AtomicReference<>();
private final List<TaskStatus> queuedUpdates;
Expand All @@ -114,7 +116,8 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
SingularityConfiguration configuration,
TaskManager taskManager,
Transcoder<SingularityTaskDestroyFrameworkMessage> transcoder,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
this.exceptionNotifier = exceptionNotifier;
this.startup = startup;
this.abort = abort;
Expand All @@ -129,6 +132,8 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta();
this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;

this.lastHeartbeatTime = lastHeartbeatTime;
this.taskManager = taskManager;
this.transcoder = transcoder;
this.leaderCacheCoordinator = leaderCacheCoordinator;
Expand All @@ -143,6 +148,11 @@ public void subscribed(Subscribed subscribed) {
callWithStateLock(() -> {
Preconditions.checkState(state == SchedulerState.NOT_STARTED, "Asked to startup - but in invalid state: %s", state.name());

double advertisedHeartbeatIntervalSeconds = subscribed.getHeartbeatIntervalSeconds();
if (advertisedHeartbeatIntervalSeconds > 0) {
heartbeatIntervalSeconds = Optional.of(advertisedHeartbeatIntervalSeconds);
}

leaderCacheCoordinator.activateLeaderCache();
MasterInfo newMasterInfo = subscribed.getMasterInfo();
masterInfo.set(newMasterInfo);
Expand Down Expand Up @@ -329,7 +339,9 @@ public void error(String message) {

@Override
public void heartbeat(Event event) {
LOG.debug("Heartbeat from mesos");
long now = System.currentTimeMillis();
long delta = (now - lastHeartbeatTime.getAndSet(now));
LOG.debug("Heartbeat from mesos. Delta since last heartbeat is {}ms", delta);
}

@Override
Expand Down Expand Up @@ -451,6 +463,10 @@ public Optional<Long> getLastOfferTimestamp() {
return lastOfferTimestamp;
}

public Optional<Double> getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}

public void killAndRecord(SingularityTaskId taskId, Optional<RequestCleanupType> requestCleanupType, Optional<TaskCleanupType> taskCleanupType, Optional<Long> originalTimestamp, Optional<Integer> retries, Optional<String> user) {
Preconditions.checkState(isRunning());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.hubspot.singularity.scheduler;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.singularity.SingularityAbort;
import com.hubspot.singularity.SingularityAbort.AbortReason;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.mesos.SingularityMesosScheduler;

public class SingularityMesosHeartbeatChecker extends SingularityLeaderOnlyPoller {
private static final Logger LOG = LoggerFactory.getLogger(SingularityMesosHeartbeatChecker.class);

private final SingularityConfiguration configuration;
private final SingularityMesosScheduler mesosScheduler;
private final SingularityAbort abort;
private final AtomicLong lastHeartbeatTime;

@Inject
public SingularityMesosHeartbeatChecker(SingularityConfiguration configuration,
SingularityMesosScheduler mesosScheduler,
SingularityAbort abort,
@Named(SingularityMainModule.LAST_MESOS_MASTER_HEARTBEAT_TIME) AtomicLong lastHeartbeatTime) {
super(configuration.getCheckMesosMasterHeartbeatEverySeconds(), TimeUnit.SECONDS);
this.configuration = configuration;
this.mesosScheduler = mesosScheduler;
this.abort = abort;
this.lastHeartbeatTime = lastHeartbeatTime;
}

@Override
public void runActionOnPoll() {
if (!mesosScheduler.getHeartbeatIntervalSeconds().isPresent()) {
if (mesosScheduler.isRunning()) {
LOG.debug("Not checking for a Mesos heartbeat because the Mesos Master didn't advertise a heartbeat interval.");
} else {
LOG.debug("Not checking for a Mesos heartbeat because we haven't subscribed with the Mesos Master yet.");
}

return;
}

long millisSinceLastHeartbeat = System.currentTimeMillis() - lastHeartbeatTime.get();
double missedHeartbeats = millisSinceLastHeartbeat / (mesosScheduler.getHeartbeatIntervalSeconds().get() * 1000);

if (missedHeartbeats > configuration.getMaxMissedMesosMasterHeartbeats()) {
LOG.error("I haven't received a Mesos heartbeat in {}ms! Aborting Singularity...", millisSinceLastHeartbeat);
mesosScheduler.notifyStopping();
abort.abort(AbortReason.LOST_MESOS_CONNECTION, Optional.of(new RuntimeException(String.format("Didn't receive a heartbeat from the Mesos Master for %dms", millisSinceLastHeartbeat))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ protected void configure() {
bind(SingularityLeaderCache.class).in(Scopes.SINGLETON);
bind(SingularityLeaderCacheCoordinator.class).in(Scopes.SINGLETON);
bind(SingularityAutoScaleSpreadAllPoller.class).in(Scopes.SINGLETON);
bind(SingularityMesosHeartbeatChecker.class).in(Scopes.SINGLETON);
}

}