diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java index 12b1cbb3a001..1a0bc76794fd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.om.snapshot; import java.util.Map; +import java.util.Optional; + import org.apache.hadoop.util.ClosableIterator; /** @@ -32,5 +34,10 @@ public interface PersistentMap { void remove(K key); - ClosableIterator> iterator(); + default ClosableIterator> iterator() { + return this.iterator(Optional.empty(), Optional.empty()); + } + + ClosableIterator> iterator(Optional lowerBoundKey, + Optional upperBoundKey); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java index 9cf8a59eedcd..4387baa521a4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java @@ -20,11 +20,16 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; +import javax.annotation.Nonnull; + import java.util.NoSuchElementException; import org.apache.hadoop.hdds.utils.db.CodecRegistry; +import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.util.ClosableIterator; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -39,11 +44,11 @@ public class RocksDbPersistentMap implements PersistentMap { private final Class keyType; private final Class valueType; - public RocksDbPersistentMap(ManagedRocksDB db, - ColumnFamilyHandle columnFamilyHandle, - CodecRegistry codecRegistry, - Class keyType, - Class valueType) { + public RocksDbPersistentMap(@Nonnull ManagedRocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull CodecRegistry codecRegistry, + @Nonnull Class keyType, + @Nonnull Class valueType) { this.db = db; this.columnFamilyHandle = columnFamilyHandle; this.codecRegistry = codecRegistry; @@ -87,9 +92,36 @@ public void remove(K key) { } @Override - public ClosableIterator> iterator() { - ManagedRocksIterator iterator = - new ManagedRocksIterator(db.get().newIterator(columnFamilyHandle)); + public ClosableIterator> iterator(Optional lowerBound, + Optional upperBound) { + final ManagedReadOptions readOptions = new ManagedReadOptions(); + ManagedRocksIterator iterator; + final ManagedSlice lowerBoundSlice; + final ManagedSlice upperBoundSlice; + try { + if (lowerBound.isPresent()) { + lowerBoundSlice = new ManagedSlice( + codecRegistry.asRawData(lowerBound.get())); + readOptions.setIterateLowerBound(lowerBoundSlice); + } else { + lowerBoundSlice = null; + } + + if (upperBound.isPresent()) { + upperBoundSlice = new ManagedSlice( + codecRegistry.asRawData(upperBound.get())); + readOptions.setIterateUpperBound(upperBoundSlice); + } else { + upperBoundSlice = null; + } + } catch (IOException exception) { + // TODO: [SNAPSHOT] Fail gracefully. + throw new RuntimeException(exception); + } + + iterator = ManagedRocksIterator.managed( + db.get().newIterator(columnFamilyHandle, readOptions)); + iterator.get().seekToFirst(); return new ClosableIterator>() { @@ -138,6 +170,13 @@ public V setValue(V value) { @Override public void close() { iterator.close(); + readOptions.close(); + if (upperBoundSlice != null) { + upperBoundSlice.close(); + } + if (lowerBoundSlice != null) { + lowerBoundSlice.close(); + } } }; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index da6bf5e458e8..9ad256a01697 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -378,6 +378,20 @@ private Set getSSTFileListForSnapshot(OmSnapshot snapshot, .getPath(), tablesToLookUp); } + /** + * Gets the report key for a particular index of snapshot diff job. + * @param jobId Snapshot diff jobId + * @param index + * @return report Key of the snapshot diff job + */ + + static String getReportKeyForIndex(String jobId, long index) { + return new StringBuilder(jobId.length() + 21) + .append(jobId).append(DELIMITER) + .append(org.apache.commons.lang3.StringUtils.leftPad( + String.valueOf(index), 20, '0')).toString(); + } + public SnapshotDiffResponse cancelSnapshotDiff( final String volumeName, final String bucketName, @@ -553,24 +567,34 @@ SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob, boolean hasMoreEntries = true; - int idx; - for (idx = index; idx - index < pageSize; idx++) { - byte[] rawKey = - codecRegistry.asRawData(snapDiffJob.getJobId() + DELIMITER + idx); - byte[] bytes = snapDiffReportTable.get(rawKey); - if (bytes == null) { + byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex( + snapDiffJob.getJobId(), index)); + byte[] upperIndex = codecRegistry.asRawData(getReportKeyForIndex( + snapDiffJob.getJobId(), index + pageSize)); + int idx = index; + try (ClosableIterator> iterator = + snapDiffReportTable.iterator(Optional.of(lowerIndex), + Optional.of(upperIndex))) { + int itemsFetched = 0; + while (iterator.hasNext() && itemsFetched < pageSize) { + Map.Entry entry = iterator.next(); + byte[] bytes = entry.getValue(); + diffReportList.add(codecRegistry.asObject(bytes, + DiffReportEntry.class)); + idx += 1; + itemsFetched += 1; + } + if (diffReportList.size() < pageSize) { hasMoreEntries = false; - break; } - diffReportList.add(codecRegistry.asObject(bytes, DiffReportEntry.class)); } - String tokenString = hasMoreEntries ? String.valueOf(idx) : null; + String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null; checkReportsIntegrity(snapDiffJob, index, diffReportList.size()); return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName, - fromSnapshotName, toSnapshotName, diffReportList, tokenString); + fromSnapshotName, toSnapshotName, diffReportList, nextTokenString); } /** @@ -1349,10 +1373,8 @@ private long addToReport(String jobId, long index, try (ClosableIterator diffReportIterator = diffReportEntries.iterator()) { while (diffReportIterator.hasNext()) { - - snapDiffReportTable.put( - codecRegistry.asRawData(jobId + DELIMITER + index), - diffReportIterator.next()); + snapDiffReportTable.put(codecRegistry.asRawData( + getReportKeyForIndex(jobId, index)), diffReportIterator.next()); index++; } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java index ed368dc3882a..42b3cdf018fe 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -88,7 +89,7 @@ public T next() { public static class StubbedPersistentMap implements PersistentMap { - private final Map map; + private final TreeMap map; public StubbedPersistentMap(Map map) { this(); @@ -117,9 +118,16 @@ public void remove(K key) { } @Override - public ClosableIterator> iterator() { - return new StubbedCloseableIterator<>( - this.map.entrySet().stream().iterator()); + public ClosableIterator> iterator( + Optional lowerBoundKey, Optional upperBoundKey) { + return new StubbedCloseableIterator<>(this.map.entrySet().stream().filter( + kvEntry -> + lowerBoundKey.map(k -> this.map.comparator() + .compare(kvEntry.getKey(), k) >= 0).orElse(Boolean.TRUE) + && + upperBoundKey.map(k -> this.map.comparator() + .compare(kvEntry.getKey(), k) < 0).orElse(true)) + .iterator()); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java index 45f71e1d60f3..6c8b67140578 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java @@ -25,15 +25,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.db.CodecRegistry; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; +import org.apache.hadoop.util.ClosableIterator; import org.apache.ozone.test.GenericTestUtils; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -50,6 +62,8 @@ public class TestRocksDbPersistentMap { private static ManagedDBOptions dbOptions; private static ManagedColumnFamilyOptions columnFamilyOptions; + private static AtomicInteger id; + @BeforeAll public static void staticInit() throws RocksDBException { dbOptions = new ManagedDBOptions(); @@ -74,10 +88,11 @@ public static void staticInit() throws RocksDBException { db = ManagedRocksDB.open(dbOptions, absolutePath, columnFamilyDescriptors, columnFamilyHandles); + id = new AtomicInteger(0); } - @AfterEach - public void teardown() throws RocksDBException { + @AfterAll + public static void teardown() throws RocksDBException { if (dbOptions != null) { dbOptions.close(); } @@ -97,7 +112,8 @@ public void testRocksDBPersistentMap() throws IOException, RocksDBException { try { final CodecRegistry codecRegistry = CodecRegistry.newBuilder().build(); columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor( - codecRegistry.asRawData("testMap"), columnFamilyOptions)); + codecRegistry.asRawData("testMap" + id.incrementAndGet()), + columnFamilyOptions)); PersistentMap persistentMap = new RocksDbPersistentMap<>( db, @@ -131,4 +147,93 @@ public void testRocksDBPersistentMap() throws IOException, RocksDBException { } } } + + /** + * Test cases for testRocksDBPersistentMapIterator. + */ + private static Stream rocksDBPersistentMapIteratorCases() { + return Stream.of( + Arguments.of( + Optional.empty(), + Optional.of("key202"), + Stream.concat(IntStream.range(0, 100).boxed(), + IntStream.range(200, 300).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList()), + Stream.concat(IntStream.range(0, 100).boxed(), + IntStream.range(200, 202).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList())), + Arguments.of(Optional.of("key050"), + Optional.empty(), Stream.concat(IntStream.range(50, 100).boxed(), + IntStream.range(200, 300).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList()), + Stream.concat(IntStream.range(50, 100).boxed(), + IntStream.range(200, 300).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList()) + ), + Arguments.of(Optional.of("key050"), + Optional.of("key210"), + Stream.concat(IntStream.range(50, 100).boxed(), + IntStream.range(200, 300).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList()), + Stream.concat(IntStream.range(50, 100).boxed(), + IntStream.range(200, 210).boxed()) + .map(i -> Pair.of(String.format("key%03d", i), + String.format("value%03d", i))) + .collect(Collectors.toList()) + )); + } + + @ParameterizedTest + @MethodSource("rocksDBPersistentMapIteratorCases") + public void testRocksDBPersistentMapIterator(Optional lowerBound, + Optional upperBound, List> keys, + List> expectedKeys) + throws IOException, RocksDBException { + ColumnFamilyHandle columnFamily = null; + try { + final CodecRegistry codecRegistry = CodecRegistry.newBuilder().build(); + columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor( + codecRegistry.asRawData("testMap" + id.incrementAndGet()), + columnFamilyOptions)); + + PersistentMap persistentMap = new RocksDbPersistentMap<>( + db, + columnFamily, + codecRegistry, + String.class, + String.class + ); + + for (int i = 0; i < keys.size(); i++) { + String key = keys.get(i).getKey(); + String value = keys.get(i).getValue(); + persistentMap.put(key, value); + } + ClosableIterator> iterator = + persistentMap.iterator(lowerBound, upperBound); + int idx = 0; + while (iterator.hasNext()) { + Map.Entry e = iterator.next(); + Assertions.assertEquals(Pair.of(e.getKey(), e.getValue()), + expectedKeys.get(idx)); + idx += 1; + } + + } finally { + if (columnFamily != null) { + db.get().dropColumnFamily(columnFamily); + columnFamily.close(); + } + } + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index bc7c00fbe26c..355cce77744c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -80,6 +80,7 @@ import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; @@ -96,6 +97,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; @@ -104,11 +106,11 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; - import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; /** * Test class for SnapshotDiffManager Class. @@ -156,7 +158,7 @@ public static void initCodecRegistry() { private DBStore getMockedDBStore(String dbStorePath) { DBStore dbStore = mock(DBStore.class); - Mockito.when(dbStore.getDbLocation()).thenReturn(new File(dbStorePath)); + when(dbStore.getDbLocation()).thenReturn(new File(dbStorePath)); return dbStore; } @@ -172,8 +174,11 @@ private OmSnapshot getMockedOmSnapshot(String snapshot) { private SnapshotDiffManager getMockedSnapshotDiffManager(int cacheSize) throws IOException { - Mockito.when(snapdiffDB.get()).thenReturn(rocksDB); - Mockito.when(rocksDB.newIterator(snapdiffJobCFH)) + when(snapdiffDB.get()).thenReturn(rocksDB); + when(rocksDB.newIterator(snapdiffJobCFH)) + .thenReturn(jobTableIterator); + when(rocksDB.newIterator(Mockito.eq(snapdiffJobCFH), + Mockito.any(ReadOptions.class))) .thenReturn(jobTableIterator); CacheLoader loader = new CacheLoader() { @@ -460,8 +465,12 @@ public void testGenerateDiffReport() throws IOException { (mock, context) -> { PersistentMap obj = new SnapshotTestUtils.StubbedPersistentMap<>(); - Mockito.when(mock.iterator()).thenReturn(obj.iterator()); - Mockito.when(mock.get(Matchers.any())) + when(mock.iterator()).thenReturn(obj.iterator()); + when(mock.iterator(Mockito.any(Optional.class), + Mockito.any(Optional.class))) + .thenAnswer(i -> obj.iterator(i.getArgument(0), + i.getArgument(1))); + when(mock.get(Matchers.any())) .thenAnswer(i -> obj.get(i.getArgument(0))); Mockito.doAnswer((Answer) i -> { obj.put(i.getArgument(0), i.getArgument(1)); @@ -624,7 +633,10 @@ public void testCreatePageResponse(int startIdx, int pageSize, cfHandleRocksDbPersistentMap.put(cf, mock); PersistentMap obj = new SnapshotTestUtils.StubbedPersistentMap<>(); - Mockito.when(mock.iterator()).thenReturn(obj.iterator()); + when(mock.iterator()).thenReturn(obj.iterator()); + when(mock.iterator(any(Optional.class), + any(Optional.class))).thenAnswer(i -> + obj.iterator(i.getArgument(0), i.getArgument(1))); Mockito.when(mock.get(Matchers.any())) .thenAnswer(i -> obj.get(i.getArgument(0))); Mockito.doAnswer((Answer) i -> { @@ -639,10 +651,12 @@ public void testCreatePageResponse(int startIdx, int pageSize, IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> { try { cfHandleRocksDbPersistentMap.get(snapdiffReportCFH) - .put(codecRegistry.asRawData(testJobId + DELIMITER + idx), + .put(codecRegistry.asRawData(SnapshotDiffManager + .getReportKeyForIndex(testJobId, idx)), codecRegistry.asRawData(getTestDiffEntry(testJobId, idx))); cfHandleRocksDbPersistentMap.get(snapdiffReportCFH) - .put(codecRegistry.asRawData(testJobId2 + DELIMITER + idx), + .put(codecRegistry.asRawData(SnapshotDiffManager + .getReportKeyForIndex(testJobId2, idx)), codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx))); } catch (IOException e) { throw new RuntimeException(e);