Skip to content

Commit

Permalink
add RedisClusterFlowSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
binecy committed Dec 31, 2019
2 parents a3269de + 62bd406 commit 2a50a54
Show file tree
Hide file tree
Showing 18 changed files with 200 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ public interface RedisClient {
*/
int requestToken(String luaCode, RequestData requestData);

void resetFlowMetrics(Set<Long> flowIds);

/**
* publish rule to redis
* reset rule(public rule and clear flow metrics)
* @param rule
*/
void publishRule(FlowRule rule);
void resetFlowRule(FlowRule rule);

/**
* clear redis rule
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.cluster.redis.config.RedisFlowRuleManager;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.function.Function;

import java.util.Collection;
import java.util.List;
import java.util.Map;

public class RedisClusterFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;

public RedisClusterFlowSlot() {
this(new FlowRuleChecker());
}

RedisClusterFlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = RedisFlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
import com.alibaba.csp.sentinel.slotchain.SlotChainExtender;

public class RedisClusterFlowSlotExtender implements SlotChainExtender {

@Override
public ProcessorSlotChain extend(ProcessorSlotChain slotChain) {
slotChain.addLast(new RedisClusterFlowSlot());
return slotChain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private RequestData createRequestData(Long flowId, int acquireCount) {

@Override
public TokenResult requestParamToken(Long aLong, int i, Collection<Object> collection) {
return null;
throw new UnsupportedOperationException("RedisClusterTokenService cannot supported request param token now");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import java.util.*;

public class ClusterClientConfig {
public static final int REDIS_DISTRIBUTED_SINGLE = 1;
public static final int REDIS_DISTRIBUTED_SENTINEL = 2;
public static final int REDIS_DISTRIBUTED_CLUSTER = 3;
public static final int REDIS_SINGLE = 1;
public static final int REDIS_SENTINEL = 2;
public static final int REDIS_CLUSTER = 3;
public static final Set<Integer> validDistributedType
= new HashSet<>(Arrays.asList(REDIS_DISTRIBUTED_SINGLE, REDIS_DISTRIBUTED_SENTINEL, REDIS_DISTRIBUTED_CLUSTER));
public int distributedType;
= new HashSet<>(Arrays.asList(REDIS_SINGLE, REDIS_SENTINEL, REDIS_CLUSTER));
public int clusterType;
private String masterName;
public List<HostAndPort> hostAndPorts;

Expand All @@ -27,22 +27,22 @@ public static ClusterClientConfig ofSingle(HostAndPort hostAndPort) {
List<HostAndPort> hostAndPorts = new ArrayList<>();
hostAndPorts.add(hostAndPort);
config.hostAndPorts = hostAndPorts;
config.distributedType = REDIS_DISTRIBUTED_SINGLE;
config.clusterType = REDIS_SINGLE;
return config;
}

public static ClusterClientConfig ofSentinel(List<HostAndPort> hostAndPorts, String masterName) {
ClusterClientConfig config = new ClusterClientConfig();
config.hostAndPorts = hostAndPorts;
config.masterName = masterName;
config.distributedType = REDIS_DISTRIBUTED_SENTINEL;
config.clusterType = REDIS_SENTINEL;
return config;
}

public static ClusterClientConfig ofCluster(List<HostAndPort> hostAndPorts) {
ClusterClientConfig config = new ClusterClientConfig();
config.hostAndPorts = hostAndPorts;
config.distributedType = REDIS_DISTRIBUTED_CLUSTER;
config.clusterType = REDIS_CLUSTER;
return config;
}

Expand Down Expand Up @@ -91,20 +91,20 @@ public ClusterClientConfig setConnectTimeout(Integer connectTimeout) {
return this;
}

public static int getRedisDistributedSingle() {
return REDIS_DISTRIBUTED_SINGLE;
public static int getRedisSingle() {
return REDIS_SINGLE;
}

public static int getRedisDistributedSentinel() {
return REDIS_DISTRIBUTED_SENTINEL;
public static int getRedisSentinel() {
return REDIS_SENTINEL;
}

public static int getRedisDistributedCluster() {
return REDIS_DISTRIBUTED_CLUSTER;
public static int getRedisCluster() {
return REDIS_CLUSTER;
}

public int getDistributedType() {
return distributedType;
public int getClusterType() {
return clusterType;
}

public List<HostAndPort> getHostAndPorts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterClientConfigManager {

/**
Expand Down Expand Up @@ -115,8 +111,8 @@ public static boolean isValidClientConfig(ClusterClientConfig config) {
return false;
}

if(!ClusterClientConfig.validDistributedType.contains(config.getDistributedType())
|| (config.getDistributedType() == ClusterClientConfig.getRedisDistributedSentinel() && config.getMasterName() == null)
if(!ClusterClientConfig.validDistributedType.contains(config.getClusterType())
|| (config.getClusterType() == ClusterClientConfig.getRedisSentinel() && config.getMasterName() == null)
|| config.getHostAndPorts() == null || config.getHostAndPorts().isEmpty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ private static void rebuildClientFactory() {

LuaUtil.resetLuaSha();
if(clientType == JEDIS_CLIENT) {
if(clientConfig.getDistributedType() == ClusterClientConfig.REDIS_DISTRIBUTED_CLUSTER) {
if(clientConfig.getClusterType() == ClusterClientConfig.REDIS_CLUSTER) {
factory = new JedisClusterClientFactory(clientConfig);
} else if(clientConfig.getDistributedType() == ClusterClientConfig.REDIS_DISTRIBUTED_SINGLE) {
} else if(clientConfig.getClusterType() == ClusterClientConfig.REDIS_SINGLE) {
factory = new JedisClientFactory(clientConfig);
} else if(clientConfig.getDistributedType() == ClusterClientConfig.REDIS_DISTRIBUTED_SENTINEL) {
} else if(clientConfig.getClusterType() == ClusterClientConfig.REDIS_SENTINEL) {
factory = new JedisSentinelFactory(clientConfig);
} else {
throw new IllegalArgumentException("cannot init process redis distributed type:" + clientConfig.getDistributedType());
throw new IllegalArgumentException("cannot init process redis distributed type:" + clientConfig.getClusterType());
}
} else if(clientType == LETTUCE_CLIENT) {
// todo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@
import com.alibaba.csp.sentinel.cluster.redis.RedisClient;
import com.alibaba.csp.sentinel.cluster.redis.RedisClientFactory;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil;
import com.alibaba.csp.sentinel.util.AssertUtil;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class RedisFlowRuleManager {
public static volatile boolean publishRuleToRedis = true;
private static Map<Long, FlowRule> flowRules = new ConcurrentHashMap();

private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
private static Map<Long, FlowRule> flowIdToRules = new ConcurrentHashMap();
private static final RedisFlowPropertyListener LISTENER = new RedisFlowPropertyListener();
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

public static void addRedisFlowRuleListener() {
FlowRuleManager.addListener(LISTENER);
static {
currentProperty.addListener(LISTENER);
}

public static FlowRule getFlowRule(long flowId) {
return flowRules.get(flowId);
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
AssertUtil.notNull(property, "property cannot be null");
synchronized (LISTENER) {
RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager");
currentProperty.removeListener(LISTENER);
property.addListener(LISTENER);
currentProperty = property;
}
}

public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}

private static void updateRule(List<FlowRule> flowRules) {
Expand All @@ -36,23 +48,23 @@ private static void updateRule(List<FlowRule> flowRules) {
return;
}

Map<Long, FlowRule> oldRules = RedisFlowRuleManager.flowRules;
Map<Long, FlowRule> oldRules = RedisFlowRuleManager.flowIdToRules;
RedisClient redisClient = factory.getClient();
publishFlowRule(flowRules, oldRules, redisClient);
clearFlowRule(oldRules, redisClient);

redisClient.close();
}

private static void publishFlowRule(List<FlowRule> flowRules, Map<Long, FlowRule> oldRules, RedisClient redisClient) {
Map<Long, FlowRule> newRules = new ConcurrentHashMap();
if(flowRules == null) {
RedisFlowRuleManager.flowRules = newRules;
private static void publishFlowRule(List<FlowRule> confRule, Map<Long, FlowRule> oldRules, RedisClient redisClient) {
RedisFlowRuleManager.flowIdToRules = new ConcurrentHashMap<>();
RedisFlowRuleManager.flowRules.clear();

if(confRule == null) {
return;
}

Set<Long> resetFlowMetricsIds = new HashSet<>();
for (FlowRule rule : flowRules) {
for (FlowRule rule : confRule) {
if(!rule.isClusterMode()) {
continue;
}
Expand All @@ -62,26 +74,26 @@ private static void publishFlowRule(List<FlowRule> flowRules, Map<Long, FlowRule
continue;
}

newRules.put(rule.getClusterConfig().getFlowId(), rule);
RedisFlowRuleManager.flowIdToRules.put(rule.getClusterConfig().getFlowId(), rule);
if(RedisFlowRuleManager.publishRuleToRedis) {
FlowRule existRule = oldRules.get(rule.getClusterConfig().getFlowId());
if (existRule != null && !isChangeRule(existRule, rule)) {
RecordLog.warn(
"[RedisFlowRuleManager] not publish to redis on same flow rule: " + rule);
continue;
}
redisClient.publishRule(rule);
resetFlowMetricsIds.add(rule.getClusterConfig().getFlowId());
redisClient.resetFlowRule(rule);
}
}
redisClient.resetFlowMetrics(resetFlowMetricsIds);
RedisFlowRuleManager.flowRules = newRules;

Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(new ArrayList<>(RedisFlowRuleManager.flowIdToRules.values()));
RedisFlowRuleManager.flowRules.putAll(rules);
}

private static void clearFlowRule(Map<Long, FlowRule> oldRules, RedisClient redisClient) {
Set<Long> deleteFlowIds = new HashSet<>();
for (Map.Entry<Long, FlowRule> oldRuleEntry : oldRules.entrySet()) {
if(!RedisFlowRuleManager.flowRules.containsKey(oldRuleEntry.getKey())) {
if(!RedisFlowRuleManager.flowIdToRules.containsKey(oldRuleEntry.getKey())) {
deleteFlowIds.add(oldRuleEntry.getKey());
}
}
Expand All @@ -98,6 +110,14 @@ private static boolean isChangeRule(FlowRule oldRule, FlowRule newRule) {
|| Double.doubleToLongBits(oldRule.getCount()) != Double.doubleToLongBits(newRule.getCount());
}

public static FlowRule getFlowRule(long flowId) {
return flowIdToRules.get(flowId);
}

public static Map<String, List<FlowRule>> getFlowRuleMap() {
return flowRules;
}

private static final class RedisFlowPropertyListener implements PropertyListener<List<FlowRule>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.alibaba.csp.sentinel.cluster.redis.init;

import com.alibaba.csp.sentinel.cluster.redis.config.RedisClientFactoryManager;
import com.alibaba.csp.sentinel.cluster.redis.config.RedisFlowRuleManager;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.init.InitOrder;
import com.alibaba.csp.sentinel.log.RecordLog;
Expand All @@ -12,11 +11,6 @@ public class RedisClusterClientInitFunc implements InitFunc {
@Override
public void init() throws Exception {
initJedisClient();
addRedisFlowRuleListener();
}

private void addRedisFlowRuleListener() {
RedisFlowRuleManager.addRedisFlowRuleListener();
}

private void initJedisClient() {
Expand Down
Loading

0 comments on commit 2a50a54

Please sign in to comment.