-
Notifications
You must be signed in to change notification settings - Fork 593
HDDS-13785. Remove orphan versions from SnapshotLocalData Yaml file #9150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6e241f6
a869500
252d338
4099bc6
e02670c
5a66cfc
79580e9
2a331ef
c4f69e2
afbc592
70ac2c7
b554cc7
49eccfa
51eda04
25f766c
96689fa
0674299
5d9fc49
2d88176
a3c4c69
686d0c7
491a54b
5e69ee9
d36622a
ee213d1
81871b2
a95604e
5a90fcf
20d7d6a
ae655cb
25fa6ae
d419283
8a44308
cb94c36
e26052c
4d272d1
2a38f59
ca098cf
67d4b3d
9838cda
6a19dbb
665f411
2894e40
ea0ab16
915562b
1c0d0ac
24da3eb
8f3774a
503cd4e
6865fad
903ecd1
06d1e99
60a7728
4711517
af8754c
655a724
6386c1b
2bc6134
0de7c62
8e8c534
f148f24
da030c0
6af6498
b281569
1ad24b4
d629911
2aecde4
8eeb44b
d9301b3
06e7d37
efd6c51
fab85ea
c73a355
76b99e2
1d39bee
54f1508
b1a3834
1986bbe
52be3dd
5f50a04
908c47d
278605a
40265e1
ac4719b
cf19dce
34097de
c46ddc2
99afc02
fcc630e
6f144e2
4600c96
48ec0bb
f524cad
cb31b7c
02dd061
ff90af8
8b014dd
57662c6
bcc0fc8
79a46f4
cd24a81
5f0bb91
3de4346
5b55a59
aa6facf
cc35056
9c1689c
3f59895
95341dd
616bef3
b0023d1
36b6fb3
4596386
7af6521
e19dae2
8c1373a
25ee4e2
c6e3914
fd4bfdb
49f4424
613d106
8a29736
cca2dbf
a810cc1
78c1036
a2bbea5
bf4746f
09d955c
cde567d
49c662a
2cf1bce
7afc8f5
5849dac
e58ff09
83b887e
519495a
c125250
408e213
715b2f0
ec59b89
b0b6d6a
a759807
808b174
c829a8b
8e91e47
bfd341c
4ccd3fc
41b7cfb
3a8c8f6
d0422ae
4ff8cea
55c68bd
018571c
6cd54dd
261a669
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |||||||||||||||||
|
|
||||||||||||||||||
| package org.apache.hadoop.ozone.om.snapshot; | ||||||||||||||||||
|
|
||||||||||||||||||
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL; | ||||||||||||||||||
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT; | ||||||||||||||||||
| import static org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION; | ||||||||||||||||||
|
|
||||||||||||||||||
| import com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||
|
|
@@ -41,17 +43,21 @@ | |||||||||||||||||
| import java.util.Stack; | ||||||||||||||||||
| import java.util.UUID; | ||||||||||||||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||||||
| import java.util.concurrent.locks.ReadWriteLock; | ||||||||||||||||||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||||||||||||||||||
| import java.util.stream.Collectors; | ||||||||||||||||||
| import org.apache.commons.lang3.tuple.Pair; | ||||||||||||||||||
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; | ||||||||||||||||||
| import org.apache.hadoop.hdds.utils.Scheduler; | ||||||||||||||||||
| import org.apache.hadoop.hdds.utils.TransactionInfo; | ||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RDBStore; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.OMMetadataManager; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.OmSnapshotLocalData; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.OmSnapshotManager; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.SnapshotChainManager; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.lock.FlatResource; | ||||||||||||||||||
| import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager; | ||||||||||||||||||
|
|
@@ -73,6 +79,7 @@ | |||||||||||||||||
| public class OmSnapshotLocalDataManager implements AutoCloseable { | ||||||||||||||||||
|
|
||||||||||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(OmSnapshotLocalDataManager.class); | ||||||||||||||||||
| private static final String LOCAL_DATA_MANAGER_SERVICE_NAME = "OmSnapshotLocalDataManagerService"; | ||||||||||||||||||
|
|
||||||||||||||||||
| private final ObjectSerializer<OmSnapshotLocalData> snapshotLocalDataSerializer; | ||||||||||||||||||
| // In-memory DAG of snapshot-version dependencies. Each node represents a | ||||||||||||||||||
|
|
@@ -101,8 +108,13 @@ public class OmSnapshotLocalDataManager implements AutoCloseable { | |||||||||||||||||
| private final ReadWriteLock internalLock; | ||||||||||||||||||
| // Locks should be always acquired by iterating through the snapshot chain to avoid deadlocks. | ||||||||||||||||||
| private HierarchicalResourceLockManager locks; | ||||||||||||||||||
| private Map<UUID, Integer> snapshotToBeCheckedForOrphans; | ||||||||||||||||||
| private Scheduler scheduler; | ||||||||||||||||||
| private volatile boolean closed; | ||||||||||||||||||
|
|
||||||||||||||||||
| public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager) throws IOException { | ||||||||||||||||||
| public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager, | ||||||||||||||||||
| SnapshotChainManager snapshotChainManager, | ||||||||||||||||||
| OzoneConfiguration configuration) throws IOException { | ||||||||||||||||||
| this.localDataGraph = GraphBuilder.directed().build(); | ||||||||||||||||||
| this.omMetadataManager = omMetadataManager; | ||||||||||||||||||
| this.snapshotLocalDataSerializer = new YamlSerializer<OmSnapshotLocalData>( | ||||||||||||||||||
|
|
@@ -116,7 +128,7 @@ public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) throws IO | |||||||||||||||||
| this.versionNodeMap = new ConcurrentHashMap<>(); | ||||||||||||||||||
| this.fullLock = new ReentrantReadWriteLock(); | ||||||||||||||||||
| this.internalLock = new ReentrantReadWriteLock(); | ||||||||||||||||||
| init(); | ||||||||||||||||||
| init(configuration, snapshotChainManager); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||
|
|
@@ -216,7 +228,7 @@ private LocalDataVersionNode getVersionNode(UUID snapshotId, int version) { | |||||||||||||||||
|
|
||||||||||||||||||
| private void addSnapshotVersionMeta(UUID snapshotId, SnapshotVersionsMeta snapshotVersionsMeta) | ||||||||||||||||||
| throws IOException { | ||||||||||||||||||
| if (!versionNodeMap.containsKey(snapshotId)) { | ||||||||||||||||||
| if (!versionNodeMap.containsKey(snapshotId) && !snapshotVersionsMeta.getSnapshotVersions().isEmpty()) { | ||||||||||||||||||
| for (LocalDataVersionNode versionNode : snapshotVersionsMeta.getSnapshotVersions().values()) { | ||||||||||||||||||
| validateVersionAddition(versionNode); | ||||||||||||||||||
| LocalDataVersionNode previousVersionNode = | ||||||||||||||||||
|
|
@@ -261,8 +273,33 @@ void addVersionNodeWithDependents(OmSnapshotLocalData snapshotLocalData) throws | |||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void init() throws IOException { | ||||||||||||||||||
| private void incrementOrphanCheckCount(UUID snapshotId) { | ||||||||||||||||||
| if (snapshotId != null) { | ||||||||||||||||||
| this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> v == null ? 1 : (v + 1)); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void decrementOrphanCheckCount(UUID snapshotId, int decrementBy) { | ||||||||||||||||||
| this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> { | ||||||||||||||||||
| if (v == null) { | ||||||||||||||||||
| return null; | ||||||||||||||||||
| } | ||||||||||||||||||
| int newValue = v - decrementBy; | ||||||||||||||||||
| if (newValue <= 0) { | ||||||||||||||||||
| return null; | ||||||||||||||||||
| } | ||||||||||||||||||
| return newValue; | ||||||||||||||||||
| }); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||
| Map<UUID, Integer> getSnapshotToBeCheckedForOrphans() { | ||||||||||||||||||
| return snapshotToBeCheckedForOrphans; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void init(OzoneConfiguration configuration, SnapshotChainManager chainManager) throws IOException { | ||||||||||||||||||
| this.locks = omMetadataManager.getHierarchicalLockManager(); | ||||||||||||||||||
| this.snapshotToBeCheckedForOrphans = new ConcurrentHashMap<>(); | ||||||||||||||||||
| RDBStore store = (RDBStore) omMetadataManager.getStore(); | ||||||||||||||||||
| String checkpointPrefix = store.getDbLocation().getName(); | ||||||||||||||||||
| File snapshotDir = new File(store.getSnapshotsParentDir()); | ||||||||||||||||||
|
|
@@ -283,6 +320,74 @@ private void init() throws IOException { | |||||||||||||||||
| } | ||||||||||||||||||
| addVersionNodeWithDependents(snapshotLocalData); | ||||||||||||||||||
| } | ||||||||||||||||||
| for (UUID snapshotId : versionNodeMap.keySet()) { | ||||||||||||||||||
| incrementOrphanCheckCount(snapshotId); | ||||||||||||||||||
| } | ||||||||||||||||||
| long snapshotLocalDataManagerServiceInterval = configuration.getTimeDuration( | ||||||||||||||||||
| OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL, | ||||||||||||||||||
| OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT, | ||||||||||||||||||
| TimeUnit.MILLISECONDS); | ||||||||||||||||||
| if (snapshotLocalDataManagerServiceInterval > 0) { | ||||||||||||||||||
| this.scheduler = new Scheduler(LOCAL_DATA_MANAGER_SERVICE_NAME, true, 1); | ||||||||||||||||||
| this.scheduler.scheduleWithFixedDelay( | ||||||||||||||||||
| () -> { | ||||||||||||||||||
| try { | ||||||||||||||||||
| checkOrphanSnapshotVersions(omMetadataManager, chainManager); | ||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||
| LOG.error("Exception while checking orphan snapshot versions", e); | ||||||||||||||||||
| } | ||||||||||||||||||
| }, snapshotLocalDataManagerServiceInterval, snapshotLocalDataManagerServiceInterval, TimeUnit.MILLISECONDS); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager) | ||||||||||||||||||
jojochuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
| throws IOException { | ||||||||||||||||||
| for (Map.Entry<UUID, Integer> entry : snapshotToBeCheckedForOrphans.entrySet()) { | ||||||||||||||||||
| UUID snapshotId = entry.getKey(); | ||||||||||||||||||
| int countBeforeCheck = entry.getValue(); | ||||||||||||||||||
| checkOrphanSnapshotVersions(metadataManager, chainManager, snapshotId); | ||||||||||||||||||
| decrementOrphanCheckCount(snapshotId, countBeforeCheck); | ||||||||||||||||||
|
Comment on lines
+348
to
+350
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the count always decremented to 0?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, If there was another update just after the snapshot versions were updated or while the entries were updated then we don't want to decrement it completely to 0. For instance when a snapshot has been purged but it has not flushed because of double buffer thread we know we have to delete it but only after flush so the commit can just increment the value again meaning the orphan check needs to be performed again in some time. |
||||||||||||||||||
| } | ||||||||||||||||||
jojochuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||
| void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager, | ||||||||||||||||||
| UUID snapshotId) throws IOException { | ||||||||||||||||||
jojochuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
| LOG.info("Checking orphan snapshot versions for snapshot {}", snapshotId); | ||||||||||||||||||
jojochuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
| try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = new WritableOmSnapshotLocalDataProvider( | ||||||||||||||||||
| snapshotId)) { | ||||||||||||||||||
| OmSnapshotLocalData snapshotLocalData = snapshotLocalDataProvider.getSnapshotLocalData(); | ||||||||||||||||||
| boolean isSnapshotPurged = OmSnapshotManager.isSnapshotPurged(chainManager, metadataManager, snapshotId, | ||||||||||||||||||
| snapshotLocalData.getTransactionInfo()); | ||||||||||||||||||
| for (Map.Entry<Integer, LocalDataVersionNode> integerLocalDataVersionNodeEntry : getVersionNodeMap() | ||||||||||||||||||
| .get(snapshotId).getSnapshotVersions().entrySet()) { | ||||||||||||||||||
|
Comment on lines
+363
to
+364
|
||||||||||||||||||
| for (Map.Entry<Integer, LocalDataVersionNode> integerLocalDataVersionNodeEntry : getVersionNodeMap() | |
| .get(snapshotId).getSnapshotVersions().entrySet()) { | |
| LocalDataVersionNodeMap versionNodeMap = getVersionNodeMap().get(snapshotId); | |
| if (versionNodeMap == null) { | |
| // The snapshotId is no longer present; skip orphan check for this snapshot. | |
| return; | |
| } | |
| for (Map.Entry<Integer, LocalDataVersionNode> integerLocalDataVersionNodeEntry : versionNodeMap.getSnapshotVersions().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cannot happen since this happens inside a write lock. The inmemory structure would always be same as the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure about this? Looks like write lock is held later inside commit() --> upsert(). Right at here there's no lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else: clean: no prior snapshot versions and no local meta file. nothing to do here.
Uh oh!
There was an error while loading. Please reload this page.