Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate the racks #2056

Merged
merged 12 commits into from
Jan 31, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ public class SingularityConfiguration extends Configuration {

private boolean proxyRunNowToLeader = true;

private Optional<Integer> expectedRacksCount = Optional.empty();

private double preferredSlaveScaleFactor = 1.5;

// high cpu slave, based on cpu to memory ratio
Expand Down Expand Up @@ -1726,6 +1728,14 @@ public void setHighMemorySlaveCutOff(double highMemorySlaveCutOff) {
this.highMemorySlaveCutOff = highMemorySlaveCutOff;
}

public Optional<Integer> getExpectedRacksCount() {
return expectedRacksCount;
}

public void setExpectedRacksCount(Optional<Integer> expectedRacksCount) {
this.expectedRacksCount = expectedRacksCount;
}

public CrashLoopConfiguration getCrashLoopConfiguration() {
return crashLoopConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,35 @@
import com.hubspot.singularity.SingularityTaskCleanup;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.TaskCleanupType;
import com.hubspot.singularity.data.RackManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager;

@Singleton
public class RebalancingHelper {
private static final Logger LOG = LoggerFactory.getLogger(RebalancingHelper.class);

private final TaskManager taskManager;
private final SlaveManager slaveManager;
private final RackManager rackManager;
private final SingularitySlaveAndRackManager slaveAndRackManager;

@Inject
public RebalancingHelper(TaskManager taskManager, RequestManager requestManager, SlaveManager slaveManager,
RackManager rackManager) {
public RebalancingHelper(TaskManager taskManager, SlaveManager slaveManager, SingularitySlaveAndRackManager slaveAndRackManager) {
this.taskManager = taskManager;
this.slaveManager = slaveManager;
this.rackManager = rackManager;
this.slaveAndRackManager = slaveAndRackManager;
}

public List<SingularityTaskId> rebalanceRacks(SingularityRequest request, List<SingularityTaskId> remainingActiveTasks, Optional<String> user) {
List<SingularityTaskId> extraCleanedTasks = new ArrayList<>();
int numActiveRacks = rackManager.getNumActive();
double perRack = request.getInstancesSafe() / (double) numActiveRacks;
int activeRacksWithCapacityCount = slaveAndRackManager.getActiveRacksWithCapacityCount();
double perRack = request.getInstancesSafe() / (double) activeRacksWithCapacityCount;

Multiset<String> countPerRack = HashMultiset.create();
for (SingularityTaskId taskId : remainingActiveTasks) {
countPerRack.add(taskId.getRackId());
LOG.info("{} - {} - {} - {}", countPerRack, perRack, extraCleanedTasks.size(), taskId);
if (countPerRack.count(taskId.getRackId()) > perRack && extraCleanedTasks.size() < numActiveRacks / 2 && taskId.getInstanceNo() > 1) {
if (countPerRack.count(taskId.getRackId()) > perRack && extraCleanedTasks.size() < activeRacksWithCapacityCount / 2 && taskId.getInstanceNo() > 1) {
extraCleanedTasks.add(taskId);
LOG.info("Cleaning up task {} to evenly distribute tasks among racks", taskId);
taskManager.createTaskCleanup(new SingularityTaskCleanup(user, TaskCleanupType.REBALANCE_RACKS, System.currentTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTa

final int numDesiredInstances = taskRequest.getRequest().getInstancesSafe();
boolean allowBounceToSameHost = isAllowBounceToSameHost(taskRequest.getRequest());
Multiset<String> countPerRack = HashMultiset.create(slaveManager.getNumActive());
int activeRacksWithCapacityCount = getActiveRacksWithCapacityCount();
Multiset<String> countPerRack = HashMultiset.create(activeRacksWithCapacityCount);
double numOnSlave = 0;
double numCleaningOnSlave = 0;
double numFromSameBounceOnSlave = 0;
Expand Down Expand Up @@ -388,8 +389,9 @@ private boolean isAllowBounceToSameHost(SingularityRequest request) {

private boolean isRackOk(Multiset<String> countPerRack, String sanitizedRackId, int numDesiredInstances, String requestId, String slaveId, String host, double numCleaningOnSlave) {
int racksAccountedFor = countPerRack.elementSet().size();
double numPerRack = numDesiredInstances / (double) rackManager.getNumActive();
if (racksAccountedFor < rackManager.getNumActive()) {
int activeRacksWithCapacityCount = getActiveRacksWithCapacityCount();
double numPerRack = numDesiredInstances / (double) activeRacksWithCapacityCount;
if (racksAccountedFor < activeRacksWithCapacityCount) {
if (countPerRack.count(sanitizedRackId) < Math.max(numPerRack, 1)) {
return true;
}
Expand Down Expand Up @@ -624,4 +626,8 @@ private boolean hasTaskLeftOnSlave(SingularityTaskId taskId, String slaveId, Sin
return false;
}

public int getActiveRacksWithCapacityCount () {
return configuration.getExpectedRacksCount().isPresent() ? configuration.getExpectedRacksCount().get() : rackManager.getNumActive();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import com.hubspot.singularity.config.ApiPaths;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.RackManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.SingularityValidator;
import com.hubspot.singularity.data.SlaveManager;
Expand All @@ -94,6 +93,7 @@
import com.hubspot.singularity.expiring.SingularityExpiringSkipHealthchecks;
import com.hubspot.singularity.helpers.RebalancingHelper;
import com.hubspot.singularity.helpers.RequestHelper;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
import com.hubspot.singularity.smtp.SingularityMailer;
import com.ning.http.client.AsyncHttpClient;
Expand All @@ -120,25 +120,35 @@ public class RequestResource extends AbstractRequestResource {
private final RebalancingHelper rebalancingHelper;
private final RequestHelper requestHelper;
private final SlaveManager slaveManager;
private final RackManager rackManager;
private final SingularityConfiguration configuration;
private final SingularityExceptionNotifier exceptionNotifier;
private final SingularitySlaveAndRackManager slaveAndRackManager;

@Inject
public RequestResource(SingularityValidator validator, DeployManager deployManager, TaskManager taskManager, RebalancingHelper rebalancingHelper,
RequestManager requestManager, SingularityMailer mailer,
SingularityAuthorizationHelper authorizationHelper, RequestHelper requestHelper, LeaderLatch leaderLatch,
SlaveManager slaveManager, AsyncHttpClient httpClient, @Singularity ObjectMapper objectMapper,
RackManager rackManager, SingularityConfiguration configuration, SingularityExceptionNotifier exceptionNotifier) {
public RequestResource(SingularityValidator validator,
DeployManager deployManager,
TaskManager taskManager,
RebalancingHelper rebalancingHelper,
RequestManager requestManager,
SingularityMailer mailer,
SingularityAuthorizationHelper authorizationHelper,
RequestHelper requestHelper,
LeaderLatch leaderLatch,
SlaveManager slaveManager,
AsyncHttpClient httpClient,
@Singularity ObjectMapper objectMapper,
SingularityConfiguration configuration,
SingularityExceptionNotifier exceptionNotifier,
SingularitySlaveAndRackManager slaveAndRackManager) {
super(requestManager, deployManager, validator, authorizationHelper, httpClient, leaderLatch, objectMapper, requestHelper);
this.mailer = mailer;
this.taskManager = taskManager;
this.rebalancingHelper = rebalancingHelper;
this.requestHelper = requestHelper;
this.slaveManager = slaveManager;
this.rackManager = rackManager;
this.configuration = configuration;
this.exceptionNotifier = exceptionNotifier;
this.slaveAndRackManager = slaveAndRackManager;
}

private void submitRequest(SingularityRequest request, Optional<SingularityRequestWithState> oldRequestWithState, Optional<RequestHistoryType> historyType,
Expand Down Expand Up @@ -199,7 +209,8 @@ private void submitRequest(SingularityRequest request, Optional<SingularityReque
}
});

if (oldRequest.get().getInstancesSafe() > rackManager.getNumActive()) {
int activeRacksWithCapacityCount = slaveAndRackManager.getActiveRacksWithCapacityCount();
if (oldRequest.get().getInstancesSafe() > activeRacksWithCapacityCount) {
if (request.isRackSensitive() && configuration.isRebalanceRacksOnScaleDown()) {
rebalancingHelper.rebalanceRacks(request, remainingActiveTasks, user.getEmail());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,43 @@ public void testRackPlacementOnScaleDown() {
}
}

@Test
public void testItRespectsExpectedRackConfiguration() {
Optional<Integer> original = configuration.getExpectedRacksCount();

try {
// Tell Singularity to expect 2 racks
configuration.setExpectedRacksCount(Optional.of(2));

// Set up 4 active racks
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave3", "host3", Optional.of("rack3")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave4", "host4", Optional.of("rack4")))).join();

initRequest();
initFirstDeploy();
saveAndSchedule(request.toBuilder()
.setInstances(Optional.of(7))
.setRackSensitive(Optional.of(true))
);

// tasks per rack = ceil(7 / 2), not ceil(7 / 4)
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave1", "host1", Optional.of("rack1")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave2", "host2", Optional.of("rack2")))).join();
sms.resourceOffers(Arrays.asList(createOffer(1, 128, 1024, "slave3", "host3", Optional.of("rack3")))).join();

// everything should be scheduled
Assertions.assertEquals(7, taskManager.getActiveTaskIds().size());
} finally {
configuration.setExpectedRacksCount(original);
}
}

@Test
public void testPlacementOfBounceTasks() {
// Set up 1 active rack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ protected Offer createOffer(double cpus, double memory, double disk, String slav
}

protected Offer createOffer(double cpus, double memory, double disk, String slave, String host, Optional<String> rack, Map<String, String> attributes, String[] portRanges, Optional<String> role) {

AgentID slaveId = AgentID.newBuilder().setValue(slave).build();
FrameworkID frameworkId = FrameworkID.newBuilder().setValue("framework1").build();

Expand Down