Skip to content

Commit

Permalink
[update][common] Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Oct 17, 2024
1 parent 8f0eb49 commit 37852c2
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 100 deletions.
13 changes: 3 additions & 10 deletions common/src/main/java/com/wgzhao/addax/common/base/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,10 @@ public class Constant
public static final int DEFAULT_FETCH_SIZE = 2048;
public static final int DEFAULT_DECIMAL_MAX_PRECISION = 38;
public static final int DEFAULT_DECIMAL_MAX_SCALE = 18;

public static final String PK_TYPE = "pkType";
// The data type of primary key is long.
public static final Object PK_TYPE_LONG = "pkTypeLong";
public static final Object PK_TYPE_MONTE_CARLO = "pkTypeMonteCarlo";
// The data type of primary key is string.
public static final Object PK_TYPE_STRING = "pkTypeString";
public static final Object PK_TYPE_FLOAT = "pkTypeFloat";

public static final String INSERT_OR_REPLACE_TEMPLATE_MARK = "insertOrReplaceTemplate";
public static final String QUERY_SQL_TEMPLATE = "SELECT %s FROM %s WHERE (%s)";
public static final String QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "SELECT %s FROM %s ";
public static final String TABLE_NAME_PLACEHOLDER = "@table";
public static final String TABLE_NUMBER_MARK = "tableNumber";

public static final String ENC_PASSWORD_PREFIX = "${enc:";

Expand All @@ -85,4 +75,7 @@ public class Constant
"SYSTEM_USER", "TABLE", "TABLESAMPLE", "TEXTSIZE", "THEN", "TO", "TOP", "TRAN", "TRANSACTION", "TRIGGER", "TRUNCATE", "TRY_CONVERT",
"TSEQUAL", "UNION", "UNIQUE", "UNPIVOT", "UPDATE", "UPDATETEXT", "USE", "USER", "VALUES", "VARYING", "VIEW", "WAITFOR", "WHEN",
"WHERE", "WHILE", "WITH", "WRITETEXT", "XACT_ABORT"));

//用于插件对自身 split 的每个 task 标识其使用的资源,以告知core 对 reader/writer split 之后的 task 进行拼接时需要根据资源标签进行更有意义的 shuffle 操作
public static final String LOAD_BALANCE_RESOURCE_MARK = "loadBalanceResourceMark";
}
3 changes: 2 additions & 1 deletion common/src/main/java/com/wgzhao/addax/common/base/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public class Key
// For char type
public static final String LENGTH = "length";


// For Oracle reader ONLY
public static final String HINT = "hint";
public static final String SAMPLE_PERCENTAGE = "samplePercentage";
Expand All @@ -148,6 +147,8 @@ public class Key
public static final String HEADER = "header";
public static final String IS_TABLE_MODE = "isTableMode";
public static final String EXTENDED_INSERT = "extendedInsert";
public static final String TABLE_NUMBER= "tableNumber";

public Key()
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected int readHeader()
throws IOException
{
short lzoLibraryVersion = 0x2060;
Logger log = LoggerFactory.getLogger(LzopInputStream.class);
Logger log = LoggerFactory.getLogger(ExpandLzopInputStream.class);
byte[] lzopMagic = {-119, 'L', 'Z', 'O', 0, '\r', '\n', '\032', '\n'};
byte[] buf = new byte[9];
readBytes(buf, 0, 9);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@

import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_DATE_FORMAT;

/**
* Created by liqiang on 15/8/23.
*/
@SuppressWarnings("NullableProblems")
public class PerfRecord
implements Comparable<PerfRecord>
{
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
private final int taskGroupId;
private final int taskId;
private final PHASE phase;
private volatile Date startTime; //NOSONAR
private final long elapsedTimeInNs = -1;
private volatile long count = 0;
private volatile long size = 0;
private volatile Date startTime;
private final AtomicLong count = new AtomicLong(0);
// private volatile long size = 0;
private final AtomicLong size = new AtomicLong(0);

public PerfRecord(int taskGroupId, int taskId, PHASE phase)
{
Expand All @@ -54,12 +55,12 @@ public void start()

public void addCount(long count)
{
this.count += count;
this.count.addAndGet(count);
}

public void addSize(long size)
{
this.size += size;
this.size.addAndGet(size);
}

public void end()
Expand All @@ -73,18 +74,10 @@ public void end(long elapsedTimeInNs)
@Override
public String toString()
{
long elapsedTimeInNs = -1;
return String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s"
, getInstId(), taskGroupId, taskId, phase,
DateFormatUtils.format(startTime, DATETIME_FORMAT), elapsedTimeInNs, count, size, getHostIP());
}

@Override
public int compareTo(PerfRecord o)
{
if (o == null) {
return 1;
}
return Long.compare(this.elapsedTimeInNs, o.elapsedTimeInNs);
DateFormatUtils.format(startTime, DEFAULT_DATE_FORMAT), elapsedTimeInNs, count, size, getHostIP());
}

@Override
Expand Down Expand Up @@ -142,12 +135,12 @@ public PHASE getPhase()

public long getCount()
{
return count;
return count.get();
}

public long getSize()
{
return size;
return size.get();
}

public long getInstId()
Expand All @@ -160,6 +153,14 @@ public String getHostIP()
return HostUtils.IP;
}

@Override
public int compareTo(PerfRecord o)
{
if (o == null) {
return 1;
}
return 0;
}

public enum PHASE
{
Expand Down Expand Up @@ -189,11 +190,6 @@ public enum PHASE
*/
RESULT_NEXT_ALL(101),

/**
* only odps block close
*/
ODPS_BLOCK_CLOSE(102),

WAIT_READ_TIME(103),

WAIT_WRITE_TIME(104),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
Expand Down Expand Up @@ -68,10 +69,10 @@ public class VMInfo
private VMInfo()
{
//初始化静态信息
osMXBean = java.lang.management.ManagementFactory.getOperatingSystemMXBean();
runtimeMXBean = java.lang.management.ManagementFactory.getRuntimeMXBean();
garbageCollectorMXBeanList = java.lang.management.ManagementFactory.getGarbageCollectorMXBeans();
memoryPoolMXBeanList = java.lang.management.ManagementFactory.getMemoryPoolMXBeans();
osMXBean = ManagementFactory.getOperatingSystemMXBean();
runtimeMXBean = ManagementFactory.getRuntimeMXBean();
garbageCollectorMXBeanList = ManagementFactory.getGarbageCollectorMXBeans();
memoryPoolMXBeanList = ManagementFactory.getMemoryPoolMXBeans();

jvmInfo = runtimeMXBean.getVmVendor() + " " + runtimeMXBean.getSpecVersion() + " " + runtimeMXBean.getVmVersion();
osInfo = osMXBean.getName() + " " + osMXBean.getArch() + " " + osMXBean.getVersion();
Expand Down Expand Up @@ -135,15 +136,15 @@ public static long getLongFromOperatingSystem(OperatingSystemMXBean operatingSys
return (Long) method.invoke(operatingSystem, (Object[]) null);
}
catch (final Exception e) {
LOG.info(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage()));
LOG.info("OperatingSystemMXBean {} failed, Exception = {} ", methodName, e.getMessage());
}

return -1;
}

public String toString()
{
return "the machine info => \n\n"
return "The machine info => \n\n"
+ "\tosInfo: \t" + osInfo + "\n"
+ "\tjvmInfo:\t" + jvmInfo + "\n"
+ "\tcpu num:\t" + totalProcessorCount + "\n\n"
Expand Down Expand Up @@ -208,7 +209,7 @@ public synchronized void getDelta(boolean print)
}

if (print) {
LOG.info(processCpuStatus.getDeltaString() + processMemoryStatus.getDeltaString() + processGCStatus.getDeltaString());
LOG.info("{}{}{}", processCpuStatus.getDeltaString(), processMemoryStatus.getDeltaString(), processGCStatus.getDeltaString());
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ public String getUnnecessaryValue(String key, String defaultValue)

/**
* 根据用户提供的json path,寻址具体的对象。
*
* NOTE: 目前仅支持Map以及List下标寻址, 例如:
*
* 对于如下JSON
* <p>
* {"a": {"b": {"c": [0,1,2,3]}}}
Expand Down Expand Up @@ -682,7 +680,6 @@ public String beautify()

/**
* 根据用户提供的json path,插入指定对象,并返回之前存在的对象(如果存在)
*
* 目前仅支持.以及数组下标寻址, 例如:
* <p>
* config.set("a.b.c[3]", object);
Expand Down Expand Up @@ -1060,7 +1057,7 @@ private Object findObjectInMap(Object target, String index)
if (!isMap) {
throw new IllegalArgumentException(String.format(
"The item [%s] requires a Map object in json format, but the actual type is [%s].",
index, target.getClass().toString()));
index, target.getClass()));
}

Object result = ((Map<String, Object>) target).get(index);
Expand All @@ -1071,14 +1068,13 @@ private Object findObjectInMap(Object target, String index)
return result;
}

@SuppressWarnings({"unchecked"})
private Object findObjectInList(Object target, String each)
{
boolean isList = (target instanceof List);
if (!isList) {
throw new IllegalArgumentException(String.format(
"The item [%s] requires a Map object in json format, but the actual type is [%s].",
each, target.getClass().toString()));
each, target.getClass()));
}

String index = each.replace("[", "").replace("]", "");
Expand Down Expand Up @@ -1144,16 +1140,4 @@ private void checkPath(String path)
}
}
}

public Set<String> getSecretKeyPathSet()
{
return secretKeyPathSet;
}

public void setSecretKeyPathSet(Set<String> keyPathSet)
{
if (keyPathSet != null) {
this.secretKeyPathSet = keyPathSet;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ public <T> T doRetry(Callable<T> callable, int retryTimes, long sleepTimeInMilli
}
catch (Exception e) {
saveException = e;
// if (i == 0) {
// LOG.error(String.format("Exception occurred while calling callable: %s", saveException.getMessage()), saveException);
// }

if (null != retryExceptionClass && !retryExceptionClass.isEmpty()) {
boolean needRetry = false;
for (Class<?> eachExceptionClass : retryExceptionClass) {
Expand Down Expand Up @@ -232,9 +228,6 @@ protected <T> T call(Callable<T> callable)
try {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw e;
}
finally {
if (!future.isDone()) {
future.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;

import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,7 +44,7 @@ public class StrUtil

private static final Pattern VARIABLE_PATTERN = Pattern.compile("(\\$)\\{?(\\w+)\\}?");

private static String systemEncoding = System.getProperty("file.encoding");
private static String systemEncoding = Charset.defaultCharset().displayName();

private StrUtil()
{
Expand Down

0 comments on commit 37852c2

Please sign in to comment.