-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-13009. Background snapshot defrag service #9324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Change-Id: I34536ff06efb7d5a4942853f0fd83942ab398b5f
…otLocalDataManager Change-Id: I34536ff06efb7d5a4942853f0fd83942ab398b5f
Change-Id: I32bcaf2a1fb290f1790c02872a0230cd65586636
Change-Id: I105a2e8178c0444d52de41b99801f4ceb6d57ffd # Conflicts: # hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/Checksum.java # hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ObjectSerializer.java # hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/YamlSerializer.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java # hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotLocalDataYaml.java
Change-Id: I985170e38fb8beeb784048e85a08a4c79e1aec97
Change-Id: I33e6e6e825bf23c323ad7ed593d800a11720fa4f
Change-Id: If30b2c766db82adde72145c8ecd3e590ef54cc2d
Change-Id: Id3f2c49050bc3476b9e0f5f51dacb6d9acc4c2f7
Change-Id: I432960725b4c6c55aa906b5780cc3027e41e10db
Change-Id: I3c5514e5bbd251a2b5297d8f074cfde5c71fa543
Change-Id: Ib5a9e6c91bdccba17820263c47eaf2c8400e930d
Change-Id: Ica36e0615c7bc6aa9b6a7f6fafafd0f830d4bafb
Change-Id: I26b66f266bb7677e4b1078f5fcd9f2ce3a651a70 # Conflicts: # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
Change-Id: I1d93dbc048a42cc55ff1f8ffa420e52f967527b8
Change-Id: I34202928a7a367dd0a1e57219317ff34de352b78
Change-Id: Iad6f26cb71ec921c51ee2d138745df1a2663533f
Change-Id: Ic5f7e249cfb9cb3973cbcd4abd36b22a6ff8f5aa
…calDataProvider Change-Id: I3a004b4b435075a4348960aeed642e8da71e7e72
Change-Id: I06990bc9ab8fc7e1eb7bec255646a650bd8c35fe
Change-Id: I4c6c61c83aa9fadab8ecef854b99dcc0a89a2208
Change-Id: I0e476322372a302572f1fe79cbf2e874bfeac2ed
Change-Id: I31004e0c95dad64411c6fe848501a82f2f773cba
Change-Id: Id317c8b56e8b25c122b68eaf96599b9690d08f79
Change-Id: I3849387d064e093634e69cdaf870d27c1934cda5 # Conflicts: # hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ObjectSerializer.java # hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/YamlSerializer.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalData.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
Change-Id: Ie5e5f3dab4324103e8855dd15619d7755f0422e6
Change-Id: I55bd5c3ef7fc32910a9111328638de2edffcd541
Change-Id: I880997d3eebdf378f14c203c61c2d63b2d17552e
Change-Id: I13ba8e2fd012a3c964d657e83496c93a4f55a3be
Change-Id: I20812cdf61886e55b4ead6003566e1d72db77f4a
Change-Id: I02de81771c9102f1212bf1962e65095910ab8207
bfbc38a to
6b856b7
Compare
Change-Id: I69cc5b8b37531aa51c7392c801eed4ff1ad42261
Change-Id: I38eea689933bb979fefee9a8e44ec47d5fca1aff
Change-Id: I510a85271278faeafe24c2f49d0afe2405211e40
Change-Id: Icb3092ca1234fc7acc77780613f0fc2ed9a0ac51 # Conflicts: # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
Change-Id: Ieb28da9fbc06b2729c9b118a75436ba8a3f1a70f # Conflicts: # hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
Change-Id: If61213983561ed6f8c283d12bd07bd2097df710d
| // Binary search smallest existing version and delete the older versions starting from the smallest version. | ||
| // This is to ensure efficient crash recovery. | ||
| int smallestExistingVersion = 0; | ||
| int largestExistingVersion = previousVersion; | ||
| while (smallestExistingVersion <= largestExistingVersion) { | ||
| int midVersion = smallestExistingVersion + (largestExistingVersion - smallestExistingVersion) / 2; | ||
| Path path = OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId, midVersion); | ||
| if (path.toFile().exists()) { | ||
| largestExistingVersion = midVersion - 1; | ||
| } else { | ||
| smallestExistingVersion = midVersion + 1; | ||
| } | ||
| } | ||
|
|
||
| // Acquire Snapshot DBHandle lock before removing the older version to ensure all readers are done with the | ||
| // snapshot db use. | ||
| try (UncheckedAutoCloseableSupplier<OMLockDetails> lock = | ||
| ozoneManager.getOmSnapshotManager().getSnapshotCache().lock(snapshotId)) { | ||
| if (!lock.get().isLockAcquired()) { | ||
| throw new IOException("Failed to acquire dbHandlelock on snapshot: " + snapshotId); | ||
| } | ||
| // Delete the older version directories. Always starting deletes from smallest version to largest version to | ||
| // ensure binary search works correctly on a later basis. | ||
| for (int version = smallestExistingVersion; version <= previousVersion; version++) { | ||
| Path path = OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId, version); | ||
| deleteDirectory(path); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would extract these lines into a method cleanupAndReleaseOlderVersions()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah working on it
| // Switch the snapshot DB location to the new version. | ||
| atomicSwitchSnapshotDB(snapshotId, checkpointLocation); | ||
| } finally { | ||
| snapshotContentLocks.releaseLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may hold snapshot content lock for a long time if defrag service waits for the previous snapshot cache lock. What if we delay the clean-up of older versions after the release of snapshot content lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consolidating both OMSNapshotPurgeResponse deleteCheckpoint and this logic in OmSnapshotManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#9380 We should have done the delete subsequently under the same content lock to ensure we don't have some handles still hanging around in the snapshot cache which could have been used by a different thread after the defrag thread releases the snapshot cache lock
...-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
Show resolved
Hide resolved
| Table checkpointTable = checkpointStore.getTable(table); | ||
| checkpointTable.loadFromFile(fileToBeIngested.toFile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure the use of this. Does loadFromFile() have a side effect? Looks like it's no use. Or do we just want to make sure it does not corrupt and can load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoadFile if it fails would throw a ROcksDataBaseException the entire defrag would fail
| Table<byte[], byte[]> checkpointTable = checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(), | ||
| ByteArrayCodec.get()); | ||
| checkpointTable.loadFromFile(tmpSstFile.toFile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
… content lock Change-Id: I07c9c2ecf969c9edc4f73e05abd5ce008e76ac9c
jojochuang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok the rest looks mostly good.
performIncrementalDefragmentation is too long and needs to simplify. But that can be split into a later task. I'd like to merge this one to unblock other PRs.
| String tableLowestValue = tableBounds.getLeft(); | ||
| String tableHighestValue = tableBounds.getRight(); | ||
|
|
||
| // If lowest value is not null and if the bucket prefix corresponding to the table is greater than lower then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose is to filter out any keys not belong to the same bucket.
| for (Map.Entry<String, List<Pair<Path, SstFileInfo>>> entry : tableGroupedDeltaFiles.entrySet()) { | ||
| String table = entry.getKey(); | ||
| List<Pair<Path, SstFileInfo>> deltaFiles = entry.getValue(); | ||
| Path fileToBeIngested; | ||
| if (deltaFiles.size() == 1 && snapshotVersion > 0) { | ||
| // If there is only one delta file for the table and the snapshot version is also not 0 then the same delta | ||
| // file can reingested into the checkpointStore. | ||
| fileToBeIngested = deltaFiles.get(0).getKey(); | ||
| } else { | ||
| Table<String, byte[]> snapshotTable = snapshot.get().getMetadataManager().getStore() | ||
| .getTable(table, StringCodec.get(), ByteArrayCodec.get()); | ||
| Table<String, byte[]> previousSnapshotTable = previousSnapshot.get().getMetadataManager().getStore() | ||
| .getTable(table, StringCodec.get(), ByteArrayCodec.get()); | ||
|
|
||
| String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table); | ||
| String sstFileReaderLowerBound = bucketPrefixInfo.getTablePrefix(entry.getKey()); | ||
| String sstFileReaderUpperBound = null; | ||
| if (Strings.isNotEmpty(sstFileReaderLowerBound)) { | ||
| sstFileReaderUpperBound = getLexicographicallyHigherString(sstFileReaderLowerBound); | ||
| } | ||
| List<Path> deltaFilePaths = deltaFiles.stream().map(Pair::getKey).collect(Collectors.toList()); | ||
| SstFileSetReader sstFileSetReader = new SstFileSetReader(deltaFilePaths); | ||
| fileToBeIngested = differTmpDir.resolve(table + "-" + UUID.randomUUID() + SST_FILE_EXTENSION); | ||
| // Delete all delta files after reingesting into the checkpointStore. | ||
| filesToBeDeleted.add(fileToBeIngested); | ||
| int deltaEntriesCount = 0; | ||
| try (ClosableIterator<String> keysToCheck = | ||
| sstFileSetReader.getKeyStreamWithTombstone(sstFileReaderLowerBound, sstFileReaderUpperBound); | ||
| TableMergeIterator<String, byte[]> tableMergeIterator = new TableMergeIterator<>(keysToCheck, | ||
| tableBucketPrefix, snapshotTable, previousSnapshotTable); | ||
| RDBSstFileWriter rdbSstFileWriter = new RDBSstFileWriter(fileToBeIngested.toFile())) { | ||
| while (tableMergeIterator.hasNext()) { | ||
| Table.KeyValue<String, List<byte[]>> kvs = tableMergeIterator.next(); | ||
| // Check if the values are equal or if they are not equal then the value should be written to the | ||
| // delta sstFile. | ||
| if (!Arrays.equals(kvs.getValue().get(0), kvs.getValue().get(1))) { | ||
| try (CodecBuffer key = StringCodec.get().toHeapCodecBuffer(kvs.getKey())) { | ||
| byte[] keyArray = key.asReadOnlyByteBuffer().array(); | ||
| byte[] val = kvs.getValue().get(0); | ||
| // If the value is null then add a tombstone to the delta sstFile. | ||
| if (val == null) { | ||
| rdbSstFileWriter.delete(keyArray); | ||
| } else { | ||
| rdbSstFileWriter.put(keyArray, val); | ||
| } | ||
| } | ||
| deltaEntriesCount++; | ||
| } | ||
| } | ||
| } catch (RocksDBException e) { | ||
| throw new RocksDatabaseException("Error while reading sst files.", e); | ||
| } | ||
| if (deltaEntriesCount == 0) { | ||
| // If there are no delta entries then delete the delta file. No need to ingest the file as a diff. | ||
| fileToBeIngested = null; | ||
| } | ||
| } | ||
| if (fileToBeIngested != null) { | ||
| if (!fileToBeIngested.toFile().exists()) { | ||
| throw new IOException("Delta file does not exist: " + fileToBeIngested); | ||
| } | ||
| Table checkpointTable = checkpointStore.getTable(table); | ||
| checkpointTable.loadFromFile(fileToBeIngested.toFile()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too long. Please consider extracting this block into its own method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Change-Id: If2f8107f66c1674f843ea445eeab7df8130b7a10
…ller functions Change-Id: I0c327fcec32670a00da3a06c1c1ac8320a531448
|
I am right now still working on the unit test cases(A few of them are already there). I will create a follow up jira for this just to unblock the next PRs. |
Change-Id: Ib747352ff5b347a58717800ebb43824c50c1d645
Change-Id: Ic4f740d3b73399d9c14d95230bc09daafae20c0b
Change-Id: I50bdafa54faa54d5271299deb609f4a8b1cea66c
Change-Id: Ifaf410a79dbb96e9ae97e834b7150be88c27e0e1
Change-Id: I79d260ec71755e0894f335f46c91f51cdf886bf1
| defragService.pause(); | ||
| assertNotNull(defragService); | ||
|
|
||
| defragService.resume(); | ||
| assertNotNull(defragService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should check running state inside
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused white lines
| public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, | ||
| OzoneManager ozoneManager, OzoneConfiguration configuration) { | ||
| OzoneManager ozoneManager, OzoneConfiguration configuration) throws IOException { | ||
| super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, | |
| super(SnapshotDefragService.class.getSimpleName(), interval, unit, DEFRAG_CORE_POOL_SIZE, |
| this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true, 1); | ||
| this.snapshotContentLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_DB_CONTENT_LOCK, true, 1); | ||
| Path tmpDefragDirPath = ozoneManager.getMetadataManager().getSnapshotParentDir().toAbsolutePath() | ||
| .resolve("tmp_defrag"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: dir naming consistency
| .resolve("tmp_defrag"); | |
| .resolve("defragTemp"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, extract this to a const in OzoneConsts instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do it as part of the unit test PR
| } | ||
| createDirectories(tmpDefragDirPath); | ||
| this.tmpDefragDir = tmpDefragDirPath.toString(); | ||
| this.differTmpDir = tmpDefragDirPath.resolve("differSstFiles"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| * @throws IOException if an I/O error occurs during processing | ||
| */ | ||
| @VisibleForTesting | ||
| Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> deltaFilePaths, Table<String, byte[]> snapshotTable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move this and other helper methods to a separate static helper class to reduce length of SDS class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might not want to do that specifically because this is very specific for SnapshotDefragService and doesn't have usecase outside of this scope. Would rather keep it here as a private method
What changes were proposed in this pull request?
Implement SnapshotDefragService to defrag a snapshot rocksdb with the required locks
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-13009
How was this patch tested?
Adding unit test WIP