Skip to content

Commit

Permalink
Merge pull request #2158 from HubSpot/stability
Browse files Browse the repository at this point in the history
Stability improvements for sql/zk usage
  • Loading branch information
ssalinas authored Jan 4, 2021
2 parents 16f1a09 + c5f742b commit 43cb189
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,39 @@ public SingularityRequestBatch getRequestsBatch(Set<String> requestIds) {
}
}

/**
* Check if a certain request exists
* @return the hash code of the current SingularityRequest object, empty otherwise
* returns -1 if the ETag header is missing on the response but the request is present
*/
public Optional<Integer> headRequest(String requestId) {
final Function<String, String> requestUri = host ->
String.format(REQUEST_GET_FORMAT, getApiBase(host), requestId);
HttpResponse response = executeRequest(
requestUri,
Method.HEAD,
Optional.empty(),
Collections.emptyMap()
);
if (response.isError()) {
throw new SingularityClientException(
String.format("Could not head request: %s", response.getAsString())
);
}
if (response.getStatusCode() == 404) {
return Optional.empty();
}
return Optional.of(
response
.getHeaders()
.get("ETag")
.stream()
.findFirst()
.map(Integer::parseInt)
.orElse(-1)
);
}

/**
* Get all requests that their state is ACTIVE
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ public class SingularityConfiguration extends Configuration {
// Stops persisters + purgers from running when the DB is enabled. Forces binding of zk-based usage manager
private boolean sqlReadOnlyMode = false;

// Instructs this instance to not contend for leadership. It will only serve api calls
private boolean readOnlyInstance = false;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -2025,4 +2028,12 @@ public boolean isSqlReadOnlyMode() {
public void setSqlReadOnlyMode(boolean sqlReadOnlyMode) {
this.sqlReadOnlyMode = sqlReadOnlyMode;
}

public boolean isReadOnlyInstance() {
return readOnlyInstance;
}

public void setReadOnlyInstance(boolean readOnlyInstance) {
this.readOnlyInstance = readOnlyInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ public List<String> getRequestIdsInTaskHistory() {
return getChildren(HISTORY_PATH_ROOT);
}

public int getTaskCountForRequest(String requestId) {
return getNumChildren(getRequestPath(requestId));
}

public List<SingularityTaskId> getAllTaskIds() {
final List<String> requestIds = getChildren(HISTORY_PATH_ROOT);
final List<String> paths = Lists.newArrayListWithCapacity(requestIds.size());
Expand Down Expand Up @@ -995,13 +999,18 @@ private List<SingularityTaskId> getTaskIdsForRequest(
String requestId,
TaskFilter taskFilter
) {
final List<SingularityTaskId> requestTaskIds = getTaskIdsForRequest(requestId);
final List<SingularityTaskId> activeTaskIds = filterActiveTaskIds(requestTaskIds);

if (taskFilter == TaskFilter.ACTIVE) {
return activeTaskIds;
if (leaderCache.active()) {
return leaderCache.getActiveTaskIdsForRequest(requestId);
} else {
return getActiveTaskIds()
.stream()
.filter(t -> t.getRequestId().equals(requestId))
.collect(Collectors.toList());
}
}

final List<SingularityTaskId> requestTaskIds = getTaskIdsForRequest(requestId);
final List<SingularityTaskId> activeTaskIds = filterActiveTaskIds(requestTaskIds);
Iterables.removeAll(requestTaskIds, activeTaskIds);

return requestTaskIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.TaskManager;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -58,7 +59,11 @@ public void runActionOnPoll() {
LOG.info("Checking inactive task ids for task history persistence");

final long start = System.currentTimeMillis();
for (String requestId : taskManager.getRequestIdsInTaskHistory()) {
List<String> requestIds = taskManager.getRequestIdsInTaskHistory();
requestIds.sort(
Comparator.comparingLong(taskManager::getTaskCountForRequest).reversed()
);
for (String requestId : requestIds) {
try {
LOG.info("Checking request {}", requestId);
List<SingularityTaskId> taskIds = taskManager.getTaskIdsForRequest(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.hubspot.singularity.SingularityLeaderController;
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.ExecutorIdGenerator;
import com.hubspot.singularity.mesos.SingularityMesosExecutorInfoSupport;
import com.hubspot.singularity.metrics.SingularityGraphiteReporter;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class SingularityLifecycleManaged implements Managed {
private final SingularityGraphiteReporter graphiteReporter;
private final ExecutorIdGenerator executorIdGenerator;
private final Set<SingularityLeaderOnlyPoller> leaderOnlyPollers;
private final boolean readOnly;

private final CuratorFramework curatorFramework;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -57,7 +59,8 @@ public SingularityLifecycleManaged(
SingularityMesosExecutorInfoSupport executorInfoSupport,
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
) {
this.cachedThreadPoolFactory = cachedThreadPoolFactory;
this.scheduledExecutorServiceFactory = scheduledExecutorServiceFactory;
Expand All @@ -69,13 +72,18 @@ public SingularityLifecycleManaged(
this.graphiteReporter = graphiteReporter;
this.executorIdGenerator = executorIdGenerator;
this.leaderOnlyPollers = leaderOnlyPollers;
this.readOnly = configuration.isReadOnlyInstance();
}

@Override
public void start() throws Exception {
if (!started.getAndSet(true)) {
startCurator();
leaderLatch.start();
if (!readOnly) {
leaderLatch.start();
} else {
LOG.info("Registered as read only, will not attempt to become the leader");
}
leaderController.start(); // start the state poller
graphiteReporter.start();
executorIdGenerator.start();
Expand Down Expand Up @@ -107,7 +115,7 @@ public void stop() throws Exception {

// to override in unit testing
protected boolean startLeaderPollers() {
return true;
return !readOnly;
}

private void stopDirectoryFetcher() {
Expand Down Expand Up @@ -163,8 +171,10 @@ private void stopGraphiteReporter() {

private void stopLeaderLatch() {
try {
LOG.info("Stopping leader latch");
leaderLatch.close();
if (!readOnly) {
LOG.info("Stopping leader latch");
leaderLatch.close();
}
} catch (Throwable t) {
LOG.warn("Could not stop leader latch ({})}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -2075,4 +2076,27 @@ public List<CrashLoopInfo> getCrashLoopsForRequest(
);
return requestManager.getCrashLoopsForRequest(requestId);
}

@HEAD
@Path("/request/{requestId}")
@Operation(
summary = "Check if a request of specific ID exists and get the hash code of the SingularityRequest object"
)
public Response headRequest(
@Parameter(hidden = true) @Auth SingularityUser user,
@Parameter(required = true, description = "The Request ID to check") @PathParam(
"requestId"
) String requestId,
@Parameter(
description = "Fetched a cached version of this data to limit expensive operations"
) @QueryParam("useWebCache") Boolean useWebCache
) {
Optional<SingularityRequestWithState> requestWithState = requestManager.getRequest(
requestId,
useWebCache
);
return requestWithState
.map(r -> Response.ok().header("ETag", r.getRequest().hashCode()).build())
.orElseGet(() -> Response.status(404).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.hubspot.singularity.SingularityLeaderController;
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.ExecutorIdGenerator;
import com.hubspot.singularity.mesos.SingularityMesosExecutorInfoSupport;
import com.hubspot.singularity.metrics.SingularityGraphiteReporter;
Expand All @@ -26,7 +27,8 @@ public SingularityLifecycleManagedTest(
SingularityMesosExecutorInfoSupport executorInfoSupport,
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
) {
super(
cachedThreadPoolFactory,
Expand All @@ -38,7 +40,8 @@ public SingularityLifecycleManagedTest(
executorInfoSupport,
graphiteReporter,
executorIdGenerator,
leaderOnlyPollers
leaderOnlyPollers,
configuration
);
}

Expand Down
2 changes: 1 addition & 1 deletion SingularityUI/app/thirdPartyConfigurations.es6
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const loadThirdParty = () => {

// Messenger options
window.Messenger.options = {
extraClasses: 'messenger-fixed messenger-on-top',
extraClasses: 'messenger-fixed messenger-on-bottom messenger-on-right',
theme: 'air',
hideOnNavigate: true,
maxMessages: 1,
Expand Down

0 comments on commit 43cb189

Please sign in to comment.