Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1462,10 +1462,6 @@ public static boolean isKeyDeleted(String key, Table keyTable) {
&& omKeyInfoCacheValue.getCacheValue() == null;
}

public static boolean isKeyInCache(String key, Table keyTable) {
return keyTable.getCacheValue(new CacheKey(key)) != null;
}

/**
* Helper function for listStatus to find key in TableCache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.TreeMap;
import java.util.function.Predicate;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

Expand Down Expand Up @@ -117,14 +118,17 @@ public static class DbTableIter<Value> implements
? extends Table.KeyValue<String, Value>> tableIterator;

private final Table<String, Value> table;
private HeapEntry currentKey;
private HeapEntry currentEntry;
private Predicate<String> doesKeyExistInCache;

DbTableIter(int entryIteratorId, Table<String, Value> table,
String prefixKey, String startKey) throws IOException {
String prefixKey, String startKey,
Predicate<String> doesKeyExistInCache) throws IOException {
this.entryIteratorId = entryIteratorId;
this.table = table;
this.tableIterator = table.iterator(prefixKey);
this.currentKey = null;
this.currentEntry = null;
this.doesKeyExistInCache = doesKeyExistInCache;

// only seek for the start key if the start key is lexicographically
// after the prefix key. For example
Expand All @@ -141,11 +145,11 @@ public static class DbTableIter<Value> implements
}

private void getNextKey() throws IOException {
while (tableIterator.hasNext() && currentKey == null) {
while (tableIterator.hasNext() && currentEntry == null) {
Table.KeyValue<String, Value> entry = tableIterator.next();
String entryKey = entry.getKey();
if (!KeyManagerImpl.isKeyInCache(entryKey, table)) {
currentKey = new HeapEntry(entryIteratorId,
if (!doesKeyExistInCache.test(entryKey)) {
currentEntry = new HeapEntry(entryIteratorId,
table.getName(), entryKey, entry.getValue());
}
}
Expand All @@ -157,13 +161,13 @@ public boolean hasNext() {
} catch (IOException t) {
throw new UncheckedIOException(t);
}
return currentKey != null;
return currentEntry != null;
}

public HeapEntry next() {
if (hasNext()) {
HeapEntry ret = currentKey;
currentKey = null;
HeapEntry ret = currentEntry;
currentEntry = null;
return ret;
}
throw new NoSuchElementException();
Expand All @@ -186,23 +190,21 @@ public static class CacheIter<Value>
private final String prefixKey;
private final String startKey;
private final String tableName;

private final int entryIteratorId;

CacheIter(int entryIteratorId, String tableName,
Iterator<Map.Entry<CacheKey<String>,
CacheValue<Value>>> cacheIter, String startKey,
String prefixKey) {
this.cacheKeyMap = new TreeMap<>();

this.startKey = startKey;
this.prefixKey = prefixKey;
this.tableName = tableName;
this.entryIteratorId = entryIteratorId;

populateCacheMap(cacheIter);

cacheCreatedKeyIter = cacheKeyMap.entrySet().iterator();
cacheCreatedKeyIter = cacheKeyMap.entrySet().stream().filter(e -> e.getValue() != null).iterator();
}

private void populateCacheMap(Iterator<Map.Entry<CacheKey<String>,
Expand Down Expand Up @@ -236,6 +238,10 @@ private void populateCacheMap(Iterator<Map.Entry<CacheKey<String>,
}
}

public boolean doesKeyExistInCache(String key) {
return cacheKeyMap.containsKey(key);
}

public boolean hasNext() {
return cacheCreatedKeyIter.hasNext();
}
Expand Down Expand Up @@ -292,11 +298,13 @@ public static class MinHeapIterator implements ClosableIterator {
try {
int iteratorId = 0;
for (Table table : tables) {
iterators.add(new CacheIter<>(iteratorId, table.getName(),
table.cacheIterator(), startKey, prefixKey));
CacheIter cacheIter = new CacheIter<>(iteratorId, table.getName(),
table.cacheIterator(), startKey, prefixKey);
Predicate<String> doesKeyExistInCache = cacheIter::doesKeyExistInCache;
iterators.add(cacheIter);
iteratorId++;
iterators.add(new DbTableIter<>(iteratorId, table, prefixKey,
startKey));
startKey, doesKeyExistInCache));
iteratorId++;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1372,8 +1372,8 @@ public List<SnapshotInfo> listSnapshot(
bucketName, snapshotInfoTable)) {
try {
while (snapshotIterator.hasNext() && maxListResult > 0) {
SnapshotInfo snapshotInfo = (SnapshotInfo) snapshotIterator.next()
.getValue();
SnapshotInfo snapshotInfo =
(SnapshotInfo) snapshotIterator.next().getValue();
if (!snapshotInfo.getName().equals(prevSnapshot)) {
snapshotInfos.add(snapshotInfo);
maxListResult--;
Expand Down