From b4ea2ee3ec256df37fd34ae34656d526935ae256 Mon Sep 17 00:00:00 2001 From: Carpenter Lee Date: Thu, 1 Nov 2018 17:10:13 +0800 Subject: [PATCH] Add "warmup with rate limiting" implementation for traffic shaping (#220) - Add a new kind of control behavior `warm up + rate limiter`, behaving as both warm up and pace control --- .../sentinel/slots/block/RuleConstant.java | 1 + .../sentinel/slots/block/flow/FlowRule.java | 7 +- .../slots/block/flow/FlowRuleManager.java | 11 +- ...roller.java => RateLimiterController.java} | 4 +- .../flow/controller/WarmUpController.java | 20 +- .../WarmUpRateLimiterController.java | 85 +++++++ ...st.java => RateLimiterControllerTest.java} | 8 +- .../flow/WarmUpRateLimiterControllerTest.java | 49 ++++ .../demo/flow/WarmUpRateLimiterFlowDemo.java | 214 ++++++++++++++++++ 9 files changed, 378 insertions(+), 21 deletions(-) rename sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/{PaceController.java => RateLimiterController.java} (95%) create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java rename sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/{PaceControllerTest.java => RateLimiterControllerTest.java} (89%) create mode 100644 sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java create mode 100644 sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java index 51af887128..60fe15d350 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/RuleConstant.java @@ -46,6 +46,7 @@ public final class RuleConstant { public static final int CONTROL_BEHAVIOR_DEFAULT = 0; public static final int CONTROL_BEHAVIOR_WARM_UP = 1; public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2; + public static final int CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER = 3; public static final String LIMIT_APP_DEFAULT = "default"; public static final String LIMIT_APP_OTHER = "other"; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java index 5ed7cf15ce..8c47036c80 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java @@ -40,7 +40,7 @@ */ public class FlowRule extends AbstractRule { - public FlowRule(){ + public FlowRule() { super(); setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } @@ -71,7 +71,7 @@ public FlowRule(){ /** * Rate limiter control behavior. - * 0. default, 1. warm up, 2. rate limiter + * 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter */ private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; @@ -213,7 +213,8 @@ private Node selectNodeByRequesterAndStrategy(String origin, Context context, De return node; } - } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, getResource())) { + } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) + && FlowRuleManager.isOtherOrigin(origin, getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java index da62422841..b4d59297cd 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java @@ -36,8 +36,9 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; -import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; /** *

@@ -126,8 +127,14 @@ private static Map> loadFlowConf(List list) { } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER && rule.getMaxQueueingTimeMs() > 0) { - rater = new PaceController(rule.getMaxQueueingTimeMs(), rule.getCount()); + rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); + } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS + && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER + && rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) { + rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), + rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); } + rule.setRater(rater); String identity = rule.getResource(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java similarity index 95% rename from sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java rename to sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java index da56d4398d..b7dc7cd9ea 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/PaceController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java @@ -25,13 +25,13 @@ /** * @author jialiang.linjl */ -public class PaceController implements Controller { +public class RateLimiterController implements Controller { private final int maxQueueingTimeMs; private final double count; private final AtomicLong latestPassedTime = new AtomicLong(-1); - public PaceController(int timeOut, double count) { + public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java index 0f97409d81..3cba3a762f 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java @@ -63,21 +63,21 @@ */ public class WarmUpController implements Controller { - private double count; + protected double count; private int coldFactor; - private int warningToken = 0; + protected int warningToken = 0; private int maxToken; - private double slope; + protected double slope; - private AtomicLong storedTokens = new AtomicLong(0); - private AtomicLong lastFilledTime = new AtomicLong(0); + protected AtomicLong storedTokens = new AtomicLong(0); + protected AtomicLong lastFilledTime = new AtomicLong(0); - public WarmUpController(double count, int warmupPeriodInSec, int coldFactor) { - construct(count, warmupPeriodInSec, coldFactor); + public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { + construct(count, warmUpPeriodInSec, coldFactor); } - public WarmUpController(double count, int warmUpPeriodInMic) { - construct(count, warmUpPeriodInMic, 3); + public WarmUpController(double count, int warmUpPeriodInSec) { + construct(count, warmUpPeriodInSec, 3); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { @@ -132,7 +132,7 @@ public boolean canPass(Node node, int acquireCount) { return false; } - private void syncToken(long passQps) { + protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java new file mode 100644 index 0000000000..8c2c20dd41 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java @@ -0,0 +1,85 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow.controller; + +import java.util.concurrent.atomic.AtomicLong; + +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * @author jialiang.linjl + */ +public class WarmUpRateLimiterController extends WarmUpController { + + final int timeOutInMs; + final AtomicLong latestPassedTime = new AtomicLong(-1); + + /** + * @param count + * @param warmUpPeriodSec + */ + public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) { + super(count, warmUpPeriodSec, coldFactor); + this.timeOutInMs = timeOutMs; + } + + @Override + public boolean canPass(Node node, int acquireCount) { + long previousQps = node.previousPassQps(); + syncToken(previousQps); + + long currentTime = TimeUtil.currentTimeMillis(); + + long restToken = storedTokens.get(); + long costTime = 0; + long expectedTime = 0; + if (restToken >= warningToken) { + long aboveToken = restToken - warningToken; + + // current interval = restToken*slope+1/count + double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); + costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000); + } else { + costTime = Math.round(1.0 * (acquireCount) / count * 1000); + } + expectedTime = costTime + latestPassedTime.get(); + + if (expectedTime <= currentTime) { + latestPassedTime.set(currentTime); + return true; + } else { + long waitTime = costTime + latestPassedTime.get() - currentTime; + if (waitTime >= timeOutInMs) { + return false; + } else { + long oldTime = latestPassedTime.addAndGet(costTime); + try { + waitTime = oldTime - TimeUtil.currentTimeMillis(); + if (waitTime >= timeOutInMs) { + latestPassedTime.addAndGet(-costTime); + return false; + } + Thread.sleep(waitTime); + return true; + } catch (InterruptedException e) { + } + } + } + return false; + } +} + diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java similarity index 89% rename from sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java rename to sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java index 0fb4aaf067..a255e44bac 100755 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/PaceControllerTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/RateLimiterControllerTest.java @@ -25,16 +25,16 @@ import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.node.Node; -import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; /** * @author jialiang.linjl */ -public class PaceControllerTest { +public class RateLimiterControllerTest { @Test public void testPaceController_normal() throws InterruptedException { - PaceController paceController = new PaceController(500, 10d); + RateLimiterController paceController = new RateLimiterController(500, 10d); Node node = mock(Node.class); long start = TimeUtil.currentTimeMillis(); @@ -47,7 +47,7 @@ public void testPaceController_normal() throws InterruptedException { @Test public void testPaceController_timeout() throws InterruptedException { - final PaceController paceController = new PaceController(500, 10d); + final RateLimiterController paceController = new RateLimiterController(500, 10d); final Node node = mock(Node.class); final AtomicInteger passcount = new AtomicInteger(); diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java new file mode 100644 index 0000000000..4222a1483b --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/WarmUpRateLimiterControllerTest.java @@ -0,0 +1,49 @@ +package com.alibaba.csp.sentinel.slots.block.flow; + +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.node.StatisticNode; +import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author CarpenterLee + */ +public class WarmUpRateLimiterControllerTest { + + @Test + public void testPace() throws InterruptedException { + WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 1000, 3); + + Node node = mock(Node.class); + + when(node.passQps()).thenReturn(100L); + when(node.previousPassQps()).thenReturn(100L); + + assertTrue(controller.canPass(node, 1)); + + long start = System.currentTimeMillis(); + assertTrue(controller.canPass(node, 1)); + long cost = System.currentTimeMillis() - start; + assertTrue(cost >= 100 && cost <= 110); + } + + @Test + public void testPaceCanNotPass() throws InterruptedException { + WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 10, 3); + + Node node = mock(Node.class); + + when(node.passQps()).thenReturn(100L); + when(node.previousPassQps()).thenReturn(100L); + + assertTrue(controller.canPass(node, 1)); + + assertFalse(controller.canPass(node, 1)); + } +} \ No newline at end of file diff --git a/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java new file mode 100644 index 0000000000..4705f01f7f --- /dev/null +++ b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/flow/WarmUpRateLimiterFlowDemo.java @@ -0,0 +1,214 @@ +package com.alibaba.csp.sentinel.demo.flow; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * When {@link FlowRule#controlBehavior} set to {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER}, real passed + * qps will gradually increase to {@link FlowRule#count}, other than burst increasing, and after the passed qps reaches + * the threshold, the request will pass at a constant interval. + *

+ * In short, {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER} behaves like + * {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP} + {@link RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER}. + *

+ * + *

+ * Run this demo, results are as follows: + *

+ * ...
+ * 1541035848056, total:5, pass:5, block:0 // run in slow qps
+ * 1541035849061, total:0, pass:0, block:0
+ * 1541035850066, total:6, pass:6, block:0
+ * 1541035851068, total:2, pass:2, block:0
+ * 1541035852073, total:3, pass:3, block:0
+ * 1541035853078, total:3361, pass:7, block:3354 // request qps burst increase, warm up behavior triggered.
+ * 1541035854083, total:3414, pass:7, block:3407
+ * 1541035855087, total:3377, pass:7, block:3370
+ * 1541035856091, total:3366, pass:8, block:3358
+ * 1541035857096, total:3259, pass:8, block:3251
+ * 1541035858101, total:3066, pass:13, block:3054
+ * 1541035859105, total:3042, pass:15, block:3026
+ * 1541035860109, total:2946, pass:17, block:2929
+ * 1541035861113, total:2909, pass:20, block:2889 // warm up process end, pass qps increased to {@link FlowRule#count}
+ * 1541035862117, total:2970, pass:20, block:2950
+ * 1541035863122, total:2919, pass:20, block:2899
+ * 1541035864127, total:2903, pass:21, block:2882
+ * 1541035865133, total:2930, pass:20, block:2910
+ * ...
+ * 
+ * + * @author CarpenterLee + * @see WarmUpFlowDemo + * @see PaceFlowDemo + */ +public class WarmUpRateLimiterFlowDemo { + private static final String KEY = "abc"; + + private static AtomicInteger pass = new AtomicInteger(); + private static AtomicInteger block = new AtomicInteger(); + private static AtomicInteger total = new AtomicInteger(); + + private static volatile boolean stop = false; + + private static final int threadCount = 100; + private static int seconds = 100; + + public static void main(String[] args) throws Exception { + initFlowRule(); + // trigger Sentinel internal init + Entry entry = null; + try { + entry = SphU.entry(KEY); + } catch (Exception e) { + } finally { + if (entry != null) { + entry.exit(); + } + } + + Thread timer = new Thread(new TimerTask()); + timer.setName("sentinel-timer-task"); + timer.start(); + + //first make the system run on a very low condition + for (int i = 0; i < 3; i++) { + Thread t = new Thread(new SlowTask()); + t.setName("sentinel-slow-task"); + t.start(); + } + Thread.sleep(5000); + + // request qps burst increase, warm up behavior triggered. + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new RunTask()); + t.setName("sentinel-run-task"); + t.start(); + } + } + + private static void initFlowRule() { + List rules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(KEY); + rule1.setCount(20); + rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); + rule1.setLimitApp("default"); + rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER); + rule1.setWarmUpPeriodSec(10); + rule1.setMaxQueueingTimeMs(100); + + rules.add(rule1); + FlowRuleManager.loadRules(rules); + } + + static class SlowTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + // token acquired, means pass + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class RunTask implements Runnable { + @Override + public void run() { + while (!stop) { + Entry entry = null; + try { + entry = SphU.entry(KEY); + pass.addAndGet(1); + } catch (BlockException e1) { + block.incrementAndGet(); + } catch (Exception e2) { + // biz exception + } finally { + total.incrementAndGet(); + if (entry != null) { + entry.exit(); + } + } + Random random2 = new Random(); + try { + TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + static class TimerTask implements Runnable { + + @Override + public void run() { + long start = System.currentTimeMillis(); + System.out.println("begin to statistic!!!"); + long oldTotal = 0; + long oldPass = 0; + long oldBlock = 0; + while (!stop) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + + long globalTotal = total.get(); + long oneSecondTotal = globalTotal - oldTotal; + oldTotal = globalTotal; + + long globalPass = pass.get(); + long oneSecondPass = globalPass - oldPass; + oldPass = globalPass; + + long globalBlock = block.get(); + long oneSecondBlock = globalBlock - oldBlock; + oldBlock = globalBlock; + + System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + + ", pass:" + oneSecondPass + + ", block:" + oneSecondBlock); + if (seconds-- <= 0) { + stop = true; + } + } + + long cost = System.currentTimeMillis() - start; + System.out.println("time cost: " + cost + " ms"); + System.out.println("total:" + total.get() + ", pass:" + pass.get() + + ", block:" + block.get()); + System.exit(0); + } + } +}