Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lifecycle tweaks for Singularity #2190

Merged
merged 7 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.singularity.config.SingularityConfiguration;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -22,7 +22,9 @@ public class SingularityManagedScheduledExecutorServiceFactory {
);

private final AtomicBoolean stopped = new AtomicBoolean();
private final List<ScheduledExecutorService> executorPools = new ArrayList<>();
private final AtomicBoolean leaderStopped = new AtomicBoolean();
private final Map<String, ScheduledExecutorService> executorPools = new HashMap<>();
private final Map<String, ScheduledExecutorService> leaderPollerPools = new HashMap<>();

private final long timeoutInMillis;

Expand Down Expand Up @@ -57,29 +59,66 @@ public synchronized ScheduledExecutorService get(
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build()
);
if (isLeaderOnlyPoller) {
executorPools.add(0, service);
leaderPollerPools.put(name, service);
} else {
executorPools.add(service);
executorPools.put(name, service);
}
return service;
}

public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
executorPools.forEach(ScheduledExecutorService::shutdown);

public void stopLeaderPollers() throws Exception {
if (!leaderStopped.getAndSet(true)) {
long timeoutLeftInMillis = timeoutInMillis;
for (Map.Entry<String, ScheduledExecutorService> entry : leaderPollerPools.entrySet()) {
final long start = System.currentTimeMillis();
closeExecutor(entry.getValue(), timeoutLeftInMillis, entry.getKey());
timeoutLeftInMillis -= (System.currentTimeMillis() - start);
}
}
}

for (ScheduledExecutorService service : executorPools) {
public void stopOtherPollers() throws Exception {
if (!stopped.getAndSet(true)) {
long timeoutLeftInMillis = timeoutInMillis;
for (Map.Entry<String, ScheduledExecutorService> entry : executorPools.entrySet()) {
final long start = System.currentTimeMillis();
closeExecutor(entry.getValue(), timeoutLeftInMillis, entry.getKey());
timeoutLeftInMillis -= (System.currentTimeMillis() - start);
}
}
}

if (!service.awaitTermination(timeoutLeftInMillis, TimeUnit.MILLISECONDS)) {
LOG.warn("Scheduled executor service task did not exit cleanly");
service.shutdownNow();
continue;
public void closeExecutor(
ScheduledExecutorService scheduledExecutorService,
long timeoutInMillis,
String name
) {
// This is basically stolen from the ExecutorService javadoc with some slight modifications
if (!scheduledExecutorService.isTerminated()) {
scheduledExecutorService.shutdown();
try {
if (
!scheduledExecutorService.awaitTermination(
timeoutInMillis,
TimeUnit.MILLISECONDS
)
) {
scheduledExecutorService.shutdownNow();
if (
!scheduledExecutorService.awaitTermination(
timeoutInMillis,
TimeUnit.MILLISECONDS
)
) {
LOG.error(
"{}: Tasks in executor failed to terminate in time, continuing with shutdown regardless.",
name
);
}
}

timeoutLeftInMillis -= (System.currentTimeMillis() - start);
} catch (InterruptedException ie) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ public void markHealthchecksFinished(SingularityTaskId taskId) {
public SingularityCreateResult savePendingTask(SingularityPendingTask task) {
final String pendingPath = getPendingPath(task.getPendingTaskId());

leaderCache.savePendingTask(task);
if (leaderCache.active()) {
leaderCache.savePendingTask(task);
}

return save(pendingPath, task, pendingTaskTranscoder);
}
Expand Down Expand Up @@ -675,18 +677,24 @@ public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getTaskHistory
}
return updates;
} else {
Map<String, SingularityTaskId> pathsMap = Maps.newHashMap();
for (SingularityTaskId taskId : taskIds) {
pathsMap.put(getHistoryPath(taskId), taskId);
}
return fetchTaskHistoryUpdates(taskIds);
}
}

return getAsyncNestedChildDataAsMap(
"getTaskHistoryUpdates",
pathsMap,
UPDATES_PATH,
taskHistoryUpdateTranscoder
);
private Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> fetchTaskHistoryUpdates(
Collection<SingularityTaskId> taskIds
) {
Map<String, SingularityTaskId> pathsMap = Maps.newHashMap();
for (SingularityTaskId taskId : taskIds) {
pathsMap.put(getHistoryPath(taskId), taskId);
}

return getAsyncNestedChildDataAsMap(
"getTaskHistoryUpdates",
pathsMap,
UPDATES_PATH,
taskHistoryUpdateTranscoder
);
}

public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getAllActiveTaskHistoryUpdates() {
Expand Down Expand Up @@ -1198,11 +1206,13 @@ public boolean taskExistsInZk(SingularityTaskId taskId) {

public void activateLeaderCache() {
leaderCache.cachePendingTasks(fetchPendingTasks());
leaderCache.cachePendingTasksToDelete(getPendingTasksMarkedForDeletion());
leaderCache.cacheActiveTaskIds(getActiveTaskIds(false));
leaderCache.cachePendingTasksToDelete(fetchPendingTasksMarkedForDeletion());
leaderCache.cacheActiveTaskIds(getActiveTaskIdsUncached());
leaderCache.cacheCleanupTasks(fetchCleanupTasks());
leaderCache.cacheKilledTasks(fetchKilledTaskIdRecords());
leaderCache.cacheTaskHistoryUpdates(getAllActiveTaskHistoryUpdates());
leaderCache.cacheTaskHistoryUpdates(
fetchTaskHistoryUpdates(getActiveTaskIdsUncached())
);
}

private List<SingularityPendingTask> fetchPendingTasks() {
Expand Down Expand Up @@ -1384,6 +1394,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
.and()
.commit();

// Not checking isActive here, already called within offer check flow
leaderCache.putActiveTask(task.getTaskId());
taskCache.set(path, task);
} catch (KeeperException.NodeExistsException nee) {
Expand Down Expand Up @@ -1451,7 +1462,9 @@ public SingularityDeleteResult deleteKilledRecord(SingularityTaskId taskId) {

@Timed
public SingularityDeleteResult deleteLastActiveTaskStatus(SingularityTaskId taskId) {
leaderCache.deleteActiveTaskId(taskId);
if (leaderCache.active()) {
leaderCache.deleteActiveTaskId(taskId);
}
return delete(getLastActiveTaskStatusPath(taskId));
}

Expand Down Expand Up @@ -1547,7 +1560,9 @@ public SingularityDeleteResult deleteTaskShellCommandRequestFromQueue(

public SingularityCreateResult saveTaskCleanup(SingularityTaskCleanup cleanup) {
saveTaskHistoryUpdate(cleanup);
leaderCache.saveTaskCleanup(cleanup);
if (leaderCache.active()) {
leaderCache.saveTaskCleanup(cleanup);
}
return save(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand Down Expand Up @@ -1581,7 +1596,9 @@ private void saveTaskHistoryUpdate(SingularityTaskCleanup cleanup) {
}

public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
if (leaderCache.active()) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
}
final SingularityCreateResult result = create(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand All @@ -1596,13 +1613,17 @@ public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup)
}

public void deletePendingTask(SingularityPendingTaskId pendingTaskId) {
leaderCache.deletePendingTask(pendingTaskId);
if (leaderCache.active()) {
leaderCache.deletePendingTask(pendingTaskId);
}
delete(getPendingPath(pendingTaskId));
delete(getPendingTasksToDeletePath(pendingTaskId));
}

public void markPendingTaskForDeletion(SingularityPendingTaskId pendingTaskId) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
if (leaderCache.active()) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
}
create(getPendingTasksToDeletePath(pendingTaskId));
}

Expand All @@ -1611,11 +1632,17 @@ public List<SingularityPendingTaskId> getPendingTasksMarkedForDeletion() {
return leaderCache.getPendingTaskIdsToDelete();
}

return fetchPendingTasksMarkedForDeletion();
}

private List<SingularityPendingTaskId> fetchPendingTasksMarkedForDeletion() {
return getChildrenAsIds(PENDING_TASKS_TO_DELETE_PATH_ROOT, pendingTaskIdTranscoder);
}

public void deleteCleanupTask(String taskId) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
if (leaderCache.active()) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
}
delete(getCleanupPath(taskId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public UsageManager(
}

public void activateLeaderCache() {
leaderCache.cacheRequestUtilizations(getRequestUtilizations(false));
leaderCache.cacheAgentUsages(getAllCurrentAgentUsage());
leaderCache.cacheRequestUtilizations(fetchRequestUtilizations());
leaderCache.cacheAgentUsages(fetchAllCurrentAgentUsage());
}

public SingularityCreateResult saveClusterUtilization(
Expand Down Expand Up @@ -91,18 +91,19 @@ public Map<String, RequestUtilization> getRequestUtilizations(boolean useWebCach
if (useWebCache && webCache.useCachedRequestUtilizations()) {
return webCache.getRequestUtilizations();
}
Map<String, RequestUtilization> requestUtilizations = getAsyncChildren(
REQUESTS_PATH,
requestUtilizationTranscoder
)
.stream()
.collect(Collectors.toMap(RequestUtilization::getRequestId, Function.identity()));
Map<String, RequestUtilization> requestUtilizations = fetchRequestUtilizations();
if (useWebCache) {
webCache.cacheRequestUtilizations(requestUtilizations);
}
return requestUtilizations;
}

private Map<String, RequestUtilization> fetchRequestUtilizations() {
return getAsyncChildren(REQUESTS_PATH, requestUtilizationTranscoder)
.stream()
.collect(Collectors.toMap(RequestUtilization::getRequestId, Function.identity()));
}

public Optional<RequestUtilization> getRequestUtilization(
String requestId,
boolean useWebCache
Expand Down Expand Up @@ -166,6 +167,10 @@ public Map<String, SingularityAgentUsageWithId> getAllCurrentAgentUsage() {
if (leaderCache.active()) {
return leaderCache.getAgentUsages();
}
return fetchAllCurrentAgentUsage();
}

private Map<String, SingularityAgentUsageWithId> fetchAllCurrentAgentUsage() {
return getAsyncChildren(AGENTS_PATH, agentUsageTranscoder)
.stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void preJettyStop() {
@Override
public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
stopOtherExecutors();
stopCurator(); // disconnect from zk
stopGraphiteReporter();
} else {
Expand Down Expand Up @@ -156,9 +157,19 @@ private void stopNewPolls() {

private void stopExecutors() {
try {
LOG.info("Stopping pollers and executors");
LOG.info("Stopping leader pollers and executors");
cachedThreadPoolFactory.stop();
scheduledExecutorServiceFactory.stop();
scheduledExecutorServiceFactory.stopLeaderPollers();
} catch (Throwable t) {
LOG.warn("Could not stop scheduled executors ({})}", t.getMessage());
}
}

// Post jetty stop, these can be hit on request paths still
private void stopOtherExecutors() {
try {
LOG.info("Stopping pollers and executors");
scheduledExecutorServiceFactory.stopOtherPollers();
} catch (Throwable t) {
LOG.warn("Could not stop scheduled executors ({})}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,13 @@ private void drainRequestCleanupQueue() {
CompletableFuture.runAsync(
() ->
lock.runWithRequestLock(
() -> {
() ->
processRequestCleanup(
start,
numTasksKilled,
numScheduledTasksRemoved,
requestCleanup
);
},
),
requestCleanup.getRequestId(),
String.format(
"%s#%s",
Expand Down
Loading