Skip to content

Commit

Permalink
Some refactor of data source
Browse files Browse the repository at this point in the history
- Add `close` method in WritableDataSource (to extend AutoCloseable in JDK 1.7 later)
- Separate the writable file data source from original class
- Add a sample to show how to register data sources via Sentinel init mechanism
- Separate a writable data source registry from original handler to make it clear

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Sep 4, 2018
1 parent f3eb285 commit 007cd9d
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 54 deletions.
4 changes: 4 additions & 0 deletions sentinel-demo/sentinel-demo-dynamic-file-rule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ private void listenRules() throws Exception {

// Data source for FlowRule
FileRefreshableDataSource<List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource<>(
flowRulePath, flowRuleListParser, this::encodeJson);
flowRulePath, flowRuleListParser);
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());

// Data source for DegradeRule
FileRefreshableDataSource<List<DegradeRule>> degradeRuleDataSource
= new FileRefreshableDataSource<>(
degradeRulePath, degradeRuleListParser, this::encodeJson);
degradeRulePath, degradeRuleListParser);
DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty());

// Data source for SystemRule
FileRefreshableDataSource<List<SystemRule>> systemRuleDataSource
= new FileRefreshableDataSource<>(
systemRulePath, systemRuleListParser, this::encodeJson);
systemRulePath, systemRuleListParser);
SystemRuleManager.register2Property(systemRuleDataSource.getProperty());
}

Expand All @@ -103,8 +103,4 @@ private void listenRules() throws Exception {
new TypeReference<List<DegradeRule>>() {});
private Converter<String, List<SystemRule>> systemRuleListParser = source -> JSON.parseObject(source,
new TypeReference<List<SystemRule>>() {});

private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.file.rule;

import java.util.List;

import com.alibaba.csp.sentinel.datasource.FileRefreshableDataSource;
import com.alibaba.csp.sentinel.datasource.FileWritableDataSource;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

/**
* <p>
* A sample showing how to register readable and writable data source via Sentinel init SPI mechanism.
* </p>
* <p>
* To activate this, you can add the class name to `com.alibaba.csp.sentinel.init.InitFunc` file
* in `META-INF/services/` directory of the resource directory. Then the data source will be automatically
* registered during the initialization of Sentinel.
* </p>
*
* @author Eric Zhao
*/
public class FileDataSourceInit implements InitFunc {

@Override
public void init() throws Exception {
// A fake path.
String flowRuleDir = System.getProperty("user.home") + "/sentinel/rules";
String flowRuleFile = "flowRule.json";
String flowRulePath = flowRuleDir + "/" + flowRuleFile;

ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>(
flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})
);
// Register to flow rule manager.
FlowRuleManager.register2Property(ds.getProperty());

WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson);
// Register to writable data source registry so that rules can be updated to file
// when there are rules pushed from the Sentinel Dashboard.
WritableDataSourceRegistry.registerFlowDataSource(wds);
}

private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* <p>
* A {@link WritableDataSource} based on file. This class will automatically fetches the backend file every refresh period.
* A {@link ReadableDataSource} based on file. This class will automatically fetches the backend file every refresh period.
* </p>
* <p>
* Limitations: Default read buffer size is 1 MB. If file size is greater than buffer size, exceeding bytes will
Expand All @@ -35,7 +35,7 @@
* @author Carpenter Lee
* @author Eric Zhao
*/
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> implements WritableDataSource<T> {
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> {

private static final int MAX_SIZE = 1024 * 1024 * 4;
private static final long DEFAULT_REFRESH_MS = 3000;
Expand All @@ -46,35 +46,33 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String,
private final Charset charset;
private final File file;

private final Converter<T, String> configEncoder;

/**
* Create a file based {@link ReadableDataSource} whose read buffer size is 1MB, charset is UTF8,
* and read interval is 3 seconds.
*
* @param file the file to read
* @param configParser the config decoder (parser)
*/
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder) throws FileNotFoundException {
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
public FileRefreshableDataSource(File file, Converter<String, T> configParser) throws FileNotFoundException {
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
}

public FileRefreshableDataSource(String fileName, Converter<String, T> configParser, Converter<T, String> configEncoder)
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser)
throws FileNotFoundException {
this(new File(fileName), configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, int bufSize)
public FileRefreshableDataSource(File file, Converter<String, T> configParser, int bufSize)
throws FileNotFoundException {
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET);
this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET);
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, Charset charset)
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Charset charset)
throws FileNotFoundException {
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset);
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset);
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, long recommendRefreshMs,
public FileRefreshableDataSource(File file, Converter<String, T> configParser, long recommendRefreshMs,
int bufSize, Charset charset) throws FileNotFoundException {
super(configParser, recommendRefreshMs);
if (bufSize <= 0 || bufSize > MAX_SIZE) {
Expand All @@ -86,13 +84,9 @@ public FileRefreshableDataSource(File file, Converter<String, T> configParser, C
if (charset == null) {
throw new IllegalArgumentException("charset can't be null");
}
if (configEncoder == null) {
throw new IllegalArgumentException("Config encoder cannot be null");
}
this.buf = new byte[bufSize];
this.file = file;
this.charset = charset;
this.configEncoder = configEncoder;
firstLoad();
}

Expand Down Expand Up @@ -132,9 +126,4 @@ public void close() throws Exception {
super.close();
buf = null;
}

@Override
public void write(T value) throws Exception {
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource;

import java.io.File;

/**
* A {@link WritableDataSource} based on file.
*
* @param <T> data type
* @author Eric Zhao
* @since 0.2.0
*/
public class FileWritableDataSource<T> implements WritableDataSource<T> {

private final Converter<T, String> configEncoder;
private File file;

public FileWritableDataSource(String filePath, Converter<T, String> configEncoder) {
this(new File(filePath), configEncoder);
}

public FileWritableDataSource(File file, Converter<T, String> configEncoder) {
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("Bad file");
}
if (configEncoder == null) {
throw new IllegalArgumentException("Config encoder cannot be null");
}
this.configEncoder = configEncoder;
this.file = file;
}

@Override
public void write(T value) throws Exception {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void close() throws Exception {
// Nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public interface WritableDataSource<T> {
* @throws Exception IO or other error occurs
*/
void write(T value) throws Exception;

/**
* Close the data source.
*
* @throws Exception IO or other error occurs
*/
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,15 @@
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.fastjson.JSONArray;

import static com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry.*;

/**
* @author jialiang.linjl
* @author Eric Zhao
*/
@CommandMapping(name = "setRules")
public class ModifyRulesCommandHandler implements CommandHandler<String> {

private static WritableDataSource<List<FlowRule>> flowDataSource = null;
private static WritableDataSource<List<AuthorityRule>> authorityDataSource = null;
private static WritableDataSource<List<DegradeRule>> degradeDataSource = null;
private static WritableDataSource<List<SystemRule>> systemSource = null;

public static synchronized void registerFlowDataSource(WritableDataSource<List<FlowRule>> datasource) {
flowDataSource = datasource;
}

public static synchronized void registerAuthorityDataSource(WritableDataSource<List<AuthorityRule>> dataSource) {
authorityDataSource = dataSource;
}

public static synchronized void registerDegradeDataSource(WritableDataSource<List<DegradeRule>> dataSource) {
degradeDataSource = dataSource;
}

public static synchronized void registerSystemDataSource(WritableDataSource<List<SystemRule>> dataSource) {
systemSource = dataSource;
}

@Override
public CommandResponse<String> handle(CommandRequest request) {
String type = request.getParam("type");
Expand All @@ -84,28 +65,28 @@ public CommandResponse<String> handle(CommandRequest request) {
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(flowDataSource, flowRules)) {
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(authorityDataSource, rules)) {
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(degradeDataSource, rules)) {
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(systemSource, rules)) {
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.alibaba.csp.sentinel.transport.util;

import java.util.List;

import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.system.SystemRule;

/**
* Writable data source registry for modifying rules via HTTP API.
*
* @author Eric Zhao
*/
public final class WritableDataSourceRegistry {

private static WritableDataSource<List<FlowRule>> flowDataSource = null;
private static WritableDataSource<List<AuthorityRule>> authorityDataSource = null;
private static WritableDataSource<List<DegradeRule>> degradeDataSource = null;
private static WritableDataSource<List<SystemRule>> systemSource = null;

public static synchronized void registerFlowDataSource(WritableDataSource<List<FlowRule>> datasource) {
flowDataSource = datasource;
}

public static synchronized void registerAuthorityDataSource(WritableDataSource<List<AuthorityRule>> dataSource) {
authorityDataSource = dataSource;
}

public static synchronized void registerDegradeDataSource(WritableDataSource<List<DegradeRule>> dataSource) {
degradeDataSource = dataSource;
}

public static synchronized void registerSystemDataSource(WritableDataSource<List<SystemRule>> dataSource) {
systemSource = dataSource;
}

public static WritableDataSource<List<FlowRule>> getFlowDataSource() {
return flowDataSource;
}

public static WritableDataSource<List<AuthorityRule>> getAuthorityDataSource() {
return authorityDataSource;
}

public static WritableDataSource<List<DegradeRule>> getDegradeDataSource() {
return degradeDataSource;
}

public static WritableDataSource<List<SystemRule>> getSystemSource() {
return systemSource;
}

private WritableDataSourceRegistry() {}
}

0 comments on commit 007cd9d

Please sign in to comment.