Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hdds.fs;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,7 +33,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -46,12 +47,16 @@ public class CachingSpaceUsageSource implements SpaceUsageSource {
LoggerFactory.getLogger(CachingSpaceUsageSource.class);

private final ScheduledExecutorService executor;
private final AtomicLong cachedValue = new AtomicLong();
private final AutoCloseableReadWriteLock lock;
private long cachedUsedSpace;
private long cachedAvailable;
private long cachedCapacity;
private final Duration refresh;
private final SpaceUsageSource source;
private final SpaceUsagePersistence persistence;
private boolean running;
private ScheduledFuture<?> scheduledFuture;
private ScheduledFuture<?> updateUsedSpaceFuture;
private ScheduledFuture<?> updateAvailableFuture;
private final AtomicBoolean isRefreshRunning;

public CachingSpaceUsageSource(SpaceUsageCheckParams params) {
Expand All @@ -60,61 +65,98 @@ public CachingSpaceUsageSource(SpaceUsageCheckParams params) {

CachingSpaceUsageSource(SpaceUsageCheckParams params,
ScheduledExecutorService executor) {
Preconditions.checkArgument(params != null, "params == null");
Preconditions.assertNotNull(params, "params == null");

refresh = params.getRefresh();
source = params.getSource();
lock = new AutoCloseableReadWriteLock(source.toString());
persistence = params.getPersistence();
this.executor = executor;
isRefreshRunning = new AtomicBoolean();

Preconditions.checkArgument(refresh.isZero() == (executor == null),
Preconditions.assertTrue(refresh.isZero() == (executor == null),
"executor should be provided if and only if refresh is requested");

loadInitialValue();
}

@Override
public long getCapacity() {
return source.getCapacity();
try (AutoCloseableLock ignored = lock.readLock(null, null)) {
return cachedCapacity;
}
}

@Override
public long getAvailable() {
return source.getAvailable();
try (AutoCloseableLock ignored = lock.readLock(null, null)) {
return cachedAvailable;
}
}

@Override
public long getUsedSpace() {
return cachedValue.get();
try (AutoCloseableLock ignored = lock.readLock(null, null)) {
return cachedUsedSpace;
}
}

@Override
public SpaceUsageSource snapshot() {
try (AutoCloseableLock ignored = lock.readLock(null, null)) {
return new Fixed(cachedCapacity, cachedAvailable, cachedUsedSpace);
}
}

public void incrementUsedSpace(long usedSpace) {
cachedValue.addAndGet(usedSpace);
if (usedSpace == 0) {
return;
}
Preconditions.assertTrue(usedSpace > 0, () -> usedSpace + " < 0");
final long current, change;
try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
current = cachedAvailable;
change = Math.min(current, usedSpace);
cachedAvailable -= change;
cachedUsedSpace += change;
}

if (change != usedSpace) {
LOG.warn("Attempted to decrement available space to a negative value. Current: {}, Decrement: {}, Source: {}",
current, usedSpace, source);
}
}

public void decrementUsedSpace(long reclaimedSpace) {
cachedValue.updateAndGet(current -> {
long newValue = current - reclaimedSpace;
if (newValue < 0) {
if (current > 0) {
LOG.warn("Attempted to decrement used space to a negative value. " +
"Current: {}, Decrement: {}, Source: {}",
current, reclaimedSpace, source);
}
return 0;
} else {
return newValue;
}
});
if (reclaimedSpace == 0) {
return;
}
Preconditions.assertTrue(reclaimedSpace > 0, () -> reclaimedSpace + " < 0");
final long current, change;
try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
current = cachedUsedSpace;
change = Math.min(current, reclaimedSpace);
cachedUsedSpace -= change;
cachedAvailable += change;
}

if (change != reclaimedSpace) {
LOG.warn("Attempted to decrement used space to a negative value. Current: {}, Decrement: {}, Source: {}",
current, reclaimedSpace, source);
}
}

public void start() {
if (executor != null) {
long initialDelay = cachedValue.get() > 0 ? refresh.toMillis() : 0;
long initialDelay = getUsedSpace() > 0 ? refresh.toMillis() : 0;
if (!running) {
scheduledFuture = executor.scheduleWithFixedDelay(
updateUsedSpaceFuture = executor.scheduleWithFixedDelay(
this::refresh, initialDelay, refresh.toMillis(), MILLISECONDS);

long availableUpdateDelay = Math.min(refresh.toMillis(), Duration.ofMinutes(1).toMillis());
updateAvailableFuture = executor.scheduleWithFixedDelay(
this::updateAvailable, availableUpdateDelay, availableUpdateDelay, MILLISECONDS);

running = true;
}
} else {
Expand All @@ -126,30 +168,62 @@ public void shutdown() {
persistence.save(this); // save cached value

if (executor != null) {
if (running && scheduledFuture != null) {
scheduledFuture.cancel(true);
if (running) {
if (updateUsedSpaceFuture != null) {
updateUsedSpaceFuture.cancel(true);
}
if (updateAvailableFuture != null) {
updateAvailableFuture.cancel(true);
}
}
running = false;

executor.shutdown();
}
}

/** Schedule immediate refresh. */
public void refreshNow() {
//refresh immediately
executor.schedule(this::refresh, 0, MILLISECONDS);
}

/** Loads {@code usedSpace} value from persistent source, if present.
* Also updates {@code available} and {@code capacity} from the {@code source}. */
private void loadInitialValue() {
final OptionalLong initialValue = persistence.load();
initialValue.ifPresent(cachedValue::set);
updateCachedValues(initialValue.orElse(0));
}

/** Updates {@code available} and {@code capacity} from the {@code source}. */
private void updateAvailable() {
final long capacity = source.getCapacity();
final long available = source.getAvailable();

try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
cachedAvailable = available;
cachedCapacity = capacity;
}
}

/** Updates {@code available} and {@code capacity} from the {@code source},
* sets {@code usedSpace} to the specified {@code used} value. */
private void updateCachedValues(long used) {
final long capacity = source.getCapacity();
final long available = source.getAvailable();

try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
cachedAvailable = available;
cachedCapacity = capacity;
cachedUsedSpace = used;
}
}

/** Refreshes all 3 values. */
private void refresh() {
//only one `refresh` can be running at a certain moment
if (isRefreshRunning.compareAndSet(false, true)) {
try {
cachedValue.set(source.getUsedSpace());
updateCachedValues(source.getUsedSpace());
} catch (RuntimeException e) {
LOG.warn("Error refreshing space usage for {}", source, e);
} finally {
Expand Down
Loading