Skip to content

Commit

Permalink
RATIS-2245. Wait for apply log futures before taking snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
swamirishi committed Feb 8, 2025
1 parent 7626c79 commit e25ba08
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void run() {
CompletableFuture<Void> applyLogFutures = CompletableFuture.completedFuture(null);
for(; state != State.STOP; ) {
try {
waitForCommit();
waitForCommit(applyLogFutures);

if (state == State.RELOAD) {
reload();
Expand All @@ -209,14 +209,14 @@ public void run() {
}
}

private void waitForCommit() throws InterruptedException {
private void waitForCommit(CompletableFuture<Void> applyLogFutures) throws InterruptedException, ExecutionException {
// When a peer starts, the committed is initialized to 0.
// It will be updated only after the leader contacts other peers.
// Thus it is possible to have applied > committed initially.
final long applied = getLastAppliedIndex();
for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING && !shouldStop(); ) {
if (server.getSnapshotRequestHandler().shouldTriggerTakingSnapshot()) {
takeSnapshot();
takeSnapshot(applyLogFutures);
}
if (awaitForSignal.await(100, TimeUnit.MILLISECONDS)) {
return;
Expand Down Expand Up @@ -277,13 +277,13 @@ private void checkAndTakeSnapshot(CompletableFuture<?> futures)
throws ExecutionException, InterruptedException {
// check if need to trigger a snapshot
if (shouldTakeSnapshot()) {
futures.get();
takeSnapshot();
takeSnapshot(futures);
}
}

private void takeSnapshot() {
private void takeSnapshot(CompletableFuture<?> applyLogFutures) throws ExecutionException, InterruptedException {
final long i;
applyLogFutures.get();
try {
try(UncheckedAutoCloseable ignored = Timekeeper.start(stateMachineMetrics.get().getTakeSnapshotTimer())) {
i = stateMachine.takeSnapshot();
Expand Down

0 comments on commit e25ba08

Please sign in to comment.