diff --git a/common/src/main/java/com/wgzhao/addax/common/base/Constant.java b/common/src/main/java/com/wgzhao/addax/common/base/Constant.java index c64ca12b5..24c9d3f7c 100644 --- a/common/src/main/java/com/wgzhao/addax/common/base/Constant.java +++ b/common/src/main/java/com/wgzhao/addax/common/base/Constant.java @@ -54,6 +54,7 @@ public class Constant public static final Object PK_TYPE_MONTE_CARLO = "pkTypeMonteCarlo"; // The data type of primary key is string. public static final Object PK_TYPE_STRING = "pkTypeString"; + public static final Object PK_TYPE_FLOAT = "pkTypeFloat"; public static final String INSERT_OR_REPLACE_TEMPLATE_MARK = "insertOrReplaceTemplate"; public static final String QUERY_SQL_TEMPLATE = "SELECT %s FROM %s WHERE (%s)"; diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java index 0790de272..bd77b6da7 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java @@ -83,10 +83,10 @@ public Configuration init(Configuration originalConfig) OriginalConfPretreatmentUtil.doPretreatment(originalConfig); if (originalConfig.getString(Key.SPLIT_PK) == null && originalConfig.getBool(Key.AUTO_PK, false)) { - LOG.info("The primary key used for splitting is not configured, try to guess the primary key that can be split."); + LOG.info("The split key is not configured, try to guess the split key."); String splitPK = GetPrimaryKeyUtil.getPrimaryKey(originalConfig); if (splitPK != null) { - LOG.info("Try to use [{}] as primary key to split", splitPK); + LOG.info("Try to use [{}] as split key", splitPK); originalConfig.set(Key.SPLIT_PK, splitPK); if (originalConfig.getInt(Key.EACH_TABLE_SPLIT_SIZE, -1) == -1) { originalConfig.set(Key.EACH_TABLE_SPLIT_SIZE, Constant.DEFAULT_EACH_TABLE_SPLIT_SIZE); @@ -103,42 +103,10 @@ public void preCheck(Configuration originalConfig, DataBaseType dataBaseType) /* 检查每个表是否有读权限,以及querySql跟split Key是否正确 */ Configuration queryConf = ReaderSplitUtil.doPreCheckSplit(originalConfig); String splitPK = queryConf.getString(Key.SPLIT_PK); - List connList = queryConf.getList(Key.CONNECTION, Object.class); + Configuration connConf = queryConf.getConfiguration(Key.CONNECTION); String username = queryConf.getString(Key.USERNAME); String password = queryConf.getString(Key.PASSWORD); - ExecutorService exec; - if (connList.size() < 10) { - exec = Executors.newFixedThreadPool(connList.size()); - } - else { - exec = Executors.newFixedThreadPool(10); - } - Collection taskList = new ArrayList<>(); - for (Object o : connList) { - Configuration connConf = Configuration.from(o.toString()); - PreCheckTask t = new PreCheckTask(username, password, connConf, dataBaseType, splitPK); - taskList.add(t); - } - List> results = new ArrayList<>(); - try { - results = exec.invokeAll(taskList); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - for (Future result : results) { - try { - result.get(); - } - catch (ExecutionException e) { - throw (AddaxException) e.getCause(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - exec.shutdownNow(); + new PreCheckTask(username, password, connConf, dataBaseType, splitPK).call(); } public List split(Configuration originalConfig, int adviceNumber) diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java index e10da5b1c..af612a046 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/GetPrimaryKeyUtil.java @@ -20,7 +20,6 @@ */ package com.wgzhao.addax.rdbms.reader.util; -import com.wgzhao.addax.common.base.Key; import com.wgzhao.addax.common.util.Configuration; import com.wgzhao.addax.rdbms.util.DBUtil; import com.wgzhao.addax.rdbms.util.DataBaseType; @@ -33,9 +32,14 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.base.Key.CONNECTION; +import static com.wgzhao.addax.common.base.Key.JDBC_URL; +import static com.wgzhao.addax.common.base.Key.PASSWORD; +import static com.wgzhao.addax.common.base.Key.TABLE; +import static com.wgzhao.addax.common.base.Key.USERNAME; public class GetPrimaryKeyUtil { @@ -48,19 +52,23 @@ private GetPrimaryKeyUtil() } /** - * 尝试自动获取指定表的主键,如果有多个,则取第一个 + * Try to get a primary key or unique key on single column to split the data + * if no primary key or unique key, return null + * Give priority to selecting the primary key, followed by a unique index of numeric type, + * and lastly, other divisible unique indexes. * - * @param readConf 读配置项 - * @return 主键 + * @param readConf {@link Configuration} + * @return column name if has primary key or unique key, else null */ public static String getPrimaryKey(Configuration readConf) { String sql; + List columns = new ArrayList<>(); Configuration connConf = readConf.getConfiguration(CONNECTION); - String table = connConf.getList("table").get(0).toString(); - String jdbc_url = connConf.getString(Key.JDBC_URL); - String username = readConf.getString(Key.USERNAME, null); - String password = readConf.getString(Key.PASSWORD, null); + String table = connConf.getList(TABLE).get(0).toString(); + String jdbc_url = connConf.getString(JDBC_URL); + String username = readConf.getString(USERNAME, null); + String password = readConf.getString(PASSWORD, null); String schema = null; if (table.contains(".")) { schema = table.split("\\.")[0]; @@ -76,50 +84,56 @@ public static String getPrimaryKey(Configuration readConf) LOG.debug("query primary sql: [{}]", sql); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - List columns = new ArrayList<>(); + while (resultSet.next()) { // column_name, data_type columns.add(new String[] {resultSet.getString(1), resultSet.getString(2)}); } + } + catch (SQLException e) { + LOG.debug(e.getMessage()); + return null; + } - if (columns.isEmpty()) { - // Table has no primary key - LOG.debug("The table {} has no primary key", table); - return null; - } + if (columns.isEmpty()) { + // Table has no primary key + LOG.debug("The table {} has no primary key", table); + return null; + } - String selectColumn = columns.get(0)[0]; + if (columns.size() == 1) { + return columns.get(0)[0]; + } - if (columns.size() > 1) { - // The primary key is multi-column primary key. - // select the appropriate column instead of the first column based - // on the datatype - giving preference to numerics over other types. - LOG.warn("The table {} contains a multi-column primary key. try to select numeric type primary key if present", table); - selectColumn = columns.get(0)[0]; - JDBCType jdbcType; - for (String[] column : columns) { - try { - jdbcType = JDBCType.valueOf(column[1]); - if (jdbcType == JDBCType.NUMERIC || jdbcType == JDBCType.INTEGER - || jdbcType == JDBCType.BIGINT || jdbcType == JDBCType.DECIMAL - || jdbcType == JDBCType.FLOAT) { - // better choice - selectColumn = column[0]; - break; - } - } - catch (IllegalArgumentException ignored) { - // ignore - } - } - return selectColumn; + LOG.warn("The table {} contains a multiply candidate keys. try to choose numeric type key if present", table); + JDBCType jdbcType; + List numericTypes = Arrays.asList( + JDBCType.NUMERIC, + JDBCType.INTEGER, + JDBCType.BIGINT, + JDBCType.DECIMAL, + JDBCType.FLOAT + ); + for (String[] column : columns) { + // JDBCType not support INT type, it exists in MySQL + if ("INT".equals(column[1])) { + // better choice + return column[0]; + } + try { + jdbcType = JDBCType.valueOf(column[1]); + } + catch (IllegalArgumentException e) { + LOG.warn("The column type {} does not map to JDBCType", column[1]); + continue; + } + if (numericTypes.contains(jdbcType)) { + // better choice + return column[0]; } - return selectColumn; - } - catch (SQLException e) { - LOG.debug(e.getMessage()); } - return null; + // last choice + return columns.get(0)[0]; } /** @@ -135,73 +149,73 @@ public static String getPrimaryKeyQuery(String schema, String tableName, String String sql = null; switch (dataBaseType) { case MySql: - sql = "select c.COLUMN_NAME, c.DATA_TYPE " + - "from INFORMATION_SCHEMA.`COLUMNS` c , INFORMATION_SCHEMA.STATISTICS s " + - "where c.TABLE_SCHEMA = s.TABLE_SCHEMA " + - " AND c.TABLE_NAME = s.TABLE_NAME " + - " AND c.COLUMN_NAME = s.COLUMN_NAME " + - " AND s.TABLE_SCHEMA = (" + getSchema(schema) + ") " + - " AND s.TABLE_NAME = '" + tableName + "' " + - " AND NON_UNIQUE = 0 " + - "ORDER BY SEQ_IN_INDEX ASC"; + sql = "select " + + " c.COLUMN_NAME, upper(c.DATA_TYPE) AS COLUMN_TYPE, c.COLUMN_KEY AS KEY_TYPE " + + " from INFORMATION_SCHEMA.`COLUMNS` c , INFORMATION_SCHEMA.STATISTICS s " + + " where c.TABLE_SCHEMA = s.TABLE_SCHEMA " + + " AND c.TABLE_NAME = s.TABLE_NAME " + + " AND c.COLUMN_NAME = s.COLUMN_NAME " + + " AND s.TABLE_SCHEMA = (SELECT SCHEMA()) " + + " AND s.TABLE_NAME = '" + tableName + "' " + + " AND NON_UNIQUE = 0 " + + " AND COLUMN_KEY <> 'MUL' and COLUMN_KEY <> '' " + + " ORDER BY c.COLUMN_KEY ASC, c.DATA_TYPE ASC"; break; case PostgreSQL: - sql = "SELECT col.ATTNAME, PG_CATALOG.FORMAT_TYPE(col.ATTTYPID, col.ATTTYPMOD) AS DTYPE " - + " FROM PG_CATALOG.PG_NAMESPACE sch, " - + " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col, " - + " PG_CATALOG.PG_INDEX ind " - + " WHERE sch.OID = tab.RELNAMESPACE " - + " AND tab.OID = col.ATTRELID " - + " AND tab.OID = ind.INDRELID " - + " AND col.ATTNUM > 0 " - + " AND sch.NSPNAME = (" + getSchema(schema) + ") " - + " AND tab.RELNAME = '" + tableName + "' " - + " AND col.ATTNUM = ANY(ind.INDKEY) " - + " AND (ind.INDISPRIMARY OR ind.INDISUNIQUE)"; + sql = "SELECT a.attname AS COLUMN_NAME, " + + " upper(format_type(a.atttypid, a.atttypmod)) AS COLUMN_TYPE, " + + " CASE WHEN con.contype = 'p' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE " + + " FROM pg_constraint con " + + " JOIN pg_class rel ON rel.oid = con.conrelid " + + " JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace " + + " LEFT JOIN pg_attribute a ON a.attnum = ANY(con.conkey) AND a.attrelid = con.conrelid " + + " WHERE nsp.nspname = (SELECT CURRENT_SCHEMA()) " + + " AND rel.relname = '" + tableName + "'" + + " AND con.contype IN ('p', 'u') AND array_length(con.conkey, 1) = 1" + + " ORDER BY con.contype ASC, a.atttypid ASC"; break; case SQLServer: - sql = "SELECT COL_NAME(ic.OBJECT_ID, ic.column_id) AS ColumnName, t.name AS DataType " + - "FROM " + - " sys.indexes AS i " + - " INNER JOIN sys.index_columns AS ic ON i.OBJECT_ID = ic.OBJECT_ID " + - " AND i.index_id = ic.index_id " + - " INNER JOIN sys.columns AS c ON ic.OBJECT_ID = c.OBJECT_ID " + - " AND ic.column_id = c.column_id " + - " INNER JOIN sys.types AS t ON c.system_type_id = t.system_type_id " + - " AND c.user_type_id = t.user_type_id " + - "WHERE " + - " OBJECT_NAME(ic.OBJECT_ID) = '" + tableName + "' " + - " AND i.is_unique = 1 " + - " AND (SELECT COUNT(*) FROM sys.index_columns " + - " WHERE OBJECT_ID = i.OBJECT_ID AND index_id = i.index_id ) = 1"; + sql = "SELECT " + + " kc.COLUMN_NAME, " + + " upper(c.DATA_TYPE) AS COLUMN_TYPE, " + + " CASE WHEN tc.CONSTRAINT_TYPE = 'PRIMARY KEY' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE " + + " FROM " + + " INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc " + + " JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kc ON tc.CONSTRAINT_NAME = kc.CONSTRAINT_NAME " + + " JOIN INFORMATION_SCHEMA.COLUMNS c ON kc.TABLE_NAME = c.TABLE_NAME AND kc.COLUMN_NAME = c.COLUMN_NAME " + + " WHERE " + + " tc.CONSTRAINT_TYPE IN ('PRIMARY KEY', 'UNIQUE') " + + " AND kc.TABLE_SCHEMA = (select schema_name()) " + + " AND kc.TABLE_NAME = '" + tableName + "' " + + " AND (SELECT COUNT(*) FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE CONSTRAINT_NAME = kc.CONSTRAINT_NAME) = 1" + + " ORDER BY tc.CONSTRAINT_TYPE ASC, c.DATA_TYPE ASC"; break; case ClickHouse: - sql = "SELECT name, type FROM system.columns " - + " WHERE database = (" + getSchema(schema) + ") " + sql = "SELECT name as column_name, type as column_type, 'PRI' as key_type" + + " FROM system.columns " + + " WHERE database = (SELECT currentDatabase()) " + " AND table = '" + tableName + "'" - + " AND is_in_primary_key = 1"; + + " AND is_in_primary_key = 1" + + " ORDER BY type ASC"; break; case Oracle: - if (schema == null) { - schema = username.toUpperCase(); - } - else { - schema = schema.toUpperCase(); - } + schema = schema == null ? username.toUpperCase() : schema.toUpperCase(); // 表明如果没有强制原始大小写,则一律转为大写 if (!tableName.startsWith("\"")) { tableName = tableName.toUpperCase(); } - sql = "SELECT AC.COLUMN_NAME, AC.DATA_TYPE " - + "FROM ALL_INDEXES AI, ALL_IND_COLUMNS AIC, ALL_TAB_COLUMNS AC " - + "WHERE AI.TABLE_NAME = AC.TABLE_NAME " - + "AND AI.OWNER = AC.OWNER " - + "AND AI.TABLE_NAME = AIC.TABLE_NAME " - + "AND AI.INDEX_NAME = AIC.INDEX_NAME" - + "AND AC.COLUMN_NAME = AIC.COLUMN_NAME" - + "AND AI.OWNER = '" + schema + "' " - + "AND AI.UNIQUENESS = 'UNIQUE' " - + "AND AI.TABLE_NAME = '" + tableName + "'"; + sql = "SELECT acc.column_name, upper(cc.data_type) AS COLUMN_TYPE, " + + " CASE WHEN ac.constraint_type = 'P' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE " + + "FROM " + + " all_constraints ac " + + " JOIN all_cons_columns acc ON ac.constraint_name = acc.constraint_name " + + " JOIN all_tab_columns cc ON acc.table_name = cc.table_name AND acc.column_name = cc.column_name " + + "WHERE " + + " ac.constraint_type IN ('P', 'U') " + + " AND ac.owner = '" + schema + "' " + + " AND acc.table_name = '" + tableName + "' " + + " AND (SELECT COUNT(*) FROM all_cons_columns WHERE constraint_name = ac.constraint_name) = 1" + + " ORDER BY ac.constraint_type ASC, cc.data_type ASC"; break; default: break; diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/MinMaxPackage.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/MinMaxPackage.java new file mode 100644 index 000000000..60d02d5b4 --- /dev/null +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/MinMaxPackage.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.wgzhao.addax.rdbms.reader.util; + +import com.wgzhao.addax.common.base.Constant; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class MinMaxPackage +{ + private Object min; + private Object max; + private Object type; + + public MinMaxPackage() + { + this.min = null; + this.max = null; + this.type = null; + } + + public Object getMin() + { + return min; + } + + public void setMin(Object min) + { + this.min = min; + } + + public Object getMax() + { + return max; + } + + public void setMax(Object max) + { + this.max = max; + } + + public Object getType() + { + return type; + } + + public void setType(Object type) + { + this.type = type; + } + + public boolean isLong() + { + return type == Constant.PK_TYPE_LONG; + } + + public boolean isFloat() + { + return type == Constant.PK_TYPE_FLOAT; + } + + public boolean isNumeric() { + return isLong() || isFloat(); + } + + public boolean isString() + { + return type == Constant.PK_TYPE_STRING; + } + + public List genSplitPoint(int splitNum) { + if (splitNum < 2) { + return Collections.emptyList(); + } + List result = new java.util.ArrayList<>(); + if (isLong()) { + long min = Long.parseLong(this.min.toString()); + long max = Long.parseLong(this.max.toString()); + long step = (max - min) / splitNum; + // exclude min and max + for (long i = 1; i < splitNum; i++) { + result.add(min + i * step); + } + return result; + } + else if (isFloat()) { + return genFloatSplitPoint(splitNum); + } + return result; + } + + public List genFloatSplitPoint(int splitNum) { + if (splitNum < 2) { + return Collections.emptyList(); + } + List result = new java.util.ArrayList<>(); + double min = Double.parseDouble(this.min.toString()); + double max = Double.parseDouble(this.max.toString()); + if ((max - min) <= splitNum) { + // the difference between min and max is less than splitNum + return result; + } + double step = Math.round((max - min) / splitNum); + // exclude min and max + for (long i = 1; i < splitNum; i++) { + result.add(min + i * step); + } + return result; + } + + public boolean isSameValue() { + return Objects.equals(min, max); + } +} \ No newline at end of file diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/PreCheckTask.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/PreCheckTask.java index a24ddb215..89e727d1e 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/PreCheckTask.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/PreCheckTask.java @@ -101,9 +101,6 @@ public Boolean call() if (splitPkSqls != null && !splitPkSqls.isEmpty()) { splitPkSql = splitPkSqls.get(i).toString(); DBUtil.sqlValid(splitPkSql, dataBaseType); - if (i == 0) { - SingleTableSplitUtil.preCheckSplitPk(conn, splitPkSql, fetchSize, table, userName); - } } } catch (ParserException e) { diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/SingleTableSplitUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/SingleTableSplitUtil.java index 5b865a6be..12ea84cd8 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/SingleTableSplitUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/SingleTableSplitUtil.java @@ -28,26 +28,21 @@ import com.wgzhao.addax.common.util.Configuration; import com.wgzhao.addax.rdbms.util.DBUtil; import com.wgzhao.addax.rdbms.util.DataBaseType; -import com.wgzhao.addax.rdbms.util.RdbmsException; -import com.wgzhao.addax.rdbms.util.RdbmsRangeSplitWrap; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigInteger; import java.sql.Connection; +import java.sql.JDBCType; import java.sql.ResultSet; import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.StringJoiner; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; -import static com.wgzhao.addax.common.spi.ErrorCode.NOT_SUPPORT_TYPE; public class SingleTableSplitUtil { @@ -68,77 +63,49 @@ public static List splitSingleTable(Configuration configuration, String table = configuration.getString(Key.TABLE); String where = configuration.getString(Key.WHERE, null); boolean hasWhere = StringUtils.isNotBlank(where); - if (dataBaseType == DataBaseType.Oracle) { - rangeList = genSplitSqlForOracle(splitPkName, table, where, configuration, adviceNum); - // warn: mysql etc to be added... + if (adviceNum < 1) { + throw new IllegalArgumentException("The number of split should be greater than or equal 1, but it got " + adviceNum); } - else { - Pair minMaxPK = getPkRange(configuration); - if (null == minMaxPK) { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "Primary key-based table splitting failed. The key type ONLY supports integer and string."); - } - + if (adviceNum == 1) { + LOG.warn("The adviceNumber is 1, so we only have one slice."); configuration.set(Key.QUERY_SQL, buildQuerySql(column, table, where)); - if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) { - // 切分后获取到的start/end 有 Null 的情况 - pluginParams.add(configuration); - return pluginParams; - } + pluginParams.add(configuration); + return pluginParams; + } - boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration.getString(Constant.PK_TYPE)); - boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration.getString(Constant.PK_TYPE)); + rangeList = genPkRangeSQLForGeneric(splitPkName, table, where, configuration, adviceNum); - if (isStringType) { - rangeList = splitStringPk(configuration, table, where, minMaxPK.getLeft().toString(), minMaxPK.getRight().toString(), - adviceNum, splitPkName); - } - else if (isLongType) { - rangeList = RdbmsRangeSplitWrap.splitAndWrap(new BigInteger(minMaxPK.getLeft().toString()), - new BigInteger(minMaxPK.getRight().toString()), adviceNum, splitPkName); - } - else { - throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, - "the splitPk[" + splitPkName + "] type is unsupported, it only support int and string"); - } + if (rangeList.isEmpty()) { + //mean the split key only has null value + LOG.warn("The min value is equal to the max value, or the split key has only null value to table {}. so we only have one slice.", table); + configuration.set(Key.QUERY_SQL, buildQuerySql(column, table, where)); + pluginParams.add(configuration); + return pluginParams; } + String tempQuerySql; - List allQuerySql = new ArrayList<>(); + StringJoiner allQuerySql = new StringJoiner("\n"); - if (!rangeList.isEmpty()) { - for (String range : rangeList) { - Configuration tempConfig = configuration.clone(); + for (String range : rangeList) { + Configuration tempConfig = configuration.clone(); - tempQuerySql = buildQuerySql(column, table, where) + (hasWhere ? " AND " : " WHERE ") + range; + tempQuerySql = buildQuerySql(column, table, where) + (hasWhere ? " AND " : " WHERE ") + range; - allQuerySql.add(tempQuerySql); - tempConfig.set(Key.QUERY_SQL, tempQuerySql); - pluginParams.add(tempConfig); - } - } - else { - //pluginParams.add(configuration); // this is wrong for new & old split - Configuration tempConfig = configuration.clone(); - tempQuerySql = buildQuerySql(column, table, where) - + (hasWhere ? " AND " : " WHERE ") - + String.format(" %s IS NOT NULL", splitPkName); allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); pluginParams.add(tempConfig); } - // if the `where` clause contains splitPkName, it means that the splitPkName is not null - // deal pk is null - if (where == null || ! where.contains(splitPkName)) { + if (configuration.getBool("pkExistsNull", false)) { Configuration tempConfig = configuration.clone(); tempQuerySql = buildQuerySql(column, table, where) - + (hasWhere? " AND ": " WHERE ") + + (hasWhere ? " AND " : " WHERE ") + splitPkName + " IS NULL"; allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); pluginParams.add(tempConfig); } - LOG.info("After splitting, all query sql = [\n{}\n].", StringUtils.join(allQuerySql, "\n")); + LOG.info("After splitting for table {}, all query sql = [\n{}\n].", table, allQuerySql); return pluginParams; } @@ -157,90 +124,57 @@ public static String buildQuerySql(String column, String table, String where) return querySql; } - @SuppressWarnings("resource") - private static Pair getPkRange(Configuration configuration) + /** + * get the min and max value of the split key + * + * @param configuration {@link Configuration} + * @return {@link MinMaxPackage} + * e.g {"min": 1, "max": 100, "type": "long"} `type` is one of `long`, `float`, `string` + */ + private static MinMaxPackage getPkMinAndMaxValue(Configuration configuration) { - String pkRangeSQL = genPKRangeSQL(configuration); + String splitPK = configuration.getString(Key.SPLIT_PK).trim(); + String table = configuration.getString(Key.TABLE).trim(); + String where = configuration.getString(Key.WHERE, null); + String pkRangeSQL = genPKSql(splitPK, table, where); - int fetchSize = configuration.getInt(Key.FETCH_SIZE); String jdbcURL = configuration.getString(Key.JDBC_URL); String username = configuration.getString(Key.USERNAME); String password = configuration.getString(Key.PASSWORD); - String table = configuration.getString(Key.TABLE); Connection conn = DBUtil.getConnection(dataBaseType, jdbcURL, username, password); - Pair minMaxPK = checkSplitPk(conn, pkRangeSQL, fetchSize, table, username, configuration); - DBUtil.closeDBResources(null, conn); - return minMaxPK; - } - - public static void preCheckSplitPk(Connection conn, String pkRangeSQL, int fetchSize, - String table, String username) - { - Pair minMaxPK = checkSplitPk(conn, pkRangeSQL, fetchSize, table, username, null); - if (null == minMaxPK) { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "The split key should be single column, and the type is either integer or string."); - } - } - - /** - * 检测splitPk的配置是否正确。 - * configuration为null, 是preCheck的逻辑,不需要回写PK_TYPE到configuration中 - * - * @param conn database connection - * @param pkRangeSQL query sql for getting the primary key range - * @param fetchSize fetch size - * @param table the table name - * @param username database connect username - * @param configuration connect configuration - * @return primary key range pair - */ - private static Pair checkSplitPk(Connection conn, String pkRangeSQL, int fetchSize, String table, - String username, Configuration configuration) - { + MinMaxPackage minMaxPackage = new MinMaxPackage(); ResultSet rs = null; - Pair minMaxPK = null; try { - String errorMsg = "the splitPk type is unsupported, it only support int and string"; - try { - rs = DBUtil.query(conn, pkRangeSQL, fetchSize); + rs = DBUtil.query(conn, pkRangeSQL, 1); + ResultSetMetaData rsMetaData = rs.getMetaData(); + if (isLongType(rsMetaData.getColumnType(1))) { + minMaxPackage.setType(Constant.PK_TYPE_LONG); } - catch (Exception e) { - throw RdbmsException.asQueryException(e, pkRangeSQL); + else if (isFloatType(rsMetaData.getColumnType(1))) { + minMaxPackage.setType(Constant.PK_TYPE_FLOAT); } - ResultSetMetaData rsMetaData = rs.getMetaData(); - if (isPKTypeValid(rsMetaData)) { - if (isStringType(rsMetaData.getColumnType(1))) { - if (configuration != null) { - configuration.set(Constant.PK_TYPE, Constant.PK_TYPE_STRING); - } - while (DBUtil.asyncResultSetNext(rs)) { - minMaxPK = new ImmutablePair<>(rs.getString(1), rs.getString(2)); - } - } - else if (isLongType(rsMetaData.getColumnType(1))) { - if (configuration != null) { - configuration.set(Constant.PK_TYPE, Constant.PK_TYPE_LONG); - } - - while (DBUtil.asyncResultSetNext(rs)) { - minMaxPK = new ImmutablePair<>(rs.getString(1), rs.getString(2)); - - // check: string shouldn't contain '.', for oracle - String minMax = rs.getString(1) + rs.getString(2); - if (StringUtils.contains(minMax, '.')) { - throw AddaxException.asAddaxException(CONFIG_ERROR, errorMsg); - } - } - } - else { - throw AddaxException.asAddaxException(CONFIG_ERROR, errorMsg); - } + else { + minMaxPackage.setType(Constant.PK_TYPE_STRING); + } + while (DBUtil.asyncResultSetNext(rs)) { + minMaxPackage.setMin(rs.getObject(1)); + minMaxPackage.setMax(rs.getObject(2)); + } + rs.close(); + // the pk exists null at the current condition + if (StringUtils.isBlank(where)) { + rs = DBUtil.query(conn, "SELECT count(*) FROM " + table + " WHERE " + splitPK + " IS NULL", 1); } else { - throw AddaxException.asAddaxException(CONFIG_ERROR, errorMsg); + rs = DBUtil.query(conn, "SELECT count(*) FROM " + table + " WHERE " + where + " AND " + splitPK + " IS NULL", 1); + } + if (rs.next() && rs.getInt(1) > 0) { + LOG.info("the split key has null value."); + //minMaxPackage.setPkExistsNull(true); + configuration.set("pkExistsNull", true); } + rs.close(); } catch (AddaxException e) { throw e; @@ -250,67 +184,9 @@ else if (isLongType(rsMetaData.getColumnType(1))) { } finally { DBUtil.closeDBResources(rs, null, null); + DBUtil.closeDBResources(null, conn); } - - return minMaxPK; - } - - private static boolean isPKTypeValid(ResultSetMetaData rsMetaData) - { - boolean ret = false; - try { - int minType = rsMetaData.getColumnType(1); - int maxType = rsMetaData.getColumnType(2); - boolean isNumberType = isLongType(minType); - boolean isStringType = isStringType(minType); - - if (minType == maxType && (isNumberType || isStringType)) { - ret = true; - } - } - catch (Exception e) { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "Failed to obtain the type of split key."); - } - return ret; - } - - // warn: Types.NUMERIC is used for oracle! because oracle use NUMBER to - // store INT, SMALLINT, INTEGER, and only oracle need to concern - // Types.NUMERIC - private static boolean isLongType(int type) - { - boolean isValidLongType = type == Types.BIGINT || type == Types.INTEGER || type == Types.SMALLINT || type == Types.TINYINT; - - if (SingleTableSplitUtil.dataBaseType == DataBaseType.Oracle) { - isValidLongType |= type == Types.NUMERIC; - } - return isValidLongType; - } - - private static boolean isStringType(int type) - { - return type == Types.CHAR || type == Types.NCHAR || type == Types.VARCHAR || type == Types.LONGVARCHAR || type == Types.NVARCHAR; - } - - private static String genPKRangeSQL(Configuration configuration) - { - - String splitPK = configuration.getString(Key.SPLIT_PK).trim(); - String table = configuration.getString(Key.TABLE).trim(); - String where = configuration.getString(Key.WHERE, null); - return genPKSql(splitPK, table, where); - } - - public static String genPKSql(String splitPK, String table, String where) - { - - String minMaxTemplate = "SELECT MIN(%s), MAX(%s) FROM %s"; - String pkRangeSQL = String.format(minMaxTemplate, splitPK, splitPK, table); - if (StringUtils.isNotBlank(where)) { - pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)", pkRangeSQL, where, splitPK); - } - return pkRangeSQL; + return minMaxPackage; } /** @@ -321,42 +197,40 @@ public static String genPKSql(String splitPK, String table, String where) * @param where where clause * @param configuration configuration * @param adviceNum the number of split - * @return list of string + * @return 1. empty list of the min value is equal to max value, or the split key has only null value; + * 2. {@link List} of where clause */ - public static List genSplitSqlForOracle(String splitPK, String table, String where, Configuration configuration, int adviceNum) + public static List genPkRangeSQLForGeneric(String splitPK, String table, String where, Configuration configuration, int adviceNum) { - if (adviceNum < 1) { - throw new IllegalArgumentException(String.format("The number of split should be greater than or equal 1, but it got %d.", adviceNum)); - } - else if (adviceNum == 1) { + if (adviceNum == 1) { return new ArrayList<>(); } - String whereSql = String.format("%s IS NOT NULL", splitPK); - if (StringUtils.isNotBlank(where)) { - whereSql = String.format(" WHERE (%s) AND (%s) ", whereSql, where); + + List rangeValue = new ArrayList<>(); + MinMaxPackage pkMinAndMaxValue = getPkMinAndMaxValue(configuration); + if (pkMinAndMaxValue.getMin() == null || pkMinAndMaxValue.getMax() == null || pkMinAndMaxValue.isSameValue()) { + // mean the split key has only null value, it can not split + return Collections.emptyList(); } - else { - whereSql = String.format(" WHERE (%s) ", whereSql); + + if (pkMinAndMaxValue.isNumeric()) { + LOG.info("The type of split key is numeric, so we use the math algorithm to split the table."); + rangeValue = pkMinAndMaxValue.genSplitPoint(adviceNum); + return genAllTypePkRangeWhereClause(splitPK, pkMinAndMaxValue, rangeValue); } - Double percentage = configuration.getDouble(Key.SAMPLE_PERCENTAGE, 0.1); - String sampleSqlTemplate = "SELECT * FROM ( SELECT %s FROM %s SAMPLE (%s) %s ORDER BY DBMS_RANDOM.VALUE) WHERE ROWNUM <= %s ORDER by %s ASC"; - String splitSql = String.format(sampleSqlTemplate, splitPK, table, percentage, whereSql, adviceNum, splitPK); - int fetchSize = configuration.getInt(Key.FETCH_SIZE, 32); + String splitSql = genSplitPointSql(splitPK, table, where, adviceNum, dataBaseType, pkMinAndMaxValue); String jdbcURL = configuration.getString(Key.JDBC_URL); String username = configuration.getString(Key.USERNAME); String password = configuration.getString(Key.PASSWORD); Connection conn = DBUtil.getConnection(dataBaseType, jdbcURL, username, password); LOG.info("split pk [sql={}] is running... ", splitSql); ResultSet rs = null; - List> splitRange = new ArrayList<>(); + try { - rs = DBUtil.query(conn, splitSql, fetchSize); - configuration.set(Constant.PK_TYPE, Constant.PK_TYPE_MONTE_CARLO); - ResultSetMetaData rsMetaData = rs.getMetaData(); + rs = DBUtil.query(conn, splitSql, adviceNum); while (DBUtil.asyncResultSetNext(rs)) { - ImmutablePair eachPoint = new ImmutablePair<>(rs.getObject(1), rsMetaData.getColumnType(1)); - splitRange.add(eachPoint); + rangeValue.add(rs.getObject(1)); } } catch (AddaxException e) { @@ -369,97 +243,128 @@ else if (adviceNum == 1) { DBUtil.closeDBResources(rs, null, null); } - LOG.debug(JSON.toJSONString(splitRange)); - List rangeSql = new ArrayList<>(); - int splitRangeSize = splitRange.size(); - // warn: splitRangeSize may be 0 or 1,切分规则为IS NULL以及 IS NOT NULL - // demo: Parameter rangeResult can not be null and its length can not <2. detail:rangeResult=[24999930]. - if (splitRangeSize >= 2) { - // warn: oracle Number is long type here - if (isLongType(splitRange.get(0).getRight())) { - BigInteger[] integerPoints = new BigInteger[splitRange.size()]; - for (int i = 0; i < splitRangeSize; i++) { - integerPoints[i] = new BigInteger(splitRange.get(i).getLeft().toString()); - } - rangeSql.addAll(RdbmsRangeSplitWrap.wrapRange(integerPoints, splitPK)); - // it's ok if splitRangeSize is 1 - rangeSql.add(RdbmsRangeSplitWrap.wrapFirstLastPoint(integerPoints[0], integerPoints[splitRangeSize - 1], splitPK)); - } - else if (isStringType(splitRange.get(0).getRight())) { - // warn: treated as string type - String[] stringPoints = new String[splitRange.size()]; - for (int i = 0; i < splitRangeSize; i++) { - stringPoints[i] = splitRange.get(i).getLeft().toString(); - } - rangeSql.addAll(RdbmsRangeSplitWrap.wrapRange(stringPoints, splitPK, "'", dataBaseType)); - // it's ok if splitRangeSize is 1 - rangeSql.add(RdbmsRangeSplitWrap.wrapFirstLastPoint(stringPoints[0], stringPoints[splitRangeSize - 1], - splitPK, "'", dataBaseType)); - } - else { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "the data type of split key is unsupported. it ONLY supports integer and string."); - } - } - return rangeSql; + LOG.debug(JSON.toJSONString(rangeValue)); + return genAllTypePkRangeWhereClause(splitPK, pkMinAndMaxValue, rangeValue); } /** - * common String split method + * generate sql that get the split points, whose points is the boundary of the split + * we can use math algorithm to get the split points, but it causes the data skew when the + * split key is not uniform distributed. So we use the random algorithm to get the split points * - * @param configuration configuration - * @param table the table which be queried - * @param where where clause - * @param minVal minimal value - * @param maxVal maximal value - * @param splitNum expected split number - * @param pkName the column which split by - * @return list of string + * @param splitPK the split key + * @param table the table name + * @param whereSql the where clause + * @param adviceNum the number of split + * @param dataBaseType the database type + * @param minMaxPack {@link MinMaxPackage} + * @return the sql string that get the split points */ - private static List splitStringPk(Configuration configuration, String table, String where, String minVal, String maxVal, - int splitNum, String pkName) + private static String genSplitPointSql(String splitPK, String table, String whereSql, int adviceNum, DataBaseType dataBaseType, MinMaxPackage minMaxPack) { - List rangeList = new ArrayList<>(); - String splitSql; - if (splitNum < 2) { - rangeList.add(String.format("%s >= '%s' AND %s <= '%s'", pkName, minVal, pkName, maxVal)); - return rangeList; + String sql; + if (StringUtils.isBlank(whereSql)) { + whereSql = " WHERE 1=1 "; } - if (StringUtils.isBlank(where)) { - where = "1=1"; + if (minMaxPack.getType() == Constant.PK_TYPE_STRING) { + whereSql = whereSql + " AND " + splitPK + " > '" + minMaxPack.getMin().toString() + "' AND " + splitPK + "< '" + minMaxPack.getMax().toString() + "'"; } - if (dataBaseType == DataBaseType.MySql) { - splitSql = String.format("SELECT %1$s from (SELECT %1$s FROM %2$s WHERE %3$s ORDER BY RAND() LIMIT %4$d) T ORDER BY %1$s ASC", - pkName, table, where, splitNum - 1); + else { + whereSql = whereSql + " AND " + splitPK + " > " + minMaxPack.getMin() + " AND " + splitPK + "<" + minMaxPack.getMax(); } - else if (dataBaseType == DataBaseType.PostgreSQL) { - splitSql = String.format("SELECT %s FROM %s TABLESAMPLE SYSTEM(10) REPEATABLE(200) ORDER BY %s LIMIT %d", - pkName, table, pkName, splitNum - 1); + + if (dataBaseType == DataBaseType.Oracle) { + sql = String.format("select %1$s from (select %1$s from %2$s %3$s order by DBMS_RANDOM.VALUE) where rownum < %4$d order by %1$s", + splitPK, table, whereSql, adviceNum); + } + else if (dataBaseType == DataBaseType.SQLServer || dataBaseType == DataBaseType.Sybase) { + sql = String.format("select %1$s from (select top %4$d %1$s from %2$s %3$s order by newid()) t order by %1$s", + splitPK, table, whereSql, adviceNum - 1); + } + else if (dataBaseType == DataBaseType.PostgreSQL || dataBaseType == DataBaseType.SQLite) { + sql = String.format("select %1s from (select %1$s from %2$s %3$s order by random() limit %4$d) t order by %1$s", + splitPK, table, whereSql, adviceNum - 1); } else { - return RdbmsRangeSplitWrap.splitAndWrap(minVal, maxVal, splitNum, pkName, "'", dataBaseType); + // include mysql, mariadb, clickhouse, hive, trinodb, presto, doris, phoenix with hbase + sql = String.format("select %1$s from (select %1$s from %2$s %3$s order by rand() limit %4$d) t order by %1$s", + splitPK, table, whereSql, adviceNum - 1); } - String jdbcURL = configuration.getString(Key.JDBC_URL); - String username = configuration.getString(Key.USERNAME); - String password = configuration.getString(Key.PASSWORD); - try (Connection conn = DBUtil.getConnection(dataBaseType, jdbcURL, username, password)) { - Statement statement = conn.createStatement(); - ResultSet resultSet = statement.executeQuery(splitSql); - List values = new ArrayList<>(); - while (resultSet.next()) { - values.add(resultSet.getString(1)); - } - String preVal = minVal; - for (String val : values) { - rangeList.add(String.format("%1$s >='%2$s' AND %1$s <'%3$s' ", pkName, preVal, val)); - preVal = val; - } - rangeList.add(String.format("%1$s >='%2$s' AND %1$s <='%3$s' ", pkName, preVal, maxVal)); - return rangeList; + return sql; + } + + /** + * generate all split query where clause like the following: + *

+ * pk ≥ min and pk < splitPoint1 + * pk ≥ splitPoint1 and pk < splitPoint2 + * .... + * pk ≥ splitPointN and pk ≤ max + *

+ * + * @param pkName the split key name + * @param minMaxPackage {@link MinMaxPackage} + * @param rangeValues the list of split points + * @return the list of where clause + */ + public static List genAllTypePkRangeWhereClause(String pkName, MinMaxPackage minMaxPackage, List rangeValues) + { + List rangeSql = new ArrayList<>(); + String singleSqlTemplate; + String middleSqlTemplate; + String lastSqlTemplate; + + boolean isString = minMaxPackage.getType() == Constant.PK_TYPE_STRING; + if (isString) { + singleSqlTemplate = "%s >= '%s' AND %s <= '%s'"; + middleSqlTemplate = "%s >= '%s' AND %s < '%s'"; + lastSqlTemplate = "%s >= '%s' AND %s <= '%s'"; + } + else { + singleSqlTemplate = "%s >= %s AND %s <= %s"; + middleSqlTemplate = "%s >= %s AND %s < %s"; + lastSqlTemplate = "%s >= %s AND %s <= %s"; } - catch (SQLException e) { - LOG.error("Failed to split the table by split key[{}]", splitSql, e); - return rangeList; + Object min = minMaxPackage.getMin(); + Object max = minMaxPackage.getMax(); + if (rangeValues.isEmpty()) { + rangeSql.add(String.format(singleSqlTemplate, pkName, min, pkName, max)); + return rangeSql; + } + rangeValues.add(0, min); + for (int i = 0; i < rangeValues.size() - 1; i++) { + rangeSql.add(String.format(middleSqlTemplate, pkName, rangeValues.get(i), pkName, rangeValues.get(i + 1))); + } + + rangeSql.add(String.format(lastSqlTemplate, pkName, rangeValues.get(rangeValues.size() - 1), pkName, max)); + + return rangeSql; + } + + private static boolean isLongType(int type) + { + List longTypeList = Arrays.asList( + JDBCType.BIGINT, JDBCType.INTEGER, JDBCType.SMALLINT, JDBCType.TINYINT + ); + return longTypeList.contains(JDBCType.valueOf(type)); + } + + private static boolean isFloatType(int type) + { + List floatTypeList = Arrays.asList(JDBCType.DECIMAL, JDBCType.NUMERIC, + JDBCType.DOUBLE, JDBCType.FLOAT, JDBCType.REAL + ); + return floatTypeList.contains(JDBCType.valueOf(type)); + } + + public static String genPKSql(String splitPK, String table, String where) + { + + String minMaxTemplate = "SELECT MIN(%s), MAX(%s) FROM %s"; + String pkRangeSQL = String.format(minMaxTemplate, splitPK, splitPK, table); + if (StringUtils.isNotBlank(where)) { + pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)", pkRangeSQL, where, splitPK); } + return pkRangeSQL; } } diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/RdbmsRangeSplitWrap.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/RdbmsRangeSplitWrap.java index b057bc6ff..cc219b3af 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/RdbmsRangeSplitWrap.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/RdbmsRangeSplitWrap.java @@ -32,93 +32,4 @@ public final class RdbmsRangeSplitWrap { private RdbmsRangeSplitWrap() {} - - public static List splitAndWrap(String left, String right, int expectSliceNumber, - String columnName, String quote, DataBaseType dataBaseType) - { - String[] tempResult = RangeSplitUtil.doAsciiStringSplit(left, right, expectSliceNumber); - return RdbmsRangeSplitWrap.wrapRange(tempResult, columnName, quote, dataBaseType); - } - - public static List splitAndWrap(BigInteger left, BigInteger right, int expectSliceNumber, String columnName) - { - BigInteger[] tempResult = RangeSplitUtil.doBigIntegerSplit(left, right, expectSliceNumber); - return RdbmsRangeSplitWrap.wrapRange(tempResult, columnName); - } - - public static List wrapRange(long[] rangeResult, String columnName) - { - String[] rangeStr = new String[rangeResult.length]; - for (int i = 0, len = rangeResult.length; i < len; i++) { - rangeStr[i] = String.valueOf(rangeResult[i]); - } - return wrapRange(rangeStr, columnName, "", null); - } - - public static List wrapRange(BigInteger[] rangeResult, String columnName) - { - String[] rangeStr = new String[rangeResult.length]; - for (int i = 0, len = rangeResult.length; i < len; i++) { - rangeStr[i] = rangeResult[i].toString(); - } - return wrapRange(rangeStr, columnName, "", null); - } - - public static List wrapRange(String[] rangeResult, String columnName, - String quote, DataBaseType dataBaseType) - { - if (null == rangeResult || rangeResult.length < 2) { - throw new IllegalArgumentException(String.format( - "Parameter rangeResult can not be null and its length can not <2. detail:rangeResult=[%s].", - StringUtils.join(rangeResult, ","))); - } - - List result = new ArrayList<>(); - - //TODO change to stringBuilder.append(..) - if (2 == rangeResult.length) { - result.add(String.format(" (%s%s%s <= %s AND %s <= %s%s%s) ", quote, quoteConstantValue(rangeResult[0], dataBaseType), - quote, columnName, columnName, quote, quoteConstantValue(rangeResult[1], dataBaseType), quote)); - } - else { - for (int i = 0, len = rangeResult.length - 2; i < len; i++) { - result.add(String.format(" (%s%s%s <= %s AND %s < %s%s%s) ", quote, quoteConstantValue(rangeResult[i], dataBaseType), - quote, columnName, columnName, quote, quoteConstantValue(rangeResult[i + 1], dataBaseType), quote)); - } - - result.add(String.format(" (%s%s%s <= %s AND %s <= %s%s%s) ", quote, quoteConstantValue(rangeResult[rangeResult.length - 2], dataBaseType), - quote, columnName, columnName, quote, quoteConstantValue(rangeResult[rangeResult.length - 1], dataBaseType), quote)); - } - return result; - } - - public static String wrapFirstLastPoint(String firstPoint, String lastPoint, String columnName, - String quote, DataBaseType dataBaseType) - { - return String.format(" ((%s < %s%s%s) OR (%s%s%s < %s)) ", columnName, quote, quoteConstantValue(firstPoint, dataBaseType), - quote, quote, quoteConstantValue(lastPoint, dataBaseType), quote, columnName); - } - - public static String wrapFirstLastPoint(BigInteger firstPoint, BigInteger lastPoint, String columnName) - { - return wrapFirstLastPoint(firstPoint.toString(), lastPoint.toString(), columnName, "", null); - } - - private static String quoteConstantValue(String aString, DataBaseType dataBaseType) - { - if (null == dataBaseType) { - return aString; - } - - if (dataBaseType == DataBaseType.MySql) { - return aString.replace("'", "''").replace("\\", "\\\\"); - } - else if (dataBaseType == DataBaseType.Oracle || dataBaseType == DataBaseType.SQLServer) { - return aString.replace("'", "''"); - } - else { - //TODO other type supported - return aString; - } - } } diff --git a/plugin/reader/hbase20xsqlreader/pom.xml b/plugin/reader/hbase20xsqlreader/pom.xml index bdad0d78e..7c42cb497 100644 --- a/plugin/reader/hbase20xsqlreader/pom.xml +++ b/plugin/reader/hbase20xsqlreader/pom.xml @@ -26,7 +26,6 @@ com.wgzhao.addax addax-rdbms ${project.version} - compile diff --git a/plugin/reader/hbase20xsqlreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java b/plugin/reader/hbase20xsqlreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java index c009f9c29..fbbc5941f 100644 --- a/plugin/reader/hbase20xsqlreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java +++ b/plugin/reader/hbase20xsqlreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java @@ -23,6 +23,7 @@ import com.wgzhao.addax.common.base.HBaseKey; import com.wgzhao.addax.common.exception.AddaxException; import com.wgzhao.addax.common.util.Configuration; +import com.wgzhao.addax.rdbms.reader.util.ReaderSplitUtil; import com.wgzhao.addax.rdbms.util.RdbmsRangeSplitWrap; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; import java.sql.DriverManager; @@ -55,72 +57,14 @@ public class HBase20SQLReaderHelper private final Configuration configuration; private Connection connection; - private List querySql; - private String fullTableName; private List columnNames; private String splitKey; - private List splitPoints; public HBase20SQLReaderHelper(Configuration configuration) { this.configuration = configuration; } - public static String buildQuerySql(List columnNames, String table, - String where) - { - String querySql; - StringBuilder columnBuilder = new StringBuilder(); - for (String columnName : columnNames) { - columnBuilder.append("\"").append(columnName).append("\","); - } - columnBuilder.setLength(columnBuilder.length() - 1); - if (StringUtils.isBlank(where)) { - querySql = String.format(HBaseConstant.QUERY_SQL_TEMPLATE_WITHOUT_WHERE, - columnBuilder.toString(), table); - } - else { - querySql = String.format(HBaseConstant.QUERY_SQL_TEMPLATE, columnBuilder.toString(), - table, where); - } - return querySql; - } - - private static boolean isPKTypeValid(ResultSetMetaData rsMetaData) - { - boolean ret = false; - try { - int minType = rsMetaData.getColumnType(1); - int maxType = rsMetaData.getColumnType(2); - - boolean isNumberType = isLongType(minType); - - boolean isStringType = isStringType(minType); - - if (minType == maxType && (isNumberType || isStringType)) { - ret = true; - } - } - catch (Exception e) { - throw AddaxException.asAddaxException(EXECUTE_FAIL, - "Addax 获取切分主键(splitPk)字段类型失败. 该错误通常是系统底层异常导致."); - } - return ret; - } - - private static boolean isLongType(int type) - { - return type == Types.BIGINT || type == Types.INTEGER - || type == Types.SMALLINT || type == Types.TINYINT; - } - - private static boolean isStringType(int type) - { - return type == Types.CHAR || type == Types.NCHAR - || type == Types.VARCHAR || type == Types.LONGVARCHAR - || type == Types.NVARCHAR; - } - /** * 校验配置参数是否正确 */ @@ -133,22 +77,23 @@ public void validateParameter() connection = getConnection(queryServerAddress, serialization); //判断querySql是否配置,如果配置则table配置可为空,否则table必须配置 - querySql = configuration.getList(HBaseKey.QUERY_SQL, String.class); + List querySql = configuration.getList(HBaseKey.QUERY_SQL, String.class); if (querySql == null || querySql.isEmpty()) { LOG.info("Split according to splitKey or split points."); String schema = configuration.getString(HBaseKey.SCHEMA, null); String tableName = configuration.getNecessaryValue(HBaseKey.TABLE, REQUIRED_VALUE); + String fullTableName; if (schema != null && !schema.isEmpty()) { fullTableName = "\"" + schema + "\".\"" + tableName + "\""; } else { fullTableName = "\"" + tableName + "\""; } + configuration.set(HBaseKey.TABLE, fullTableName); // 如果列名未配置,默认读取全部列* columnNames = configuration.getList(HBaseKey.COLUMN, String.class); splitKey = configuration.getString(HBaseKey.SPLIT_KEY, null); - splitPoints = configuration.getList(HBaseKey.SPLIT_POINT); checkTable(schema, tableName); dealWhere(); } @@ -224,7 +169,7 @@ public void checkTable(String schema, String tableName) if (splitKey != null && !primaryColumnNames.contains(splitKey)) { // 切分列必须是主键列,否则会严重影响读取性能 throw AddaxException.asAddaxException(ILLEGAL_VALUE, - "您配置的切分列" + splitKey + "不是表" + tableName + "的主键,请检查您的配置或者联系HBase管理员."); + "您配置的切分列" + splitKey + "不是表" + tableName + "的主键,请检查您的配置或者联系HBase管理员."); } } catch (SQLException e) { @@ -274,199 +219,6 @@ public void dealWhere() */ public List doSplit(int adviceNumber) { - List pluginParams = new ArrayList<>(); - List rangeList; - String where = configuration.getString(HBaseKey.WHERE); - boolean hasWhere = StringUtils.isNotBlank(where); - if (querySql == null || querySql.isEmpty()) { - // 如果splitPoints为空,则根据splitKey自动切分,不过这种切分方式无法保证数据均分,且只支持整形和字符型列 - if (splitPoints == null || splitPoints.isEmpty()) { - LOG.info("Split according to the min and max value of splitColumn..."); - Pair minMaxPK = getPkRange(configuration); - if (null == minMaxPK) { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "根据切分主键切分表失败. 仅支持切分主键为一个,并且类型为整数或者字符串类型. " + - "请尝试使用其他的切分主键或者联系 HBase管理员 进行处理."); - } - if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) { - // 切分后获取到的start/end 有 Null 的情况 - pluginParams.add(configuration); - return pluginParams; - } - boolean isStringType = HBaseConstant.PK_TYPE_STRING.equals(configuration - .getString(HBaseConstant.PK_TYPE)); - boolean isLongType = HBaseConstant.PK_TYPE_LONG.equals(configuration - .getString(HBaseConstant.PK_TYPE)); - if (isStringType) { - rangeList = RdbmsRangeSplitWrap.splitAndWrap( - String.valueOf(minMaxPK.getLeft()), - String.valueOf(minMaxPK.getRight()), adviceNumber, - splitKey, "'", null); - } - else if (isLongType) { - rangeList = RdbmsRangeSplitWrap.splitAndWrap( - new BigInteger(minMaxPK.getLeft().toString()), - new BigInteger(minMaxPK.getRight().toString()), - adviceNumber, splitKey); - } - else { - throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, - "您配置的切分主键(splitPk) 类型不支持. 仅支持切分主键为一个,并且类型为整数或者字符串类型. " + - "请尝试使用其他的切分主键或者联系HBase管理员进行处理."); - } - } - else { - LOG.info("Split according to splitPoints..."); - // 根据指定splitPoints进行切分 - rangeList = buildSplitRange(); - } - String tempQuerySql; - if (null != rangeList && !rangeList.isEmpty()) { - for (String range : rangeList) { - Configuration tempConfig = configuration.clone(); - - tempQuerySql = buildQuerySql(columnNames, fullTableName, where) - + (hasWhere ? " and " : " where ") + range; - LOG.info("Query SQL: {}", tempQuerySql); - tempConfig.set(HBaseConstant.QUERY_SQL_PER_SPLIT, tempQuerySql); - pluginParams.add(tempConfig); - } - } - else { - Configuration tempConfig = configuration.clone(); - tempQuerySql = buildQuerySql(columnNames, fullTableName, where) - + (hasWhere ? " and " : " where ") - + String.format(" %s IS NOT NULL", splitKey); - LOG.info("Query SQL: {}", tempQuerySql); - tempConfig.set(HBaseConstant.QUERY_SQL_PER_SPLIT, tempQuerySql); - pluginParams.add(tempConfig); - } - } - else { - // 指定querySql不需要切分 - for (String sql : querySql) { - Configuration tempConfig = configuration.clone(); - tempConfig.set(HBaseConstant.QUERY_SQL_PER_SPLIT, sql); - pluginParams.add(tempConfig); - } - } - return pluginParams; - } - - private List buildSplitRange() - { - String getSplitKeyTypeSQL = String.format(HBaseConstant.QUERY_COLUMN_TYPE_TEMPLATE, splitKey, fullTableName); - Statement statement = null; - ResultSet resultSet = null; - List splitConditions = new ArrayList<>(); - - try { - statement = connection.createStatement(); - resultSet = statement.executeQuery(getSplitKeyTypeSQL); - ResultSetMetaData rsMetaData = resultSet.getMetaData(); - int type = rsMetaData.getColumnType(1); - String symbol = "%s"; - switch (type) { - case Types.CHAR: - case Types.VARCHAR: - symbol = "'%s'"; - break; - case Types.DATE: - symbol = "TO_DATE('%s')"; - break; - case Types.TIME: - symbol = "TO_TIME('%s')"; - break; - case Types.TIMESTAMP: - symbol = "TO_TIMESTAMP('%s')"; - break; - case Types.BINARY: - case Types.VARBINARY: - case Types.ARRAY: - throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, - "切分列类型为" + rsMetaData.getColumnTypeName(1) + ",暂不支持该类型字段作为切分列。"); - default: - break; - } - String splitCondition; - for (int i = 0; i <= splitPoints.size(); i++) { - if (i == 0) { - splitCondition = splitKey + " <= " + String.format(symbol, splitPoints.get(i)); - } - else if (i == splitPoints.size()) { - splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1)); - } - else { - splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1)) + - " AND " + splitKey + " <= " + String.format(symbol, splitPoints.get(i)); - } - splitConditions.add(splitCondition); - } - - return splitConditions; - } - catch (SQLException e) { - throw AddaxException.asAddaxException(EXECUTE_FAIL, - "获取切分列类型失败,请检查服务或给定表和切分列是否正常,或者联系HBase管理员进行处理。", e); - } - finally { - closeJdbc(null, statement, resultSet); - } - } - - private Pair getPkRange(Configuration configuration) - { - String pkRangeSQL = String.format(HBaseConstant.QUERY_MIN_MAX_TEMPLATE, splitKey, splitKey, fullTableName); - String where = configuration.getString(HBaseKey.WHERE); - if (StringUtils.isNotBlank(where)) { - pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)", - pkRangeSQL, where, splitKey); - } - Statement statement = null; - ResultSet resultSet = null; - Pair minMaxPK = null; - - try { - statement = connection.createStatement(); - resultSet = statement.executeQuery(pkRangeSQL); - ResultSetMetaData rsMetaData = resultSet.getMetaData(); - - if (isPKTypeValid(rsMetaData)) { - if (isStringType(rsMetaData.getColumnType(1))) { - configuration.set(HBaseConstant.PK_TYPE, HBaseConstant.PK_TYPE_STRING); - - if (resultSet.next()) { - minMaxPK = new ImmutablePair<>( - resultSet.getString(1), resultSet.getString(2)); - } - } - else if (isLongType(rsMetaData.getColumnType(1))) { - configuration.set(HBaseConstant.PK_TYPE, HBaseConstant.PK_TYPE_LONG); - - if (resultSet.next()) { - minMaxPK = new ImmutablePair<>( - resultSet.getLong(1), resultSet.getLong(2)); - } - } - else { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "您配置的切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型不支持. " + - "仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理."); - } - } - else { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "您配置的切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型不支持. " + - "仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理."); - } - } - catch (SQLException e) { - throw AddaxException.asAddaxException(CONFIG_ERROR, e); - } - finally { - closeJdbc(null, statement, resultSet); - } - - return minMaxPK; + return ReaderSplitUtil.doSplit(configuration, adviceNumber); } }