-
Notifications
You must be signed in to change notification settings - Fork 11.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RIP-70-3]Extract adaptive lock mechanism #8663
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #8663 +/- ##
=============================================
- Coverage 47.39% 47.37% -0.03%
- Complexity 11628 11645 +17
=============================================
Files 1290 1294 +4
Lines 90293 90424 +131
Branches 11609 11627 +18
=============================================
+ Hits 42792 42834 +42
- Misses 42233 42306 +73
- Partials 5268 5284 +16 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CR need
# Conflicts: # store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveLockImpl.java # store/src/main/java/org/apache/rocketmq/store/lock/CollisionRetreatLock.java
if (this.adaptiveLock instanceof CollisionRetreatLock) { | ||
CollisionRetreatLock lock = (CollisionRetreatLock) this.adaptiveLock; | ||
int base = Math.min(200 + tps / 200, 500); | ||
if (lock.getNumberOfRetreat(slot) * base >= tps) { | ||
if (lock.isAdapt()) { | ||
lock.adapt(true); | ||
} else { | ||
this.tpsSwapCriticalPoint = tps; | ||
needSwap = true; | ||
} | ||
} else if (lock.getNumberOfRetreat(slot) * base * 3 / 2 <= tps) { | ||
lock.adapt(false); | ||
} | ||
lock.setNumberOfRetreat(slot, 0); | ||
} else { | ||
if (tps <= this.tpsSwapCriticalPoint * 4 / 5) { | ||
needSwap = true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too many magic numbers, it should be optimized.
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class CollisionRetreatLock implements AdaptiveBackOffLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is RetreatLock? Is it a misspelling of reentrant lock?
private final List<AtomicInteger> numberOfRetreat; | ||
|
||
public CollisionRetreatLock() { | ||
this.initOptimalDegree = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd better make these numbers as constants, like OPTIMAL_DEGREE = 1000, MAX_OPTIMAL_DEGREE = 10000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
|
||
public interface AdaptiveBackOffSpinLock extends PutMessageLock { | ||
|
||
void lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PutMessageLock class includes methods for locking and unlocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
@@ -130,7 +132,11 @@ protected PutMessageThreadLocal initialValue() { | |||
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig()); | |||
} | |||
}; | |||
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); | |||
|
|||
AdaptiveBackOffSpinLock adaptiveBackOffSpinLock = new AdaptiveBackOffSpinLockImpl(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PutMessageLock adaptiveBackOffSpinLock = new AdaptiveBackOffSpinLockImpl() may be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
} | ||
} | ||
|
||
if (needSwap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments to explain the principles and the rationale behind these numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
|
||
public AdaptiveBackOffSpinLockImpl() { | ||
this.locks = new HashMap<>(); | ||
this.locks.put("Reentrant", new BackOffReentrantLock()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These strings use class constants.
… adaptive_lock # Conflicts: # store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# Conflicts: # store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
Which Issue(s) This PR Fixes
Fixes #8442
Brief Description
How Did You Test This Change?
Test document :https://shimo.im/docs/ZzkLMQ4RwwUa87AQ/