Skip to content

Commit

Permalink
[improve][lib][rdbms] Refactor table split strategy (#1145)
Browse files Browse the repository at this point in the history
1. Implement math algorithms for numeric split keys, including float types.
2. Use database-supported `ORDER BY RAND()` or similar functions for string split keys.
3. Refactor code for improved simplicity and clarity.
  • Loading branch information
wgzhao authored Sep 28, 2024
1 parent 291fd69 commit 7942831
Show file tree
Hide file tree
Showing 9 changed files with 447 additions and 769 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Constant
public static final Object PK_TYPE_MONTE_CARLO = "pkTypeMonteCarlo";
// The data type of primary key is string.
public static final Object PK_TYPE_STRING = "pkTypeString";
public static final Object PK_TYPE_FLOAT = "pkTypeFloat";

public static final String INSERT_OR_REPLACE_TEMPLATE_MARK = "insertOrReplaceTemplate";
public static final String QUERY_SQL_TEMPLATE = "SELECT %s FROM %s WHERE (%s)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public Configuration init(Configuration originalConfig)

OriginalConfPretreatmentUtil.doPretreatment(originalConfig);
if (originalConfig.getString(Key.SPLIT_PK) == null && originalConfig.getBool(Key.AUTO_PK, false)) {
LOG.info("The primary key used for splitting is not configured, try to guess the primary key that can be split.");
LOG.info("The split key is not configured, try to guess the split key.");
String splitPK = GetPrimaryKeyUtil.getPrimaryKey(originalConfig);
if (splitPK != null) {
LOG.info("Try to use [{}] as primary key to split", splitPK);
LOG.info("Try to use [{}] as split key", splitPK);
originalConfig.set(Key.SPLIT_PK, splitPK);
if (originalConfig.getInt(Key.EACH_TABLE_SPLIT_SIZE, -1) == -1) {
originalConfig.set(Key.EACH_TABLE_SPLIT_SIZE, Constant.DEFAULT_EACH_TABLE_SPLIT_SIZE);
Expand All @@ -103,42 +103,10 @@ public void preCheck(Configuration originalConfig, DataBaseType dataBaseType)
/* 检查每个表是否有读权限,以及querySql跟split Key是否正确 */
Configuration queryConf = ReaderSplitUtil.doPreCheckSplit(originalConfig);
String splitPK = queryConf.getString(Key.SPLIT_PK);
List<Object> connList = queryConf.getList(Key.CONNECTION, Object.class);
Configuration connConf = queryConf.getConfiguration(Key.CONNECTION);
String username = queryConf.getString(Key.USERNAME);
String password = queryConf.getString(Key.PASSWORD);
ExecutorService exec;
if (connList.size() < 10) {
exec = Executors.newFixedThreadPool(connList.size());
}
else {
exec = Executors.newFixedThreadPool(10);
}
Collection<PreCheckTask> taskList = new ArrayList<>();
for (Object o : connList) {
Configuration connConf = Configuration.from(o.toString());
PreCheckTask t = new PreCheckTask(username, password, connConf, dataBaseType, splitPK);
taskList.add(t);
}
List<Future<Boolean>> results = new ArrayList<>();
try {
results = exec.invokeAll(taskList);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

for (Future<Boolean> result : results) {
try {
result.get();
}
catch (ExecutionException e) {
throw (AddaxException) e.getCause();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
exec.shutdownNow();
new PreCheckTask(username, password, connConf, dataBaseType, splitPK).call();
}

public List<Configuration> split(Configuration originalConfig, int adviceNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/
package com.wgzhao.addax.rdbms.reader.util;

import com.wgzhao.addax.common.base.Key;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.rdbms.util.DBUtil;
import com.wgzhao.addax.rdbms.util.DataBaseType;
Expand All @@ -33,9 +32,14 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.base.Key.CONNECTION;
import static com.wgzhao.addax.common.base.Key.JDBC_URL;
import static com.wgzhao.addax.common.base.Key.PASSWORD;
import static com.wgzhao.addax.common.base.Key.TABLE;
import static com.wgzhao.addax.common.base.Key.USERNAME;

public class GetPrimaryKeyUtil
{
Expand All @@ -48,19 +52,23 @@ private GetPrimaryKeyUtil()
}

/**
* 尝试自动获取指定表的主键,如果有多个,则取第一个
* Try to get a primary key or unique key on single column to split the data
* if no primary key or unique key, return null
* Give priority to selecting the primary key, followed by a unique index of numeric type,
* and lastly, other divisible unique indexes.
*
* @param readConf 读配置项
* @return 主键
* @param readConf {@link Configuration}
* @return column name if has primary key or unique key, else null
*/
public static String getPrimaryKey(Configuration readConf)
{
String sql;
List<String[]> columns = new ArrayList<>();
Configuration connConf = readConf.getConfiguration(CONNECTION);
String table = connConf.getList("table").get(0).toString();
String jdbc_url = connConf.getString(Key.JDBC_URL);
String username = readConf.getString(Key.USERNAME, null);
String password = readConf.getString(Key.PASSWORD, null);
String table = connConf.getList(TABLE).get(0).toString();
String jdbc_url = connConf.getString(JDBC_URL);
String username = readConf.getString(USERNAME, null);
String password = readConf.getString(PASSWORD, null);
String schema = null;
if (table.contains(".")) {
schema = table.split("\\.")[0];
Expand All @@ -76,50 +84,56 @@ public static String getPrimaryKey(Configuration readConf)
LOG.debug("query primary sql: [{}]", sql);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
List<String[]> columns = new ArrayList<>();

while (resultSet.next()) {
// column_name, data_type
columns.add(new String[] {resultSet.getString(1), resultSet.getString(2)});
}
}
catch (SQLException e) {
LOG.debug(e.getMessage());
return null;
}

if (columns.isEmpty()) {
// Table has no primary key
LOG.debug("The table {} has no primary key", table);
return null;
}
if (columns.isEmpty()) {
// Table has no primary key
LOG.debug("The table {} has no primary key", table);
return null;
}

String selectColumn = columns.get(0)[0];
if (columns.size() == 1) {
return columns.get(0)[0];
}

if (columns.size() > 1) {
// The primary key is multi-column primary key.
// select the appropriate column instead of the first column based
// on the datatype - giving preference to numerics over other types.
LOG.warn("The table {} contains a multi-column primary key. try to select numeric type primary key if present", table);
selectColumn = columns.get(0)[0];
JDBCType jdbcType;
for (String[] column : columns) {
try {
jdbcType = JDBCType.valueOf(column[1]);
if (jdbcType == JDBCType.NUMERIC || jdbcType == JDBCType.INTEGER
|| jdbcType == JDBCType.BIGINT || jdbcType == JDBCType.DECIMAL
|| jdbcType == JDBCType.FLOAT) {
// better choice
selectColumn = column[0];
break;
}
}
catch (IllegalArgumentException ignored) {
// ignore
}
}
return selectColumn;
LOG.warn("The table {} contains a multiply candidate keys. try to choose numeric type key if present", table);
JDBCType jdbcType;
List<JDBCType> numericTypes = Arrays.asList(
JDBCType.NUMERIC,
JDBCType.INTEGER,
JDBCType.BIGINT,
JDBCType.DECIMAL,
JDBCType.FLOAT
);
for (String[] column : columns) {
// JDBCType not support INT type, it exists in MySQL
if ("INT".equals(column[1])) {
// better choice
return column[0];
}
try {
jdbcType = JDBCType.valueOf(column[1]);
}
catch (IllegalArgumentException e) {
LOG.warn("The column type {} does not map to JDBCType", column[1]);
continue;
}
if (numericTypes.contains(jdbcType)) {
// better choice
return column[0];
}
return selectColumn;
}
catch (SQLException e) {
LOG.debug(e.getMessage());
}
return null;
// last choice
return columns.get(0)[0];
}

/**
Expand All @@ -135,73 +149,73 @@ public static String getPrimaryKeyQuery(String schema, String tableName, String
String sql = null;
switch (dataBaseType) {
case MySql:
sql = "select c.COLUMN_NAME, c.DATA_TYPE " +
"from INFORMATION_SCHEMA.`COLUMNS` c , INFORMATION_SCHEMA.STATISTICS s " +
"where c.TABLE_SCHEMA = s.TABLE_SCHEMA " +
" AND c.TABLE_NAME = s.TABLE_NAME " +
" AND c.COLUMN_NAME = s.COLUMN_NAME " +
" AND s.TABLE_SCHEMA = (" + getSchema(schema) + ") " +
" AND s.TABLE_NAME = '" + tableName + "' " +
" AND NON_UNIQUE = 0 " +
"ORDER BY SEQ_IN_INDEX ASC";
sql = "select "
+ " c.COLUMN_NAME, upper(c.DATA_TYPE) AS COLUMN_TYPE, c.COLUMN_KEY AS KEY_TYPE "
+ " from INFORMATION_SCHEMA.`COLUMNS` c , INFORMATION_SCHEMA.STATISTICS s "
+ " where c.TABLE_SCHEMA = s.TABLE_SCHEMA "
+ " AND c.TABLE_NAME = s.TABLE_NAME "
+ " AND c.COLUMN_NAME = s.COLUMN_NAME "
+ " AND s.TABLE_SCHEMA = (SELECT SCHEMA()) "
+ " AND s.TABLE_NAME = '" + tableName + "' "
+ " AND NON_UNIQUE = 0 "
+ " AND COLUMN_KEY <> 'MUL' and COLUMN_KEY <> '' "
+ " ORDER BY c.COLUMN_KEY ASC, c.DATA_TYPE ASC";
break;
case PostgreSQL:
sql = "SELECT col.ATTNAME, PG_CATALOG.FORMAT_TYPE(col.ATTTYPID, col.ATTTYPMOD) AS DTYPE "
+ " FROM PG_CATALOG.PG_NAMESPACE sch, "
+ " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col, "
+ " PG_CATALOG.PG_INDEX ind "
+ " WHERE sch.OID = tab.RELNAMESPACE "
+ " AND tab.OID = col.ATTRELID "
+ " AND tab.OID = ind.INDRELID "
+ " AND col.ATTNUM > 0 "
+ " AND sch.NSPNAME = (" + getSchema(schema) + ") "
+ " AND tab.RELNAME = '" + tableName + "' "
+ " AND col.ATTNUM = ANY(ind.INDKEY) "
+ " AND (ind.INDISPRIMARY OR ind.INDISUNIQUE)";
sql = "SELECT a.attname AS COLUMN_NAME, "
+ " upper(format_type(a.atttypid, a.atttypmod)) AS COLUMN_TYPE, "
+ " CASE WHEN con.contype = 'p' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE "
+ " FROM pg_constraint con "
+ " JOIN pg_class rel ON rel.oid = con.conrelid "
+ " JOIN pg_namespace nsp ON nsp.oid = rel.relnamespace "
+ " LEFT JOIN pg_attribute a ON a.attnum = ANY(con.conkey) AND a.attrelid = con.conrelid "
+ " WHERE nsp.nspname = (SELECT CURRENT_SCHEMA()) "
+ " AND rel.relname = '" + tableName + "'"
+ " AND con.contype IN ('p', 'u') AND array_length(con.conkey, 1) = 1"
+ " ORDER BY con.contype ASC, a.atttypid ASC";
break;
case SQLServer:
sql = "SELECT COL_NAME(ic.OBJECT_ID, ic.column_id) AS ColumnName, t.name AS DataType " +
"FROM " +
" sys.indexes AS i " +
" INNER JOIN sys.index_columns AS ic ON i.OBJECT_ID = ic.OBJECT_ID " +
" AND i.index_id = ic.index_id " +
" INNER JOIN sys.columns AS c ON ic.OBJECT_ID = c.OBJECT_ID " +
" AND ic.column_id = c.column_id " +
" INNER JOIN sys.types AS t ON c.system_type_id = t.system_type_id " +
" AND c.user_type_id = t.user_type_id " +
"WHERE " +
" OBJECT_NAME(ic.OBJECT_ID) = '" + tableName + "' " +
" AND i.is_unique = 1 " +
" AND (SELECT COUNT(*) FROM sys.index_columns " +
" WHERE OBJECT_ID = i.OBJECT_ID AND index_id = i.index_id ) = 1";
sql = "SELECT "
+ " kc.COLUMN_NAME, "
+ " upper(c.DATA_TYPE) AS COLUMN_TYPE, "
+ " CASE WHEN tc.CONSTRAINT_TYPE = 'PRIMARY KEY' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE "
+ " FROM "
+ " INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc "
+ " JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kc ON tc.CONSTRAINT_NAME = kc.CONSTRAINT_NAME "
+ " JOIN INFORMATION_SCHEMA.COLUMNS c ON kc.TABLE_NAME = c.TABLE_NAME AND kc.COLUMN_NAME = c.COLUMN_NAME "
+ " WHERE "
+ " tc.CONSTRAINT_TYPE IN ('PRIMARY KEY', 'UNIQUE') "
+ " AND kc.TABLE_SCHEMA = (select schema_name()) "
+ " AND kc.TABLE_NAME = '" + tableName + "' "
+ " AND (SELECT COUNT(*) FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE CONSTRAINT_NAME = kc.CONSTRAINT_NAME) = 1"
+ " ORDER BY tc.CONSTRAINT_TYPE ASC, c.DATA_TYPE ASC";
break;
case ClickHouse:
sql = "SELECT name, type FROM system.columns "
+ " WHERE database = (" + getSchema(schema) + ") "
sql = "SELECT name as column_name, type as column_type, 'PRI' as key_type"
+ " FROM system.columns "
+ " WHERE database = (SELECT currentDatabase()) "
+ " AND table = '" + tableName + "'"
+ " AND is_in_primary_key = 1";
+ " AND is_in_primary_key = 1"
+ " ORDER BY type ASC";
break;
case Oracle:
if (schema == null) {
schema = username.toUpperCase();
}
else {
schema = schema.toUpperCase();
}
schema = schema == null ? username.toUpperCase() : schema.toUpperCase();
// 表明如果没有强制原始大小写,则一律转为大写
if (!tableName.startsWith("\"")) {
tableName = tableName.toUpperCase();
}
sql = "SELECT AC.COLUMN_NAME, AC.DATA_TYPE "
+ "FROM ALL_INDEXES AI, ALL_IND_COLUMNS AIC, ALL_TAB_COLUMNS AC "
+ "WHERE AI.TABLE_NAME = AC.TABLE_NAME "
+ "AND AI.OWNER = AC.OWNER "
+ "AND AI.TABLE_NAME = AIC.TABLE_NAME "
+ "AND AI.INDEX_NAME = AIC.INDEX_NAME"
+ "AND AC.COLUMN_NAME = AIC.COLUMN_NAME"
+ "AND AI.OWNER = '" + schema + "' "
+ "AND AI.UNIQUENESS = 'UNIQUE' "
+ "AND AI.TABLE_NAME = '" + tableName + "'";
sql = "SELECT acc.column_name, upper(cc.data_type) AS COLUMN_TYPE, "
+ " CASE WHEN ac.constraint_type = 'P' THEN 'PRI' ELSE 'UNI' END AS KEY_TYPE "
+ "FROM "
+ " all_constraints ac "
+ " JOIN all_cons_columns acc ON ac.constraint_name = acc.constraint_name "
+ " JOIN all_tab_columns cc ON acc.table_name = cc.table_name AND acc.column_name = cc.column_name "
+ "WHERE "
+ " ac.constraint_type IN ('P', 'U') "
+ " AND ac.owner = '" + schema + "' "
+ " AND acc.table_name = '" + tableName + "' "
+ " AND (SELECT COUNT(*) FROM all_cons_columns WHERE constraint_name = ac.constraint_name) = 1"
+ " ORDER BY ac.constraint_type ASC, cc.data_type ASC";
break;
default:
break;
Expand Down
Loading

0 comments on commit 7942831

Please sign in to comment.