diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java index cb1a3f16a9c5..01d1a03d4340 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/FullTableCache.java @@ -19,16 +19,20 @@ package org.apache.hadoop.hdds.utils.db.cache; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -54,7 +58,8 @@ public class FullTableCache implements TableCache { private final Map, CacheValue> cache; private final NavigableMap>> epochEntries; - private final ExecutorService executorService; + private final ScheduledExecutorService executorService; + private final Queue epochCleanupQueue = new ConcurrentLinkedQueue<>(); private final ReadWriteLock lock; @@ -82,8 +87,8 @@ public FullTableCache(String threadNamePrefix) { .setDaemon(true) .setNameFormat(threadNamePrefix + "FullTableCache-Cleanup-%d") .build(); - executorService = Executors.newSingleThreadExecutor(threadFactory); - + executorService = Executors.newScheduledThreadPool(1, threadFactory); + executorService.scheduleWithFixedDelay(() -> cleanupTask(), 0, 1000L, TimeUnit.MILLISECONDS); statsRecorder = new CacheStatsRecorder(); } @@ -111,18 +116,31 @@ public void loadInitial(CacheKey key, CacheValue value) { @Override public void put(CacheKey cacheKey, CacheValue value) { try { - lock.writeLock().lock(); + lock.readLock().lock(); cache.put(cacheKey, value); - epochEntries.computeIfAbsent(value.getEpoch(), - v -> new CopyOnWriteArraySet<>()).add(cacheKey); + // add in case of null value for cleanup purpose only when key is deleted + if (value.getCacheValue() == null) { + epochEntries.computeIfAbsent(value.getEpoch(), + v -> new CopyOnWriteArraySet<>()).add(cacheKey); + } } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } } @Override public void cleanup(List epochs) { - executorService.execute(() -> evictCache(epochs)); + epochCleanupQueue.clear(); + epochCleanupQueue.addAll(epochs); + } + + private void cleanupTask() { + if (epochCleanupQueue.isEmpty()) { + return; + } + ArrayList epochList = new ArrayList<>(epochCleanupQueue); + epochCleanupQueue.removeAll(epochList); + evictCache(epochList); } @Override @@ -139,45 +157,48 @@ public Iterator, CacheValue>> iterator() { @VisibleForTesting @Override public void evictCache(List epochs) { + // when no delete entries, can exit immediately + if (epochEntries.isEmpty()) { + return; + } + Set> currentCacheKeys; CacheKey cachekey; long lastEpoch = epochs.get(epochs.size() - 1); - for (long currentEpoch : epochEntries.keySet()) { - currentCacheKeys = epochEntries.get(currentEpoch); + // Acquire lock to avoid race between cleanup and add to cache entry by + // client requests. + try { + lock.writeLock().lock(); + for (long currentEpoch : epochEntries.keySet()) { + currentCacheKeys = epochEntries.get(currentEpoch); - // If currentEntry epoch is greater than last epoch provided, we have - // deleted all entries less than specified epoch. So, we can break. - if (currentEpoch > lastEpoch) { - break; - } + // If currentEntry epoch is greater than last epoch provided, we have + // deleted all entries less than specified epoch. So, we can break. + if (currentEpoch > lastEpoch) { + break; + } - // Acquire lock to avoid race between cleanup and add to cache entry by - // client requests. - try { - lock.writeLock().lock(); - if (epochs.contains(currentEpoch)) { - for (Iterator> iterator = currentCacheKeys.iterator(); - iterator.hasNext();) { - cachekey = iterator.next(); - cache.computeIfPresent(cachekey, ((k, v) -> { - // If cache epoch entry matches with current Epoch, remove entry - // from cache. - if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) { - if (LOG.isDebugEnabled()) { - LOG.debug("CacheKey {} with epoch {} is removed from cache", - k.getCacheKey(), currentEpoch); - } - return null; + for (Iterator> iterator = currentCacheKeys.iterator(); + iterator.hasNext();) { + cachekey = iterator.next(); + cache.computeIfPresent(cachekey, ((k, v) -> { + // If cache epoch entry matches with current Epoch, remove entry + // from cache. + if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) { + if (LOG.isDebugEnabled()) { + LOG.debug("CacheKey {} with epoch {} is removed from cache", + k.getCacheKey(), currentEpoch); } - return v; - })); - } + return null; + } + return v; + })); // Remove epoch entry, as the entry is there in epoch list. epochEntries.remove(currentEpoch); } - } finally { - lock.writeLock().unlock(); } + } finally { + lock.writeLock().unlock(); } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java index aabc664140c2..3f645f90e700 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCache.java @@ -115,7 +115,12 @@ public void testTableCacheWithRenameKey(TableCache.CacheType cacheType) { // Epoch entries should be like (long, (key1, key2, ...)) // (0, (0A, 0B)) (1, (1A, 1B)) (2, (2A, 1B)) assertEquals(3, tableCache.getEpochEntries().size()); - assertEquals(2, tableCache.getEpochEntries().get(0L).size()); + if (cacheType == TableCache.CacheType.FULL_CACHE) { + // first time cache value is null for xA cases and non-null value for xB cases, so have 1 entry + assertEquals(1, tableCache.getEpochEntries().get(0L).size()); + } else { + assertEquals(2, tableCache.getEpochEntries().get(0L).size()); + } // Cache should be like (key, (cacheValue, long)) // (0A, (null, 0)) (0B, (0, 0)) @@ -227,9 +232,13 @@ public void testPartialTableCacheWithOverrideEntries( assertEquals(3, tableCache.size()); - // It will have 2 additional entries because we have 2 override entries. - assertEquals(3 + 2, - tableCache.getEpochEntries().size()); + if (cacheType == TableCache.CacheType.FULL_CACHE) { + // full table cache keep only deleted entry which is 0 + assertEquals(0, tableCache.getEpochEntries().size()); + } else { + // It will have 2 additional entries because we have 2 override entries. + assertEquals(3 + 2, tableCache.getEpochEntries().size()); + } // Now remove @@ -301,9 +310,13 @@ public void testPartialTableCacheWithOverrideAndDelete( assertEquals(3, tableCache.size()); - // It will have 4 additional entries because we have 4 override entries. - assertEquals(3 + 4, - tableCache.getEpochEntries().size()); + if (cacheType == TableCache.CacheType.FULL_CACHE) { + // It will have 2 deleted entries + assertEquals(2, tableCache.getEpochEntries().size()); + } else { + // It will have 4 additional entries because we have 4 override entries. + assertEquals(3 + 4, tableCache.getEpochEntries().size()); + } // Now remove @@ -506,7 +519,12 @@ public void testTableCacheWithNonConsecutiveEpochList( tableCache.evictCache(epochs); assertEquals(2, tableCache.size()); - assertEquals(2, tableCache.getEpochEntries().size()); + if (cacheType == TableCache.CacheType.FULL_CACHE) { + // no deleted entries + assertEquals(0, tableCache.getEpochEntries().size()); + } else { + assertEquals(2, tableCache.getEpochEntries().size()); + } assertNotNull(tableCache.get(new CacheKey<>(Long.toString(0)))); assertEquals(2,