Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.om.snapshot;

import java.util.Map;
import java.util.Optional;

import org.apache.hadoop.util.ClosableIterator;

/**
Expand All @@ -32,5 +34,10 @@ public interface PersistentMap<K, V> {

void remove(K key);

ClosableIterator<Map.Entry<K, V>> iterator();
default ClosableIterator<Map.Entry<K, V>> iterator() {
return this.iterator(Optional.empty(), Optional.empty());
}

ClosableIterator<Map.Entry<K, V>> iterator(Optional<K> lowerBoundKey,
Optional<K> upperBoundKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;

import java.util.NoSuchElementException;

import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
import org.apache.hadoop.util.ClosableIterator;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
Expand All @@ -39,11 +44,11 @@ public class RocksDbPersistentMap<K, V> implements PersistentMap<K, V> {
private final Class<K> keyType;
private final Class<V> valueType;

public RocksDbPersistentMap(ManagedRocksDB db,
ColumnFamilyHandle columnFamilyHandle,
CodecRegistry codecRegistry,
Class<K> keyType,
Class<V> valueType) {
public RocksDbPersistentMap(@Nonnull ManagedRocksDB db,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull CodecRegistry codecRegistry,
@Nonnull Class<K> keyType,
@Nonnull Class<V> valueType) {
this.db = db;
this.columnFamilyHandle = columnFamilyHandle;
this.codecRegistry = codecRegistry;
Expand Down Expand Up @@ -87,9 +92,36 @@ public void remove(K key) {
}

@Override
public ClosableIterator<Map.Entry<K, V>> iterator() {
ManagedRocksIterator iterator =
new ManagedRocksIterator(db.get().newIterator(columnFamilyHandle));
public ClosableIterator<Map.Entry<K, V>> iterator(Optional<K> lowerBound,
Optional<K> upperBound) {
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final ManagedReadOptions readOptions = new ManagedReadOptions();
ManagedRocksIterator iterator;
final ManagedSlice lowerBoundSlice;
final ManagedSlice upperBoundSlice;
try {
if (lowerBound.isPresent()) {
lowerBoundSlice = new ManagedSlice(
codecRegistry.asRawData(lowerBound.get()));
readOptions.setIterateLowerBound(lowerBoundSlice);
} else {
lowerBoundSlice = null;
}

if (upperBound.isPresent()) {
upperBoundSlice = new ManagedSlice(
codecRegistry.asRawData(upperBound.get()));
readOptions.setIterateUpperBound(upperBoundSlice);
} else {
upperBoundSlice = null;
}
} catch (IOException exception) {
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(exception);
}

iterator = ManagedRocksIterator.managed(
db.get().newIterator(columnFamilyHandle, readOptions));

iterator.get().seekToFirst();

return new ClosableIterator<Map.Entry<K, V>>() {
Expand Down Expand Up @@ -138,6 +170,13 @@ public V setValue(V value) {
@Override
public void close() {
iterator.close();
readOptions.close();
if (upperBoundSlice != null) {
upperBoundSlice.close();
}
if (lowerBoundSlice != null) {
lowerBoundSlice.close();
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,20 @@ private Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
.getPath(), tablesToLookUp);
}

/**
* Gets the report key for a particular index of snapshot diff job.
* @param jobId Snapshot diff jobId
* @param index
* @return report Key of the snapshot diff job
*/

static String getReportKeyForIndex(String jobId, long index) {
return new StringBuilder(jobId.length() + 21)
.append(jobId).append(DELIMITER)
.append(org.apache.commons.lang3.StringUtils.leftPad(
String.valueOf(index), 20, '0')).toString();
}

public SnapshotDiffResponse cancelSnapshotDiff(
final String volumeName,
final String bucketName,
Expand Down Expand Up @@ -553,24 +567,34 @@ SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob,

boolean hasMoreEntries = true;

int idx;
for (idx = index; idx - index < pageSize; idx++) {
byte[] rawKey =
codecRegistry.asRawData(snapDiffJob.getJobId() + DELIMITER + idx);
byte[] bytes = snapDiffReportTable.get(rawKey);
if (bytes == null) {
byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex(
snapDiffJob.getJobId(), index));
byte[] upperIndex = codecRegistry.asRawData(getReportKeyForIndex(
snapDiffJob.getJobId(), index + pageSize));
int idx = index;
try (ClosableIterator<Map.Entry<byte[], byte[]>> iterator =
snapDiffReportTable.iterator(Optional.of(lowerIndex),
Optional.of(upperIndex))) {
int itemsFetched = 0;
while (iterator.hasNext() && itemsFetched < pageSize) {
Map.Entry<byte[], byte[]> entry = iterator.next();
byte[] bytes = entry.getValue();
diffReportList.add(codecRegistry.asObject(bytes,
DiffReportEntry.class));
idx += 1;
itemsFetched += 1;
}
if (diffReportList.size() < pageSize) {
hasMoreEntries = false;
break;
}
diffReportList.add(codecRegistry.asObject(bytes, DiffReportEntry.class));
}

String tokenString = hasMoreEntries ? String.valueOf(idx) : null;
String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null;

checkReportsIntegrity(snapDiffJob, index, diffReportList.size());

return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName,
fromSnapshotName, toSnapshotName, diffReportList, tokenString);
fromSnapshotName, toSnapshotName, diffReportList, nextTokenString);
}

/**
Expand Down Expand Up @@ -1349,10 +1373,8 @@ private long addToReport(String jobId, long index,
try (ClosableIterator<byte[]>
diffReportIterator = diffReportEntries.iterator()) {
while (diffReportIterator.hasNext()) {

snapDiffReportTable.put(
codecRegistry.asRawData(jobId + DELIMITER + index),
diffReportIterator.next());
snapDiffReportTable.put(codecRegistry.asRawData(
getReportKeyForIndex(jobId, index)), diffReportIterator.next());
index++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand Down Expand Up @@ -88,7 +89,7 @@ public T next() {
public static class StubbedPersistentMap<K, V> implements
PersistentMap<K, V> {

private final Map<K, V> map;
private final TreeMap<K, V> map;

public StubbedPersistentMap(Map<K, V> map) {
this();
Expand Down Expand Up @@ -117,9 +118,16 @@ public void remove(K key) {
}

@Override
public ClosableIterator<Map.Entry<K, V>> iterator() {
return new StubbedCloseableIterator<>(
this.map.entrySet().stream().iterator());
public ClosableIterator<Map.Entry<K, V>> iterator(
Optional<K> lowerBoundKey, Optional<K> upperBoundKey) {
return new StubbedCloseableIterator<>(this.map.entrySet().stream().filter(
kvEntry ->
lowerBoundKey.map(k -> this.map.comparator()
.compare(kvEntry.getKey(), k) >= 0).orElse(Boolean.TRUE)
&&
upperBoundKey.map(k -> this.map.comparator()
.compare(kvEntry.getKey(), k) < 0).orElse(true))
.iterator());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,27 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
Expand All @@ -50,6 +62,8 @@ public class TestRocksDbPersistentMap {
private static ManagedDBOptions dbOptions;
private static ManagedColumnFamilyOptions columnFamilyOptions;

private static AtomicInteger id;

@BeforeAll
public static void staticInit() throws RocksDBException {
dbOptions = new ManagedDBOptions();
Expand All @@ -74,10 +88,11 @@ public static void staticInit() throws RocksDBException {

db = ManagedRocksDB.open(dbOptions, absolutePath, columnFamilyDescriptors,
columnFamilyHandles);
id = new AtomicInteger(0);
}

@AfterEach
public void teardown() throws RocksDBException {
@AfterAll
public static void teardown() throws RocksDBException {
if (dbOptions != null) {
dbOptions.close();
}
Expand All @@ -97,7 +112,8 @@ public void testRocksDBPersistentMap() throws IOException, RocksDBException {
try {
final CodecRegistry codecRegistry = CodecRegistry.newBuilder().build();
columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor(
codecRegistry.asRawData("testMap"), columnFamilyOptions));
codecRegistry.asRawData("testMap" + id.incrementAndGet()),
columnFamilyOptions));

PersistentMap<String, String> persistentMap = new RocksDbPersistentMap<>(
db,
Expand Down Expand Up @@ -131,4 +147,93 @@ public void testRocksDBPersistentMap() throws IOException, RocksDBException {
}
}
}

/**
* Test cases for testRocksDBPersistentMapIterator.
*/
private static Stream<Arguments> rocksDBPersistentMapIteratorCases() {
return Stream.of(
Arguments.of(
Optional.empty(),
Optional.of("key202"),
Stream.concat(IntStream.range(0, 100).boxed(),
IntStream.range(200, 300).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList()),
Stream.concat(IntStream.range(0, 100).boxed(),
IntStream.range(200, 202).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList())),
Arguments.of(Optional.of("key050"),
Optional.empty(), Stream.concat(IntStream.range(50, 100).boxed(),
IntStream.range(200, 300).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList()),
Stream.concat(IntStream.range(50, 100).boxed(),
IntStream.range(200, 300).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList())
),
Arguments.of(Optional.of("key050"),
Optional.of("key210"),
Stream.concat(IntStream.range(50, 100).boxed(),
IntStream.range(200, 300).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList()),
Stream.concat(IntStream.range(50, 100).boxed(),
IntStream.range(200, 210).boxed())
.map(i -> Pair.of(String.format("key%03d", i),
String.format("value%03d", i)))
.collect(Collectors.toList())
));
}

@ParameterizedTest
@MethodSource("rocksDBPersistentMapIteratorCases")
public void testRocksDBPersistentMapIterator(Optional<String> lowerBound,
Optional<String> upperBound, List<Pair<String, String>> keys,
List<Pair<String, String>> expectedKeys)
throws IOException, RocksDBException {
ColumnFamilyHandle columnFamily = null;
try {
final CodecRegistry codecRegistry = CodecRegistry.newBuilder().build();
columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor(
codecRegistry.asRawData("testMap" + id.incrementAndGet()),
columnFamilyOptions));

PersistentMap<String, String> persistentMap = new RocksDbPersistentMap<>(
db,
columnFamily,
codecRegistry,
String.class,
String.class
);

for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i).getKey();
String value = keys.get(i).getValue();
persistentMap.put(key, value);
}
ClosableIterator<Map.Entry<String, String>> iterator =
persistentMap.iterator(lowerBound, upperBound);
int idx = 0;
while (iterator.hasNext()) {
Map.Entry<String, String> e = iterator.next();
Assertions.assertEquals(Pair.of(e.getKey(), e.getValue()),
expectedKeys.get(idx));
idx += 1;
}

} finally {
if (columnFamily != null) {
db.get().dropColumnFamily(columnFamily);
columnFamily.close();
}
}
}
}
Loading