diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java
index 51af887128..60fe15d350 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java
@@ -46,6 +46,7 @@ public final class RuleConstant {
public static final int CONTROL_BEHAVIOR_DEFAULT = 0;
public static final int CONTROL_BEHAVIOR_WARM_UP = 1;
public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2;
+ public static final int CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER = 3;
public static final String LIMIT_APP_DEFAULT = "default";
public static final String LIMIT_APP_OTHER = "other";
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java
index 5ed7cf15ce..8c47036c80 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java
@@ -40,7 +40,7 @@
*/
public class FlowRule extends AbstractRule {
- public FlowRule(){
+ public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
@@ -71,7 +71,7 @@ public FlowRule(){
/**
* Rate limiter control behavior.
- * 0. default, 1. warm up, 2. rate limiter
+ * 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
@@ -213,7 +213,8 @@ private Node selectNodeByRequesterAndStrategy(String origin, Context context, De
return node;
}
- } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, getResource())) {
+ } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
+ && FlowRuleManager.isOtherOrigin(origin, getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java
index da62422841..b4d59297cd 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java
@@ -36,8 +36,9 @@
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController;
-import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController;
+import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController;
+import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;
/**
*
@@ -126,8 +127,14 @@ private static Map> loadFlowConf(List list) {
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
&& rule.getMaxQueueingTimeMs() > 0) {
- rater = new PaceController(rule.getMaxQueueingTimeMs(), rule.getCount());
+ rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
+ } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS
+ && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER
+ && rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) {
+ rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
+ rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
}
+
rule.setRater(rater);
String identity = rule.getResource();
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java
similarity index 95%
rename from sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java
rename to sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java
index da56d4398d..b7dc7cd9ea 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java
@@ -25,13 +25,13 @@
/**
* @author jialiang.linjl
*/
-public class PaceController implements Controller {
+public class RateLimiterController implements Controller {
private final int maxQueueingTimeMs;
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
- public PaceController(int timeOut, double count) {
+ public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java
index 0f97409d81..3cba3a762f 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java
@@ -63,21 +63,21 @@
*/
public class WarmUpController implements Controller {
- private double count;
+ protected double count;
private int coldFactor;
- private int warningToken = 0;
+ protected int warningToken = 0;
private int maxToken;
- private double slope;
+ protected double slope;
- private AtomicLong storedTokens = new AtomicLong(0);
- private AtomicLong lastFilledTime = new AtomicLong(0);
+ protected AtomicLong storedTokens = new AtomicLong(0);
+ protected AtomicLong lastFilledTime = new AtomicLong(0);
- public WarmUpController(double count, int warmupPeriodInSec, int coldFactor) {
- construct(count, warmupPeriodInSec, coldFactor);
+ public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
+ construct(count, warmUpPeriodInSec, coldFactor);
}
- public WarmUpController(double count, int warmUpPeriodInMic) {
- construct(count, warmUpPeriodInMic, 3);
+ public WarmUpController(double count, int warmUpPeriodInSec) {
+ construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
@@ -132,7 +132,7 @@ public boolean canPass(Node node, int acquireCount) {
return false;
}
- private void syncToken(long passQps) {
+ protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java
new file mode 100644
index 0000000000..8c2c20dd41
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.flow.controller;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.csp.sentinel.node.Node;
+import com.alibaba.csp.sentinel.util.TimeUtil;
+
+/**
+ * @author jialiang.linjl
+ */
+public class WarmUpRateLimiterController extends WarmUpController {
+
+ final int timeOutInMs;
+ final AtomicLong latestPassedTime = new AtomicLong(-1);
+
+ /**
+ * @param count
+ * @param warmUpPeriodSec
+ */
+ public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
+ super(count, warmUpPeriodSec, coldFactor);
+ this.timeOutInMs = timeOutMs;
+ }
+
+ @Override
+ public boolean canPass(Node node, int acquireCount) {
+ long previousQps = node.previousPassQps();
+ syncToken(previousQps);
+
+ long currentTime = TimeUtil.currentTimeMillis();
+
+ long restToken = storedTokens.get();
+ long costTime = 0;
+ long expectedTime = 0;
+ if (restToken >= warningToken) {
+ long aboveToken = restToken - warningToken;
+
+ // current interval = restToken*slope+1/count
+ double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
+ costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
+ } else {
+ costTime = Math.round(1.0 * (acquireCount) / count * 1000);
+ }
+ expectedTime = costTime + latestPassedTime.get();
+
+ if (expectedTime <= currentTime) {
+ latestPassedTime.set(currentTime);
+ return true;
+ } else {
+ long waitTime = costTime + latestPassedTime.get() - currentTime;
+ if (waitTime >= timeOutInMs) {
+ return false;
+ } else {
+ long oldTime = latestPassedTime.addAndGet(costTime);
+ try {
+ waitTime = oldTime - TimeUtil.currentTimeMillis();
+ if (waitTime >= timeOutInMs) {
+ latestPassedTime.addAndGet(-costTime);
+ return false;
+ }
+ Thread.sleep(waitTime);
+ return true;
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ return false;
+ }
+}
+
diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java
similarity index 89%
rename from sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java
rename to sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java
index 0fb4aaf067..a255e44bac 100755
--- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java
+++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java
@@ -25,16 +25,16 @@
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.Node;
-import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController;
+import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController;
/**
* @author jialiang.linjl
*/
-public class PaceControllerTest {
+public class RateLimiterControllerTest {
@Test
public void testPaceController_normal() throws InterruptedException {
- PaceController paceController = new PaceController(500, 10d);
+ RateLimiterController paceController = new RateLimiterController(500, 10d);
Node node = mock(Node.class);
long start = TimeUtil.currentTimeMillis();
@@ -47,7 +47,7 @@ public void testPaceController_normal() throws InterruptedException {
@Test
public void testPaceController_timeout() throws InterruptedException {
- final PaceController paceController = new PaceController(500, 10d);
+ final RateLimiterController paceController = new RateLimiterController(500, 10d);
final Node node = mock(Node.class);
final AtomicInteger passcount = new AtomicInteger();
diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java
new file mode 100644
index 0000000000..4222a1483b
--- /dev/null
+++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java
@@ -0,0 +1,49 @@
+package com.alibaba.csp.sentinel.slots.block.flow;
+
+import com.alibaba.csp.sentinel.node.Node;
+import com.alibaba.csp.sentinel.node.StatisticNode;
+import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author CarpenterLee
+ */
+public class WarmUpRateLimiterControllerTest {
+
+ @Test
+ public void testPace() throws InterruptedException {
+ WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 1000, 3);
+
+ Node node = mock(Node.class);
+
+ when(node.passQps()).thenReturn(100L);
+ when(node.previousPassQps()).thenReturn(100L);
+
+ assertTrue(controller.canPass(node, 1));
+
+ long start = System.currentTimeMillis();
+ assertTrue(controller.canPass(node, 1));
+ long cost = System.currentTimeMillis() - start;
+ assertTrue(cost >= 100 && cost <= 110);
+ }
+
+ @Test
+ public void testPaceCanNotPass() throws InterruptedException {
+ WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 10, 3);
+
+ Node node = mock(Node.class);
+
+ when(node.passQps()).thenReturn(100L);
+ when(node.previousPassQps()).thenReturn(100L);
+
+ assertTrue(controller.canPass(node, 1));
+
+ assertFalse(controller.canPass(node, 1));
+ }
+}
\ No newline at end of file
diff --git a/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java
new file mode 100644
index 0000000000..4705f01f7f
--- /dev/null
+++ b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java
@@ -0,0 +1,214 @@
+package com.alibaba.csp.sentinel.demo.flow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.alibaba.csp.sentinel.Entry;
+import com.alibaba.csp.sentinel.SphU;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.csp.sentinel.util.TimeUtil;
+
+/**
+ * When {@link FlowRule#controlBehavior} set to {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER}, real passed
+ * qps will gradually increase to {@link FlowRule#count}, other than burst increasing, and after the passed qps reaches
+ * the threshold, the request will pass at a constant interval.
+ *
+ * In short, {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER} behaves like
+ * {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP} + {@link RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER}.
+ *
+ *
+ *
+ * Run this demo, results are as follows:
+ *
+ * ...
+ * 1541035848056, total:5, pass:5, block:0 // run in slow qps
+ * 1541035849061, total:0, pass:0, block:0
+ * 1541035850066, total:6, pass:6, block:0
+ * 1541035851068, total:2, pass:2, block:0
+ * 1541035852073, total:3, pass:3, block:0
+ * 1541035853078, total:3361, pass:7, block:3354 // request qps burst increase, warm up behavior triggered.
+ * 1541035854083, total:3414, pass:7, block:3407
+ * 1541035855087, total:3377, pass:7, block:3370
+ * 1541035856091, total:3366, pass:8, block:3358
+ * 1541035857096, total:3259, pass:8, block:3251
+ * 1541035858101, total:3066, pass:13, block:3054
+ * 1541035859105, total:3042, pass:15, block:3026
+ * 1541035860109, total:2946, pass:17, block:2929
+ * 1541035861113, total:2909, pass:20, block:2889 // warm up process end, pass qps increased to {@link FlowRule#count}
+ * 1541035862117, total:2970, pass:20, block:2950
+ * 1541035863122, total:2919, pass:20, block:2899
+ * 1541035864127, total:2903, pass:21, block:2882
+ * 1541035865133, total:2930, pass:20, block:2910
+ * ...
+ *
+ *
+ * @author CarpenterLee
+ * @see WarmUpFlowDemo
+ * @see PaceFlowDemo
+ */
+public class WarmUpRateLimiterFlowDemo {
+ private static final String KEY = "abc";
+
+ private static AtomicInteger pass = new AtomicInteger();
+ private static AtomicInteger block = new AtomicInteger();
+ private static AtomicInteger total = new AtomicInteger();
+
+ private static volatile boolean stop = false;
+
+ private static final int threadCount = 100;
+ private static int seconds = 100;
+
+ public static void main(String[] args) throws Exception {
+ initFlowRule();
+ // trigger Sentinel internal init
+ Entry entry = null;
+ try {
+ entry = SphU.entry(KEY);
+ } catch (Exception e) {
+ } finally {
+ if (entry != null) {
+ entry.exit();
+ }
+ }
+
+ Thread timer = new Thread(new TimerTask());
+ timer.setName("sentinel-timer-task");
+ timer.start();
+
+ //first make the system run on a very low condition
+ for (int i = 0; i < 3; i++) {
+ Thread t = new Thread(new SlowTask());
+ t.setName("sentinel-slow-task");
+ t.start();
+ }
+ Thread.sleep(5000);
+
+ // request qps burst increase, warm up behavior triggered.
+ for (int i = 0; i < threadCount; i++) {
+ Thread t = new Thread(new RunTask());
+ t.setName("sentinel-run-task");
+ t.start();
+ }
+ }
+
+ private static void initFlowRule() {
+ List rules = new ArrayList();
+ FlowRule rule1 = new FlowRule();
+ rule1.setResource(KEY);
+ rule1.setCount(20);
+ rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
+ rule1.setLimitApp("default");
+ rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER);
+ rule1.setWarmUpPeriodSec(10);
+ rule1.setMaxQueueingTimeMs(100);
+
+ rules.add(rule1);
+ FlowRuleManager.loadRules(rules);
+ }
+
+ static class SlowTask implements Runnable {
+ @Override
+ public void run() {
+ while (!stop) {
+ Entry entry = null;
+ try {
+ entry = SphU.entry(KEY);
+ // token acquired, means pass
+ pass.addAndGet(1);
+ } catch (BlockException e1) {
+ block.incrementAndGet();
+ } catch (Exception e2) {
+ // biz exception
+ } finally {
+ total.incrementAndGet();
+ if (entry != null) {
+ entry.exit();
+ }
+ }
+ Random random2 = new Random();
+ try {
+ TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ static class RunTask implements Runnable {
+ @Override
+ public void run() {
+ while (!stop) {
+ Entry entry = null;
+ try {
+ entry = SphU.entry(KEY);
+ pass.addAndGet(1);
+ } catch (BlockException e1) {
+ block.incrementAndGet();
+ } catch (Exception e2) {
+ // biz exception
+ } finally {
+ total.incrementAndGet();
+ if (entry != null) {
+ entry.exit();
+ }
+ }
+ Random random2 = new Random();
+ try {
+ TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ static class TimerTask implements Runnable {
+
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+ System.out.println("begin to statistic!!!");
+ long oldTotal = 0;
+ long oldPass = 0;
+ long oldBlock = 0;
+ while (!stop) {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ }
+
+ long globalTotal = total.get();
+ long oneSecondTotal = globalTotal - oldTotal;
+ oldTotal = globalTotal;
+
+ long globalPass = pass.get();
+ long oneSecondPass = globalPass - oldPass;
+ oldPass = globalPass;
+
+ long globalBlock = block.get();
+ long oneSecondBlock = globalBlock - oldBlock;
+ oldBlock = globalBlock;
+
+ System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ + ", pass:" + oneSecondPass
+ + ", block:" + oneSecondBlock);
+ if (seconds-- <= 0) {
+ stop = true;
+ }
+ }
+
+ long cost = System.currentTimeMillis() - start;
+ System.out.println("time cost: " + cost + " ms");
+ System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ + ", block:" + block.get());
+ System.exit(0);
+ }
+ }
+}