Skip to content

Commit

Permalink
Merge pull request #2056 from HubSpot/rack
Browse files Browse the repository at this point in the history
Validate the racks
  • Loading branch information
WH77 authored Jan 31, 2020
2 parents 69be472 + e9d4e1b commit 34603ca
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ public class SingularityConfiguration extends Configuration {

private boolean proxyRunNowToLeader = true;

private Optional<Integer> expectedRacksCount = Optional.empty();

private double preferredSlaveScaleFactor = 1.5;

// high cpu slave, based on cpu to memory ratio
Expand Down Expand Up @@ -1726,6 +1728,14 @@ public void setHighMemorySlaveCutOff(double highMemorySlaveCutOff) {
this.highMemorySlaveCutOff = highMemorySlaveCutOff;
}

public Optional<Integer> getExpectedRacksCount() {
return expectedRacksCount;
}

public void setExpectedRacksCount(Optional<Integer> expectedRacksCount) {
this.expectedRacksCount = expectedRacksCount;
}

public CrashLoopConfiguration getCrashLoopConfiguration() {
return crashLoopConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,35 @@
import com.hubspot.singularity.SingularityTaskCleanup;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.TaskCleanupType;
import com.hubspot.singularity.data.RackManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager;

@Singleton
public class RebalancingHelper {
private static final Logger LOG = LoggerFactory.getLogger(RebalancingHelper.class);

private final TaskManager taskManager;
private final SlaveManager slaveManager;
private final RackManager rackManager;
private final SingularitySlaveAndRackManager slaveAndRackManager;

@Inject
public RebalancingHelper(TaskManager taskManager, RequestManager requestManager, SlaveManager slaveManager,
RackManager rackManager) {
public RebalancingHelper(TaskManager taskManager, SlaveManager slaveManager, SingularitySlaveAndRackManager slaveAndRackManager) {
this.taskManager = taskManager;
this.slaveManager = slaveManager;
this.rackManager = rackManager;
this.slaveAndRackManager = slaveAndRackManager;
}

public List<SingularityTaskId> rebalanceRacks(SingularityRequest request, List<SingularityTaskId> remainingActiveTasks, Optional<String> user) {
List<SingularityTaskId> extraCleanedTasks = new ArrayList<>();
int numActiveRacks = rackManager.getNumActive();
double perRack = request.getInstancesSafe() / (double) numActiveRacks;
int activeRacksWithCapacityCount = slaveAndRackManager.getActiveRacksWithCapacityCount();
double perRack = request.getInstancesSafe() / (double) activeRacksWithCapacityCount;

Multiset<String> countPerRack = HashMultiset.create();
for (SingularityTaskId taskId : remainingActiveTasks) {
countPerRack.add(taskId.getRackId());
LOG.info("{} - {} - {} - {}", countPerRack, perRack, extraCleanedTasks.size(), taskId);
if (countPerRack.count(taskId.getRackId()) > perRack && extraCleanedTasks.size() < numActiveRacks / 2 && taskId.getInstanceNo() > 1) {
if (countPerRack.count(taskId.getRackId()) > perRack && extraCleanedTasks.size() < activeRacksWithCapacityCount / 2 && taskId.getInstanceNo() > 1) {
extraCleanedTasks.add(taskId);
LOG.info("Cleaning up task {} to evenly distribute tasks among racks", taskId);
taskManager.createTaskCleanup(new SingularityTaskCleanup(user, TaskCleanupType.REBALANCE_RACKS, System.currentTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTa

final int numDesiredInstances = taskRequest.getRequest().getInstancesSafe();
boolean allowBounceToSameHost = isAllowBounceToSameHost(taskRequest.getRequest());
Multiset<String> countPerRack = HashMultiset.create(slaveManager.getNumActive());
int activeRacksWithCapacityCount = getActiveRacksWithCapacityCount();
Multiset<String> countPerRack = HashMultiset.create(activeRacksWithCapacityCount);
double numOnSlave = 0;
double numCleaningOnSlave = 0;
double numFromSameBounceOnSlave = 0;
Expand Down Expand Up @@ -388,8 +389,9 @@ private boolean isAllowBounceToSameHost(SingularityRequest request) {

private boolean isRackOk(Multiset<String> countPerRack, String sanitizedRackId, int numDesiredInstances, String requestId, String slaveId, String host, double numCleaningOnSlave) {
int racksAccountedFor = countPerRack.elementSet().size();
double numPerRack = numDesiredInstances / (double) rackManager.getNumActive();
if (racksAccountedFor < rackManager.getNumActive()) {
int activeRacksWithCapacityCount = getActiveRacksWithCapacityCount();
double numPerRack = numDesiredInstances / (double) activeRacksWithCapacityCount;
if (racksAccountedFor < activeRacksWithCapacityCount) {
if (countPerRack.count(sanitizedRackId) < Math.max(numPerRack, 1)) {
return true;
}
Expand Down Expand Up @@ -624,4 +626,8 @@ private boolean hasTaskLeftOnSlave(SingularityTaskId taskId, String slaveId, Sin
return false;
}

public int getActiveRacksWithCapacityCount () {
return configuration.getExpectedRacksCount().isPresent() ? configuration.getExpectedRacksCount().get() : rackManager.getNumActive();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import com.hubspot.singularity.config.ApiPaths;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.RackManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.SingularityValidator;
import com.hubspot.singularity.data.SlaveManager;
Expand All @@ -94,6 +93,7 @@
import com.hubspot.singularity.expiring.SingularityExpiringSkipHealthchecks;
import com.hubspot.singularity.helpers.RebalancingHelper;
import com.hubspot.singularity.helpers.RequestHelper;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
import com.hubspot.singularity.smtp.SingularityMailer;
import com.ning.http.client.AsyncHttpClient;
Expand All @@ -120,25 +120,35 @@ public class RequestResource extends AbstractRequestResource {
private final RebalancingHelper rebalancingHelper;
private final RequestHelper requestHelper;
private final SlaveManager slaveManager;
private final RackManager rackManager;
private final SingularityConfiguration configuration;
private final SingularityExceptionNotifier exceptionNotifier;
private final SingularitySlaveAndRackManager slaveAndRackManager;

@Inject
public RequestResource(SingularityValidator validator, DeployManager deployManager, TaskManager taskManager, RebalancingHelper rebalancingHelper,
RequestManager requestManager, SingularityMailer mailer,
SingularityAuthorizationHelper authorizationHelper, RequestHelper requestHelper, LeaderLatch leaderLatch,
SlaveManager slaveManager, AsyncHttpClient httpClient, @Singularity ObjectMapper objectMapper,
RackManager rackManager, SingularityConfiguration configuration, SingularityExceptionNotifier exceptionNotifier) {
public RequestResource(SingularityValidator validator,
DeployManager deployManager,
TaskManager taskManager,
RebalancingHelper rebalancingHelper,
RequestManager requestManager,
SingularityMailer mailer,
SingularityAuthorizationHelper authorizationHelper,
RequestHelper requestHelper,
LeaderLatch leaderLatch,
SlaveManager slaveManager,
AsyncHttpClient httpClient,
@Singularity ObjectMapper objectMapper,
SingularityConfiguration configuration,
SingularityExceptionNotifier exceptionNotifier,
SingularitySlaveAndRackManager slaveAndRackManager) {
super(requestManager, deployManager, validator, authorizationHelper, httpClient, leaderLatch, objectMapper, requestHelper);
this.mailer = mailer;
this.taskManager = taskManager;
this.rebalancingHelper = rebalancingHelper;
this.requestHelper = requestHelper;
this.slaveManager = slaveManager;
this.rackManager = rackManager;
this.configuration = configuration;
this.exceptionNotifier = exceptionNotifier;
this.slaveAndRackManager = slaveAndRackManager;
}

private void submitRequest(SingularityRequest request, Optional<SingularityRequestWithState> oldRequestWithState, Optional<RequestHistoryType> historyType,
Expand Down Expand Up @@ -199,7 +209,8 @@ private void submitRequest(SingularityRequest request, Optional<SingularityReque
}
});

if (oldRequest.get().getInstancesSafe() > rackManager.getNumActive()) {
int activeRacksWithCapacityCount = slaveAndRackManager.getActiveRacksWithCapacityCount();
if (oldRequest.get().getInstancesSafe() > activeRacksWithCapacityCount) {
if (request.isRackSensitive() && configuration.isRebalanceRacksOnScaleDown()) {
rebalancingHelper.rebalanceRacks(request, remainingActiveTasks, user.getEmail());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,43 @@ public void testRackPlacementOnScaleDown() {
}
}

@Test
public void testItRespectsExpectedRackConfiguration() {
Optional<Integer> original = configuration.getExpectedRacksCount();

try {
// Tell Singularity to expect 2 racks
configuration.setExpectedRacksCount(Optional.of(2));

// Set up 4 active racks
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave3", "host3", Optional.of("rack3")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave4", "host4", Optional.of("rack4")))).join();

initRequest();
initFirstDeploy();
saveAndSchedule(request.toBuilder()
.setInstances(Optional.of(7))
.setRackSensitive(Optional.of(true))
);

// tasks per rack = ceil(7 / 2), not ceil(7 / 4)
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave3", "host3", Optional.of("rack3")))).join();

// everything should be scheduled
Assertions.assertEquals(7, taskManager.getActiveTaskIds().size());
} finally {
configuration.setExpectedRacksCount(original);
}
}

@Test
public void testPlacementOfBounceTasks() {
// Set up 1 active rack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ protected Offer createOffer(double cpus, double memory, double disk, String slav
}

protected Offer createOffer(double cpus, double memory, double disk, String slave, String host, Optional<String> rack, Map<String, String> attributes, String[] portRanges, Optional<String> role) {

AgentID slaveId = AgentID.newBuilder().setValue(slave).build();
FrameworkID frameworkId = FrameworkID.newBuilder().setValue("framework1").build();

Expand Down

0 comments on commit 34603ca

Please sign in to comment.