From a6050c7fcc472b6224c9a46675827575def11c15 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Wed, 30 Oct 2024 19:02:50 +0800 Subject: [PATCH] [improve][modules] code cleanup --- .../addax/common/element/BytesColumn.java | 2 +- .../addax/common/util/Configuration.java | 2 +- .../wgzhao/addax/common/util/MathUtil.java | 6 +++--- .../wgzhao/addax/core/job/JobContainer.java | 2 +- .../CassandraReaderHelper.java | 9 ++++---- .../plugin/reader/datareader/DataReader.java | 3 ++- .../reader/hbase11xreader/NormalTask.java | 2 +- .../hbase20xreader/MultiVersionTask.java | 4 ++-- .../plugin/reader/httpreader/HttpReader.java | 2 +- .../reader/streamreader/StreamReader.java | 3 ++- .../cassandrawriter/CassandraWriter.java | 2 +- .../plugin/writer/doriswriter/DorisKey.java | 2 +- .../writer/elasticsearchwriter/ESWriter.java | 6 +++--- .../plugin/writer/hdfswriter/HdfsWriter.java | 4 ++-- .../manager/StarRocksStreamLoadVisitor.java | 21 +++++++++---------- .../tdenginewriter/DefaultDataHandler.java | 16 +++++++------- .../tdenginewriter/OpentsdbDataHandler.java | 8 +++---- 17 files changed, 48 insertions(+), 46 deletions(-) diff --git a/common/src/main/java/com/wgzhao/addax/common/element/BytesColumn.java b/common/src/main/java/com/wgzhao/addax/common/element/BytesColumn.java index f5edf87ce..54dd2ca88 100644 --- a/common/src/main/java/com/wgzhao/addax/common/element/BytesColumn.java +++ b/common/src/main/java/com/wgzhao/addax/common/element/BytesColumn.java @@ -69,7 +69,7 @@ public String asString() catch (Exception e) { throw AddaxException.asAddaxException( ErrorCode.CONVERT_NOT_SUPPORT, - String.format("Bytes[%s] cannot be converted to String .", this.toString())); + String.format("Bytes[%s] cannot be converted to String .", this)); } } diff --git a/common/src/main/java/com/wgzhao/addax/common/util/Configuration.java b/common/src/main/java/com/wgzhao/addax/common/util/Configuration.java index 3616b2b5b..327eda19a 100644 --- a/common/src/main/java/com/wgzhao/addax/common/util/Configuration.java +++ b/common/src/main/java/com/wgzhao/addax/common/util/Configuration.java @@ -72,7 +72,7 @@ public class Configuration * 对于加密的keyPath,需要记录下来 * 为的是后面分布式情况下将该值加密后抛到 AddaxServer中 */ - private Set secretKeyPathSet = + private final Set secretKeyPathSet = new HashSet<>(); private Object root; diff --git a/common/src/main/java/com/wgzhao/addax/common/util/MathUtil.java b/common/src/main/java/com/wgzhao/addax/common/util/MathUtil.java index 130cf135e..8a5deadba 100644 --- a/common/src/main/java/com/wgzhao/addax/common/util/MathUtil.java +++ b/common/src/main/java/com/wgzhao/addax/common/util/MathUtil.java @@ -313,7 +313,7 @@ public static BigDecimal sum(BigDecimal v1, BigDecimal... valList) if (null == v1) { v1 = BigDecimal.ZERO; } - if (null == valList || valList.length == 0) { + if (null == valList) { return v1; } for (BigDecimal val : valList) { @@ -386,7 +386,7 @@ public static String avg(String... valList) public static BigDecimal max(BigDecimal v1, BigDecimal... valList) { BigDecimal max = v1; - if (null == valList || valList.length == 0) { + if (null == valList) { return max; } for (BigDecimal val : valList) { @@ -422,7 +422,7 @@ public static BigDecimal maxArr(BigDecimal... valList) public static BigDecimal min(BigDecimal v1, BigDecimal... valList) { BigDecimal min = v1; - if (null == valList || valList.length == 0) { + if (null == valList) { return min; } for (BigDecimal val : valList) { diff --git a/core/src/main/java/com/wgzhao/addax/core/job/JobContainer.java b/core/src/main/java/com/wgzhao/addax/core/job/JobContainer.java index 9af359535..b735f4c25 100644 --- a/core/src/main/java/com/wgzhao/addax/core/job/JobContainer.java +++ b/core/src/main/java/com/wgzhao/addax/core/job/JobContainer.java @@ -765,7 +765,7 @@ private void postJobRunStatistic(String url, int timeoutMills, String jsonStr) LOG.info("The job results were uploaded successfully"); } else { - LOG.warn("Uploading the job results failed: {}", httpResponse.toString()); + LOG.warn("Uploading the job results failed: {}", httpResponse); } } catch (IOException e) { diff --git a/plugin/reader/cassandrareader/src/main/java/com/wgzhao/addax/plugin/reader/cassandrareader/CassandraReaderHelper.java b/plugin/reader/cassandrareader/src/main/java/com/wgzhao/addax/plugin/reader/cassandrareader/CassandraReaderHelper.java index 07a64da8d..719624ce5 100644 --- a/plugin/reader/cassandrareader/src/main/java/com/wgzhao/addax/plugin/reader/cassandrareader/CassandraReaderHelper.java +++ b/plugin/reader/cassandrareader/src/main/java/com/wgzhao/addax/plugin/reader/cassandrareader/CassandraReaderHelper.java @@ -50,6 +50,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -468,7 +469,7 @@ public static List splitJob(int adviceNumber, Configuration jobCo BigDecimal minToken = BigDecimal.valueOf(-1); BigDecimal maxToken = new BigDecimal(new BigInteger("2").pow(127)); BigDecimal step = maxToken.subtract(minToken) - .divide(BigDecimal.valueOf(adviceNumber), 2, BigDecimal.ROUND_HALF_EVEN); + .divide(BigDecimal.valueOf(adviceNumber), 2, RoundingMode.HALF_EVEN); for (int i = 0; i < adviceNumber; i++) { BigInteger l = minToken.add(step.multiply(BigDecimal.valueOf(i))).toBigInteger(); BigInteger r = minToken.add(step.multiply(BigDecimal.valueOf(i + 1))).toBigInteger(); @@ -485,7 +486,7 @@ else if (partitioner.endsWith("Murmur3Partitioner")) { BigDecimal minToken = BigDecimal.valueOf(Long.MIN_VALUE); BigDecimal maxToken = BigDecimal.valueOf(Long.MAX_VALUE); BigDecimal step = maxToken.subtract(minToken) - .divide(BigDecimal.valueOf(adviceNumber), 2, BigDecimal.ROUND_HALF_EVEN); + .divide(BigDecimal.valueOf(adviceNumber), 2, RoundingMode.HALF_EVEN); for (int i = 0; i < adviceNumber; i++) { long l = minToken.add(step.multiply(BigDecimal.valueOf(i))).longValue(); long r = minToken.add(step.multiply(BigDecimal.valueOf(i + 1))).longValue(); @@ -553,9 +554,9 @@ public static String getQueryString(Configuration taskConfig, Cluster cluster) boolean allowFiltering = taskConfig.getBool(MyKey.ALLOW_FILTERING, false); StringBuilder select = new StringBuilder(); - select.append("SELECT ").append(columns.toString()).append(" FROM ").append(table); + select.append("SELECT ").append(columns).append(" FROM ").append(table); if (where.length() > 0) { - select.append(" where ").append(where.toString()); + select.append(" where ").append(where); } if (allowFiltering) { select.append(" ALLOW FILTERING"); diff --git a/plugin/reader/datareader/src/main/java/com/wgzhao/addax/plugin/reader/datareader/DataReader.java b/plugin/reader/datareader/src/main/java/com/wgzhao/addax/plugin/reader/datareader/DataReader.java index c1db5f8a3..cd3523f77 100644 --- a/plugin/reader/datareader/src/main/java/com/wgzhao/addax/plugin/reader/datareader/DataReader.java +++ b/plugin/reader/datareader/src/main/java/com/wgzhao/addax/plugin/reader/datareader/DataReader.java @@ -51,6 +51,7 @@ import org.apache.commons.rng.simple.RandomSource; import java.math.BigDecimal; +import java.math.RoundingMode; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -472,7 +473,7 @@ private Column buildOneColumn(Configuration eachColumnConfig, int columnIndex) // specify fixed scale or not ? if (scale > 0) { BigDecimal b = BigDecimal.valueOf(rng.nextDouble(param1Int, param2Int + 1)) - .setScale(scale, BigDecimal.ROUND_HALF_UP); + .setScale(scale, RoundingMode.HALF_UP); return new DoubleColumn(b.doubleValue()); } else { diff --git a/plugin/reader/hbase11xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase11xreader/NormalTask.java b/plugin/reader/hbase11xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase11xreader/NormalTask.java index b891c722e..eb5716209 100644 --- a/plugin/reader/hbase11xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase11xreader/NormalTask.java +++ b/plugin/reader/hbase11xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase11xreader/NormalTask.java @@ -35,7 +35,7 @@ public class NormalTask extends HbaseAbstractTask { private final List hbaseColumnCells; - private List column; + private final List column; public NormalTask(Configuration configuration) { diff --git a/plugin/reader/hbase20xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xreader/MultiVersionTask.java b/plugin/reader/hbase20xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xreader/MultiVersionTask.java index 120ad9f56..37f4dadaf 100644 --- a/plugin/reader/hbase20xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xreader/MultiVersionTask.java +++ b/plugin/reader/hbase20xreader/src/main/java/com/wgzhao/addax/plugin/reader/hbase20xreader/MultiVersionTask.java @@ -51,8 +51,8 @@ protected MultiVersionTask(Configuration configuration) super(configuration); this.maxVersion = configuration.getInt(HBaseKey.MAX_VERSION); - this.column = configuration.getList(HBaseKey.COLUMN, Map.class); - this.familyQualifierMap = Hbase20xHelper.parseColumnOfMultiversionMode(this.column); + column = configuration.getList(HBaseKey.COLUMN, Map.class); + this.familyQualifierMap = Hbase20xHelper.parseColumnOfMultiversionMode(column); } @Override diff --git a/plugin/reader/httpreader/src/main/java/com/wgzhao/addax/plugin/reader/httpreader/HttpReader.java b/plugin/reader/httpreader/src/main/java/com/wgzhao/addax/plugin/reader/httpreader/HttpReader.java index 2111d2eea..80718ecb1 100644 --- a/plugin/reader/httpreader/src/main/java/com/wgzhao/addax/plugin/reader/httpreader/HttpReader.java +++ b/plugin/reader/httpreader/src/main/java/com/wgzhao/addax/plugin/reader/httpreader/HttpReader.java @@ -105,7 +105,7 @@ public static class Task private URIBuilder uriBuilder; private String username; private String password; - private BasicCredentialsProvider credsProvider = new BasicCredentialsProvider(); + private final BasicCredentialsProvider credsProvider = new BasicCredentialsProvider(); private HttpHost proxy = null; private Request request; private String method; diff --git a/plugin/reader/streamreader/src/main/java/com/wgzhao/addax/plugin/reader/streamreader/StreamReader.java b/plugin/reader/streamreader/src/main/java/com/wgzhao/addax/plugin/reader/streamreader/StreamReader.java index 55cd74f32..7c6a50b66 100644 --- a/plugin/reader/streamreader/src/main/java/com/wgzhao/addax/plugin/reader/streamreader/StreamReader.java +++ b/plugin/reader/streamreader/src/main/java/com/wgzhao/addax/plugin/reader/streamreader/StreamReader.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; +import java.math.RoundingMode; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -449,7 +450,7 @@ private Column buildOneColumn(Configuration eachColumnConfig, int columnIndex) // specify fixed scale or not ? if (scale > 0) { BigDecimal b = BigDecimal.valueOf(rng.nextDouble(param1Int, param2Int + 1)) - .setScale(scale, BigDecimal.ROUND_HALF_UP); + .setScale(scale, RoundingMode.HALF_UP); return new DoubleColumn(b.doubleValue()); } else { return new DoubleColumn(rng.nextDouble(param1Int, param2Int + 1)); diff --git a/plugin/writer/cassandrawriter/src/main/java/com/wgzhao/addax/plugin/writer/cassandrawriter/CassandraWriter.java b/plugin/writer/cassandrawriter/src/main/java/com/wgzhao/addax/plugin/writer/cassandrawriter/CassandraWriter.java index 2b9c94924..8325514f2 100644 --- a/plugin/writer/cassandrawriter/src/main/java/com/wgzhao/addax/plugin/writer/cassandrawriter/CassandraWriter.java +++ b/plugin/writer/cassandrawriter/src/main/java/com/wgzhao/addax/plugin/writer/cassandrawriter/CassandraWriter.java @@ -227,7 +227,7 @@ public void init() Insert insertStmt = QueryBuilder.insertInto(table); for (String colunmnName : columnMeta) { - if (colunmnName.toLowerCase().equals(CassandraKey.WRITE_TIME)) { + if (colunmnName.equalsIgnoreCase(CassandraKey.WRITE_TIME)) { if (writeTimeCol != -1) { throw AddaxException .asAddaxException( diff --git a/plugin/writer/doriswriter/src/main/java/com/wgzhao/addax/plugin/writer/doriswriter/DorisKey.java b/plugin/writer/doriswriter/src/main/java/com/wgzhao/addax/plugin/writer/doriswriter/DorisKey.java index e03aca33c..5f7fe791e 100644 --- a/plugin/writer/doriswriter/src/main/java/com/wgzhao/addax/plugin/writer/doriswriter/DorisKey.java +++ b/plugin/writer/doriswriter/src/main/java/com/wgzhao/addax/plugin/writer/doriswriter/DorisKey.java @@ -51,7 +51,7 @@ public class DorisKey public enum StreamLoadFormat { - CSV, JSON; + CSV, JSON } private static final String FLUSH_INTERVAL = "flushInterval"; diff --git a/plugin/writer/elasticsearchwriter/src/main/java/com/wgzhao/addax/plugin/writer/elasticsearchwriter/ESWriter.java b/plugin/writer/elasticsearchwriter/src/main/java/com/wgzhao/addax/plugin/writer/elasticsearchwriter/ESWriter.java index 098f759c7..5e62109ed 100644 --- a/plugin/writer/elasticsearchwriter/src/main/java/com/wgzhao/addax/plugin/writer/elasticsearchwriter/ESWriter.java +++ b/plugin/writer/elasticsearchwriter/src/main/java/com/wgzhao/addax/plugin/writer/elasticsearchwriter/ESWriter.java @@ -124,11 +124,11 @@ private String genMappings(String typeName) String colName = jo.getString("name"); String colTypeStr = jo.getString("type"); if (colTypeStr == null) { - throw AddaxException.asAddaxException(CONFIG_ERROR, col.toString() + " column must have type"); + throw AddaxException.asAddaxException(CONFIG_ERROR, col + " column must have type"); } ESFieldType colType = ESFieldType.getESFieldType(colTypeStr); if (colType == null) { - throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, col.toString() + " unsupported type"); + throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, col + " unsupported type"); } ESColumn columnItem = new ESColumn(); @@ -387,7 +387,7 @@ private long doBatchInsert(final List writerBuffer) data.put(columnName, dateStr); } catch (Exception e) { - getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e.toString())); + getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); } break; case KEYWORD: diff --git a/plugin/writer/hdfswriter/src/main/java/com/wgzhao/addax/plugin/writer/hdfswriter/HdfsWriter.java b/plugin/writer/hdfswriter/src/main/java/com/wgzhao/addax/plugin/writer/hdfswriter/HdfsWriter.java index 0a4e47275..2a81d6683 100644 --- a/plugin/writer/hdfswriter/src/main/java/com/wgzhao/addax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/plugin/writer/hdfswriter/src/main/java/com/wgzhao/addax/plugin/writer/hdfswriter/HdfsWriter.java @@ -128,8 +128,8 @@ private void validateParameter() if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) { String type = eachColumnConf.getString(Key.TYPE); eachColumnConf.set(Key.TYPE, "decimal"); - eachColumnConf.set(Key.PRECISION, (Object) getDecimalPrecision(type)); - eachColumnConf.set(Key.SCALE, (Object) getDecimalScale(type)); + eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type)); + eachColumnConf.set(Key.SCALE, getDecimalScale(type)); columns.set(i, eachColumnConf); rewriteFlag = true; } diff --git a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java index 1fb9aca5d..0aafe234b 100644 --- a/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java +++ b/plugin/writer/starrockswriter/src/main/java/com/wgzhao/addax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java @@ -57,27 +57,26 @@ public void doStreamLoad(StarRocksFlushTuple flushData) if (null == host) { throw new IOException("None of the host in `load_url` could be connected."); } - String loadUrl = new StringBuilder(host) - .append("/api/") - .append(writerOptions.getDatabase()) - .append("/") - .append(writerOptions.getTable()) - .append("/_stream_load") - .toString(); + String loadUrl = host + + "/api/" + + writerOptions.getDatabase() + + "/" + + writerOptions.getTable() + + "/_stream_load"; LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue())); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { throw new IOException("Unable to flush data to StarRocks: unknown result status."); } - LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + LOG.debug("StreamLoad response:\n" + JSON.toJSONString(loadResult)); if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { throw new IOException( - new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString() + "Failed to flush data to StarRocks.\n" + JSON.toJSONString(loadResult) ); } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { - LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + LOG.debug("StreamLoad response:\n" + JSON.toJSONString(loadResult)); // has to block-checking the state to get the final result checkLabelState(host, flushData.getLabel()); } @@ -88,7 +87,7 @@ private String getAvailableHost() List hostList = writerOptions.getLoadUrlList(); long tmp = pos + hostList.size(); for (; pos < tmp; pos++) { - String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); + String host = "http://" + hostList.get((int) (pos % hostList.size())); if (tryHttpConnection(host)) { return host; } diff --git a/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 2d0a70962..90e267f8a 100644 --- a/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -51,14 +51,14 @@ public class DefaultDataHandler private static final String DEFAULT_USERNAME = "root"; private static final String DEFAULT_PASSWORD = "taosdata"; private static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false; - private String username; - private String password; - private String jdbcUrl; - private int batchSize; - private boolean ignoreTagsUnmatched; - - private List tables; - private List columns; + private final String username; + private final String password; + private final String jdbcUrl; + private final int batchSize; + private final boolean ignoreTagsUnmatched; + + private final List tables; + private final List columns; private Map tableMetas; private SchemaManager schemaManager; diff --git a/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java index 146b5171f..0399ff887 100644 --- a/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java +++ b/plugin/writer/tdenginewriter/src/main/java/com/wgzhao/addax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java @@ -41,9 +41,9 @@ public class OpentsdbDataHandler private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class); private SchemalessWriter writer; - private String jdbcUrl; - private String user; - private String password; + private final String jdbcUrl; + private final String user; + private final String password; int batchSize; public OpentsdbDataHandler(Configuration config) @@ -59,7 +59,7 @@ public OpentsdbDataHandler(Configuration config) public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { int count; - try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password);) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + user + "] established."); writer = new SchemalessWriter(conn); count = write(lineReceiver, batchSize);