Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent new host overloading #1822

Merged
merged 29 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3a7a125
WIP: Prevent new host overloading
pschoenfelder Jul 17, 2018
e610cf1
Rework async stuff per PR
pschoenfelder Jul 19, 2018
339dd44
PR changes
pschoenfelder Jul 24, 2018
d814a9d
Move usage collection loop
pschoenfelder Jul 31, 2018
50ba080
Use semaphore
pschoenfelder Jul 31, 2018
39b3535
Fix tests
pschoenfelder Jul 31, 2018
bee2269
Resolve dependencies
pschoenfelder Jul 31, 2018
6c02f60
Fix tests
pschoenfelder Jul 31, 2018
fe45d81
Add timeout for slave usage checking
pschoenfelder Aug 1, 2018
1414edf
Build fix
pschoenfelder Aug 1, 2018
4650639
Build fix
pschoenfelder Aug 1, 2018
890504b
Fix failed data refresh
pschoenfelder Aug 1, 2018
5d7de80
Make fewer zk calls for usage fetching
ssalinas Aug 2, 2018
572407e
Add new method for slave usage
pschoenfelder Aug 6, 2018
3612d54
Merge branch 'new-host-overloading' of https://github.com/HubSpot/Sin…
pschoenfelder Aug 6, 2018
d527799
rm comment
pschoenfelder Aug 6, 2018
d055ff0
longs to doubles
pschoenfelder Aug 6, 2018
0127517
Add test tolerances
pschoenfelder Aug 6, 2018
f4de9ee
Remove more zk calls
pschoenfelder Aug 7, 2018
df9c3a0
Condense duplicate variables
pschoenfelder Aug 7, 2018
b1fbeda
Fix typo
pschoenfelder Aug 7, 2018
5ec231b
Skip hosts which do not have valid metrics during offer processing
ssalinas Aug 8, 2018
9ed693f
Merge pull request #1828 from HubSpot/skip_host
pschoenfelder Aug 8, 2018
b80e8c9
Add leader/web cache for request utilizatons
ssalinas Aug 8, 2018
a13440a
merge request utilization caching
ssalinas Aug 8, 2018
8cae538
new strategy for new host overlaod check
ssalinas Aug 9, 2018
4ea2166
fix the test client as well
ssalinas Aug 9, 2018
5b822c3
Add logging
ssalinas Aug 9, 2018
e2e6794
add try/catch here
ssalinas Aug 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ public class SingularityConfiguration extends Configuration {

private long preemptibleTaskMaxExpectedRuntimeMs = 900000; // 15 minutes

private long maxSlaveUsageMetricAgeMs = 30000;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -1593,4 +1595,12 @@ public long getPreemptibleTaskMaxExpectedRuntimeMs() {
public void setPreemptibleTaskMaxExpectedRuntimeMs(long preemptibleTaskMaxExpectedRuntimeMs) {
this.preemptibleTaskMaxExpectedRuntimeMs = preemptibleTaskMaxExpectedRuntimeMs;
}

public long getMaxSlaveUsageMetricAgeMs() {
return maxSlaveUsageMetricAgeMs;
}

public void setMaxSlaveUsageMetricAgeMs(long maxSlaveUsageMetricAgeMs) {
this.maxSlaveUsageMetricAgeMs = maxSlaveUsageMetricAgeMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.hubspot.singularity.RequestUtilization;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularitySlave;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularitySlaveUsageWithId;
import com.hubspot.singularity.SingularityTask;
Expand All @@ -41,13 +42,15 @@
import com.hubspot.singularity.config.MesosConfiguration;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.UsageManager;
import com.hubspot.singularity.helpers.MesosUtils;
import com.hubspot.singularity.helpers.SingularityMesosTaskHolder;
import com.hubspot.singularity.mesos.SingularitySlaveUsageWithCalculatedScores.MaxProbableUsage;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;
import com.hubspot.singularity.scheduler.SingularityScheduler;
import com.hubspot.singularity.scheduler.SingularityUsagePoller;

@Singleton
public class SingularityMesosOfferScheduler {
Expand All @@ -65,6 +68,8 @@ public class SingularityMesosOfferScheduler {
private final SingularitySlaveAndRackManager slaveAndRackManager;
private final SingularitySlaveAndRackHelper slaveAndRackHelper;
private final SingularityTaskSizeOptimizer taskSizeOptimizer;
private final SingularityUsagePoller usagePoller;
private final SlaveManager slaveManager;
private final UsageManager usageManager;
private final DeployManager deployManager;
private final SingularitySchedulerLock lock;
Expand All @@ -89,6 +94,8 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
SingularityTaskSizeOptimizer taskSizeOptimizer,
SingularitySlaveAndRackHelper slaveAndRackHelper,
SingularityLeaderCache leaderCache,
SingularityUsagePoller usagePoller,
SlaveManager slaveManager,
UsageManager usageManager,
DeployManager deployManager,
SingularitySchedulerLock lock) {
Expand All @@ -102,6 +109,8 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
this.slaveAndRackManager = slaveAndRackManager;
this.taskSizeOptimizer = taskSizeOptimizer;
this.leaderCache = leaderCache;
this.usagePoller = usagePoller;
this.slaveManager = slaveManager;
this.slaveAndRackHelper = slaveAndRackHelper;
this.taskPrioritizer = taskPrioritizer;
this.usageManager = usageManager;
Expand Down Expand Up @@ -180,7 +189,8 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
mesosConfiguration.getScoreUsingSystemLoad(),
getMaxProbableUsageForSlave(activeTaskIds, requestUtilizations, offerHolders.get(usageWithId.getSlaveId()).getSanitizedHost()),
mesosConfiguration.getLoad5OverloadedThreshold(),
mesosConfiguration.getLoad1OverloadedThreshold()
mesosConfiguration.getLoad1OverloadedThreshold(),
usageWithId.getTimestamp()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second thoughts about the setup. Would it make sense to instead collect additional usages in this block instead? I'm realizing that the loop below would be called for each pending task. If we hit a case where collecting a particular slave usage is throwing exceptions or timing out, we will continue to recheck it for each pending task. Whereas, if we check in this block instead, we can just omit that up front and leave the block below as it was previously.

If we move the usage collection here, we'll likely want to convert this from parallelStream to a list of CompletableFutures like below to have better control over the concurrency

)
));

Expand All @@ -196,23 +206,43 @@ public Collection<SingularityOfferHolder> checkOffers(final Collection<Offer> of
List<CompletableFuture<Void>> scoringFutures = new ArrayList<>();
AtomicReference<Throwable> scoringException = new AtomicReference<>(null);
for (SingularityOfferHolder offerHolder : offerHolders.values()) {
if (!isOfferFull(offerHolder)) {
scoringFutures.add(
offerScoringSemaphore.call(
() -> CompletableFuture.runAsync(() -> {
try {
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId()));
if (score != 0) {
scorePerOffer.put(offerHolder.getSlaveId(), score);
}
} catch (Throwable t) {
LOG.error("Uncaught exception while scoring offers", t);
scoringException.set(t);
}
},
offerScoringExecutor
)));
if (isOfferFull(offerHolder)) {
continue;
}
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage = Optional.fromNullable(currentSlaveUsagesBySlaveId.get(offerHolder.getSlaveId()));

if (taskManager.getActiveTasks().stream()
.anyMatch(t -> t.getTaskRequest().getDeploy().getTimestamp().or(System.currentTimeMillis()) > maybeSlaveUsage.get().getTimestamp()
&& t.getMesosTask().getSlaveId().getValue().equals(offerHolder.getSlaveId()))) {
Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId());
if (maybeSlave.isPresent()) {
usagePoller.getSlaveUsage(maybeSlave.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will probably want to put this in currentSlaveUsagesBySlaveId after it's calculated. Calling this alone won't update the underlying values we pass to the scoring functions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another note, this would be a good thing to stick inside the completable future below. It's a good candidate to make async since we are now io bound on an api call and cpu bound on the calculations

}
continue;
}

// if (maybeSlaveUsage.isPresent() && System.currentTimeMillis() - maybeSlaveUsage.get().getTimestamp() > configuration.getMaxSlaveUsageMetricAgeMs()) {
// Optional<SingularitySlave> maybeSlave = slaveManager.getSlave(offerHolder.getSlaveId());
// if (maybeSlave.isPresent()) {
// usagePoller.getSlaveUsage(maybeSlave.get());
// }
// continue;
// }
scoringFutures.add(
offerScoringSemaphore.call(
() -> CompletableFuture.runAsync(() -> {
try {
double score = calculateScore(offerHolder, currentSlaveUsagesBySlaveId, tasksPerOfferHost, taskRequestHolder, activeTaskIdsForRequest, requestUtilizations.get(taskRequestHolder.getTaskRequest().getRequest().getId()));
if (score != 0) {
scorePerOffer.put(offerHolder.getSlaveId(), score);
}
} catch (Throwable t) {
LOG.error("Uncaught exception while scoring offers", t);
scoringException.set(t);
}
},
offerScoringExecutor
)));
}

CompletableFutures.allOf(scoringFutures).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ class SingularitySlaveUsageWithCalculatedScores {
private final double load5Threshold;
private final double load1Threshold;

SingularitySlaveUsageWithCalculatedScores(SingularitySlaveUsage slaveUsage, MachineLoadMetric systemLoadMetric, MaxProbableUsage maxProbableTaskUsage, double load5Threshold, double load1Threshold) {
private final long timestamp;

SingularitySlaveUsageWithCalculatedScores(SingularitySlaveUsage slaveUsage,
MachineLoadMetric systemLoadMetric,
MaxProbableUsage maxProbableTaskUsage,
double load5Threshold,
double load1Threshold,
long timestamp) {
this.slaveUsage = slaveUsage;
this.systemLoadMetric = systemLoadMetric;
this.maxProbableTaskUsage = maxProbableTaskUsage;
Expand All @@ -39,6 +46,7 @@ class SingularitySlaveUsageWithCalculatedScores {
}
this.load5Threshold = load5Threshold;
this.load1Threshold = load1Threshold;
this.timestamp = timestamp;
}

boolean isCpuOverloaded(double estimatedNumCpusToAdd) {
Expand Down Expand Up @@ -121,6 +129,10 @@ SingularitySlaveUsage getSlaveUsage() {
return diskInUseScore;
}

long getTimestamp() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to just do getSlaveUsage().getTimestamp() instead of having to store it in two places

return timestamp;
}

void addEstimatedCpuUsage(double estimatedAddedCpus) {
this.estimatedAddedCpusUsage += estimatedAddedCpus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void runActionOnPoll() {
usageHelper.getSlavesToTrackUsageFor().forEach((slave) -> {
usageFutures.add(usageCollectionSemaphore.call(() ->
CompletableFuture.runAsync(() -> {
collectSlaveUage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable,
collectSlaveUsage(slave, now, utilizationPerRequestId, previousUtilizations, overLoadedHosts, totalMemBytesUsed, totalMemBytesAvailable,
totalCpuUsed, totalCpuAvailable, totalDiskBytesUsed, totalDiskBytesAvailable);
}, usageExecutor)
));
Expand All @@ -126,15 +126,35 @@ public void runActionOnPoll() {
CompletableFutures.allOf(usageFutures).join();

usageManager.saveClusterUtilization(
getClusterUtilization(utilizationPerRequestId, totalMemBytesUsed.get(), totalMemBytesAvailable.get(), totalCpuUsed.get(), totalCpuAvailable.get(), totalDiskBytesUsed.get(), totalDiskBytesAvailable
.get(), now));
getClusterUtilization(
utilizationPerRequestId, totalMemBytesUsed.get(), totalMemBytesAvailable.get(),
totalCpuUsed.get(), totalCpuAvailable.get(), totalDiskBytesUsed.get(), totalDiskBytesAvailable.get(), now));
utilizationPerRequestId.values().forEach(usageManager::saveRequestUtilization);

if (configuration.isShuffleTasksForOverloadedSlaves()) {
shuffleTasksOnOverloadedHosts(overLoadedHosts);
}
}

public CompletableFuture<Void> getSlaveUsage(SingularitySlave slave) {
return usageCollectionSemaphore.call(() ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the individual method, not sure we want to make it completely async like the larger ones. This method will likely not be called from the same context as the poller itself, so it should probably fall under a different semaphore (e.g. the offer scoring one) if we want it to be async

CompletableFuture.runAsync(() -> {
collectSlaveUsage(
slave,
System.currentTimeMillis(),
new ConcurrentHashMap<>(),
usageManager.getRequestUtilizations(),
new ConcurrentHashMap<>(),
new AtomicLong(),
new AtomicLong(),
new AtomicDouble(),
new AtomicDouble(),
new AtomicLong(),
new AtomicLong());
}, usageExecutor)
);
}

public void runWithRequestLock(Runnable function, String requestId) {
ReentrantLock lock = requestLocks.computeIfAbsent(requestId, (r) -> new ReentrantLock());
lock.lock();
Expand All @@ -145,17 +165,17 @@ public void runWithRequestLock(Runnable function, String requestId) {
}
}

private void collectSlaveUage(SingularitySlave slave,
long now,
Map<String, RequestUtilization> utilizationPerRequestId,
Map<String, RequestUtilization> previousUtilizations,
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts,
AtomicLong totalMemBytesUsed,
AtomicLong totalMemBytesAvailable,
AtomicDouble totalCpuUsed,
AtomicDouble totalCpuAvailable,
AtomicLong totalDiskBytesUsed,
AtomicLong totalDiskBytesAvailable) {
private void collectSlaveUsage(SingularitySlave slave,
long now,
Map<String, RequestUtilization> utilizationPerRequestId,
Map<String, RequestUtilization> previousUtilizations,
Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overLoadedHosts,
AtomicLong totalMemBytesUsed,
AtomicLong totalMemBytesAvailable,
AtomicDouble totalCpuUsed,
AtomicDouble totalCpuAvailable,
AtomicLong totalDiskBytesUsed,
AtomicLong totalDiskBytesAvailable) {
Optional<Long> memoryMbTotal = Optional.absent();
Optional<Double> cpusTotal = Optional.absent();
Optional<Long> diskMbTotal = Optional.absent();
Expand Down