Skip to content

Commit

Permalink
Fix the bug that numeric overflow might occur when refilling tokens i…
Browse files Browse the repository at this point in the history
…n ParamFlowChecker (#838)

- use AtomicLong to replace AtomicInteger

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 authored Jun 18, 2019
1 parent eb7508c commit d59beae
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRu
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicInteger> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

if (tokenCounters == null || timeCounters == null) {
Expand All @@ -130,7 +130,7 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
int tokenCount = (int)rule.getCount();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
Expand All @@ -139,7 +139,7 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
return false;
}

int maxCount = tokenCount + rule.getBurstCount();
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
Expand All @@ -150,23 +150,23 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}

// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
int restQps = oldQps.get();
int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000));
int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount)
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);

if (newQps < 0) {
Expand All @@ -179,9 +179,9 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
Thread.yield();
}
} else {
AtomicInteger oldQps = tokenCounters.get(value);
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
int oldQpsValue = oldQps.get();
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ParameterMetric {
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> ruleTokenCounter = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

/**
Expand All @@ -61,7 +61,7 @@ public class ParameterMetric {
* @return the associated token counter
* @since 1.6.0
*/
public CacheMap<Object, AtomicInteger> getRuleTokenCounter(ParamFlowRule rule) {
public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
}

Expand Down Expand Up @@ -98,7 +98,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(size));
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ public long getThreadCount(int index, Object value) {
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> getRuleTokenCounterMap() {
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTokenCounterMap() {
return ruleTokenCounter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testPassLocalCheckForCollection() throws InterruptedException {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,49 @@

/**
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {

@Test
public void testCheckQpsWithLongIntervalAndHighThreshold() {
// This test case is intended to avoid number overflow.
final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

// Set a large threshold.
long threshold = 25000L;

ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx);

String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));

// 24 hours passed.
// This can make `toAddCount` larger that Integer.MAX_VALUE.
sleep(1000 * 60 * 60 * 24);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));

// 48 hours passed.
sleep(1000 * 60 * 60 * 48);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}

@Test
public void testParamFlowDefaultCheckSingleQps() {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
Expand All @@ -59,7 +99,7 @@ public void testParamFlowDefaultCheckSingleQps() {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -99,7 +139,7 @@ public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedExce
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -169,7 +209,7 @@ public void testParamFlowDefaultCheckQpsInDifferentDuration() throws Interrupted
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
Expand Down Expand Up @@ -222,7 +262,7 @@ public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
int threadCount = 40;

final CountDownLatch waitLatch = new CountDownLatch(threadCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public void testEntryWhenParamFlowExists() throws Throwable {

ParameterMetric metric = mock(ParameterMetric.class);

CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000);
CacheMap<Object, AtomicInteger> map2 = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000);
CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicLong> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));
Expand Down

0 comments on commit d59beae

Please sign in to comment.