Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

package org.apache.hadoop.hdds.utils.db.cache;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -54,7 +58,8 @@ public class FullTableCache<KEY, VALUE> implements TableCache<KEY, VALUE> {

private final Map<CacheKey<KEY>, CacheValue<VALUE>> cache;
private final NavigableMap<Long, Set<CacheKey<KEY>>> epochEntries;
private final ExecutorService executorService;
private final ScheduledExecutorService executorService;
private final Queue<Long> epochCleanupQueue = new ConcurrentLinkedQueue<>();

private final ReadWriteLock lock;

Expand Down Expand Up @@ -82,8 +87,8 @@ public FullTableCache(String threadNamePrefix) {
.setDaemon(true)
.setNameFormat(threadNamePrefix + "FullTableCache-Cleanup-%d")
.build();
executorService = Executors.newSingleThreadExecutor(threadFactory);

executorService = Executors.newScheduledThreadPool(1, threadFactory);
executorService.scheduleWithFixedDelay(() -> cleanupTask(), 0, 1000L, TimeUnit.MILLISECONDS);
statsRecorder = new CacheStatsRecorder();
}

Expand Down Expand Up @@ -111,18 +116,31 @@ public void loadInitial(CacheKey<KEY> key, CacheValue<VALUE> value) {
@Override
public void put(CacheKey<KEY> cacheKey, CacheValue<VALUE> value) {
try {
lock.writeLock().lock();
lock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there are no parallel calls to TableCache#put method. What will happen if someone makes a parallel put calls with same key? Will we end up in inconsistent state here with readLock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no use case, but if someone do parallel call, the last one will get updated as collection is concurrentSkipListMap and no corruption.
This is same case for PartialCacheTable where same logic, but no lock. Lock purpose was eviction issue as reported in HDDS-4583 Jira where target problem is mentioned. And hence this is not required.

cache.put(cacheKey, value);
epochEntries.computeIfAbsent(value.getEpoch(),
v -> new CopyOnWriteArraySet<>()).add(cacheKey);
// add in case of null value for cleanup purpose only when key is deleted
if (value.getCacheValue() == null) {
epochEntries.computeIfAbsent(value.getEpoch(),
v -> new CopyOnWriteArraySet<>()).add(cacheKey);
}
} finally {
lock.writeLock().unlock();
lock.readLock().unlock();
}
}

@Override
public void cleanup(List<Long> epochs) {
executorService.execute(() -> evictCache(epochs));
epochCleanupQueue.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen to the epochs that were there in the epochCleanupQueue which were not picked up by the cleanupTasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are giving preference to latest epoch, and since latest epoch is received, which means previous epoch is already notified. Using this, intermediate epoch is not required and in eviction logic, only latest is used to handle all previous epch and action for cleanup.
Check for sequential epoch is not required and logic is modified.

epochCleanupQueue.addAll(epochs);
}

private void cleanupTask() {
if (epochCleanupQueue.isEmpty()) {
return;
}
ArrayList<Long> epochList = new ArrayList<>(epochCleanupQueue);
epochCleanupQueue.removeAll(epochList);
evictCache(epochList);
}

@Override
Expand All @@ -139,45 +157,48 @@ public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> iterator() {
@VisibleForTesting
@Override
public void evictCache(List<Long> epochs) {
// when no delete entries, can exit immediately
if (epochEntries.isEmpty()) {
return;
}

Set<CacheKey<KEY>> currentCacheKeys;
CacheKey<KEY> cachekey;
long lastEpoch = epochs.get(epochs.size() - 1);
for (long currentEpoch : epochEntries.keySet()) {
currentCacheKeys = epochEntries.get(currentEpoch);
// Acquire lock to avoid race between cleanup and add to cache entry by
// client requests.
try {
lock.writeLock().lock();
for (long currentEpoch : epochEntries.keySet()) {
currentCacheKeys = epochEntries.get(currentEpoch);

// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
if (currentEpoch > lastEpoch) {
break;
}
// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
if (currentEpoch > lastEpoch) {
break;
}

// Acquire lock to avoid race between cleanup and add to cache entry by
// client requests.
try {
lock.writeLock().lock();
if (epochs.contains(currentEpoch)) {
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return null;
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return v;
}));
}
return null;
}
return v;
}));
// Remove epoch entry, as the entry is there in epoch list.
epochEntries.remove(currentEpoch);
}
} finally {
lock.writeLock().unlock();
}
} finally {
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ public void testTableCacheWithRenameKey(TableCache.CacheType cacheType) {
// Epoch entries should be like (long, (key1, key2, ...))
// (0, (0A, 0B)) (1, (1A, 1B)) (2, (2A, 1B))
assertEquals(3, tableCache.getEpochEntries().size());
assertEquals(2, tableCache.getEpochEntries().get(0L).size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// first time cache value is null for xA cases and non-null value for xB cases, so have 1 entry
assertEquals(1, tableCache.getEpochEntries().get(0L).size());
} else {
assertEquals(2, tableCache.getEpochEntries().get(0L).size());
}

// Cache should be like (key, (cacheValue, long))
// (0A, (null, 0)) (0B, (0, 0))
Expand Down Expand Up @@ -227,9 +232,13 @@ public void testPartialTableCacheWithOverrideEntries(


assertEquals(3, tableCache.size());
// It will have 2 additional entries because we have 2 override entries.
assertEquals(3 + 2,
tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// full table cache keep only deleted entry which is 0
assertEquals(0, tableCache.getEpochEntries().size());
} else {
// It will have 2 additional entries because we have 2 override entries.
assertEquals(3 + 2, tableCache.getEpochEntries().size());
}

// Now remove

Expand Down Expand Up @@ -301,9 +310,13 @@ public void testPartialTableCacheWithOverrideAndDelete(


assertEquals(3, tableCache.size());
// It will have 4 additional entries because we have 4 override entries.
assertEquals(3 + 4,
tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// It will have 2 deleted entries
assertEquals(2, tableCache.getEpochEntries().size());
} else {
// It will have 4 additional entries because we have 4 override entries.
assertEquals(3 + 4, tableCache.getEpochEntries().size());
}

// Now remove

Expand Down Expand Up @@ -506,7 +519,12 @@ public void testTableCacheWithNonConsecutiveEpochList(
tableCache.evictCache(epochs);

assertEquals(2, tableCache.size());
assertEquals(2, tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// no deleted entries
assertEquals(0, tableCache.getEpochEntries().size());
} else {
assertEquals(2, tableCache.getEpochEntries().size());
}

assertNotNull(tableCache.get(new CacheKey<>(Long.toString(0))));
assertEquals(2,
Expand Down