diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java index b9a2f87a03da..9b9719386b39 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hdds.fs; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; +import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.AutoCloseableReadWriteLock; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -46,12 +47,16 @@ public class CachingSpaceUsageSource implements SpaceUsageSource { LoggerFactory.getLogger(CachingSpaceUsageSource.class); private final ScheduledExecutorService executor; - private final AtomicLong cachedValue = new AtomicLong(); + private final AutoCloseableReadWriteLock lock; + private long cachedUsedSpace; + private long cachedAvailable; + private long cachedCapacity; private final Duration refresh; private final SpaceUsageSource source; private final SpaceUsagePersistence persistence; private boolean running; - private ScheduledFuture scheduledFuture; + private ScheduledFuture updateUsedSpaceFuture; + private ScheduledFuture updateAvailableFuture; private final AtomicBoolean isRefreshRunning; public CachingSpaceUsageSource(SpaceUsageCheckParams params) { @@ -60,15 +65,16 @@ public CachingSpaceUsageSource(SpaceUsageCheckParams params) { CachingSpaceUsageSource(SpaceUsageCheckParams params, ScheduledExecutorService executor) { - Preconditions.checkArgument(params != null, "params == null"); + Preconditions.assertNotNull(params, "params == null"); refresh = params.getRefresh(); source = params.getSource(); + lock = new AutoCloseableReadWriteLock(source.toString()); persistence = params.getPersistence(); this.executor = executor; isRefreshRunning = new AtomicBoolean(); - Preconditions.checkArgument(refresh.isZero() == (executor == null), + Preconditions.assertTrue(refresh.isZero() == (executor == null), "executor should be provided if and only if refresh is requested"); loadInitialValue(); @@ -76,45 +82,81 @@ public CachingSpaceUsageSource(SpaceUsageCheckParams params) { @Override public long getCapacity() { - return source.getCapacity(); + try (AutoCloseableLock ignored = lock.readLock(null, null)) { + return cachedCapacity; + } } @Override public long getAvailable() { - return source.getAvailable(); + try (AutoCloseableLock ignored = lock.readLock(null, null)) { + return cachedAvailable; + } } @Override public long getUsedSpace() { - return cachedValue.get(); + try (AutoCloseableLock ignored = lock.readLock(null, null)) { + return cachedUsedSpace; + } + } + + @Override + public SpaceUsageSource snapshot() { + try (AutoCloseableLock ignored = lock.readLock(null, null)) { + return new Fixed(cachedCapacity, cachedAvailable, cachedUsedSpace); + } } public void incrementUsedSpace(long usedSpace) { - cachedValue.addAndGet(usedSpace); + if (usedSpace == 0) { + return; + } + Preconditions.assertTrue(usedSpace > 0, () -> usedSpace + " < 0"); + final long current, change; + try (AutoCloseableLock ignored = lock.writeLock(null, null)) { + current = cachedAvailable; + change = Math.min(current, usedSpace); + cachedAvailable -= change; + cachedUsedSpace += change; + } + + if (change != usedSpace) { + LOG.warn("Attempted to decrement available space to a negative value. Current: {}, Decrement: {}, Source: {}", + current, usedSpace, source); + } } public void decrementUsedSpace(long reclaimedSpace) { - cachedValue.updateAndGet(current -> { - long newValue = current - reclaimedSpace; - if (newValue < 0) { - if (current > 0) { - LOG.warn("Attempted to decrement used space to a negative value. " + - "Current: {}, Decrement: {}, Source: {}", - current, reclaimedSpace, source); - } - return 0; - } else { - return newValue; - } - }); + if (reclaimedSpace == 0) { + return; + } + Preconditions.assertTrue(reclaimedSpace > 0, () -> reclaimedSpace + " < 0"); + final long current, change; + try (AutoCloseableLock ignored = lock.writeLock(null, null)) { + current = cachedUsedSpace; + change = Math.min(current, reclaimedSpace); + cachedUsedSpace -= change; + cachedAvailable += change; + } + + if (change != reclaimedSpace) { + LOG.warn("Attempted to decrement used space to a negative value. Current: {}, Decrement: {}, Source: {}", + current, reclaimedSpace, source); + } } public void start() { if (executor != null) { - long initialDelay = cachedValue.get() > 0 ? refresh.toMillis() : 0; + long initialDelay = getUsedSpace() > 0 ? refresh.toMillis() : 0; if (!running) { - scheduledFuture = executor.scheduleWithFixedDelay( + updateUsedSpaceFuture = executor.scheduleWithFixedDelay( this::refresh, initialDelay, refresh.toMillis(), MILLISECONDS); + + long availableUpdateDelay = Math.min(refresh.toMillis(), Duration.ofMinutes(1).toMillis()); + updateAvailableFuture = executor.scheduleWithFixedDelay( + this::updateAvailable, availableUpdateDelay, availableUpdateDelay, MILLISECONDS); + running = true; } } else { @@ -126,8 +168,13 @@ public void shutdown() { persistence.save(this); // save cached value if (executor != null) { - if (running && scheduledFuture != null) { - scheduledFuture.cancel(true); + if (running) { + if (updateUsedSpaceFuture != null) { + updateUsedSpaceFuture.cancel(true); + } + if (updateAvailableFuture != null) { + updateAvailableFuture.cancel(true); + } } running = false; @@ -135,21 +182,48 @@ public void shutdown() { } } + /** Schedule immediate refresh. */ public void refreshNow() { - //refresh immediately executor.schedule(this::refresh, 0, MILLISECONDS); } + /** Loads {@code usedSpace} value from persistent source, if present. + * Also updates {@code available} and {@code capacity} from the {@code source}. */ private void loadInitialValue() { final OptionalLong initialValue = persistence.load(); - initialValue.ifPresent(cachedValue::set); + updateCachedValues(initialValue.orElse(0)); + } + + /** Updates {@code available} and {@code capacity} from the {@code source}. */ + private void updateAvailable() { + final long capacity = source.getCapacity(); + final long available = source.getAvailable(); + + try (AutoCloseableLock ignored = lock.writeLock(null, null)) { + cachedAvailable = available; + cachedCapacity = capacity; + } + } + + /** Updates {@code available} and {@code capacity} from the {@code source}, + * sets {@code usedSpace} to the specified {@code used} value. */ + private void updateCachedValues(long used) { + final long capacity = source.getCapacity(); + final long available = source.getAvailable(); + + try (AutoCloseableLock ignored = lock.writeLock(null, null)) { + cachedAvailable = available; + cachedCapacity = capacity; + cachedUsedSpace = used; + } } + /** Refreshes all 3 values. */ private void refresh() { //only one `refresh` can be running at a certain moment if (isRefreshRunning.compareAndSet(false, true)) { try { - cachedValue.set(source.getUsedSpace()); + updateCachedValues(source.getUsedSpace()); } catch (RuntimeException e) { LOG.warn("Error refreshing space usage for {}", source, e); } finally { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java index 8523861000e7..b84ca130f27c 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java @@ -36,19 +36,20 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** * Tests for {@link CachingSpaceUsageSource}. */ -public class TestCachingSpaceUsageSource { +class TestCachingSpaceUsageSource { @TempDir private static File dir; @Test - public void providesInitialValueUntilStarted() { + void providesInitialValueUntilStarted() { final long initialValue = validInitialValue(); SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(initialValue)) .withRefresh(Duration.ZERO) @@ -57,10 +58,11 @@ public void providesInitialValueUntilStarted() { SpaceUsageSource subject = new CachingSpaceUsageSource(params); assertEquals(initialValue, subject.getUsedSpace()); + assertAvailableWasUpdated(params.getSource(), subject); } @Test - public void ignoresMissingInitialValue() { + void ignoresMissingInitialValue() { SpaceUsageCheckParams params = paramsBuilder() .withRefresh(Duration.ZERO) .build(); @@ -68,10 +70,11 @@ public void ignoresMissingInitialValue() { SpaceUsageSource subject = new CachingSpaceUsageSource(params); assertEquals(0, subject.getUsedSpace()); + assertAvailableWasUpdated(params.getSource(), subject); } @Test - public void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() { + void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() { AtomicLong savedValue = new AtomicLong(validInitialValue()); SpaceUsageCheckParams params = paramsBuilder(savedValue) .withRefresh(Duration.ZERO).build(); @@ -79,11 +82,11 @@ public void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() { CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params); subject.start(); - assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject); + assertSubjectWasRefreshed(params.getSource(), subject); } @Test - public void schedulesRefreshWithDelayIfConfigured() { + void schedulesRefreshWithDelayIfConfigured() { long initialValue = validInitialValue(); AtomicLong savedValue = new AtomicLong(initialValue); SpaceUsageCheckParams params = paramsBuilder(savedValue) @@ -96,13 +99,13 @@ public void schedulesRefreshWithDelayIfConfigured() { subject.start(); verifyRefreshWasScheduled(executor, refresh.toMillis(), refresh); - assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject); + assertSubjectWasRefreshed(params.getSource(), subject); assertEquals(initialValue, savedValue.get(), "value should not have been saved to file yet"); } @Test - public void schedulesImmediateRefreshIfInitialValueMissing() { + void schedulesImmediateRefreshIfInitialValueMissing() { final long initialValue = missingInitialValue(); AtomicLong savedValue = new AtomicLong(initialValue); SpaceUsageCheckParams params = paramsBuilder(savedValue).build(); @@ -113,13 +116,13 @@ public void schedulesImmediateRefreshIfInitialValueMissing() { subject.start(); verifyRefreshWasScheduled(executor, 0L, params.getRefresh()); - assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject); + assertSubjectWasRefreshed(params.getSource(), subject); assertEquals(initialValue, savedValue.get(), "value should not have been saved to file yet"); } @Test - public void savesValueOnShutdown() { + void savesValueOnShutdown() { AtomicLong savedValue = new AtomicLong(validInitialValue()); SpaceUsageSource source = mock(SpaceUsageSource.class); final long usedSpace = 4L; @@ -138,22 +141,44 @@ public void savesValueOnShutdown() { "value should have been saved to file"); assertEquals(usedSpace, subject.getUsedSpace(), "no further updates from source expected"); - verify(future).cancel(true); + verify(future, times(2)).cancel(true); verify(executor).shutdown(); } @Test - public void testDecrementDoesNotGoNegative() { + void decrementUsedSpaceMoreThanCurrent() { SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50)) .withRefresh(Duration.ZERO) .build(); CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params); + SpaceUsageSource original = subject.snapshot(); // Try to decrement more than the current value - subject.decrementUsedSpace(100); + final long change = original.getUsedSpace() * 2; + subject.decrementUsedSpace(change); - // Check that the value has been set to 0 + // should not drop below 0 assertEquals(0, subject.getUsedSpace()); + // available and used change by same amount (in opposite directions) + assertEquals(original.getAvailable() + original.getUsedSpace(), subject.getAvailable()); + } + + @Test + void decrementAvailableSpaceMoreThanCurrent() { + SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50)) + .withRefresh(Duration.ZERO) + .build(); + CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params); + SpaceUsageSource original = subject.snapshot(); + + // Try to decrement more than the current value + final long change = original.getAvailable() * 2; + subject.incrementUsedSpace(change); + + // should not drop below 0 + assertEquals(0, subject.getAvailable()); + // available and used change by same amount (in opposite directions) + assertEquals(original.getUsedSpace() + original.getAvailable(), subject.getUsedSpace()); } private static long missingInitialValue() { @@ -197,14 +222,29 @@ private static void verifyRefreshWasScheduled( ScheduledExecutorService executor, long expectedInitialDelay, Duration refresh) { + // refresh usedSpace verify(executor).scheduleWithFixedDelay(any(), eq(expectedInitialDelay), eq(refresh.toMillis()), eq(TimeUnit.MILLISECONDS)); + + // update available/capacity + final long oneMinute = Duration.ofMinutes(1).toMillis(); + final long delay = Math.min(refresh.toMillis(), oneMinute); + verify(executor).scheduleWithFixedDelay(any(), eq(delay), + eq(delay), eq(TimeUnit.MILLISECONDS)); + } + + private static void assertAvailableWasUpdated(SpaceUsageSource source, + SpaceUsageSource subject) { + + assertEquals(source.getCapacity(), subject.getCapacity()); + assertEquals(source.getAvailable(), subject.getAvailable()); } - private static void assertSubjectWasRefreshed(long expected, + private static void assertSubjectWasRefreshed(SpaceUsageSource source, SpaceUsageSource subject) { - assertEquals(expected, subject.getUsedSpace(), + assertAvailableWasUpdated(source, subject); + assertEquals(source.getUsedSpace(), subject.getUsedSpace(), "subject should have been refreshed"); }