-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap #7745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Change-Id: I97e0e0e28736cc51bfb23c45d3750cf721a9689e
Change-Id: Ia5ecb2b863ba2ea2482ac2824aa7b6df98939bb5
| this.readLock = lock.readLock(); | ||
| this.writeLock = lock.writeLock(); | ||
| this.snapshotRefThreadIds = new ConcurrentHashMap<>(); | ||
| this.lockTimeout = lockTimeout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's print an INFO level log here that the snapshot cache lock timeout is set to X ms.
| if (v == null) { | ||
| LOG.warn("SnapshotId: '{}' does not exist in snapshot cache.", k); | ||
| } else { | ||
| readLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in invalidate(). Should it use writeLock instead?
| while (lockCnt > 0) { | ||
| try { | ||
| lockReleased.await(lockTimeout, TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException e) { | ||
| throw new IOException("Error while waiting for locks to be released.", e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Condition await() throw InterruptedException upon timeout? I don't think so. You should check the return value instead if that is the intention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more test cases to verify that the states of those new internal structures are behaving as expected upon cache operations? e.g. snapshotRefThreadIds
jojochuang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm new to Snapshot code so have a few questions please bear with me
| } | ||
|
|
||
| public Map<Long, Long> getThreadCntMap() { | ||
| return ImmutableMap.copyOf(threadMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this be expensive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be very expensive as the reference to a snapshot shouldn't be held by a lot of threads.
|
|
||
| public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int cacheSizeLimit, OMMetrics omMetrics, | ||
| long cleanupInterval) { | ||
| long cleanupInterval, long lockTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lockTimeout is in milliseconds
| dbMap.values().stream() | ||
| .flatMap(referenceCounted -> | ||
| referenceCounted.getThreadCntMap().entrySet().stream().map(entry -> Pair.of(entry, referenceCounted))) | ||
| .forEach(entry -> updateThreadCnt(entry.getKey().getKey(), entry.getValue().get().getSnapshotID(), | ||
| entry.getKey().getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put this in a helper method to make it self explanatory, like initializeThreadSnapshotReferences()
| readLock.lock(); | ||
| try { | ||
| if (lockCnt > 0) { | ||
| for (Long tid : snapshotRefThreadIds.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up references to snapshots when the lockCnt is greater than 0.
To ensure that threads no longer hold references to specific snapshots (key) when their associated locks are being released.
| return false; | ||
| } | ||
|
|
||
| private void updateThreadCnt(long threadId, UUID key, long cnt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for thread (threadId), increment the reference count of snapshot (key) by (cnt)
| /** | ||
| * Decreases the lock count. When the count reaches zero all new threads would be able to get a handle of snapshot. | ||
| */ | ||
| private Runnable decrementLockCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please assert that write lock is held to avoid race condition.
| * @param threadId | ||
| * @throws InterruptedException | ||
| */ | ||
| private void waitForLock(long threadId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the Javadoc is correct, the method needs a more descriptive name.
| * @throws InterruptedException | ||
| */ | ||
| private void waitForLock(long threadId) throws IOException { | ||
| if (snapshotRefThreadIds.computeIfPresent(threadId, (k, v) -> v) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if computeIfPresent() returns null, that implies the thread is reading its first snapshot, and it would skip waiting?
| private final Lock writeLock; | ||
| private int lockCnt; | ||
| private final Condition lockReleased; | ||
| private final Condition dbClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it named dbClosed?
|
Closing this PR in favour of #8474 |
What changes were proposed in this pull request?
Implementing a lock for this snapshot cache would prevent any newer threads from requesting a snapshot rocksdb handle from the snapshot cache. Thus any operation under this lock will have a consistent view of the entire snapshot.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-12134
How was this patch tested?
Adding unit tests.