diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 4883ab9dd205..7d7b28999fb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -851,7 +851,7 @@ private void removeStateMachineDataIfNeeded(long index) { .getFollowerNextIndices()).min().getAsLong(); LOG.debug("Removing data corresponding to log index {} min index {} " + "from cache", index, minIndex); - stateMachineDataCache.removeIf(k -> k >= (Math.min(minIndex, index))); + removeCacheDataUpTo(Math.min(minIndex, index)); } } catch (Exception e) { throw new RuntimeException(e); @@ -873,9 +873,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { removeStateMachineDataIfNeeded(index); // if waitOnBothFollower is false, remove the entry from the cache // as soon as its applied and such entry exists in the cache. - if (!waitOnBothFollowers) { - stateMachineDataCache.removeIf(k -> k >= index); - } + removeStateMachineDataIfMajorityFollowSync(index); DispatcherContext.Builder builder = new DispatcherContext.Builder().setTerm(trx.getLogEntry().getTerm()) .setLogIndex(index); @@ -982,6 +980,18 @@ public CompletableFuture applyTransaction(TransactionContext trx) { } } + private void removeStateMachineDataIfMajorityFollowSync(long index) { + if (!waitOnBothFollowers) { + // if majority follow in sync, remove all cache previous to current index + // including current index + removeCacheDataUpTo(index); + } + } + + private void removeCacheDataUpTo(long index) { + stateMachineDataCache.removeIf(k -> k <= index); + } + private static CompletableFuture completeExceptionally(Exception e) { final CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); @@ -996,7 +1006,7 @@ public void notifyNotLeader(Collection pendingEntries) { @Override public CompletableFuture truncate(long index) { - stateMachineDataCache.removeIf(k -> k >= index); + stateMachineDataCache.removeIf(k -> k > index); return CompletableFuture.completedFuture(null); }