-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent new host overloading #1822
Changes from 2 commits
3a7a125
e610cf1
339dd44
d814a9d
50ba080
39b3535
bee2269
6c02f60
fe45d81
1414edf
4650639
890504b
5d7de80
572407e
3612d54
d527799
d055ff0
0127517
f4de9ee
df9c3a0
b1fbeda
5ec231b
9ed693f
b80e8c9
a13440a
8cae538
4ea2166
5b822c3
e2e6794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import com.hubspot.singularity.RequestUtilization; | ||
import com.hubspot.singularity.SingularityDeployStatistics; | ||
import com.hubspot.singularity.SingularityPendingTaskId; | ||
import com.hubspot.singularity.SingularitySlave; | ||
import com.hubspot.singularity.SingularitySlaveUsage; | ||
import com.hubspot.singularity.SingularitySlaveUsageWithId; | ||
import com.hubspot.singularity.SingularityTask; | ||
|
@@ -41,13 +42,15 @@ | |
import com.hubspot.singularity.config.MesosConfiguration; | ||
import com.hubspot.singularity.config.SingularityConfiguration; | ||
import com.hubspot.singularity.data.DeployManager; | ||
import com.hubspot.singularity.data.SlaveManager; | ||
import com.hubspot.singularity.data.TaskManager; | ||
import com.hubspot.singularity.data.UsageManager; | ||
import com.hubspot.singularity.helpers.MesosUtils; | ||
import com.hubspot.singularity.helpers.SingularityMesosTaskHolder; | ||
import com.hubspot.singularity.mesos.SingularitySlaveUsageWithCalculatedScores.MaxProbableUsage; | ||
import com.hubspot.singularity.scheduler.SingularityLeaderCache; | ||
import com.hubspot.singularity.scheduler.SingularityScheduler; | ||
import com.hubspot.singularity.scheduler.SingularityUsagePoller; | ||
|
||
@Singleton | ||
public class SingularityMesosOfferScheduler { | ||
|
@@ -65,6 +68,8 @@ public class SingularityMesosOfferScheduler { | |
private final SingularitySlaveAndRackManager slaveAndRackManager; | ||
private final SingularitySlaveAndRackHelper slaveAndRackHelper; | ||
private final SingularityTaskSizeOptimizer taskSizeOptimizer; | ||
private final SingularityUsagePoller usagePoller; | ||
private final SlaveManager slaveManager; | ||
private final UsageManager usageManager; | ||
private final DeployManager deployManager; | ||
private final SingularitySchedulerLock lock; | ||
|
@@ -89,6 +94,8 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, | |
SingularityTaskSizeOptimizer taskSizeOptimizer, | ||
SingularitySlaveAndRackHelper slaveAndRackHelper, | ||
SingularityLeaderCache leaderCache, | ||
SingularityUsagePoller usagePoller, | ||
SlaveManager slaveManager, | ||
UsageManager usageManager, | ||
DeployManager deployManager, | ||
SingularitySchedulerLock lock) { | ||
|
@@ -102,6 +109,8 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration, | |
this.slaveAndRackManager = slaveAndRackManager; | ||
this.taskSizeOptimizer = taskSizeOptimizer; | ||
this.leaderCache = leaderCache; | ||
this.usagePoller = usagePoller; | ||
this.slaveManager = slaveManager; | ||
this.slaveAndRackHelper = slaveAndRackHelper; | ||
this.taskPrioritizer = taskPrioritizer; | ||
this.usageManager = usageManager; | ||
|
@@ -180,7 +189,8 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of | |
mesosConfiguration.getScoreUsingSystemLoad(), | ||
getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(usageWithId.getSlaveId()).getSanitizedHost()), | ||
mesosConfiguration.getLoad5OverloadedThreshold(), | ||
mesosConfiguration.getLoad1OverloadedThreshold() | ||
mesosConfiguration.getLoad1OverloadedThreshold(), | ||
usageWithId.getTimestamp() | ||
) | ||
)); | ||
|
||
|
@@ -196,23 +206,11 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of | |
List<CompletableFuture<Void>> scoringFutures = new ArrayList<>(); | ||
AtomicReference<Throwable> scoringException = new AtomicReference<>(null); | ||
for (SingularityOfferHolder offerHolder : offerHolders.values()) { | ||
if (!isOfferFull(offerHolder)) { | ||
scoringFutures.add( | ||
offerScoringSemaphore.call( | ||
() -> CompletableFuture.runAsync(() -> { | ||
try { | ||
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId())); | ||
if (score != 0) { | ||
scorePerOffer.put(offerHolder.getSlaveId(), score); | ||
} | ||
} catch (Throwable t) { | ||
LOG.error("Uncaught exception while scoring offers", t); | ||
scoringException.set(t); | ||
} | ||
}, | ||
offerScoringExecutor | ||
))); | ||
} | ||
scoringFutures.add(offerScoringSemaphore.call(() -> | ||
CompletableFuture.supplyAsync(() -> { | ||
return buildScoringFuture(offerHolders, requestUtilizations, activeTaskIds, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, scorePerOffer, activeTaskIdsForRequest, scoringException, offerHolder); | ||
}, | ||
offerScoringExecutor))); | ||
} | ||
|
||
CompletableFutures.allOf(scoringFutures).join(); | ||
|
@@ -240,6 +238,59 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of | |
return offerHolders.values(); | ||
} | ||
|
||
private Void buildScoringFuture( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit on naming, this method isn't actually building the future, it's a synchronous method for doing the actual scoring. I'd either rename this to calculateScore (or something like that), or move the supplyAsync inside this method and have it return the actual |
||
Map<String, SingularityOfferHolder> offerHolders, | ||
Map<String, RequestUtilization> requestUtilizations, | ||
List<SingularityTaskId> activeTaskIds, | ||
Map<String, SingularitySlaveUsageWithCalculatedScores> currentSlaveUsagesBySlaveId, | ||
Map<String, Integer> tasksPerOfferHost, | ||
SingularityTaskRequestHolder taskRequestHolder, | ||
Map<String, Double> scorePerOffer, | ||
List<SingularityTaskId> activeTaskIdsForRequest, | ||
AtomicReference<Throwable> scoringException, | ||
SingularityOfferHolder offerHolder) { | ||
if (isOfferFull(offerHolder)) { | ||
return null; | ||
} | ||
String slaveId = offerHolder.getSlaveId(); | ||
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(slaveId)); | ||
|
||
if (taskManager.getActiveTasks().stream() | ||
.anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp() | ||
&& t.getMesosTask().getSlaveId().getValue().equals(slaveId))) { | ||
Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(slaveId); | ||
if (maybeSlave.isPresent()) { | ||
usagePoller.getSlaveUsage(maybeSlave.get()) | ||
.whenComplete((usage, throwable) -> { | ||
if (throwable == null) { | ||
currentSlaveUsagesBySlaveId.put(slaveId, new SingularitySlaveUsageWithCalculatedScores( | ||
usage, | ||
mesosConfiguration.getScoreUsingSystemLoad(), | ||
getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(slaveId).getSanitizedHost()), | ||
mesosConfiguration.getLoad5OverloadedThreshold(), | ||
mesosConfiguration.getLoad1OverloadedThreshold(), | ||
usage.getTimestamp() | ||
)); | ||
} else { | ||
throw new RuntimeException(throwable); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the handling for this runtime exception? We currently aren't calling a get or join for the future created here which causes two issues for us:
|
||
} | ||
}); | ||
} | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I want to mention the choice here, could go either way on this. Currently this looks like the newly updated slave metrics will not be taken into account because we are returning here. Wouldn't we want to continue on to the scoring since we've gathered new metrics and put them in the map that is fed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'll go with the latter option. For some reason I was thinking only the slave usage is necessary but probably safer to update the score too. |
||
} | ||
|
||
try { | ||
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId())); | ||
if (score != 0) { | ||
scorePerOffer.put(slaveId, score); | ||
} | ||
} catch (Throwable t) { | ||
LOG.error("Uncaught exception while scoring offers", t); | ||
scoringException.set(t); | ||
} | ||
return null; | ||
} | ||
|
||
private MaxProbableUsage getMaxProbableUsageForSlave(List<SingularityTaskId> activeTaskIds, Map<String, RequestUtilization> requestUtilizations, String sanitizedHostname) { | ||
double cpu = 0; | ||
double memBytes = 0; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,14 @@ class SingularitySlaveUsageWithCalculatedScores { | |
private final double load5Threshold; | ||
private final double load1Threshold; | ||
|
||
SingularitySlaveUsageWithCalculatedScores(SingularitySlaveUsage slaveUsage, MachineLoadMetric systemLoadMetric, MaxProbableUsage maxProbableTaskUsage, double load5Threshold, double load1Threshold) { | ||
private final long timestamp; | ||
|
||
SingularitySlaveUsageWithCalculatedScores(SingularitySlaveUsage slaveUsage, | ||
MachineLoadMetric systemLoadMetric, | ||
MaxProbableUsage maxProbableTaskUsage, | ||
double load5Threshold, | ||
double load1Threshold, | ||
long timestamp) { | ||
this.slaveUsage = slaveUsage; | ||
this.systemLoadMetric = systemLoadMetric; | ||
this.maxProbableTaskUsage = maxProbableTaskUsage; | ||
|
@@ -39,6 +46,7 @@ class SingularitySlaveUsageWithCalculatedScores { | |
} | ||
this.load5Threshold = load5Threshold; | ||
this.load1Threshold = load1Threshold; | ||
this.timestamp = timestamp; | ||
} | ||
|
||
boolean isCpuOverloaded(double estimatedNumCpusToAdd) { | ||
|
@@ -121,6 +129,10 @@ SingularitySlaveUsage getSlaveUsage() { | |
return diskInUseScore; | ||
} | ||
|
||
long getTimestamp() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should be able to just do getSlaveUsage().getTimestamp() instead of having to store it in two places |
||
return timestamp; | ||
} | ||
|
||
void addEstimatedCpuUsage(double estimatedAddedCpus) { | ||
this.estimatedAddedCpusUsage += estimatedAddedCpus; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,7 +68,7 @@ public class SingularityUsagePoller extends SingularityLeaderOnlyPoller { | |
private final DeployManager deployManager; | ||
private final TaskManager taskManager; | ||
|
||
private final AsyncSemaphore<Void> usageCollectionSemaphore; | ||
private final AsyncSemaphore<SingularitySlaveUsage> usageCollectionSemaphore; | ||
private final ExecutorService usageExecutor; | ||
private final ConcurrentHashMap<String, ReentrantLock> requestLocks; | ||
|
||
|
@@ -112,12 +112,12 @@ public void runActionOnPoll() { | |
|
||
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts = new ConcurrentHashMap<>(); | ||
|
||
List<CompletableFuture<Void>> usageFutures = new ArrayList<>(); | ||
List<CompletableFuture<SingularitySlaveUsage>> usageFutures = new ArrayList<>(); | ||
|
||
usageHelper.getSlavesToTrackUsageFor().forEach((slave) -> { | ||
usageFutures.add(usageCollectionSemaphore.call(() -> | ||
CompletableFuture.runAsync(() -> { | ||
collectSlaveUage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, | ||
CompletableFuture.supplyAsync(() -> { | ||
return collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable, | ||
totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable); | ||
}, usageExecutor) | ||
)); | ||
|
@@ -126,15 +126,35 @@ public void runActionOnPoll() { | |
CompletableFutures.allOf(usageFutures).join(); | ||
|
||
usageManager.saveClusterUtilization( | ||
getClusterUtilization(utilizationPerRequestId, totalMemBytesUsed.get(), totalMemBytesAvailable.get(), totalCpuUsed.get(), totalCpuAvailable.get(), totalDiskBytesUsed.get(), totalDiskBytesAvailable | ||
.get(), now)); | ||
getClusterUtilization( | ||
utilizationPerRequestId, totalMemBytesUsed.get(), totalMemBytesAvailable.get(), | ||
totalCpuUsed.get(), totalCpuAvailable.get(), totalDiskBytesUsed.get(), totalDiskBytesAvailable.get(), now)); | ||
utilizationPerRequestId.values().forEach(usageManager::saveRequestUtilization); | ||
|
||
if (configuration.isShuffleTasksForOverloadedSlaves()) { | ||
shuffleTasksOnOverloadedHosts(overLoadedHosts); | ||
} | ||
} | ||
|
||
public CompletableFuture<SingularitySlaveUsage> getSlaveUsage(SingularitySlave slave) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only thing I find as a code smell here is that now the offer scoring flow will rely on the usage semaphore and executor having enough permits/threads. Since within the offer scoring we are already in a block that is executed async, it may be worth calling |
||
return usageCollectionSemaphore.call(() -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the individual method, not sure we want to make it completely async like the larger ones. This method will likely not be called from the same context as the poller itself, so it should probably fall under a different semaphore (e.g. the offer scoring one) if we want it to be async |
||
CompletableFuture.supplyAsync(() -> { | ||
return collectSlaveUsage( | ||
slave, | ||
System.currentTimeMillis(), | ||
new ConcurrentHashMap<>(), | ||
usageManager.getRequestUtilizations(), | ||
new ConcurrentHashMap<>(), | ||
new AtomicLong(), | ||
new AtomicLong(), | ||
new AtomicDouble(), | ||
new AtomicDouble(), | ||
new AtomicLong(), | ||
new AtomicLong()); | ||
}, usageExecutor) | ||
); | ||
} | ||
|
||
public void runWithRequestLock(Runnable function, String requestId) { | ||
ReentrantLock lock = requestLocks.computeIfAbsent(requestId, (r) -> new ReentrantLock()); | ||
lock.lock(); | ||
|
@@ -145,17 +165,17 @@ public void runWithRequestLock(Runnable function, String requestId) { | |
} | ||
} | ||
|
||
private void collectSlaveUage(SingularitySlave slave, | ||
long now, | ||
Map<String, RequestUtilization> utilizationPerRequestId, | ||
Map<String, RequestUtilization> previousUtilizations, | ||
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts, | ||
AtomicLong totalMemBytesUsed, | ||
AtomicLong totalMemBytesAvailable, | ||
AtomicDouble totalCpuUsed, | ||
AtomicDouble totalCpuAvailable, | ||
AtomicLong totalDiskBytesUsed, | ||
AtomicLong totalDiskBytesAvailable) { | ||
private SingularitySlaveUsage collectSlaveUsage(SingularitySlave slave, | ||
long now, | ||
Map<String, RequestUtilization> utilizationPerRequestId, | ||
Map<String, RequestUtilization> previousUtilizations, | ||
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts, | ||
AtomicLong totalMemBytesUsed, | ||
AtomicLong totalMemBytesAvailable, | ||
AtomicDouble totalCpuUsed, | ||
AtomicDouble totalCpuAvailable, | ||
AtomicLong totalDiskBytesUsed, | ||
AtomicLong totalDiskBytesAvailable) { | ||
Optional<Long> memoryMbTotal = Optional.absent(); | ||
Optional<Double> cpusTotal = Optional.absent(); | ||
Optional<Long> diskMbTotal = Optional.absent(); | ||
|
@@ -314,11 +334,13 @@ private void collectSlaveUage(SingularitySlave slave, | |
|
||
LOG.debug("Saving slave {} usage {}", slave.getHost(), slaveUsage); | ||
usageManager.saveSpecificSlaveUsageAndSetCurrent(slave.getId(), slaveUsage); | ||
return slaveUsage; | ||
} catch (Throwable t) { | ||
String message = String.format("Could not get slave usage for host %s", slave.getHost()); | ||
LOG.error(message, t); | ||
exceptionNotifier.notify(message, t); | ||
} | ||
return null; // TODO: is this really okay? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this method is called anywhere else that expects a return value. Could always wrap in an optional to make it more explicit that the result might not be there. |
||
} | ||
|
||
private boolean isEligibleForShuffle(SingularityTaskId task) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second thoughts about the setup. Would it make sense to instead collect additional usages in this block instead? I'm realizing that the loop below would be called for each pending task. If we hit a case where collecting a particular slave usage is throwing exceptions or timing out, we will continue to recheck it for each pending task. Whereas, if we check in this block instead, we can just omit that up front and leave the block below as it was previously.
If we move the usage collection here, we'll likely want to convert this from parallelStream to a list of CompletableFutures like below to have better control over the concurrency