Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stability improvements for sql/zk usage #2158

Merged
merged 8 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,39 @@ public SingularityRequestBatch getRequestsBatch(Set<String> requestIds) {
}
}

/**
* Check if a certain request exists
* @return the hash code of the current SingularityRequest object, empty otherwise
* returns -1 if the ETag header is missing on the response but the request is present
*/
public Optional<Integer> headRequest(String requestId) {
final Function<String, String> requestUri = host ->
String.format(REQUEST_GET_FORMAT, getApiBase(host), requestId);
HttpResponse response = executeRequest(
requestUri,
Method.HEAD,
Optional.empty(),
Collections.emptyMap()
);
if (response.isError()) {
throw new SingularityClientException(
String.format("Could not head request: %s", response.getAsString())
);
}
if (response.getStatusCode() == 404) {
return Optional.empty();
}
return Optional.of(
response
.getHeaders()
.get("ETag")
.stream()
.findFirst()
.map(Integer::parseInt)
.orElse(-1)
);
}

/**
* Get all requests that their state is ACTIVE
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ public class SingularityConfiguration extends Configuration {
// Stops persisters + purgers from running when the DB is enabled. Forces binding of zk-based usage manager
private boolean sqlReadOnlyMode = false;

// Instructs this instance to not contend for leadership. It will only serve api calls
private boolean readOnlyInstance = false;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -2025,4 +2028,12 @@ public boolean isSqlReadOnlyMode() {
public void setSqlReadOnlyMode(boolean sqlReadOnlyMode) {
this.sqlReadOnlyMode = sqlReadOnlyMode;
}

public boolean isReadOnlyInstance() {
return readOnlyInstance;
}

public void setReadOnlyInstance(boolean readOnlyInstance) {
this.readOnlyInstance = readOnlyInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ public List<String> getRequestIdsInTaskHistory() {
return getChildren(HISTORY_PATH_ROOT);
}

public int getTaskCountForRequest(String requestId) {
return getNumChildren(getRequestPath(requestId));
}

public List<SingularityTaskId> getAllTaskIds() {
final List<String> requestIds = getChildren(HISTORY_PATH_ROOT);
final List<String> paths = Lists.newArrayListWithCapacity(requestIds.size());
Expand Down Expand Up @@ -995,13 +999,18 @@ private List<SingularityTaskId> getTaskIdsForRequest(
String requestId,
TaskFilter taskFilter
) {
final List<SingularityTaskId> requestTaskIds = getTaskIdsForRequest(requestId);
final List<SingularityTaskId> activeTaskIds = filterActiveTaskIds(requestTaskIds);

if (taskFilter == TaskFilter.ACTIVE) {
return activeTaskIds;
if (leaderCache.active()) {
return leaderCache.getActiveTaskIdsForRequest(requestId);
} else {
return getActiveTaskIds()
.stream()
.filter(t -> t.getRequestId().equals(requestId))
.collect(Collectors.toList());
}
}

final List<SingularityTaskId> requestTaskIds = getTaskIdsForRequest(requestId);
final List<SingularityTaskId> activeTaskIds = filterActiveTaskIds(requestTaskIds);
Iterables.removeAll(requestTaskIds, activeTaskIds);

return requestTaskIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.TaskManager;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -58,7 +59,11 @@ public void runActionOnPoll() {
LOG.info("Checking inactive task ids for task history persistence");

final long start = System.currentTimeMillis();
for (String requestId : taskManager.getRequestIdsInTaskHistory()) {
List<String> requestIds = taskManager.getRequestIdsInTaskHistory();
requestIds.sort(
Comparator.comparingLong(taskManager::getTaskCountForRequest).reversed()
);
for (String requestId : requestIds) {
try {
LOG.info("Checking request {}", requestId);
List<SingularityTaskId> taskIds = taskManager.getTaskIdsForRequest(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.hubspot.singularity.SingularityLeaderController;
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.ExecutorIdGenerator;
import com.hubspot.singularity.mesos.SingularityMesosExecutorInfoSupport;
import com.hubspot.singularity.metrics.SingularityGraphiteReporter;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class SingularityLifecycleManaged implements Managed {
private final SingularityGraphiteReporter graphiteReporter;
private final ExecutorIdGenerator executorIdGenerator;
private final Set<SingularityLeaderOnlyPoller> leaderOnlyPollers;
private final boolean readOnly;

private final CuratorFramework curatorFramework;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -57,7 +59,8 @@ public SingularityLifecycleManaged(
SingularityMesosExecutorInfoSupport executorInfoSupport,
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
) {
this.cachedThreadPoolFactory = cachedThreadPoolFactory;
this.scheduledExecutorServiceFactory = scheduledExecutorServiceFactory;
Expand All @@ -69,13 +72,18 @@ public SingularityLifecycleManaged(
this.graphiteReporter = graphiteReporter;
this.executorIdGenerator = executorIdGenerator;
this.leaderOnlyPollers = leaderOnlyPollers;
this.readOnly = configuration.isReadOnlyInstance();
}

@Override
public void start() throws Exception {
if (!started.getAndSet(true)) {
startCurator();
leaderLatch.start();
if (!readOnly) {
leaderLatch.start();
} else {
LOG.info("Registered as read only, will not attempt to become the leader");
}
leaderController.start(); // start the state poller
graphiteReporter.start();
executorIdGenerator.start();
Expand Down Expand Up @@ -107,7 +115,7 @@ public void stop() throws Exception {

// to override in unit testing
protected boolean startLeaderPollers() {
return true;
return !readOnly;
}

private void stopDirectoryFetcher() {
Expand Down Expand Up @@ -163,8 +171,10 @@ private void stopGraphiteReporter() {

private void stopLeaderLatch() {
try {
LOG.info("Stopping leader latch");
leaderLatch.close();
if (!readOnly) {
LOG.info("Stopping leader latch");
leaderLatch.close();
}
} catch (Throwable t) {
LOG.warn("Could not stop leader latch ({})}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -2075,4 +2076,27 @@ public List<CrashLoopInfo> getCrashLoopsForRequest(
);
return requestManager.getCrashLoopsForRequest(requestId);
}

@HEAD
@Path("/request/{requestId}")
@Operation(
summary = "Check if a request of specific ID exists and get the hash code of the SingularityRequest object"
)
public Response headRequest(
@Parameter(hidden = true) @Auth SingularityUser user,
@Parameter(required = true, description = "The Request ID to check") @PathParam(
"requestId"
) String requestId,
@Parameter(
description = "Fetched a cached version of this data to limit expensive operations"
) @QueryParam("useWebCache") Boolean useWebCache
) {
Optional<SingularityRequestWithState> requestWithState = requestManager.getRequest(
requestId,
useWebCache
);
return requestWithState
.map(r -> Response.ok().header("ETag", r.getRequest().hashCode()).build())
.orElseGet(() -> Response.status(404).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.hubspot.singularity.SingularityLeaderController;
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.ExecutorIdGenerator;
import com.hubspot.singularity.mesos.SingularityMesosExecutorInfoSupport;
import com.hubspot.singularity.metrics.SingularityGraphiteReporter;
Expand All @@ -26,7 +27,8 @@ public SingularityLifecycleManagedTest(
SingularityMesosExecutorInfoSupport executorInfoSupport,
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
) {
super(
cachedThreadPoolFactory,
Expand All @@ -38,7 +40,8 @@ public SingularityLifecycleManagedTest(
executorInfoSupport,
graphiteReporter,
executorIdGenerator,
leaderOnlyPollers
leaderOnlyPollers,
configuration
);
}

Expand Down
2 changes: 1 addition & 1 deletion SingularityUI/app/thirdPartyConfigurations.es6
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const loadThirdParty = () => {

// Messenger options
window.Messenger.options = {
extraClasses: 'messenger-fixed messenger-on-top',
extraClasses: 'messenger-fixed messenger-on-bottom messenger-on-right',
theme: 'air',
hideOnNavigate: true,
maxMessages: 1,
Expand Down