Skip to content

Commit

Permalink
Add DataSource integration for Redis (alibaba#102)
Browse files Browse the repository at this point in the history
- This implementation uses Lettuce as the internal client, and leverages Redis pub-sub feature to implement push mode data source. (by @tigerMoon)
  • Loading branch information
tigerMoon authored and sczyh30 committed Sep 14, 2018
1 parent edca25f commit 46cb729
Show file tree
Hide file tree
Showing 10 changed files with 1,433 additions and 0 deletions.
1 change: 1 addition & 0 deletions sentinel-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<module>sentinel-datasource-nacos</module>
<module>sentinel-datasource-zookeeper</module>
<module>sentinel-datasource-apollo</module>
<module>sentinel-datasource-redis</module>
<module>sentinel-annotation-aspectj</module>
</modules>

Expand Down
80 changes: 80 additions & 0 deletions sentinel-extension/sentinel-datasource-redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Sentinel DataSource Redis

Sentinel DataSource Redis provides integration with Redis. make Redis
as dynamic rule data source of Sentinel. The data source uses push model (listener) with redis pub/sub feature.

**NOTE**:
we not support redis cluster as a pub/sub dataSource now.

To use Sentinel DataSource Redis, you should add the following dependency:

```xml
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-redis</artifactId>
<version>x.y.z</version>
</dependency>

```

Then you can create an `RedisDataSource` and register to rule managers.
For instance:

```java
ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<List<FlowRule>>(redisConnectionConfig, ruleKey, channel, flowConfigParser);
FlowRuleManager.register2Property(redisDataSource.getProperty());
```

_**redisConnectionConfig**_ : use `RedisConnectionConfig` class to build your connection config.

_**ruleKey**_ : when the json rule data publish. it also should save to the key for init read.

_**channel**_ : the channel to listen.

you can also create multi data source listen for different rule type.

you can see test cases for usage.

## Before start

RedisDataSource init config by read from redis key `ruleKey`, value store the latest rule config data.
so you should first config your redis ruleData in back end.

since update redis rule data. it should simultaneously send data to `channel`.

you may implement like this (using Redis transaction):

```
MULTI
PUBLISH channel value
SET ruleKey value
EXEC
```


## How to build RedisConnectionConfig


### Build with redis standLone mode

```java
RedisConnectionConfig config = RedisConnectionConfig.builder()
.withHost("localhost")
.withPort(6379)
.withPassword("pwd")
.withDataBase(2)
.build();

```


### Build with redis sentinel mode

```java
RedisConnectionConfig config = RedisConnectionConfig.builder()
.withRedisSentinel("redisSentinelServer1",5000)
.withRedisSentinel("redisSentinelServer2",5001)
.withRedisSentinelMasterId("redisSentinelMasterId").build();
```
47 changes: 47 additions & 0 deletions sentinel-extension/sentinel-datasource-redis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-extension</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-datasource-redis</artifactId>
<packaging>jar</packaging>

<properties>
<lettuce.version>5.0.1.RELEASE</lettuce.version>
<redis.mock.version>0.1.6</redis.mock.version>
</properties>

<dependencies>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ai.grakn</groupId>
<artifactId>redis-mock</artifactId>
<version>${redis.mock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.csp.sentinel.datasource.redis;

import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
import com.alibaba.csp.sentinel.datasource.redis.util.AssertUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;

import java.util.concurrent.TimeUnit;

/**
* A read-only {@code DataSource} with Redis backend.
* <p>
* When data source init,reads form redis with string k-v,value is string format rule config data.
* This data source subscribe from specific channel and then data published in redis with this channel,data source
* will be notified and then update rule config in time.
* </p>
*
* @author tiger
*/

public class RedisDataSource<T> extends AbstractDataSource<String, T> {

private RedisClient redisClient = null;

private String ruleKey;

/**
* Constructor of {@code RedisDataSource}
*
* @param connectionConfig redis connection config.
* @param ruleKey data save in redis.
* @param channel subscribe from channel.
* @param parser convert <code>ruleKey<code>`s value to {@literal alibaba/Sentinel} rule type
*/
public RedisDataSource(RedisConnectionConfig connectionConfig, String ruleKey, String channel, Converter<String, T> parser) {
super(parser);
AssertUtil.notNull(connectionConfig, "redis connection config can not be null");
AssertUtil.notEmpty(ruleKey, "redis subscribe ruleKey can not be empty");
AssertUtil.notEmpty(channel, "redis subscribe channel can not be empty");
this.redisClient = getRedisClient(connectionConfig);
this.ruleKey = ruleKey;
loadInitialConfig();
subscribeFromChannel(channel);
}

/**
* build redis client form {@code RedisConnectionConfig} with io.lettuce.
*
* @return a new {@link RedisClient}
*/
private RedisClient getRedisClient(RedisConnectionConfig connectionConfig) {
if (connectionConfig.getRedisSentinels().size() == 0) {
RecordLog.info("start standLone mode to connect to redis");
return getRedisStandLoneClient(connectionConfig);
} else {
RecordLog.info("start redis sentinel mode to connect to redis");
return getRedisSentinelClient(connectionConfig);
}
}

private RedisClient getRedisStandLoneClient(RedisConnectionConfig connectionConfig) {
char[] password = connectionConfig.getPassword();
String clientName = connectionConfig.getClientName();
RedisURI.Builder redisUriBuilder = RedisURI.builder();
redisUriBuilder.withHost(connectionConfig.getHost())
.withPort(connectionConfig.getPort())
.withDatabase(connectionConfig.getDatabase())
.withTimeout(connectionConfig.getTimeout(), TimeUnit.MILLISECONDS);
if (password != null) {
redisUriBuilder.withPassword(connectionConfig.getPassword());
}
if (StringUtil.isNotEmpty(connectionConfig.getClientName())) {
redisUriBuilder.withClientName(clientName);
}
return RedisClient.create(redisUriBuilder.build());
}

private RedisClient getRedisSentinelClient(RedisConnectionConfig connectionConfig) {
char[] password = connectionConfig.getPassword();
String clientName = connectionConfig.getClientName();
RedisURI.Builder sentinelRedisUriBuilder = RedisURI.builder();
for (RedisConnectionConfig config : connectionConfig.getRedisSentinels()) {
sentinelRedisUriBuilder.withSentinel(config.getHost(), config.getPort());
}
if (password != null) {
sentinelRedisUriBuilder.withPassword(connectionConfig.getPassword());
}
if (StringUtil.isNotEmpty(connectionConfig.getClientName())) {
sentinelRedisUriBuilder.withClientName(clientName);
}
sentinelRedisUriBuilder.withSentinelMasterId(connectionConfig.getRedisSentinelMasterId())
.withTimeout(connectionConfig.getTimeout(), TimeUnit.MILLISECONDS);
return RedisClient.create(sentinelRedisUriBuilder.build());
}

private void subscribeFromChannel(String channel) {
StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
RedisPubSubAdapter<String, String> adapterListener = new DelegatingRedisPubSubListener();
pubSubConnection.addListener(adapterListener);
RedisPubSubCommands<String, String> sync = pubSubConnection.sync();
sync.subscribe(channel);
}

private void loadInitialConfig() {
try {
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[RedisDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[RedisDataSource] Error when loading initial config", ex);
}
}

@Override
public String readSource() throws Exception {
if (this.redisClient == null) {
throw new IllegalStateException("redis client has not been initialized or error occurred");
}
RedisCommands<String, String> stringRedisCommands = redisClient.connect().sync();
return stringRedisCommands.get(ruleKey);
}

@Override
public void close() throws Exception {
redisClient.shutdown();
}

private class DelegatingRedisPubSubListener extends RedisPubSubAdapter<String, String> {

DelegatingRedisPubSubListener() {
}

@Override
public void message(String channel, String message) {
RecordLog.info(String.format("[RedisDataSource] New property value received for channel %s: %s", channel, message));
getProperty().updateValue(parser.convert(message));
}
}

}
Loading

0 comments on commit 46cb729

Please sign in to comment.