Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warm up + rate limiter flow control behavior and test cases #220

Merged
merged 1 commit into from
Nov 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class RuleConstant {
public static final int CONTROL_BEHAVIOR_DEFAULT = 0;
public static final int CONTROL_BEHAVIOR_WARM_UP = 1;
public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2;
public static final int CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER = 3;

public static final String LIMIT_APP_DEFAULT = "default";
public static final String LIMIT_APP_OTHER = "other";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class FlowRule extends AbstractRule {

public FlowRule(){
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
Expand Down Expand Up @@ -71,7 +71,7 @@ public FlowRule(){

/**
* Rate limiter control behavior.
* 0. default, 1. warm up, 2. rate limiter
* 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

Expand Down Expand Up @@ -213,7 +213,8 @@ private Node selectNodeByRequesterAndStrategy(String origin, Context context, De
return node;
}

} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, getResource())) {
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;

/**
* <p>
Expand Down Expand Up @@ -126,8 +127,14 @@ private static Map<String, List<FlowRule>> loadFlowConf(List<FlowRule> list) {
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
&& rule.getMaxQueueingTimeMs() > 0) {
rater = new PaceController(rule.getMaxQueueingTimeMs(), rule.getCount());
rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER
&& rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) {
rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
}

rule.setRater(rater);

String identity = rule.getResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
/**
* @author jialiang.linjl
*/
public class PaceController implements Controller {
public class RateLimiterController implements Controller {

private final int maxQueueingTimeMs;
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);

public PaceController(int timeOut, double count) {
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,21 @@
*/
public class WarmUpController implements Controller {

private double count;
protected double count;
private int coldFactor;
private int warningToken = 0;
protected int warningToken = 0;
private int maxToken;
private double slope;
protected double slope;

private AtomicLong storedTokens = new AtomicLong(0);
private AtomicLong lastFilledTime = new AtomicLong(0);
protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);

public WarmUpController(double count, int warmupPeriodInSec, int coldFactor) {
construct(count, warmupPeriodInSec, coldFactor);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}

public WarmUpController(double count, int warmUpPeriodInMic) {
construct(count, warmUpPeriodInMic, 3);
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}

private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
Expand Down Expand Up @@ -132,7 +132,7 @@ public boolean canPass(Node node, int acquireCount) {
return false;
}

private void syncToken(long passQps) {
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.controller;

import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author jialiang.linjl
*/
public class WarmUpRateLimiterController extends WarmUpController {

final int timeOutInMs;
final AtomicLong latestPassedTime = new AtomicLong(-1);

/**
* @param count
* @param warmUpPeriodSec
*/
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeOutInMs = timeOutMs;
}

@Override
public boolean canPass(Node node, int acquireCount) {
long previousQps = node.previousPassQps();
syncToken(previousQps);

long currentTime = TimeUtil.currentTimeMillis();

long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;

// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();

if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime >= timeOutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime >= timeOutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
Thread.sleep(waitTime);
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController;
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController;

/**
* @author jialiang.linjl
*/
public class PaceControllerTest {
public class RateLimiterControllerTest {

@Test
public void testPaceController_normal() throws InterruptedException {
PaceController paceController = new PaceController(500, 10d);
RateLimiterController paceController = new RateLimiterController(500, 10d);
Node node = mock(Node.class);

long start = TimeUtil.currentTimeMillis();
Expand All @@ -47,7 +47,7 @@ public void testPaceController_normal() throws InterruptedException {

@Test
public void testPaceController_timeout() throws InterruptedException {
final PaceController paceController = new PaceController(500, 10d);
final RateLimiterController paceController = new RateLimiterController(500, 10d);
final Node node = mock(Node.class);

final AtomicInteger passcount = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.alibaba.csp.sentinel.slots.block.flow;

import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.node.StatisticNode;
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController;

import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* @author CarpenterLee
*/
public class WarmUpRateLimiterControllerTest {

@Test
public void testPace() throws InterruptedException {
WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 1000, 3);

Node node = mock(Node.class);

when(node.passQps()).thenReturn(100L);
when(node.previousPassQps()).thenReturn(100L);

assertTrue(controller.canPass(node, 1));

long start = System.currentTimeMillis();
assertTrue(controller.canPass(node, 1));
long cost = System.currentTimeMillis() - start;
assertTrue(cost >= 100 && cost <= 110);
}

@Test
public void testPaceCanNotPass() throws InterruptedException {
WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 10, 3);

Node node = mock(Node.class);

when(node.passQps()).thenReturn(100L);
when(node.previousPassQps()).thenReturn(100L);

assertTrue(controller.canPass(node, 1));

assertFalse(controller.canPass(node, 1));
}
}
Loading