Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sentinel-cluster-client-redis #1226

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sentinel-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
<module>sentinel-cluster-server-default</module>
<module>sentinel-cluster-common-default</module>
<module>sentinel-cluster-server-envoy-rls</module>
</modules>
jasonjoo2010 marked this conversation as resolved.
Show resolved Hide resolved
<module>sentinel-cluster-client-redis</module>
</modules>

<dependencyManagement>
<dependencies>
Expand Down
46 changes: 46 additions & 0 deletions sentinel-cluster/sentinel-cluster-client-redis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

<parent>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster</artifactId>
<version>1.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-cluster-client-redis</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-cluster-common-default</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.cluster.redis.request.RequestData;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.Set;

public interface RedisClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the loading process should not be put in client (or we call it proxy or util) and load when needed. This may cause racing or latency, right?

Better put them in kinds of initializing codes like InitFunc, etc

And the name RedisClient will cause ambiguous with real client of Redis.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis加载工作确实在RedisClusterClientInitFunc#initJedisClient方法中RedisProcessorFactoryManager.setClientType完成。
RedisClient已改为RedisProcessor。


/**
* request token from redis
* @param luaCode
* @param requestData
* @return
*/
int requestToken(String luaCode, RequestData requestData);

/**
* reset rule(public rule and clear flow metrics)
* @param rule
*/
void resetRedisRuleAndMetrics(FlowRule rule);

/**
* clear redis rule
* @param flowIds
*/
void clearRuleAndMetrics(Set<Long> flowIds);

/**
* close redis client
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.alibaba.csp.sentinel.cluster.redis;

public interface RedisClientFactory {
/**
* create redis client
* @return
*/
RedisClient getClient();

/**
* destroy factory
*/
void destroy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.cluster.redis.config.RedisFlowRuleManager;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.function.Function;

import java.util.Collection;
import java.util.List;
import java.util.Map;

public class RedisClusterFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;

public RedisClusterFlowSlot() {
this(new FlowRuleChecker());
}

RedisClusterFlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = RedisFlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
import com.alibaba.csp.sentinel.slotchain.SlotChainExtender;

public class RedisClusterFlowSlotExtender implements SlotChainExtender {

@Override
public ProcessorSlotChain extend(ProcessorSlotChain slotChain) {
slotChain.addLast(new RedisClusterFlowSlot());
return slotChain;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.alibaba.csp.sentinel.cluster.redis;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
import com.alibaba.csp.sentinel.cluster.client.ClusterTokenClient;
import com.alibaba.csp.sentinel.cluster.redis.config.RedisClientFactoryManager;
import com.alibaba.csp.sentinel.cluster.redis.config.RedisFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.redis.lua.LuaUtil;
import com.alibaba.csp.sentinel.cluster.redis.request.RequestData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import java.util.Collection;

public class RedisClusterTokenService implements ClusterTokenClient {

@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}

RedisClientFactory redisClientFactory = RedisClientFactoryManager.getFactory();
if(redisClientFactory == null) {
RecordLog.warn(
"[RedisClusterTokenService] cannot get RedisClientFactory, please init redis config");
return clientFail();
}

RedisClient redisClient = redisClientFactory.getClient();

FlowRule rule = getFlowRule(flowId);
if (rule == null) {
RecordLog.warn(
"[RedisClusterTokenService] Ignoring invalid flow rule :" + flowId);
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}

int rs = redisClient.requestToken(LuaUtil.FLOW_CHECKER_LUA, createRequestData(flowId, acquireCount));
redisClient.close();
// todo remaining value
return new TokenResult(rs)
.setRemaining(0)
.setWaitInMs(0);
}

private RequestData createRequestData(Long flowId, int acquireCount) {
RequestData requestData = new RequestData();
requestData.setFlowId(flowId);
requestData.setAcquireCount(acquireCount);
return requestData;
}

@Override
public TokenResult requestParamToken(Long aLong, int i, Collection<Object> collection) {
throw new UnsupportedOperationException("RedisClusterTokenService cannot supported request param token now");
}


private FlowRule getFlowRule(Long flowId) {
return RedisFlowRuleManager.getFlowRule(flowId);
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}

private TokenResult clientFail() {
return new TokenResult(TokenResultStatus.FAIL);
}

@Override
public TokenServerDescriptor currentServer() {
return null;
}

@Override
public void start() throws Exception {

}

@Override
public void stop() throws Exception {

}

@Override
public int getState() {
return 0;
}
}
Loading