Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug that numeric overflow might occur when refilling tokens in ParamFlowChecker #838

Merged
merged 1 commit into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRu
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicInteger> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

if (tokenCounters == null || timeCounters == null) {
Expand All @@ -130,7 +130,7 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
int tokenCount = (int)rule.getCount();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
Expand All @@ -139,7 +139,7 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
return false;
}

int maxCount = tokenCount + rule.getBurstCount();
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
Expand All @@ -150,23 +150,23 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}

// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
int restQps = oldQps.get();
int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000));
int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount)
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);

if (newQps < 0) {
Expand All @@ -179,9 +179,9 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
Thread.yield();
}
} else {
AtomicInteger oldQps = tokenCounters.get(value);
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
int oldQpsValue = oldQps.get();
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ParameterMetric {
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> ruleTokenCounter = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

/**
Expand All @@ -61,7 +61,7 @@ public class ParameterMetric {
* @return the associated token counter
* @since 1.6.0
*/
public CacheMap<Object, AtomicInteger> getRuleTokenCounter(ParamFlowRule rule) {
public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
}

Expand Down Expand Up @@ -98,7 +98,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(size));
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ public long getThreadCount(int index, Object value) {
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> getRuleTokenCounterMap() {
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTokenCounterMap() {
return ruleTokenCounter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testPassLocalCheckForCollection() throws InterruptedException {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,49 @@

/**
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {

@Test
public void testCheckQpsWithLongIntervalAndHighThreshold() {
// This test case is intended to avoid number overflow.
final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

// Set a large threshold.
long threshold = 25000L;

ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx);

String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));

// 24 hours passed.
// This can make `toAddCount` larger that Integer.MAX_VALUE.
sleep(1000 * 60 * 60 * 24);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));

// 48 hours passed.
sleep(1000 * 60 * 60 * 48);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}

@Test
public void testParamFlowDefaultCheckSingleQps() {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
Expand All @@ -59,7 +99,7 @@ public void testParamFlowDefaultCheckSingleQps() {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -99,7 +139,7 @@ public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedExce
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -169,7 +209,7 @@ public void testParamFlowDefaultCheckQpsInDifferentDuration() throws Interrupted
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -222,7 +262,7 @@ public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
int threadCount = 40;

final CountDownLatch waitLatch = new CountDownLatch(threadCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public void testEntryWhenParamFlowExists() throws Throwable {

ParameterMetric metric = mock(ParameterMetric.class);

CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000);
CacheMap<Object, AtomicInteger> map2 = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000);
CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicLong> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));
Expand Down