Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lifecycle tweaks for Singularity #2190

Merged
merged 7 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ public void markHealthchecksFinished(SingularityTaskId taskId) {
public SingularityCreateResult savePendingTask(SingularityPendingTask task) {
final String pendingPath = getPendingPath(task.getPendingTaskId());

leaderCache.savePendingTask(task);
if (leaderCache.active()) {
leaderCache.savePendingTask(task);
}

return save(pendingPath, task, pendingTaskTranscoder);
}
Expand Down Expand Up @@ -1384,6 +1386,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
.and()
.commit();

// Not checking isActive here, already called within offer check flow
leaderCache.putActiveTask(task.getTaskId());
taskCache.set(path, task);
} catch (KeeperException.NodeExistsException nee) {
Expand Down Expand Up @@ -1451,7 +1454,9 @@ public SingularityDeleteResult deleteKilledRecord(SingularityTaskId taskId) {

@Timed
public SingularityDeleteResult deleteLastActiveTaskStatus(SingularityTaskId taskId) {
leaderCache.deleteActiveTaskId(taskId);
if (leaderCache.active()) {
leaderCache.deleteActiveTaskId(taskId);
}
return delete(getLastActiveTaskStatusPath(taskId));
}

Expand Down Expand Up @@ -1547,7 +1552,9 @@ public SingularityDeleteResult deleteTaskShellCommandRequestFromQueue(

public SingularityCreateResult saveTaskCleanup(SingularityTaskCleanup cleanup) {
saveTaskHistoryUpdate(cleanup);
leaderCache.saveTaskCleanup(cleanup);
if (leaderCache.active()) {
leaderCache.saveTaskCleanup(cleanup);
}
return save(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand Down Expand Up @@ -1581,7 +1588,9 @@ private void saveTaskHistoryUpdate(SingularityTaskCleanup cleanup) {
}

public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
if (leaderCache.active()) {
leaderCache.createTaskCleanupIfNotExists(cleanup);
}
final SingularityCreateResult result = create(
getCleanupPath(cleanup.getTaskId().getId()),
cleanup,
Expand All @@ -1596,13 +1605,17 @@ public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup)
}

public void deletePendingTask(SingularityPendingTaskId pendingTaskId) {
leaderCache.deletePendingTask(pendingTaskId);
if (leaderCache.active()) {
leaderCache.deletePendingTask(pendingTaskId);
}
delete(getPendingPath(pendingTaskId));
delete(getPendingTasksToDeletePath(pendingTaskId));
}

public void markPendingTaskForDeletion(SingularityPendingTaskId pendingTaskId) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
if (leaderCache.active()) {
leaderCache.markPendingTaskForDeletion(pendingTaskId);
}
create(getPendingTasksToDeletePath(pendingTaskId));
}

Expand All @@ -1615,7 +1628,9 @@ public List<SingularityPendingTaskId> getPendingTasksMarkedForDeletion() {
}

public void deleteCleanupTask(String taskId) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
if (leaderCache.active()) {
leaderCache.deleteTaskCleanup(SingularityTaskId.valueOf(taskId));
}
delete(getCleanupPath(taskId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,13 @@ private void drainRequestCleanupQueue() {
CompletableFuture.runAsync(
() ->
lock.runWithRequestLock(
() -> {
() ->
processRequestCleanup(
start,
numTasksKilled,
numScheduledTasksRemoved,
requestCleanup
);
},
),
requestCleanup.getRequestId(),
String.format(
"%s#%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -47,15 +48,43 @@ public class SingularityLeaderCache {
private Map<String, RequestUtilization> requestUtilizations;
private Map<String, SingularityAgentUsageWithId> agentUsages;

private final Object syncObject = new Object();

private volatile boolean active;
private volatile boolean bootstrapping;

@Inject
public SingularityLeaderCache() {
this.active = false;
this.bootstrapping = false;
}

public synchronized void startBootstrap() {
bootstrapping = true;
}

public void activate() {
public synchronized void activate() {
active = true;
bootstrapping = false;
synchronized (syncObject) {
syncObject.notify();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would this do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit of background on wait + notify in https://www.baeldung.com/java-wait-notify. Though realizing I probably need notifyAll here actually. Leaving as a TODO for later

}

public boolean active() {
if (bootstrapping) {
try {
synchronized (syncObject) {
while (bootstrapping) {
syncObject.wait();
}
return active;
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
return active;
}

// Only for unit testing
Expand Down Expand Up @@ -109,9 +138,8 @@ public void cachePendingTasksToDelete(List<SingularityPendingTaskId> pendingTask
}

public void cacheActiveTaskIds(List<SingularityTaskId> activeTaskIds) {
this.activeTaskIds =
Collections.synchronizedSet(new HashSet<SingularityTaskId>(activeTaskIds.size()));
activeTaskIds.forEach(this.activeTaskIds::add);
this.activeTaskIds = Collections.synchronizedSet(new HashSet<>(activeTaskIds.size()));
this.activeTaskIds.addAll(activeTaskIds);
}

public void cacheRequests(List<SingularityRequestWithState> requestsWithState) {
Expand Down Expand Up @@ -140,19 +168,17 @@ public void cacheTaskHistoryUpdates(
Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> historyUpdates
) {
this.historyUpdates = new ConcurrentHashMap<>(historyUpdates.size());
historyUpdates
.entrySet()
.stream()
.forEach(
e ->
this.historyUpdates.put(
e.getKey(),
e
.getValue()
.stream()
.collect(Collectors.toMap(u -> u.getTaskState(), u -> u))
)
);
historyUpdates.forEach(
(key, value) ->
this.historyUpdates.put(
key,
value
.stream()
.collect(
Collectors.toMap(SingularityTaskHistoryUpdate::getTaskState, u -> u)
)
)
);
}

public void cacheAgents(List<SingularityAgent> slaves) {
Expand Down Expand Up @@ -185,10 +211,6 @@ public void cacheAgentUsages(Map<String, SingularityAgentUsageWithId> slaveUsage
this.agentUsages = new ConcurrentHashMap<>(slaveUsages);
}

public boolean active() {
return active;
}

public List<SingularityPendingTask> getPendingTasks() {
return new ArrayList<>(pendingTaskIdToPendingTask.values());
}
Expand Down Expand Up @@ -222,9 +244,7 @@ public void deletePendingTask(SingularityPendingTaskId pendingTaskId) {
LOG.warn("deletePendingTask {}, but not active", pendingTaskId);
return;
}
if (pendingTaskIdsToDelete.contains(pendingTaskId)) {
pendingTaskIdsToDelete.remove(pendingTaskId);
}
pendingTaskIdsToDelete.remove(pendingTaskId);
pendingTaskIdToPendingTask.remove(pendingTaskId);
}

Expand Down Expand Up @@ -423,7 +443,7 @@ public Map<String, SingularityRequestDeployState> getRequestDeployStateByRequest
.entrySet()
.stream()
.filter(e -> requestIds.contains(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}

public void deleteRequestDeployState(String requestId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void activateLeaderCache() {
6,
new ThreadFactoryBuilder().setNameFormat("leader-cache-%d").build()
);
leaderCache.startBootstrap();
CompletableFutures
.allOf(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class SingularityTaskReconciliation {
private final StateManager stateManager;
private final SingularityMesosStatusUpdateHandler statusUpdateHandler;

private Future<?> nextCheckFuture = null;

@Inject
public SingularityTaskReconciliation(
SingularityManagedScheduledExecutorServiceFactory executorServiceFactory,
Expand Down Expand Up @@ -97,6 +100,12 @@ boolean isReconciliationRunning() {
return isRunningReconciliation.get();
}

public void cancelReconciliation() {
if (nextCheckFuture != null) {
nextCheckFuture.cancel(true);
}
}

public ReconciliationState startReconciliation() {
if (
statusUpdateHandler.getQueueFullness() >= configuration.getStatusQueueNearlyFull()
Expand Down Expand Up @@ -156,11 +165,9 @@ private void scheduleReconciliationCheck(
)
);

executorService.schedule(
new Runnable() {

@Override
public void run() {
nextCheckFuture =
executorService.schedule(
() -> {
try {
checkReconciliation(
reconciliationStart,
Expand All @@ -179,11 +186,10 @@ public void run() {
);
abort.abort(AbortReason.UNRECOVERABLE_ERROR, Optional.of(t));
}
}
},
configuration.getCheckReconcileWhenRunningEveryMillis(),
TimeUnit.MILLISECONDS
);
},
configuration.getCheckReconcileWhenRunningEveryMillis(),
TimeUnit.MILLISECONDS
);
}

private void checkReconciliation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public void runActionOnPoll() {
taskReconciliation.startReconciliation();
}
}

@Override
public void stop() {
taskReconciliation.cancelReconciliation();
super.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.hubspot.singularity.scheduler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SingularityLeaderCacheTest {

@Test
public void testBlockWhileBootstrapping() throws Exception {
SingularityLeaderCache leaderCache = new SingularityLeaderCache();
AtomicBoolean reachedTheEnd = new AtomicBoolean(false);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Runnable testRun = () -> {
if (leaderCache.active()) {
reachedTheEnd.set(true);
}
};

// Should now block anything calling leaderCache.active() until bootstrap done
leaderCache.startBootstrap();
CompletableFuture.runAsync(testRun, executorService);
Assertions.assertFalse(reachedTheEnd.get());
Thread.sleep(200); // just in case
Assertions.assertFalse(reachedTheEnd.get());

// should notify any waiting and unblock
leaderCache.activate();
Thread.sleep(200);
Assertions.assertTrue(reachedTheEnd.get());
}
}