Skip to content

Commit

Permalink
[improve][core] Improved JSON structure for job content and connectio…
Browse files Browse the repository at this point in the history
…n parameters (#1125)

- Changed job.content from a list of maps to a map for better readability and access.
- Changed job.content.reader.parameter.connection from a list of maps to a map to simplify the structure.
  • Loading branch information
wgzhao authored Sep 17, 2024
1 parent e44d1cf commit 696f0e4
Show file tree
Hide file tree
Showing 41 changed files with 590 additions and 833 deletions.
4 changes: 2 additions & 2 deletions core/src/main/bin/addax.sh
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ if [ "$1" = "gen" ]; then
"channel": 1
}
},
"content": [{
"content": {
"reader":
$reader_content ,
"writer":
$writer_content
}]
}
}
}
EOF
Expand Down
156 changes: 75 additions & 81 deletions core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.core.util.container.CoreConstant;

import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand All @@ -31,38 +31,54 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public final class ConfigParser
{
import static com.wgzhao.addax.common.base.Key.CONNECTION;
import static com.wgzhao.addax.core.util.container.CoreConstant.CONF_PATH;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_SERVER_TIMEOUT_SEC;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_NAME;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_PARAMETER;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_PARAMETER_CONNECTION;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER_NAME;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER_PARAMETER;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_POST_HANDLER_PLUGIN_NAME;
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_PRE_HANDLER_PLUGIN_NAME;
import static com.wgzhao.addax.core.util.container.CoreConstant.PLUGIN_READER_HOME;
import static com.wgzhao.addax.core.util.container.CoreConstant.PLUGIN_WRITER_HOME;


public final class ConfigParser {
private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class);

private ConfigParser() {}
private ConfigParser() {
}

/*
* 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回
*/
public static Configuration parse(String jobPath)
{
public static Configuration parse(String jobPath) {
Configuration configuration = ConfigParser.parseJobConfig(jobPath);

// Upgrade the new job format to the old one
configuration = upgradeJobConfig(configuration);
upgradeJobConfig(configuration);
//validate job json
validateJob(configuration);

configuration.merge(ConfigParser.parseCoreConfig(CoreConstant.CONF_PATH), false);
String readerPluginName = configuration.getString(CoreConstant.JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(CoreConstant.JOB_CONTENT_WRITER_NAME);
configuration.merge(ConfigParser.parseCoreConfig(), false);
String readerPluginName = configuration.getString(JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(JOB_CONTENT_WRITER_NAME);

String preHandlerName = configuration.getString(CoreConstant.JOB_PRE_HANDLER_PLUGIN_NAME);
String preHandlerName = configuration.getString(JOB_PRE_HANDLER_PLUGIN_NAME);

String postHandlerName = configuration.getString(CoreConstant.JOB_POST_HANDLER_PLUGIN_NAME);
String postHandlerName = configuration.getString(JOB_POST_HANDLER_PLUGIN_NAME);

Set<String> pluginList = new HashSet<>();
pluginList.add(readerPluginName);
Expand All @@ -76,14 +92,12 @@ public static Configuration parse(String jobPath)
}
try {
configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
}
catch (Exception e) {
} catch (Exception e) {
//吞掉异常,保持log干净。这里message足够。
LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
LOG.warn("Failed to load plugin(s) [{},{}]: {}, try after 1 second.", readerPluginName, writerPluginName, e.getMessage());
try {
Thread.sleep(1000);
}
catch (InterruptedException e1) {
} catch (InterruptedException e1) {
//
}
configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
Expand All @@ -95,44 +109,50 @@ public static Configuration parse(String jobPath)
/**
* Upgrade the new job format to the old one
* 1. the content of job.json is a map instead of list of map
*
* @param configuration {@link Configuration}
* @return {@link Configuration}
*/
private static Configuration upgradeJobConfig(Configuration configuration)
{
String content = configuration.getString("job.content");
if (content.startsWith("[")) {
private static void upgradeJobConfig(Configuration configuration) {
if (configuration.getString(JOB_CONTENT).startsWith("[")) {
// get the first element
List<Map> contentList = configuration.getList(CoreConstant.JOB_CONTENT, Map.class);
if (contentList!= null && contentList.size() > 0) {
List<Map> contentList = configuration.getList(JOB_CONTENT, Map.class);
if (contentList != null && !contentList.isEmpty()) {
configuration.set("job.content", contentList.get(0));
return configuration;
}
}
return configuration;
Configuration reader = configuration.getConfiguration(JOB_CONTENT_READER_PARAMETER);
if (reader != null) {
if (reader.getString(CONNECTION).startsWith("[")) {
List<Map> connectionList = configuration.getList(JOB_CONTENT_READER_PARAMETER_CONNECTION, Map.class);
if (connectionList != null && !connectionList.isEmpty()) {
reader.set(CONNECTION, connectionList.get(0));
}
}
if (reader.getString("connection.jdbcUrl", "").startsWith("[")) {
reader.set("connection.jdbcUrl", reader.getList("connection.jdbcUrl", String.class).get(0));
}
configuration.set(JOB_CONTENT_READER_PARAMETER, reader);
}
}

private static Configuration parseCoreConfig(String path)
{
return Configuration.from(new File(path));
private static Configuration parseCoreConfig() {
return Configuration.from(new File(CONF_PATH));
}

public static Configuration parseJobConfig(String path)
{
public static Configuration parseJobConfig(String path) {
String jobContent = getJobContent(path);
return Configuration.from(jobContent);
}

private static String getJobContent(String jobResource)
{
private static String getJobContent(String jobResource) {
String jobContent;

boolean isJobResourceFromHttp = jobResource.trim().toLowerCase().startsWith("http");

if (isJobResourceFromHttp) {
//设置httpclient的 HTTP_TIMEOUT_IN_MILLION_SECONDS
Configuration coreConfig = ConfigParser.parseCoreConfig(CoreConstant.CONF_PATH);
int httpTimeOutInMillionSeconds = coreConfig.getInt(CoreConstant.CORE_SERVER_TIMEOUT_SEC, 5) * 1000;
Configuration coreConfig = ConfigParser.parseCoreConfig();
int httpTimeOutInMillionSeconds = coreConfig.getInt(CORE_SERVER_TIMEOUT_SEC, 5) * 1000;
HttpClientUtil.setHttpTimeoutInMillionSeconds(httpTimeOutInMillionSeconds);

HttpClientUtil httpClientUtil = new HttpClientUtil();
Expand All @@ -142,17 +162,14 @@ private static String getJobContent(String jobResource)
httpGet.setURI(url.toURI());

jobContent = httpClientUtil.executeAndGetWithFailedRetry(httpGet, 1, 1000L);
}
catch (Exception e) {
} catch (Exception e) {
throw AddaxException.asAddaxException(FrameworkErrorCode.CONFIG_ERROR, "Failed to obtain job configuration:" + jobResource, e);
}
}
else {
} else {
// jobResource 是本地文件绝对路径
try {
jobContent = FileUtils.readFileToString(new File(jobResource), StandardCharsets.UTF_8);
}
catch (IOException e) {
} catch (IOException e) {
throw AddaxException.asAddaxException(FrameworkErrorCode.CONFIG_ERROR, "Failed to obtain job configuration:" + jobResource, e);
}
}
Expand All @@ -163,20 +180,19 @@ private static String getJobContent(String jobResource)
return jobContent;
}

public static Configuration parsePluginConfig(List<String> wantPluginNames)
{
public static Configuration parsePluginConfig(List<String> wantPluginNames) {
Configuration configuration = Configuration.newDefault();

int complete = 0;
String pluginType;
String pluginPath;
for(String plugin: wantPluginNames) {
for (String plugin : wantPluginNames) {
if (plugin.endsWith("reader")) {
pluginType = "reader";
pluginPath = CoreConstant.PLUGIN_READER_HOME + File.separator + plugin;
pluginPath = PLUGIN_READER_HOME + File.separator + plugin;
} else {
pluginType = "writer";
pluginPath = CoreConstant.PLUGIN_WRITER_HOME + File.separator + plugin;
pluginPath = PLUGIN_WRITER_HOME + File.separator + plugin;
}

String filePath = pluginPath + File.separator + "plugin.json";
Expand All @@ -190,50 +206,28 @@ public static Configuration parsePluginConfig(List<String> wantPluginNames)
complete += 1;
}

if ( !wantPluginNames.isEmpty() && wantPluginNames.size() != complete) {
if (!wantPluginNames.isEmpty() && wantPluginNames.size() != complete) {
throw AddaxException.asAddaxException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "Plugin loading failed. The specified plugin was not loaded: " + wantPluginNames);
}

return configuration;
}

private static void validateJob(Configuration conf)
{
final Map content = conf.getMap(CoreConstant.JOB_CONTENT);

if (content== null || content.isEmpty()) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT + "' is required");
}

if (null == conf.get(CoreConstant.JOB_CONTENT_READER) ) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_READER + "' is required");
}

if (null == conf.get(CoreConstant.JOB_CONTENT_WRITER)) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_WRITER + "' is required");
}
private static void validateJob(Configuration conf) {
final Map content = conf.getMap(JOB_CONTENT);
String[] validPaths = new String[]{JOB_CONTENT_READER, JOB_CONTENT_WRITER, JOB_CONTENT_READER_NAME,
JOB_CONTENT_READER_PARAMETER, JOB_CONTENT_WRITER_NAME, JOB_CONTENT_WRITER_PARAMETER};

if ( null == conf.get(CoreConstant.JOB_CONTENT_READER_NAME)) {
if (content == null || content.isEmpty()) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_READER_NAME + "' is required");
"The configuration item '" + JOB_CONTENT + "' is required");
}

if (null == conf.get(CoreConstant.JOB_CONTENT_READER_PARAMETER)) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_READER_PARAMETER + "' is required");
}

if ( null == conf.get(CoreConstant.JOB_CONTENT_WRITER_NAME)) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_READER_NAME + "' is required");
}

if (null == conf.get(CoreConstant.JOB_CONTENT_WRITER_PARAMETER)) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + CoreConstant.JOB_CONTENT_READER_PARAMETER + "' is required");
for (String path : validPaths) {
if (content.get(path) == null) {
throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR,
"The configuration item '" + path + "' is required");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class CoreConstant

public static final String JOB_CONTENT_READER_PARAMETER = "job.content.reader.parameter";

public static final String JOB_CONTENT_READER_PARAMETER_CONNECTION = "job.content.reader.parameter.connection";

public static final String JOB_CONTENT_WRITER_NAME = "job.content.writer.name";

public static final String JOB_CONTENT_WRITER = "job.content.writer";
Expand Down
46 changes: 20 additions & 26 deletions core/src/main/job/clickhouse2stream.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,29 @@
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "default",
"password": "",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:clickhouse://127.0.0.1:8123/tpch"
],
"querySql": [
"select * from orders limit 10"
]
}
"content": {
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "default",
"password": "",
"column": [
"*"
],
"connection": {
"jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/tpch",
"querySql": [
"select * from orders limit 10"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
]
}
}
}
Loading

0 comments on commit 696f0e4

Please sign in to comment.