Skip to content

Commit

Permalink
Merge pull request #2190 from HubSpot/lifecycle_tweaks
Browse files Browse the repository at this point in the history
Lifecycle tweaks for Singularity
  • Loading branch information
ssalinas authored Mar 19, 2021
2 parents 6394dbe + 9e314cb commit fa151c3
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.singularity.config.SingularityConfiguration;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -22,7 +22,9 @@ public class SingularityManagedScheduledExecutorServiceFactory {
);

private final AtomicBoolean stopped = new AtomicBoolean();
private final List<ScheduledExecutorService> executorPools = new ArrayList<>();
private final AtomicBoolean leaderStopped = new AtomicBoolean();
private final Map<String, ScheduledExecutorService> executorPools = new HashMap<>();
private final Map<String, ScheduledExecutorService> leaderPollerPools = new HashMap<>();

private final long timeoutInMillis;

Expand Down Expand Up @@ -57,29 +59,66 @@ public synchronized ScheduledExecutorService get(
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build()
);
if (isLeaderOnlyPoller) {
executorPools.add(0, service);
leaderPollerPools.put(name, service);
} else {
executorPools.add(service);
executorPools.put(name, service);
}
return service;
}

public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
executorPools.forEach(ScheduledExecutorService::shutdown);

public void stopLeaderPollers() throws Exception {
if (!leaderStopped.getAndSet(true)) {
long timeoutLeftInMillis = timeoutInMillis;
for (Map.Entry<String, ScheduledExecutorService> entry : leaderPollerPools.entrySet()) {
final long start = System.currentTimeMillis();
closeExecutor(entry.getValue(), timeoutLeftInMillis, entry.getKey());
timeoutLeftInMillis -= (System.currentTimeMillis() - start);
}
}
}

for (ScheduledExecutorService service : executorPools) {
public void stopOtherPollers() throws Exception {
if (!stopped.getAndSet(true)) {
long timeoutLeftInMillis = timeoutInMillis;
for (Map.Entry<String, ScheduledExecutorService> entry : executorPools.entrySet()) {
final long start = System.currentTimeMillis();
closeExecutor(entry.getValue(), timeoutLeftInMillis, entry.getKey());
timeoutLeftInMillis -= (System.currentTimeMillis() - start);
}
}
}

if (!service.awaitTermination(timeoutLeftInMillis, TimeUnit.MILLISECONDS)) {
LOG.warn("Scheduled executor service task did not exit cleanly");
service.shutdownNow();
continue;
public void closeExecutor(
ScheduledExecutorService scheduledExecutorService,
long timeoutInMillis,
String name
) {
// This is basically stolen from the ExecutorService javadoc with some slight modifications
if (!scheduledExecutorService.isTerminated()) {
scheduledExecutorService.shutdown();
try {
if (
!scheduledExecutorService.awaitTermination(
timeoutInMillis,
TimeUnit.MILLISECONDS
)
) {
scheduledExecutorService.shutdownNow();
if (
!scheduledExecutorService.awaitTermination(
timeoutInMillis,
TimeUnit.MILLISECONDS
)
) {
LOG.error(
"{}: Tasks in executor failed to terminate in time, continuing with shutdown regardless.",
name
);
}
}

timeoutLeftInMillis -= (System.currentTimeMillis() - start);
} catch (InterruptedException ie) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ public void markHealthchecksFinished(SingularityTaskId taskId) {
public SingularityCreateResult savePendingTask(SingularityPendingTask task) {
final String pendingPath = getPendingPath(task.getPendingTaskId());

leaderCache.savePendingTask(task);
if (leaderCache.active()) {
leaderCache.savePendingTask(task);
}

return save(pendingPath, task, pendingTaskTranscoder);
}
Expand Down Expand Up @@ -675,18 +677,24 @@ public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getTaskHistory
}
return updates;
} else {
Map<String, SingularityTaskId> pathsMap = Maps.newHashMap();
for (SingularityTaskId taskId : taskIds) {
pathsMap.put(getHistoryPath(taskId), taskId);
}
return fetchTaskHistoryUpdates(taskIds);
}
}

return getAsyncNestedChildDataAsMap(
"getTaskHistoryUpdates",
pathsMap,
UPDATES_PATH,
taskHistoryUpdateTranscoder
);
private Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> fetchTaskHistoryUpdates(
Collection<SingularityTaskId> taskIds
) {
Map<String, SingularityTaskId> pathsMap = Maps.newHashMap();
for (SingularityTaskId taskId : taskIds) {
pathsMap.put(getHistoryPath(taskId), taskId);
}

return getAsyncNestedChildDataAsMap(
"getTaskHistoryUpdates",
pathsMap,
UPDATES_PATH,
taskHistoryUpdateTranscoder
);
}

public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getAllActiveTaskHistoryUpdates() {
Expand Down Expand Up @@ -1198,11 +1206,13 @@ public boolean taskExistsInZk(SingularityTaskId taskId) {

public void activateLeaderCache() {
leaderCache.cachePendingTasks(fetchPendingTasks());
leaderCache.cachePendingTasksToDelete(getPendingTasksMarkedForDeletion());
leaderCache.cacheActiveTaskIds(getActiveTaskIds(false));
leaderCache.cachePendingTasksToDelete(fetchPendingTasksMarkedForDeletion());
leaderCache.cacheActiveTaskIds(getActiveTaskIdsUncached());
leaderCache.cacheCleanupTasks(fetchCleanupTasks());
leaderCache.cacheKilledTasks(fetchKilledTaskIdRecords());
leaderCache.cacheTaskHistoryUpdates(getAllActiveTaskHistoryUpdates());
leaderCache.cacheTaskHistoryUpdates(
fetchTaskHistoryUpdates(getActiveTaskIdsUncached())
);
}

private List<SingularityPendingTask> fetchPendingTasks() {
Expand Down Expand Up @@ -1384,6 +1394,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
.and()
.commit();

// Not checking isActive here, already called within offer check flow
leaderCache.putActiveTask(task.getTaskId());
taskCache.set(path, task);
} catch (KeeperException.NodeExistsException nee) {
Expand Down Expand Up @@ -1451,7 +1462,9 @@ public SingularityDeleteResult deleteKilledRecord(SingularityTaskId taskId) {

@Timed
public SingularityDeleteResult deleteLastActiveTaskStatus(SingularityTaskId taskId) {
leaderCache.deleteActiveTaskId(taskId);
if (leaderCache.active()) {
leaderCache.deleteActiveTaskId(taskId);
}
return delete(getLastActiveTaskStatusPath(taskId));
}

Expand Down Expand Up @@ -1547,7 +1560,9 @@ public SingularityDeleteResult deleteTaskShellCommandRequestFromQueue(

public SingularityCreateResult saveTaskCleanup(SingularityTaskCleanup cleanup) {
saveTaskHistoryUpdate(cleanup);
leaderCache.saveTaskCleanup(cleanup);
if (leaderCache.active()) {
leaderCache.saveTaskCleanup(cleanup);
}
return save(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand Down Expand Up @@ -1581,7 +1596,9 @@ private void saveTaskHistoryUpdate(SingularityTaskCleanup cleanup) {
}

public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
if (leaderCache.active()) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
}
final SingularityCreateResult result = create(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand All @@ -1596,13 +1613,17 @@ public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup)
}

public void deletePendingTask(SingularityPendingTaskId pendingTaskId) {
leaderCache.deletePendingTask(pendingTaskId);
if (leaderCache.active()) {
leaderCache.deletePendingTask(pendingTaskId);
}
delete(getPendingPath(pendingTaskId));
delete(getPendingTasksToDeletePath(pendingTaskId));
}

public void markPendingTaskForDeletion(SingularityPendingTaskId pendingTaskId) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
if (leaderCache.active()) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
}
create(getPendingTasksToDeletePath(pendingTaskId));
}

Expand All @@ -1611,11 +1632,17 @@ public List<SingularityPendingTaskId> getPendingTasksMarkedForDeletion() {
return leaderCache.getPendingTaskIdsToDelete();
}

return fetchPendingTasksMarkedForDeletion();
}

private List<SingularityPendingTaskId> fetchPendingTasksMarkedForDeletion() {
return getChildrenAsIds(PENDING_TASKS_TO_DELETE_PATH_ROOT, pendingTaskIdTranscoder);
}

public void deleteCleanupTask(String taskId) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
if (leaderCache.active()) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
}
delete(getCleanupPath(taskId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public UsageManager(
}

public void activateLeaderCache() {
leaderCache.cacheRequestUtilizations(getRequestUtilizations(false));
leaderCache.cacheAgentUsages(getAllCurrentAgentUsage());
leaderCache.cacheRequestUtilizations(fetchRequestUtilizations());
leaderCache.cacheAgentUsages(fetchAllCurrentAgentUsage());
}

public SingularityCreateResult saveClusterUtilization(
Expand Down Expand Up @@ -91,18 +91,19 @@ public Map<String, RequestUtilization> getRequestUtilizations(boolean useWebCach
if (useWebCache && webCache.useCachedRequestUtilizations()) {
return webCache.getRequestUtilizations();
}
Map<String, RequestUtilization> requestUtilizations = getAsyncChildren(
REQUESTS_PATH,
requestUtilizationTranscoder
)
.stream()
.collect(Collectors.toMap(RequestUtilization::getRequestId, Function.identity()));
Map<String, RequestUtilization> requestUtilizations = fetchRequestUtilizations();
if (useWebCache) {
webCache.cacheRequestUtilizations(requestUtilizations);
}
return requestUtilizations;
}

private Map<String, RequestUtilization> fetchRequestUtilizations() {
return getAsyncChildren(REQUESTS_PATH, requestUtilizationTranscoder)
.stream()
.collect(Collectors.toMap(RequestUtilization::getRequestId, Function.identity()));
}

public Optional<RequestUtilization> getRequestUtilization(
String requestId,
boolean useWebCache
Expand Down Expand Up @@ -166,6 +167,10 @@ public Map<String, SingularityAgentUsageWithId> getAllCurrentAgentUsage() {
if (leaderCache.active()) {
return leaderCache.getAgentUsages();
}
return fetchAllCurrentAgentUsage();
}

private Map<String, SingularityAgentUsageWithId> fetchAllCurrentAgentUsage() {
return getAsyncChildren(AGENTS_PATH, agentUsageTranscoder)
.stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void preJettyStop() {
@Override
public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
stopOtherExecutors();
stopCurator(); // disconnect from zk
stopGraphiteReporter();
} else {
Expand Down Expand Up @@ -156,9 +157,19 @@ private void stopNewPolls() {

private void stopExecutors() {
try {
LOG.info("Stopping pollers and executors");
LOG.info("Stopping leader pollers and executors");
cachedThreadPoolFactory.stop();
scheduledExecutorServiceFactory.stop();
scheduledExecutorServiceFactory.stopLeaderPollers();
} catch (Throwable t) {
LOG.warn("Could not stop scheduled executors ({})}", t.getMessage());
}
}

// Post jetty stop, these can be hit on request paths still
private void stopOtherExecutors() {
try {
LOG.info("Stopping pollers and executors");
scheduledExecutorServiceFactory.stopOtherPollers();
} catch (Throwable t) {
LOG.warn("Could not stop scheduled executors ({})}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,13 @@ private void drainRequestCleanupQueue() {
CompletableFuture.runAsync(
() ->
lock.runWithRequestLock(
() -> {
() ->
processRequestCleanup(
start,
numTasksKilled,
numScheduledTasksRemoved,
requestCleanup
);
},
),
requestCleanup.getRequestId(),
String.format(
"%s#%s",
Expand Down
Loading

0 comments on commit fa151c3

Please sign in to comment.