diff --git a/core/src/main/bin/addax.sh b/core/src/main/bin/addax.sh index 12c0f2c6c..b033e2edd 100755 --- a/core/src/main/bin/addax.sh +++ b/core/src/main/bin/addax.sh @@ -206,12 +206,12 @@ if [ "$1" = "gen" ]; then "channel": 1 } }, - "content": [{ + "content": { "reader": $reader_content , "writer": $writer_content - }] + } } } EOF diff --git a/core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java b/core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java index 8d08bc211..0c6c17e62 100644 --- a/core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java +++ b/core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java @@ -21,7 +21,7 @@ import com.wgzhao.addax.common.exception.AddaxException; import com.wgzhao.addax.common.util.Configuration; -import com.wgzhao.addax.core.util.container.CoreConstant; + import java.io.File; import java.io.IOException; import java.net.URL; @@ -31,38 +31,54 @@ import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.HttpGet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public final class ConfigParser -{ +import static com.wgzhao.addax.common.base.Key.CONNECTION; +import static com.wgzhao.addax.core.util.container.CoreConstant.CONF_PATH; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_SERVER_TIMEOUT_SEC; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_NAME; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_PARAMETER; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_PARAMETER_CONNECTION; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER_NAME; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_WRITER_PARAMETER; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_POST_HANDLER_PLUGIN_NAME; +import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_PRE_HANDLER_PLUGIN_NAME; +import static com.wgzhao.addax.core.util.container.CoreConstant.PLUGIN_READER_HOME; +import static com.wgzhao.addax.core.util.container.CoreConstant.PLUGIN_WRITER_HOME; + + +public final class ConfigParser { private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class); - private ConfigParser() {} + private ConfigParser() { + } /* * 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回 */ - public static Configuration parse(String jobPath) - { + public static Configuration parse(String jobPath) { Configuration configuration = ConfigParser.parseJobConfig(jobPath); // Upgrade the new job format to the old one - configuration = upgradeJobConfig(configuration); + upgradeJobConfig(configuration); //validate job json validateJob(configuration); - configuration.merge(ConfigParser.parseCoreConfig(CoreConstant.CONF_PATH), false); - String readerPluginName = configuration.getString(CoreConstant.JOB_CONTENT_READER_NAME); - String writerPluginName = configuration.getString(CoreConstant.JOB_CONTENT_WRITER_NAME); + configuration.merge(ConfigParser.parseCoreConfig(), false); + String readerPluginName = configuration.getString(JOB_CONTENT_READER_NAME); + String writerPluginName = configuration.getString(JOB_CONTENT_WRITER_NAME); - String preHandlerName = configuration.getString(CoreConstant.JOB_PRE_HANDLER_PLUGIN_NAME); + String preHandlerName = configuration.getString(JOB_PRE_HANDLER_PLUGIN_NAME); - String postHandlerName = configuration.getString(CoreConstant.JOB_POST_HANDLER_PLUGIN_NAME); + String postHandlerName = configuration.getString(JOB_POST_HANDLER_PLUGIN_NAME); Set pluginList = new HashSet<>(); pluginList.add(readerPluginName); @@ -76,14 +92,12 @@ public static Configuration parse(String jobPath) } try { configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false); - } - catch (Exception e) { + } catch (Exception e) { //吞掉异常,保持log干净。这里message足够。 - LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage())); + LOG.warn("Failed to load plugin(s) [{},{}]: {}, try after 1 second.", readerPluginName, writerPluginName, e.getMessage()); try { Thread.sleep(1000); - } - catch (InterruptedException e1) { + } catch (InterruptedException e1) { // } configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false); @@ -95,44 +109,50 @@ public static Configuration parse(String jobPath) /** * Upgrade the new job format to the old one * 1. the content of job.json is a map instead of list of map + * * @param configuration {@link Configuration} - * @return {@link Configuration} */ - private static Configuration upgradeJobConfig(Configuration configuration) - { - String content = configuration.getString("job.content"); - if (content.startsWith("[")) { + private static void upgradeJobConfig(Configuration configuration) { + if (configuration.getString(JOB_CONTENT).startsWith("[")) { // get the first element - List contentList = configuration.getList(CoreConstant.JOB_CONTENT, Map.class); - if (contentList!= null && contentList.size() > 0) { + List contentList = configuration.getList(JOB_CONTENT, Map.class); + if (contentList != null && !contentList.isEmpty()) { configuration.set("job.content", contentList.get(0)); - return configuration; } } - return configuration; + Configuration reader = configuration.getConfiguration(JOB_CONTENT_READER_PARAMETER); + if (reader != null) { + if (reader.getString(CONNECTION).startsWith("[")) { + List connectionList = configuration.getList(JOB_CONTENT_READER_PARAMETER_CONNECTION, Map.class); + if (connectionList != null && !connectionList.isEmpty()) { + reader.set(CONNECTION, connectionList.get(0)); + } + } + if (reader.getString("connection.jdbcUrl", "").startsWith("[")) { + reader.set("connection.jdbcUrl", reader.getList("connection.jdbcUrl", String.class).get(0)); + } + configuration.set(JOB_CONTENT_READER_PARAMETER, reader); + } } - private static Configuration parseCoreConfig(String path) - { - return Configuration.from(new File(path)); + private static Configuration parseCoreConfig() { + return Configuration.from(new File(CONF_PATH)); } - public static Configuration parseJobConfig(String path) - { + public static Configuration parseJobConfig(String path) { String jobContent = getJobContent(path); return Configuration.from(jobContent); } - private static String getJobContent(String jobResource) - { + private static String getJobContent(String jobResource) { String jobContent; boolean isJobResourceFromHttp = jobResource.trim().toLowerCase().startsWith("http"); if (isJobResourceFromHttp) { //设置httpclient的 HTTP_TIMEOUT_IN_MILLION_SECONDS - Configuration coreConfig = ConfigParser.parseCoreConfig(CoreConstant.CONF_PATH); - int httpTimeOutInMillionSeconds = coreConfig.getInt(CoreConstant.CORE_SERVER_TIMEOUT_SEC, 5) * 1000; + Configuration coreConfig = ConfigParser.parseCoreConfig(); + int httpTimeOutInMillionSeconds = coreConfig.getInt(CORE_SERVER_TIMEOUT_SEC, 5) * 1000; HttpClientUtil.setHttpTimeoutInMillionSeconds(httpTimeOutInMillionSeconds); HttpClientUtil httpClientUtil = new HttpClientUtil(); @@ -142,17 +162,14 @@ private static String getJobContent(String jobResource) httpGet.setURI(url.toURI()); jobContent = httpClientUtil.executeAndGetWithFailedRetry(httpGet, 1, 1000L); - } - catch (Exception e) { + } catch (Exception e) { throw AddaxException.asAddaxException(FrameworkErrorCode.CONFIG_ERROR, "Failed to obtain job configuration:" + jobResource, e); } - } - else { + } else { // jobResource 是本地文件绝对路径 try { jobContent = FileUtils.readFileToString(new File(jobResource), StandardCharsets.UTF_8); - } - catch (IOException e) { + } catch (IOException e) { throw AddaxException.asAddaxException(FrameworkErrorCode.CONFIG_ERROR, "Failed to obtain job configuration:" + jobResource, e); } } @@ -163,20 +180,19 @@ private static String getJobContent(String jobResource) return jobContent; } - public static Configuration parsePluginConfig(List wantPluginNames) - { + public static Configuration parsePluginConfig(List wantPluginNames) { Configuration configuration = Configuration.newDefault(); int complete = 0; String pluginType; String pluginPath; - for(String plugin: wantPluginNames) { + for (String plugin : wantPluginNames) { if (plugin.endsWith("reader")) { pluginType = "reader"; - pluginPath = CoreConstant.PLUGIN_READER_HOME + File.separator + plugin; + pluginPath = PLUGIN_READER_HOME + File.separator + plugin; } else { pluginType = "writer"; - pluginPath = CoreConstant.PLUGIN_WRITER_HOME + File.separator + plugin; + pluginPath = PLUGIN_WRITER_HOME + File.separator + plugin; } String filePath = pluginPath + File.separator + "plugin.json"; @@ -190,50 +206,28 @@ public static Configuration parsePluginConfig(List wantPluginNames) complete += 1; } - if ( !wantPluginNames.isEmpty() && wantPluginNames.size() != complete) { + if (!wantPluginNames.isEmpty() && wantPluginNames.size() != complete) { throw AddaxException.asAddaxException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "Plugin loading failed. The specified plugin was not loaded: " + wantPluginNames); } return configuration; } - private static void validateJob(Configuration conf) - { - final Map content = conf.getMap(CoreConstant.JOB_CONTENT); - - if (content== null || content.isEmpty()) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT + "' is required"); - } - - if (null == conf.get(CoreConstant.JOB_CONTENT_READER) ) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_READER + "' is required"); - } - - if (null == conf.get(CoreConstant.JOB_CONTENT_WRITER)) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_WRITER + "' is required"); - } + private static void validateJob(Configuration conf) { + final Map content = conf.getMap(JOB_CONTENT); + String[] validPaths = new String[]{JOB_CONTENT_READER, JOB_CONTENT_WRITER, JOB_CONTENT_READER_NAME, + JOB_CONTENT_READER_PARAMETER, JOB_CONTENT_WRITER_NAME, JOB_CONTENT_WRITER_PARAMETER}; - if ( null == conf.get(CoreConstant.JOB_CONTENT_READER_NAME)) { + if (content == null || content.isEmpty()) { throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_READER_NAME + "' is required"); + "The configuration item '" + JOB_CONTENT + "' is required"); } - if (null == conf.get(CoreConstant.JOB_CONTENT_READER_PARAMETER)) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_READER_PARAMETER + "' is required"); - } - - if ( null == conf.get(CoreConstant.JOB_CONTENT_WRITER_NAME)) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_READER_NAME + "' is required"); - } - - if (null == conf.get(CoreConstant.JOB_CONTENT_WRITER_PARAMETER)) { - throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, - "The configuration item '" + CoreConstant.JOB_CONTENT_READER_PARAMETER + "' is required"); + for (String path : validPaths) { + if (content.get(path) == null) { + throw AddaxException.asAddaxException(FrameworkErrorCode.JOB_ERROR, + "The configuration item '" + path + "' is required"); + } } } } diff --git a/core/src/main/java/com/wgzhao/addax/core/util/container/CoreConstant.java b/core/src/main/java/com/wgzhao/addax/core/util/container/CoreConstant.java index f3f432f3a..8c5841f7f 100644 --- a/core/src/main/java/com/wgzhao/addax/core/util/container/CoreConstant.java +++ b/core/src/main/java/com/wgzhao/addax/core/util/container/CoreConstant.java @@ -82,6 +82,8 @@ public class CoreConstant public static final String JOB_CONTENT_READER_PARAMETER = "job.content.reader.parameter"; + public static final String JOB_CONTENT_READER_PARAMETER_CONNECTION = "job.content.reader.parameter.connection"; + public static final String JOB_CONTENT_WRITER_NAME = "job.content.writer.name"; public static final String JOB_CONTENT_WRITER = "job.content.writer"; diff --git a/core/src/main/job/clickhouse2stream.json b/core/src/main/job/clickhouse2stream.json index f73c5364e..80bd10505 100644 --- a/core/src/main/job/clickhouse2stream.json +++ b/core/src/main/job/clickhouse2stream.json @@ -9,35 +9,29 @@ "percentage": 0.02 } }, - "content": [ - { - "reader": { - "name": "clickhousereader", - "parameter": { - "username": "default", - "password": "", - "column": [ - "*" - ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:clickhouse://127.0.0.1:8123/tpch" - ], - "querySql": [ - "select * from orders limit 10" - ] - } + "content": { + "reader": { + "name": "clickhousereader", + "parameter": { + "username": "default", + "password": "", + "column": [ + "*" + ], + "connection": { + "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/tpch", + "querySql": [ + "select * from orders limit 10" ] } - }, - "writer": { - "name": "streamwriter", - "parameter": { - "print": true - } + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true } } - ] + } } } \ No newline at end of file diff --git a/core/src/main/job/job.json b/core/src/main/job/job.json index ebbd5fe4b..c17ac84b2 100755 --- a/core/src/main/job/job.json +++ b/core/src/main/job/job.json @@ -10,47 +10,45 @@ "percentage": 0.02 } }, - "content": [ - { - "reader": { - "name": "streamreader", - "parameter": { - "column": [ - { - "value": "addax", - "type": "string" - }, - { - "value": 19890604, - "type": "long" - }, - { - "value": "1989-06-04 00:00:00", - "type": "date" - }, - { - "value": true, - "type": "bool" - }, - { - "value": "test", - "type": "bytes" - } - ], - "sliceRecordCount": 10 - } - }, - "writer": { - "name": "streamwriter", - "parameter": { - "print": true, - "column": [ - "col1" - ], - "encoding": "UTF-8" - } + "content": { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "value": "addax", + "type": "string" + }, + { + "value": 19890604, + "type": "long" + }, + { + "value": "1989-06-04 00:00:00", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + } + ], + "sliceRecordCount": 10 + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true, + "column": [ + "col1" + ], + "encoding": "UTF-8" } } - ] + } } } \ No newline at end of file diff --git a/core/src/main/job/oracle2dbf.json b/core/src/main/job/oracle2dbf.json index fb33782d4..583349f48 100644 --- a/core/src/main/job/oracle2dbf.json +++ b/core/src/main/job/oracle2dbf.json @@ -11,16 +11,12 @@ "BRANCH_NAME", "DW_CLT_DATE" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@127.0.0.1:1521/stage" - ], - "table": [ - "STGUF.ALLBRANCH" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521/stage", + "table": [ + "STGUF.ALLBRANCH" + ] + }, "password": "password", "username": "oracle", "where": "branch_no = '1211'", diff --git a/core/src/main/job/oracle2hdfs.json b/core/src/main/job/oracle2hdfs.json index ce3456cbb..22395fc93 100755 --- a/core/src/main/job/oracle2hdfs.json +++ b/core/src/main/job/oracle2hdfs.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@127.0.0.1/stage" - ], - "table": [ - "${sdb}.${stable}" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1/stage", + "table": [ + "${sdb}.${stable}" + ] + }, "password": "password", "username": "oracle" } diff --git a/core/src/main/job/oracle2hive.json b/core/src/main/job/oracle2hive.json index cdbf61f16..1423a0e72 100644 --- a/core/src/main/job/oracle2hive.json +++ b/core/src/main/job/oracle2hive.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@127.0.0.1:1521/stage" - ], - "table": [ - "STGUF.CSDCHOLDER" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521/stage", + "table": [ + "STGUF.CSDCHOLDER" + ] + }, "username": "oracle", "password": "password" } diff --git a/core/src/main/job/oracle2oracle.json b/core/src/main/job/oracle2oracle.json index f62130443..40bf04b9b 100644 --- a/core/src/main/job/oracle2oracle.json +++ b/core/src/main/job/oracle2oracle.json @@ -10,14 +10,12 @@ "column": [ "*" ], - "connection": [ - { - "table": [ - "${ddb}.${dtable}" - ], - "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1/stage" - } - ], + "connection": { + "table": [ + "${ddb}.${dtable}" + ], + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1/stage" + }, "preSql": [ "truncate table @table" ] diff --git a/core/src/main/job/oracle2stream.json b/core/src/main/job/oracle2stream.json index 6680831b0..aa412c18f 100644 --- a/core/src/main/job/oracle2stream.json +++ b/core/src/main/job/oracle2stream.json @@ -8,16 +8,12 @@ "column": [ "COL1" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@127.0.0.1:1521/stage" - ], - "table": [ - "FINEBI.T" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521/stage", + "table": [ + "FINEBI.T" + ] + }, "username": "oracle", "password": "password" } diff --git a/core/src/main/job/oracle2txt.json b/core/src/main/job/oracle2txt.json index 5e8ccbe42..7ed15fd5e 100644 --- a/core/src/main/job/oracle2txt.json +++ b/core/src/main/job/oracle2txt.json @@ -12,16 +12,12 @@ "BRANCH_NO", "BRANCH_NAME" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@127.0.0.1:1521/stage" - ], - "table": [ - "STGUF.ALLBRANCH" - ] - } - ] + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521/stage", + "table": [ + "STGUF.ALLBRANCH" + ] + } } }, "writer": { diff --git a/docs/assets/jobs/accessreader.json b/docs/assets/jobs/accessreader.json index 7c51c7d40..d640e52e3 100644 --- a/docs/assets/jobs/accessreader.json +++ b/docs/assets/jobs/accessreader.json @@ -1,42 +1,38 @@ - { - "job": { - "setting": { - "speed": { - "byte": -1, - "channel": 1 +{ + "job": { + "setting": { + "speed": { + "byte": -1, + "channel": 1 + } + }, + "content": [ + { + "reader": { + "name": "accessreader", + "parameter": { + "username": "root", + "password": "", + "column": [ + "*" + ], + "connection": { + "table": [ + "tbl_Users" + ], + "jdbcUrl": "jdbc:ucanaccess:///Users/wgzhao/Downloads/AccessThemeDemo.mdb" + }, + "where": "" } }, - "content": [ - { - "reader": { - "name": "accessreader", - "parameter": { - "username": "root", - "password": "", - "column": [ - "*" - ], - "connection": [ - { - "table": [ - "tbl_Users" - ], - "jdbcUrl": [ - "jdbc:ucanaccess:///Users/wgzhao/Downloads/AccessThemeDemo.mdb" - ] - } - ], - "where": "" - } - }, - "writer": { - "name": "streamwriter", - "parameter": { - "encoding": "utf-8", - "print": true - } - } + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "utf-8", + "print": true } - ] + } } - } + ] + } +} diff --git a/docs/assets/jobs/clickhousereader.json b/docs/assets/jobs/clickhousereader.json index 3cbc952d4..cacf327f8 100644 --- a/docs/assets/jobs/clickhousereader.json +++ b/docs/assets/jobs/clickhousereader.json @@ -19,16 +19,12 @@ "column": [ "*" ], - "connection": [ - { - "table": [ - "ck_addax" - ], - "jdbcUrl": [ - "jdbc:clickhouse://127.0.0.1:8123/default" - ] - } - ] + "connection": { + "table": [ + "ck_addax" + ], + "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default" + } } }, "writer": { diff --git a/docs/assets/jobs/databend2stream.json b/docs/assets/jobs/databend2stream.json index 644376bdb..5fc072f70 100644 --- a/docs/assets/jobs/databend2stream.json +++ b/docs/assets/jobs/databend2stream.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:databend://127.0.0.1:8000/default" - ], - "table": [ - "addax_reader" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:databend://127.0.0.1:8000/default", + "table": [ + "addax_reader" + ] + }, "username": "databend", "password": "databend" } diff --git a/docs/assets/jobs/hanareader.json b/docs/assets/jobs/hanareader.json index 4cc8fc10e..ffc6aec19 100644 --- a/docs/assets/jobs/hanareader.json +++ b/docs/assets/jobs/hanareader.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sap://wgzhao-pc:39017/system" - ], - "table": [ - "addax_tbl" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sap://wgzhao-pc:39017/system", + "table": [ + "addax_tbl" + ] + }, "username": "system", "password": "HXEHana1" } diff --git a/docs/assets/jobs/hivereader.json b/docs/assets/jobs/hivereader.json index f6d1a6727..1eb2ced6c 100644 --- a/docs/assets/jobs/hivereader.json +++ b/docs/assets/jobs/hivereader.json @@ -19,16 +19,12 @@ ], "username": "hive", "password": "", - "connection": [ - { - "jdbcUrl": [ - "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM" - ], - "table": [ - "hive_reader" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM", + "table": [ + "hive_reader" + ] + }, "where": "logdate='20211013'", "haveKerberos": true, "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab", diff --git a/docs/assets/jobs/mysqlreader.json b/docs/assets/jobs/mysqlreader.json index 10ee01244..11e4e8f6f 100644 --- a/docs/assets/jobs/mysqlreader.json +++ b/docs/assets/jobs/mysqlreader.json @@ -15,17 +15,13 @@ "column": [ "*" ], - "connection": [ - { - "table": [ - "addax_reader" - ], - "jdbcUrl": [ - "jdbc:mysql://127.0.0.1:3306/test" - ], - "driver": "com.mysql.jdbc.Driver" - } - ] + "connection": { + "table": [ + "addax_reader" + ], + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test", + "driver": "com.mysql.jdbc.Driver" + } } }, "writer": { diff --git a/docs/assets/jobs/oraclereader.json b/docs/assets/jobs/oraclereader.json index 1b01d1539..692ce9469 100644 --- a/docs/assets/jobs/oraclereader.json +++ b/docs/assets/jobs/oraclereader.json @@ -17,16 +17,12 @@ "name" ], "splitPk": "db_id", - "connection": [ - { - "table": [ - "table" - ], - "jdbcUrl": [ - "jdbc:oracle:thin:@:PORT:" - ] - } - ] + "connection": { + "table": [ + "table" + ], + "jdbcUrl": "jdbc:oracle:thin:@:PORT:" + } } }, "writer": { diff --git a/docs/assets/jobs/pgreader.json b/docs/assets/jobs/pgreader.json index 7e45576e3..76ed78297 100644 --- a/docs/assets/jobs/pgreader.json +++ b/docs/assets/jobs/pgreader.json @@ -15,16 +15,12 @@ "column": [ "*" ], - "connection": [ - { - "table": [ - "addax_tbl" - ], - "jdbcUrl": [ - "jdbc:postgresql://127.0.0.1:5432/pgtest" - ] - } - ] + "connection": { + "table": [ + "addax_tbl" + ], + "jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/pgtest" + } } }, "writer": { diff --git a/docs/assets/jobs/rdbmsreader.json b/docs/assets/jobs/rdbmsreader.json index a62b93625..0058c88e5 100644 --- a/docs/assets/jobs/rdbmsreader.json +++ b/docs/assets/jobs/rdbmsreader.json @@ -20,16 +20,12 @@ "*" ], "driver": "io.prestosql.jdbc.PrestoDriver", - "connection": [ - { - "table": [ - "default.table" - ], - "jdbcUrl": [ - "jdbc:presto://127.0.0.1:8080/hive" - ] - } - ], + "connection": { + "table": [ + "default.table" + ], + "jdbcUrl": "jdbc:presto://127.0.0.1:8080/hive" + }, "fetchSize": 1024, "where": "1 = 1" } diff --git a/docs/assets/jobs/sqlitereader.json b/docs/assets/jobs/sqlitereader.json index eda4e9d9c..49a45c146 100644 --- a/docs/assets/jobs/sqlitereader.json +++ b/docs/assets/jobs/sqlitereader.json @@ -18,16 +18,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sqlite:/tmp/test.sqlite3" - ], - "table": [ - "test" - ] - } - ] + "connection": { + "jdbcUrl": "jdbc:sqlite:/tmp/test.sqlite3", + "table": [ + "test" + ] + } } }, "writer": { diff --git a/docs/assets/jobs/sqlserverreader.json b/docs/assets/jobs/sqlserverreader.json index 9179125cd..6b0fee4e1 100644 --- a/docs/assets/jobs/sqlserverreader.json +++ b/docs/assets/jobs/sqlserverreader.json @@ -16,16 +16,12 @@ "*" ], "splitPk": "db_id", - "connection": [ - { - "table": [ - "table" - ], - "jdbcUrl": [ - "jdbc:sqlserver://localhost:3433;DatabaseName=dbname" - ] - } - ] + "connection": { + "table": [ + "table" + ], + "jdbcUrl": "jdbc:sqlserver://localhost:3433;DatabaseName=dbname" + } } }, "writer": { diff --git a/docs/assets/jobs/sybasereader.json b/docs/assets/jobs/sybasereader.json index 664fb5f27..d88281f23 100644 --- a/docs/assets/jobs/sybasereader.json +++ b/docs/assets/jobs/sybasereader.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sybase:Tds:127.0.0.1:5000/master" - ], - "table": [ - "dbo.ijdbc_function_escapes" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sybase:Tds:127.0.0.1:5000/master", + "table": [ + "dbo.ijdbc_function_escapes" + ] + }, "username": "sa", "password": "password" } diff --git a/docs/assets/jobs/tdenginereader.json b/docs/assets/jobs/tdenginereader.json index 1ee35df52..9123c114a 100644 --- a/docs/assets/jobs/tdenginereader.json +++ b/docs/assets/jobs/tdenginereader.json @@ -18,16 +18,12 @@ "beginDateTime": "2017-07-14 10:40:00", "endDateTime": "2017-08-14 10:40:00", "splitInterval": "1d", - "connection": [ - { - "jdbcUrl": [ - "jdbc:TAOS://127.0.0.1:6030/test" - ], - "querySql": [ - "select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 10" - ] - } - ] + "connection": { + "jdbcUrl": "jdbc:TAOS://127.0.0.1:6030/test", + "querySql": [ + "select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 10" + ] + } } }, "writer": { diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java index b2107b718..e10da5b1c 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java @@ -35,6 +35,8 @@ import java.util.ArrayList; import java.util.List; +import static com.wgzhao.addax.common.base.Key.CONNECTION; + public class GetPrimaryKeyUtil { private static final Logger LOG = LoggerFactory.getLogger(GetPrimaryKeyUtil.class); @@ -54,7 +56,7 @@ private GetPrimaryKeyUtil() public static String getPrimaryKey(Configuration readConf) { String sql; - Configuration connConf = Configuration.from(readConf.getList("connection").get(0).toString()); + Configuration connConf = readConf.getConfiguration(CONNECTION); String table = connConf.getList("table").get(0).toString(); String jdbc_url = connConf.getString(Key.JDBC_URL); String username = readConf.getString(Key.USERNAME, null); diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java index 3f06bc46f..3ec239b1f 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java @@ -106,54 +106,52 @@ private static void dealJdbcAndTable(Configuration originalConfig) boolean isTableMode = originalConfig.getBool(Key.IS_TABLE_MODE); boolean isPreCheck = originalConfig.getBool(Key.DRY_RUN, false); - List conns = originalConfig.getList(Key.CONNECTION, Object.class); + Configuration connConf = originalConfig.getConfiguration(Key.CONNECTION); List preSql = originalConfig.getList(Key.PRE_SQL, String.class); int tableNum = 0; - for (int i = 0, len = conns.size(); i < len; i++) { - Configuration connConf = Configuration.from(conns.get(i).toString()); - // 是否配置的定制的驱动名称 - String driverClass = connConf.getString(Key.JDBC_DRIVER, null); - if (driverClass != null && !driverClass.isEmpty()) { - LOG.warn("use specified driver class: {}", driverClass); - dataBaseType.setDriverClassName(driverClass); - } - connConf.getNecessaryValue(Key.JDBC_URL, DBUtilErrorCode.REQUIRED_VALUE); + // 是否配置的定制的驱动名称 + String driverClass = connConf.getString(Key.JDBC_DRIVER, null); + if (driverClass != null && !driverClass.isEmpty()) { + LOG.warn("use specified driver class: {}", driverClass); + dataBaseType.setDriverClassName(driverClass); + } + connConf.getNecessaryValue(Key.JDBC_URL, DBUtilErrorCode.REQUIRED_VALUE); - List jdbcUrls = connConf.getList(Key.JDBC_URL, String.class); + String jdbcUrl = connConf.getString(Key.JDBC_URL); - String jdbcUrl; - if (isPreCheck) { - jdbcUrl = DBUtil.chooseJdbcUrlWithoutRetry(dataBaseType, jdbcUrls, username, password, preSql); - } - else { - jdbcUrl = DBUtil.chooseJdbcUrl(dataBaseType, jdbcUrls, username, password, preSql); - } + if (StringUtils.isBlank(jdbcUrl)) { + throw AddaxException.asAddaxException(DBUtilErrorCode.REQUIRED_VALUE, "The parameter [connection.jdbcUrl] is not set."); + } - jdbcUrl = dataBaseType.appendJDBCSuffixForReader(jdbcUrl); + if (isPreCheck) { + DBUtil.validJdbcUrlWithoutRetry(dataBaseType, jdbcUrl, username, password, preSql); + } + else { + DBUtil.validJdbcUrl(dataBaseType, jdbcUrl, username, password, preSql); + } - // 回写到connection[i].jdbcUrl - originalConfig.set(String.format("%s[%d].%s", Key.CONNECTION, i, Key.JDBC_URL), jdbcUrl); + jdbcUrl = dataBaseType.appendJDBCSuffixForReader(jdbcUrl); - LOG.info("Available jdbcUrl [{}].", jdbcUrl); + // 回写到connection.jdbcUrl + originalConfig.set(String.format("%s.%s", Key.CONNECTION, Key.JDBC_URL), jdbcUrl); - if (isTableMode) { - // table 方式 - // 对每一个connection 上配置的table 项进行解析(已对表名称进行了 ` 处理的) - List tables = connConf.getList(Key.TABLE, String.class); + if (isTableMode) { + // table 方式 + // 对每一个connection 上配置的table 项进行解析(已对表名称进行了 ` 处理的) + List tables = connConf.getList(Key.TABLE, String.class); - List expandedTables = TableExpandUtil.expandTableConf(dataBaseType, tables); + List expandedTables = TableExpandUtil.expandTableConf(dataBaseType, tables); - if (expandedTables.isEmpty()) { - throw AddaxException.asAddaxException( - DBUtilErrorCode.ILLEGAL_VALUE, String.format("Failed to obtain the table [%s].", StringUtils.join(tables, ","))); - } + if (expandedTables.isEmpty()) { + throw AddaxException.asAddaxException( + DBUtilErrorCode.ILLEGAL_VALUE, String.format("Failed to obtain the table [%s].", StringUtils.join(tables, ","))); + } - tableNum += expandedTables.size(); + tableNum += expandedTables.size(); - originalConfig.set(String.format("%s[%d].%s", Key.CONNECTION, i, Key.TABLE), expandedTables); - } + originalConfig.set(String.format("%s.%s", Key.CONNECTION, Key.TABLE), expandedTables); } originalConfig.set(Constant.TABLE_NUMBER_MARK, tableNum); @@ -237,48 +235,34 @@ private static void dealColumnConf(Configuration originalConfig) } } - private static boolean recognizeTableOrQuerySqlMode( - Configuration originalConfig) + private static boolean recognizeTableOrQuerySqlMode(Configuration originalConfig) { - List conns = originalConfig.getList(Key.CONNECTION, Object.class); - - List tableModeFlags = new ArrayList<>(); - List querySqlModeFlags = new ArrayList<>(); + Configuration connConf = originalConfig.getConfiguration(Key.CONNECTION); String table; String querySql; boolean isTableMode; boolean isQuerySqlMode; - for (Object conn : conns) { - Configuration connConf = Configuration.from(conn.toString()); - table = connConf.getString(Key.TABLE, null); - querySql = connConf.getString(Key.QUERY_SQL, null); - isTableMode = StringUtils.isNotBlank(table); - tableModeFlags.add(isTableMode); + table = connConf.getString(Key.TABLE, null); + querySql = connConf.getString(Key.QUERY_SQL, null); - isQuerySqlMode = StringUtils.isNotBlank(querySql); - querySqlModeFlags.add(isQuerySqlMode); + isTableMode = StringUtils.isNotBlank(table); - if (!isTableMode && !isQuerySqlMode) { - // table 和 querySql 二者均未配置 - throw AddaxException.asAddaxException( - DBUtilErrorCode.TABLE_QUERY_SQL_MISSING, "You must configure either table or querySql."); - } - else if (isTableMode && isQuerySqlMode) { - // table 和 querySql 二者均配置 - throw AddaxException.asAddaxException(DBUtilErrorCode.TABLE_QUERY_SQL_MIXED, - "You ca not configure both table and querySql at the same time."); - } - } + isQuerySqlMode = StringUtils.isNotBlank(querySql); - // 混合配制 table 和 querySql - if (!ListUtil.checkIfValueSame(tableModeFlags) || !ListUtil.checkIfValueSame(querySqlModeFlags)) { + if (!isTableMode && !isQuerySqlMode) { + // table 和 querySql 二者均未配置 + throw AddaxException.asAddaxException( + DBUtilErrorCode.TABLE_QUERY_SQL_MISSING, "You must configure either table or querySql."); + } + else if (isTableMode && isQuerySqlMode) { + // table 和 querySql 二者均配置 throw AddaxException.asAddaxException(DBUtilErrorCode.TABLE_QUERY_SQL_MIXED, "You ca not configure both table and querySql at the same time."); } - return tableModeFlags.get(0); + return isTableMode; } } diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/ReaderSplitUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/ReaderSplitUtil.java index 5bf1d41a7..678e92da5 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/ReaderSplitUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/ReaderSplitUtil.java @@ -36,7 +36,7 @@ public final class ReaderSplitUtil { private ReaderSplitUtil() {} - + public static List doSplit(Configuration originalSliceConfig, int adviceNumber) { boolean isTableMode = originalSliceConfig.getBool(Key.IS_TABLE_MODE); @@ -59,74 +59,71 @@ public static List doSplit(Configuration originalSliceConfig, int String column = originalSliceConfig.getString(Key.COLUMN); String where = originalSliceConfig.getString(Key.WHERE, null); - List conns = originalSliceConfig.getList(Key.CONNECTION, Object.class); + Configuration connConf = originalSliceConfig.getConfiguration(Key.CONNECTION); List splitConfigs = new ArrayList<>(); - for (Object conn : conns) { - Configuration sliceConfig = originalSliceConfig.clone(); - - Configuration connConf = Configuration.from(conn.toString()); - String jdbcUrl = connConf.getString(Key.JDBC_URL); - sliceConfig.set(Key.JDBC_URL, jdbcUrl); - - // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 - sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); + Configuration sliceConfig = originalSliceConfig.clone(); - sliceConfig.remove(Key.CONNECTION); - int tableSplitNumber = eachTableShouldSplitNumber; - Configuration tempSlice; + String jdbcUrl = connConf.getString(Key.JDBC_URL); + sliceConfig.set(Key.JDBC_URL, jdbcUrl); - // 说明是配置的 table 方式 - if (isTableMode) { - // 已在之前进行了扩展和`处理,可以直接使用 - List tables = connConf.getList(Key.TABLE, String.class); + // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 + sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); - Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); + sliceConfig.remove(Key.CONNECTION); + int tableSplitNumber = eachTableShouldSplitNumber; + Configuration tempSlice; - String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null); - //最终切分份数不一定等于 eachTableShouldSplitNumber - boolean needSplitTable = tableSplitNumber > 1 && StringUtils.isNotBlank(splitPk); - if (needSplitTable) { - if (tables.size() == 1 && !isUserSpecifyEachTableSplitSize) { - //原来:如果是单表的,主键切分num=num*2+1 - // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑 - //eachTableShouldSplitNumber = eachTableShouldSplitNumber * 2 + 1;// 不应该加1导致长尾 - - //考虑其他比率数字?(splitPk is null, 忽略此长尾) - tableSplitNumber = tableSplitNumber * 5; - } - // 尝试对每个表,切分为eachTableShouldSplitNumber 份 - for (String table : tables) { - tempSlice = sliceConfig.clone(); - tempSlice.set(Key.TABLE, table); + // 说明是配置的 table 方式 + if (isTableMode) { + // 已在之前进行了扩展和`处理,可以直接使用 + List tables = connConf.getList(Key.TABLE, String.class); + + Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); + + String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null); + //最终切分份数不一定等于 eachTableShouldSplitNumber + boolean needSplitTable = tableSplitNumber > 1 && StringUtils.isNotBlank(splitPk); + if (needSplitTable) { + if (tables.size() == 1 && !isUserSpecifyEachTableSplitSize) { + //原来:如果是单表的,主键切分num=num*2+1 + // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑 + //eachTableShouldSplitNumber = eachTableShouldSplitNumber * 2 + 1;// 不应该加1导致长尾 + + //考虑其他比率数字?(splitPk is null, 忽略此长尾) + tableSplitNumber = tableSplitNumber * 5; + } + // 尝试对每个表,切分为eachTableShouldSplitNumber 份 + for (String table : tables) { + tempSlice = sliceConfig.clone(); + tempSlice.set(Key.TABLE, table); - List splitSlices = SingleTableSplitUtil.splitSingleTable(tempSlice, tableSplitNumber); + List splitSlices = SingleTableSplitUtil.splitSingleTable(tempSlice, tableSplitNumber); - splitConfigs.addAll(splitSlices); - } - } - else { - for (String table : tables) { - tempSlice = sliceConfig.clone(); - tempSlice.set(Key.TABLE, table); - String queryColumn = HintUtil.buildQueryColumn(table, column); - tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); - splitConfigs.add(tempSlice); - } + splitConfigs.addAll(splitSlices); } } else { - // 说明是配置的 querySql 方式 - List sqls = connConf.getList(Key.QUERY_SQL, String.class); - - for (String querySql : sqls) { + for (String table : tables) { tempSlice = sliceConfig.clone(); - tempSlice.set(Key.QUERY_SQL, querySql); + tempSlice.set(Key.TABLE, table); + String queryColumn = HintUtil.buildQueryColumn(table, column); + tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); splitConfigs.add(tempSlice); } } } + else { + // 说明是配置的 querySql 方式 + List sqls = connConf.getList(Key.QUERY_SQL, String.class); + + for (String querySql : sqls) { + tempSlice = sliceConfig.clone(); + tempSlice.set(Key.QUERY_SQL, querySql); + splitConfigs.add(tempSlice); + } + } return splitConfigs; } @@ -140,38 +137,35 @@ public static Configuration doPreCheckSplit(Configuration originalSliceConfig) String column = originalSliceConfig.getString(Key.COLUMN); String where = originalSliceConfig.getString(Key.WHERE, null); - List conns = queryConfig.getList(Key.CONNECTION, Object.class); - - for (int i = 0, len = conns.size(); i < len; i++) { - Configuration connConf = Configuration.from(conns.get(i).toString()); - List queries = new ArrayList<>(); - List splitPkQueries = new ArrayList<>(); - String connPath = String.format("connection[%d]", i); - // 说明是配置的 table 方式 - if (isTableMode) { - // 已在之前进行了扩展和`处理,可以直接使用 - List tables = connConf.getList(Key.TABLE, String.class); - Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); - for (String table : tables) { - queries.add(SingleTableSplitUtil.buildQuerySql(column, table, where)); - if (splitPK != null && !splitPK.isEmpty()) { - splitPkQueries.add(SingleTableSplitUtil.genPKSql(splitPK.trim(), table, where)); - } - } - if (!splitPkQueries.isEmpty()) { - connConf.set(Key.SPLIT_PK_SQL, splitPkQueries); + Configuration connConf = queryConfig.getConfiguration(Key.CONNECTION); + + List queries = new ArrayList<>(); + List splitPkQueries = new ArrayList<>(); + // 说明是配置的 table 方式 + if (isTableMode) { + // 已在之前进行了扩展和`处理,可以直接使用 + List tables = connConf.getList(Key.TABLE, String.class); + Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); + for (String table : tables) { + queries.add(SingleTableSplitUtil.buildQuerySql(column, table, where)); + if (splitPK != null && !splitPK.isEmpty()) { + splitPkQueries.add(SingleTableSplitUtil.genPKSql(splitPK.trim(), table, where)); } - connConf.set(Key.QUERY_SQL, queries); - queryConfig.set(connPath, connConf); } - else { - // 说明是配置的 querySql 方式 - List sqls = connConf.getList(Key.QUERY_SQL, String.class); - queries.addAll(sqls); - connConf.set(Key.QUERY_SQL, queries); - queryConfig.set(connPath, connConf); + if (!splitPkQueries.isEmpty()) { + connConf.set(Key.SPLIT_PK_SQL, splitPkQueries); } + connConf.set(Key.QUERY_SQL, queries); + queryConfig.set(Key.CONNECTION, connConf); } + else { + // 说明是配置的 querySql 方式 + List sqls = connConf.getList(Key.QUERY_SQL, String.class); + queries.addAll(sqls); + connConf.set(Key.QUERY_SQL, queries); + queryConfig.set(Key.CONNECTION, connConf); + } + return queryConfig; } diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DBUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DBUtil.java index 3e94f0932..a7bb72547 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DBUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DBUtil.java @@ -49,8 +49,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -public final class DBUtil -{ +public final class DBUtil { private static final Logger LOG = LoggerFactory.getLogger(DBUtil.class); private static final int DEFAULT_SOCKET_TIMEOUT_SEC = 20_000; @@ -59,86 +58,39 @@ public final class DBUtil .setDaemon(true) .build())); - private DBUtil() - { + private DBUtil() { } - public static String chooseJdbcUrl(DataBaseType dataBaseType, List jdbcUrls, String username, String password, List preSql) - { - if (null == jdbcUrls || jdbcUrls.isEmpty()) { - throw AddaxException.asAddaxException(DBUtilErrorCode.JDBC_NULL, - String.format("The configure item jdbcUrl [%s] cannot be empty.", - StringUtils.join(jdbcUrls, ","))); - } + public static void validJdbcUrl(DataBaseType dataBaseType, String jdbcUrl, String username, String password, List preSql) { try { - return RetryUtil.executeWithRetry(() -> { - boolean connOK; - for (String url : jdbcUrls) { - url = url.trim(); - if (StringUtils.isNotBlank(url)) { - if (null == preSql || preSql.isEmpty()) { - connOK = testConnWithoutRetry(dataBaseType, url, username, password); - } - else { - connOK = testConnWithoutRetry(dataBaseType, url, username, password, preSql); - } - - if (connOK) { - return url; - } - } + RetryUtil.executeWithRetry(() -> { + if (null == preSql || preSql.isEmpty()) { + testConnWithoutRetry(dataBaseType, jdbcUrl, username, password); + } else { + testConnWithoutRetry(dataBaseType, jdbcUrl, username, password, preSql); } - throw new Exception("Unable to connect the database."); + return null; }, 3, 1000L, true); - } - catch (Exception e) { + } catch (Exception e) { throw AddaxException.asAddaxException(DBUtilErrorCode.CONN_DB_ERROR, - String.format("Failed to connect the database, no connectable jdbcUrl can be found in [%s].", - StringUtils.join(jdbcUrls, ",")), e); + "Failed to connect the database server using " + jdbcUrl, e); } } - public static String chooseJdbcUrlWithoutRetry(DataBaseType dataBaseType, List jdbcUrls, String username, - String password, List preSql) - { - if (null == jdbcUrls || jdbcUrls.isEmpty()) { - throw AddaxException.asAddaxException( - DBUtilErrorCode.CONF_ERROR, - String.format("The configure item jdbcUrl [%s] cannot be empty.", - StringUtils.join(jdbcUrls, ","))); - } - - boolean connOK; - for (String url : jdbcUrls) { - url = url.trim(); - if (StringUtils.isNotBlank(url)) { - if (null != preSql && !preSql.isEmpty()) { - connOK = testConnWithoutRetry(dataBaseType, url, username, password, preSql); - } - else { - try { - connOK = testConnWithoutRetry(dataBaseType, url, username, password); - } - catch (Exception e) { - throw AddaxException.asAddaxException( - DBUtilErrorCode.CONN_DB_ERROR, - String.format("Failed to connect the database, no connectable jdbcUrl can be found in [%s].", - StringUtils.join(jdbcUrls, ",")), e); - } - } - if (connOK) { - return url; - } + public static void validJdbcUrlWithoutRetry(DataBaseType dataBaseType, String jdbcUrl, String username, String password, List preSql) { + if (null != preSql && !preSql.isEmpty()) { + testConnWithoutRetry(dataBaseType, jdbcUrl, username, password, preSql); + } else { + try { + testConnWithoutRetry(dataBaseType, jdbcUrl, username, password); + } catch (Exception e) { + throw AddaxException.asAddaxException( + DBUtilErrorCode.CONN_DB_ERROR, "Failed to connect the server using jdbcUrl " + jdbcUrl, e); } } - throw AddaxException.asAddaxException( - DBUtilErrorCode.CONN_DB_ERROR, - String.format("Failed to connect the database, no connectable jdbcUrl can be found in [%s].", - StringUtils.join(jdbcUrls, ","))); } - public static boolean checkInsertPrivilege(DataBaseType dataBaseType, String jdbcURL, String userName, String password, List tableList) - { + public static boolean checkInsertPrivilege(DataBaseType dataBaseType, String jdbcURL, String userName, String password, List tableList) { Connection connection = connect(dataBaseType, jdbcURL, userName, password); String insertTemplate = "INSERT INTO %s (SELECT * FROM %s WHERE 1 = 2)"; @@ -149,8 +101,7 @@ public static boolean checkInsertPrivilege(DataBaseType dataBaseType, String jdb try { insertStmt = connection.createStatement(); insertStmt.execute(checkInsertPrivilegeSql); - } - catch (Exception e) { + } catch (Exception e) { hasInsertPrivilege = false; LOG.warn("Failed to insert into table [{}] with user [{}]: {}.", userName, tableName, e.getMessage()); } @@ -160,8 +111,7 @@ public static boolean checkInsertPrivilege(DataBaseType dataBaseType, String jdb return hasInsertPrivilege; } - public static boolean checkDeletePrivilege(DataBaseType dataBaseType, String jdbcURL, String userName, String password, List tableList) - { + public static boolean checkDeletePrivilege(DataBaseType dataBaseType, String jdbcURL, String userName, String password, List tableList) { Connection connection = connect(dataBaseType, jdbcURL, userName, password); String deleteTemplate = "DELETE FROM %s WHERE 1 = 2"; @@ -172,8 +122,7 @@ public static boolean checkDeletePrivilege(DataBaseType dataBaseType, String jdb try { deleteStmt = connection.createStatement(); deleteStmt.execute(checkDeletePrivilegeSQL); - } - catch (Exception e) { + } catch (Exception e) { hasInsertPrivilege = false; LOG.warn("Failed to delete from table [{}] with user [{}]: {}.", userName, tableName, e.getMessage()); } @@ -183,8 +132,7 @@ public static boolean checkDeletePrivilege(DataBaseType dataBaseType, String jdb return hasInsertPrivilege; } - public static boolean needCheckDeletePrivilege(Configuration originalConfig) - { + public static boolean needCheckDeletePrivilege(Configuration originalConfig) { List allSqls = new ArrayList<>(); List preSQLs = originalConfig.getList(Key.PRE_SQL, String.class); List postSQLs = originalConfig.getList(Key.POST_SQL, String.class); @@ -209,25 +157,22 @@ public static boolean needCheckDeletePrivilege(Configuration originalConfig) *

* * @param dataBaseType database type. - * @param jdbcUrl java jdbc url. - * @param username User for login. - * @param password Password to use when connecting to server. + * @param jdbcUrl java jdbc url. + * @param username User for login. + * @param password Password to use when connecting to server. * @return Connection class {@link Connection} */ - public static Connection getConnection(DataBaseType dataBaseType, String jdbcUrl, String username, String password) - { + public static Connection getConnection(DataBaseType dataBaseType, String jdbcUrl, String username, String password) { return getConnection(dataBaseType, jdbcUrl, username, password, DEFAULT_SOCKET_TIMEOUT_SEC); } - public static Connection getConnection(DataBaseType dataBaseType, String jdbcUrl, String username, String password, int socketTimeout) - { + public static Connection getConnection(DataBaseType dataBaseType, String jdbcUrl, String username, String password, int socketTimeout) { try { return RetryUtil.executeWithRetry(() -> DBUtil.connect(dataBaseType, jdbcUrl, username, password, socketTimeout), 3, 1000L, true); - } - catch (Exception e) { + } catch (Exception e) { throw AddaxException.asAddaxException(DBUtilErrorCode.CONN_DB_ERROR, String.format("Failed to connect the database with [%s].", jdbcUrl), e); } @@ -240,55 +185,50 @@ public static Connection getConnection(DataBaseType dataBaseType, String jdbcUrl *

* * @param dataBaseType The database's type - * @param jdbcUrl jdbc url - * @param username User for login - * @param password Password to use when connecting to server + * @param jdbcUrl jdbc url + * @param username User for login + * @param password Password to use when connecting to server * @return Connection class {@link Connection} */ - public static Connection getConnectionWithoutRetry(DataBaseType dataBaseType, String jdbcUrl, String username, String password) - { + public static Connection getConnectionWithoutRetry(DataBaseType dataBaseType, String jdbcUrl, String username, String password) { return getConnectionWithoutRetry(dataBaseType, jdbcUrl, username, password, DEFAULT_SOCKET_TIMEOUT_SEC); } - public static Connection getConnectionWithoutRetry(DataBaseType dataBaseType, String jdbcUrl, String username, String password, int socketTimeout) - { + public static Connection getConnectionWithoutRetry(DataBaseType dataBaseType, String jdbcUrl, String username, String password, int socketTimeout) { return DBUtil.connect(dataBaseType, jdbcUrl, username, password, socketTimeout); } - private static synchronized Connection connect(DataBaseType dataBaseType, String url, String user, String pass) - { + private static synchronized Connection connect(DataBaseType dataBaseType, String url, String user, String pass) { return connect(dataBaseType, url, user, pass, DEFAULT_SOCKET_TIMEOUT_SEC); } - private static synchronized Connection connect(DataBaseType dataBaseType, String url, String user, String pass, int socketTimeout) - { - BasicDataSource bds = new BasicDataSource(); - bds.setUrl(url); - bds.setUsername(user); - bds.setPassword(pass); - - if (dataBaseType == DataBaseType.Oracle) { - //oracle.net.READ_TIMEOUT for jdbc versions < 10.1.0.5 oracle.jdbc.ReadTimeout for jdbc versions >=10.1.0.5 - // unit ms - bds.addConnectionProperty("oracle.jdbc.ReadTimeout", String.valueOf(socketTimeout * 1000)); - } - if (url.contains("inceptor2")) { - LOG.warn("inceptor2 must be process specially"); - url = url.replace("inceptor2", "hive2"); + private static synchronized Connection connect(DataBaseType dataBaseType, String url, String user, String pass, int socketTimeout) { + + try (BasicDataSource bds = new BasicDataSource()) { bds.setUrl(url); - bds.setDriverClassName("org.apache.hive.jdbc.HiveDriver"); - } - else { - LOG.debug("Connecting to database with driver {}", dataBaseType.getDriverClassName()); - bds.setDriverClassName(dataBaseType.getDriverClassName()); - } - try { + bds.setUsername(user); + bds.setPassword(pass); + + if (dataBaseType == DataBaseType.Oracle) { + //oracle.net.READ_TIMEOUT for jdbc versions < 10.1.0.5 oracle.jdbc.ReadTimeout for jdbc versions >=10.1.0.5 + // unit ms + bds.addConnectionProperty("oracle.jdbc.ReadTimeout", String.valueOf(socketTimeout * 1000)); + } + if (url.contains("inceptor2")) { + LOG.warn("inceptor2 must be process specially"); + url = url.replace("inceptor2", "hive2"); + bds.setUrl(url); + bds.setDriverClassName("org.apache.hive.jdbc.HiveDriver"); + } else { + LOG.debug("Connecting to database with driver {}", dataBaseType.getDriverClassName()); + bds.setDriverClassName(dataBaseType.getDriverClassName()); + } bds.setMinIdle(2); bds.setMaxIdle(5); bds.setMaxOpenPreparedStatements(200); return bds.getConnection(); - } - catch (Exception e) { + } catch (Exception e) { + LOG.error("An exception occurred while attempting to connect to the database using jdbcUrl '{}': {}'", url, e.toString()); throw RdbmsException.asConnException(e); } } @@ -296,15 +236,14 @@ private static synchronized Connection connect(DataBaseType dataBaseType, String /** * a wrapped method to execute select-like sql statement . * - * @param conn Database connection . - * @param sql sql statement to be executed + * @param conn Database connection . + * @param sql sql statement to be executed * @param fetchSize fetch size * @return a {@link ResultSet} * @throws SQLException if occurs SQLException. */ public static ResultSet query(Connection conn, String sql, int fetchSize) - throws SQLException - { + throws SQLException { // 默认3600 s 的query Timeout return query(conn, sql, fetchSize, DEFAULT_SOCKET_TIMEOUT_SEC); } @@ -312,30 +251,26 @@ public static ResultSet query(Connection conn, String sql, int fetchSize) /** * a wrapped method to execute select-like sql statement . * - * @param conn Database connection . - * @param sql sql statement to be executed - * @param fetchSize fetch size each batch + * @param conn Database connection . + * @param sql sql statement to be executed + * @param fetchSize fetch size each batch * @param queryTimeout unit:second * @return A {@link ResultSet} * @throws SQLException if failed to execute sql statement */ public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout) - throws SQLException - { + throws SQLException { Statement stmt; try { // make sure autocommit is off conn.setAutoCommit(false); - } - catch (SQLFeatureNotSupportedException ignore) { + } catch (SQLFeatureNotSupportedException ignore) { LOG.warn("The current database does not support AUTO_COMMIT property"); } try { stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); //NOSONAR - } - - catch (SQLException ignore) { + } catch (SQLException ignore) { // some database does not support TYPE_FORWARD_ONLY/CONCUR_READ_ONLY LOG.warn("The current database does not support TYPE_FORWARD_ONLY/CONCUR_READ_ONLY"); stmt = conn.createStatement(); //NOSONAR @@ -345,13 +280,11 @@ public static ResultSet query(Connection conn, String sql, int fetchSize, int qu return stmt.executeQuery(sql); } - public static void closeDBResources(ResultSet rs, Statement stmt, Connection conn) - { + public static void closeDBResources(ResultSet rs, Statement stmt, Connection conn) { if (null != rs) { try { rs.close(); - } - catch (SQLException ignored) { + } catch (SQLException ignored) { // } } @@ -359,8 +292,7 @@ public static void closeDBResources(ResultSet rs, Statement stmt, Connection con if (null != stmt) { try { stmt.close(); - } - catch (SQLException ignored) { + } catch (SQLException ignored) { // } } @@ -368,26 +300,22 @@ public static void closeDBResources(ResultSet rs, Statement stmt, Connection con if (null != conn) { try { conn.close(); - } - catch (SQLException ignored) { + } catch (SQLException ignored) { // } } } - public static void closeDBResources(Statement stmt, Connection conn) - { + public static void closeDBResources(Statement stmt, Connection conn) { closeDBResources(null, stmt, conn); } - public static List getTableColumns(DataBaseType dataBaseType, String jdbcUrl, String user, String pass, String tableName) - { + public static List getTableColumns(DataBaseType dataBaseType, String jdbcUrl, String user, String pass, String tableName) { Connection conn = getConnection(dataBaseType, jdbcUrl, user, pass); return getTableColumnsByConn(conn, tableName); } - public static List getTableColumnsByConn(Connection conn, String tableName) - { + public static List getTableColumnsByConn(Connection conn, String tableName) { List columns = new ArrayList<>(); List> rsMetaData = getColumnMetaData(conn, tableName, "*"); @@ -400,13 +328,12 @@ public static List getTableColumnsByConn(Connection conn, String tableNa /** * get column description * - * @param conn database connection + * @param conn database connection * @param tableName The table name - * @param column table column + * @param column table column * @return {@link List} */ - public static List> getColumnMetaData(Connection conn, String tableName, String column) - { + public static List> getColumnMetaData(Connection conn, String tableName, String column) { List> result = new ArrayList<>(); // skip index 0, compliant with jdbc resultSet and resultMetaData result.add(null); @@ -416,8 +343,7 @@ public static List> getColumnMetaData(Connection conn, Strin if (DataBaseType.TDengine.getDriverClassName().equals(conn.getMetaData().getDriverName())) { // TDengine does not support 1=2 clause queryColumnSql = "SELECT " + column + " FROM " + tableName + " LIMIT 0"; - } - else { + } else { queryColumnSql = "SELECT " + column + " FROM " + tableName + " WHERE 1 = 2"; } ResultSetMetaData metaData = statement.executeQuery(queryColumnSql).getMetaData(); @@ -433,52 +359,51 @@ public static List> getColumnMetaData(Connection conn, Strin } statement.close(); return result; - } - catch (SQLException e) { + } catch (SQLException e) { throw AddaxException.asAddaxException(DBUtilErrorCode.GET_COLUMN_INFO_FAILED, String.format("Failed to obtain the fields of table [%s].", tableName), e); } } - public static boolean testConnWithoutRetry(DataBaseType dataBaseType, String url, String user, String pass) - { - try (Connection ignored = connect(dataBaseType, url, user, pass)) { - return true; - } - catch (Exception e) { - LOG.error("Failed to connection the database with [{}]: {}.", url, e.getMessage()); + public static void testConnWithoutRetry(DataBaseType dataBaseType, String url, String user, String pass) { + Connection ignored = null; + try { + ignored = connect(dataBaseType, url, user, pass); + } catch (Exception e) { + throw AddaxException.asAddaxException(DBUtilErrorCode.CONN_DB_ERROR, + String.format("Failed to connect the database using '%s': %s.", url, e.getMessage()), e); + } finally { + if (null != ignored) { + try { + ignored.close(); + } catch (SQLException e) { + LOG.warn("Failed to close the connection."); + } + } } - return false; } - public static boolean testConnWithoutRetry(DataBaseType dataBaseType, String url, String user, String pass, List preSql) - { + public static void testConnWithoutRetry(DataBaseType dataBaseType, String url, String user, String pass, List preSql) { try (Connection connection = connect(dataBaseType, url, user, pass)) { for (String pre : preSql) { if (!doPreCheck(connection, pre)) { LOG.warn("Failed to doPreCheck."); - return false; } } - return true; + } catch (Exception e) { + LOG.warn("Failed to connect the database using '{}': {}.", url, e.getMessage()); } - catch (Exception e) { - LOG.warn("Failed to connection the database with [{}]: {}.", url, e.getMessage()); - } - return false; } public static ResultSet query(Connection conn, String sql) - throws SQLException - { + throws SQLException { try (Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { stmt.setQueryTimeout(DEFAULT_SOCKET_TIMEOUT_SEC); return stmt.executeQuery(sql); } } - private static boolean doPreCheck(Connection conn, String pre) - { + private static boolean doPreCheck(Connection conn, String pre) { try (ResultSet rs = query(conn, pre)) { int checkResult = -1; if (DBUtil.asyncResultSetNext(rs)) { @@ -492,16 +417,14 @@ private static boolean doPreCheck(Connection conn, String pre) return true; } LOG.warn("Failed to pre-check with [{}]. It should return 0.", pre); - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("Failed to pre-check with [{}], errorMessage: [{}].", pre, e.getMessage()); } return false; } // warn:until now, only oracle need to handle session config. - public static void dealWithSessionConfig(Connection conn, Configuration config, DataBaseType databaseType, String message) - { + public static void dealWithSessionConfig(Connection conn, Configuration config, DataBaseType databaseType, String message) { List sessionConfig; switch (databaseType) { case Oracle: @@ -515,8 +438,7 @@ public static void dealWithSessionConfig(Connection conn, Configuration config, } } - private static void doDealWithSessionConfig(Connection conn, List sessions, String message) - { + private static void doDealWithSessionConfig(Connection conn, List sessions, String message) { if (null == sessions || sessions.isEmpty()) { return; } @@ -524,8 +446,7 @@ private static void doDealWithSessionConfig(Connection conn, List sessio Statement stmt; try { stmt = conn.createStatement(); - } - catch (SQLException e) { + } catch (SQLException e) { throw AddaxException.asAddaxException(DBUtilErrorCode.SET_SESSION_ERROR, String.format("Failed to set session with [%s]", message), e); } @@ -534,8 +455,7 @@ private static void doDealWithSessionConfig(Connection conn, List sessio LOG.info("Executing SQL:[{}]", sessionSql); try { stmt.execute(sessionSql); - } - catch (SQLException e) { + } catch (SQLException e) { throw AddaxException.asAddaxException(DBUtilErrorCode.SET_SESSION_ERROR, String.format("Failed to set session with [%s].", message), e); } @@ -543,8 +463,7 @@ private static void doDealWithSessionConfig(Connection conn, List sessio DBUtil.closeDBResources(stmt, null); } - public static void sqlValid(String sql, DataBaseType dataBaseType) - { + public static void sqlValid(String sql, DataBaseType dataBaseType) { SQLStatementParser statementParser = SQLParserUtils.createSQLStatementParser(sql, dataBaseType.getTypeName()); statementParser.parseStatementList(); } @@ -555,27 +474,23 @@ public static void sqlValid(String sql, DataBaseType dataBaseType) * @param resultSet result set * @return boolean */ - public static boolean asyncResultSetNext(ResultSet resultSet) - { + public static boolean asyncResultSetNext(ResultSet resultSet) { return asyncResultSetNext(resultSet, 3600); } - public static boolean asyncResultSetNext(ResultSet resultSet, int timeout) - { + public static boolean asyncResultSetNext(ResultSet resultSet, int timeout) { Future future = rsExecutors.get().submit(resultSet::next); try { return future.get(timeout, TimeUnit.SECONDS); - } - catch (Exception e) { + } catch (Exception e) { throw AddaxException.asAddaxException(DBUtilErrorCode.RS_ASYNC_ERROR, "Asynchronous retrieval of ResultSet failed.", e); } } - public static void loadDriverClass(String pluginType, String pluginName) - { + public static void loadDriverClass(String pluginType, String pluginName) { try { String pluginJsonPath = StringUtils.join( - new String[] { + new String[]{ System.getProperty("addax.home"), "plugin", pluginType, @@ -586,8 +501,7 @@ public static void loadDriverClass(String pluginType, String pluginName) for (String driver : drivers) { Class.forName(driver); } - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw AddaxException.asAddaxException(DBUtilErrorCode.CONF_ERROR, "Error loading database driver. Please confirm that the libs directory has the driver jar package " + "and the drivers configuration in plugin.json is correct.", e); diff --git a/plugin/reader/accessreader/src/main/resources/plugin_job_template.json b/plugin/reader/accessreader/src/main/resources/plugin_job_template.json index 89acbe886..1f189e641 100644 --- a/plugin/reader/accessreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/accessreader/src/main/resources/plugin_job_template.json @@ -6,16 +6,13 @@ "column": [ "*" ], - "connection": [ + "connection": { "table": [ "addax_reader" ], - "jdbcUrl": [ - "jdbc:ucanaccess://" - ] - } - ], + "jdbcUrl": "jdbc:ucanaccess://" + }, "where": "" } } \ No newline at end of file diff --git a/plugin/reader/clickhousereader/src/main/resources/plugin_job_template.json b/plugin/reader/clickhousereader/src/main/resources/plugin_job_template.json index aeffb7f79..c01684b9f 100644 --- a/plugin/reader/clickhousereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/clickhousereader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:clickhouse://127.0.0.1:8123/default" - ], + "connection": { + "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default", "table": [ "yourtable" ] - } - ], + }, "where": "" } } diff --git a/plugin/reader/databendreader/src/main/resources/plugin_job_template.json b/plugin/reader/databendreader/src/main/resources/plugin_job_template.json index 087e7fc9d..c98201037 100644 --- a/plugin/reader/databendreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/databendreader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:databend://127.0.0.1:8000/test" - ], - "table": [ - "addax_reader" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:databend://127.0.0.1:8000/test", + "table": [ + "addax_reader" + ] + }, "where": "" } } diff --git a/plugin/reader/hanareader/src/main/resources/plugin_job_template.json b/plugin/reader/hanareader/src/main/resources/plugin_job_template.json index 96cef4c94..762205891 100644 --- a/plugin/reader/hanareader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/hanareader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sap://localhost:30015/?currentschema=SYSTEM" - ], - "table": [ - "addax_table" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sap://localhost:30015/?currentschema=SYSTEM", + "table": [ + "addax_table" + ] + }, "where": "1=1", "autoPk": false, "fetchSize": 2048, diff --git a/plugin/reader/hivereader/src/main/resources/plugin_job_template.json b/plugin/reader/hivereader/src/main/resources/plugin_job_template.json index c6ff9e58f..eba9f6257 100644 --- a/plugin/reader/hivereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/hivereader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ ], "username": "hive", "password": "", - "connection": [ - { - "jdbcUrl": [ - "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@COMPANY.COM" - ], - "table": [ - "hive_reader" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@COMPANY.COM", + "table": [ + "hive_reader" + ] + }, "where": "logdate='20211013'", "haveKerberos": true, "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab", diff --git a/plugin/reader/mysqlreader/src/main/resources/plugin_job_template.json b/plugin/reader/mysqlreader/src/main/resources/plugin_job_template.json index 2d5048f22..93564aba1 100644 --- a/plugin/reader/mysqlreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/mysqlreader/src/main/resources/plugin_job_template.json @@ -8,18 +8,14 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:mysql://127.0.0.1:3306/test" - ], - "table": [ - "addax_reader" - ], - "driver": "com.mysql.jdbc.Driver", - "fetchSize": 2048 - } - ], + "connection": { + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test", + "table": [ + "addax_reader" + ], + "driver": "com.mysql.jdbc.Driver", + "fetchSize": 2048 + }, "where": "" } } diff --git a/plugin/reader/oraclereader/src/main/resources/plugin_job_template.json b/plugin/reader/oraclereader/src/main/resources/plugin_job_template.json index f2bf2d0df..0c660f2a5 100644 --- a/plugin/reader/oraclereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/oraclereader/src/main/resources/plugin_job_template.json @@ -9,16 +9,12 @@ ], "splitPk": "db_id", "autoPk": false, - "connection": [ - { - "jdbcUrl": [ - "jdbc:oracle:thin:@:PORT:" - ], - "table": [ - "table" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:oracle:thin:@:PORT:", + "table": [ + "table" + ] + }, "where": "", "session": [ "alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'", diff --git a/plugin/reader/postgresqlreader/src/main/resources/plugin_job_template.json b/plugin/reader/postgresqlreader/src/main/resources/plugin_job_template.json index fff1ab5a5..8b243597e 100644 --- a/plugin/reader/postgresqlreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/postgresqlreader/src/main/resources/plugin_job_template.json @@ -8,16 +8,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:postgresql://127.0.0.1:5432/pgtest" - ], - "table": [ - "addax_tbl" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/pgtest", + "table": [ + "addax_tbl" + ] + }, "where": "", "fetchSize": 2048 } diff --git a/plugin/reader/rdbmsreader/src/main/resources/plugin_job_template.json b/plugin/reader/rdbmsreader/src/main/resources/plugin_job_template.json index a9d93e425..da38d0735 100644 --- a/plugin/reader/rdbmsreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/rdbmsreader/src/main/resources/plugin_job_template.json @@ -6,17 +6,13 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:://:/" - ], - "table": [ - "adax_table" - ], - "driver": "" - } - ], + "connection": { + "jdbcUrl": "jdbc:://:/", + "table": [ + "adax_table" + ], + "driver": "" + }, "where": "1=1", "autoPk": false, "fetchSize": 2048, diff --git a/plugin/reader/sqlitereader/src/main/resources/plugin_job_template.json b/plugin/reader/sqlitereader/src/main/resources/plugin_job_template.json index c1b7aa592..717f5bdee 100644 --- a/plugin/reader/sqlitereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/sqlitereader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sqlite:/tmp/test.sqlite3" - ], - "table": [ - "test" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sqlite:/tmp/test.sqlite3", + "table": [ + "test" + ] + }, "where": "" } } diff --git a/plugin/reader/sqlserverreader/src/main/resources/plugin_job_template.json b/plugin/reader/sqlserverreader/src/main/resources/plugin_job_template.json index f61f41587..8e479e985 100644 --- a/plugin/reader/sqlserverreader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/sqlserverreader/src/main/resources/plugin_job_template.json @@ -3,16 +3,12 @@ "parameter": { "username": "", "password": "", - "connection": [ - { - "jdbcUrl": [ - "jdbc:sqlserver://localhost:3433;DatabaseName=dbname" - ], - "table": [ - "table" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sqlserver://localhost:3433;DatabaseName=dbname", + "table": [ + "table" + ] + }, "where": "1=1", "autoPk": false, "fetchSize": 2048, diff --git a/plugin/reader/sybasereader/src/main/resources/plugin_job_template.json b/plugin/reader/sybasereader/src/main/resources/plugin_job_template.json index a2738fffe..7f8ee739d 100644 --- a/plugin/reader/sybasereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/sybasereader/src/main/resources/plugin_job_template.json @@ -6,16 +6,12 @@ "column": [ "*" ], - "connection": [ - { - "jdbcUrl": [ - "jdbc:sybase://127.0.0.1:3306/test" - ], - "table": [ - "addax_reader" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:sybase://127.0.0.1:3306/test", + "table": [ + "addax_reader" + ] + }, "where": "" } } diff --git a/plugin/reader/tdenginereader/src/main/resources/plugin_job_template.json b/plugin/reader/tdenginereader/src/main/resources/plugin_job_template.json index 9ea459b1a..65dc3fc8f 100644 --- a/plugin/reader/tdenginereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/tdenginereader/src/main/resources/plugin_job_template.json @@ -3,16 +3,12 @@ "parameter": { "username": "root", "password": "taosdata", - "connection": [ - { - "jdbcUrl": [ - "jdbc:TAOS://127.0.0.1:6030/test" - ], - "querySql": [ - "select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 10" - ] - } - ], + "connection": { + "jdbcUrl": "jdbc:TAOS://127.0.0.1:6030/test", + "querySql": [ + "select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 10" + ] + }, "where": "1=1", "autoPk": false, "fetchSize": 2048,