From f1e637b2eb955a839b1e570d0a8477f1d8259364 Mon Sep 17 00:00:00 2001 From: wgzhao <wgzhao@gmail.com> Date: Tue, 29 Oct 2024 16:33:10 +0800 Subject: [PATCH] [update][core] Optimize code for better performance and readability --- .../wgzhao/addax/common/element/Record.java | 4 +- .../addax/core/transport/channel/Channel.java | 30 ++++++----- .../channel/memory/MemoryChannel.java | 12 ++--- .../exchanger/BufferedRecordExchanger.java | 20 ++++---- .../transport/exchanger/RecordExchanger.java | 23 +++++---- .../transformer/FilterTransformer.java | 50 ++++++++----------- .../transformer/SubstrTransformer.java | 4 +- .../transformer/TransformerRegistry.java | 5 +- .../addax/core/util/TransformerUtil.java | 2 +- 9 files changed, 80 insertions(+), 70 deletions(-) diff --git a/common/src/main/java/com/wgzhao/addax/common/element/Record.java b/common/src/main/java/com/wgzhao/addax/common/element/Record.java index d6d9b2832..87c85fab9 100644 --- a/common/src/main/java/com/wgzhao/addax/common/element/Record.java +++ b/common/src/main/java/com/wgzhao/addax/common/element/Record.java @@ -42,7 +42,7 @@ public interface Record int getMemorySize(); - public void setMeta(Map<String, String> meta); + void setMeta(Map<String, String> meta); - public Map<String, String> getMeta(); + Map<String, String> getMeta(); } diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/channel/Channel.java b/core/src/main/java/com/wgzhao/addax/core/transport/channel/Channel.java index d98b8175d..584bc0d08 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/channel/Channel.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/channel/Channel.java @@ -24,13 +24,19 @@ import com.wgzhao.addax.core.statistics.communication.Communication; import com.wgzhao.addax.core.statistics.communication.CommunicationTool; import com.wgzhao.addax.core.transport.record.TerminateRecord; -import com.wgzhao.addax.core.util.container.CoreConstant; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_CONTAINER_TASK_GROUP_ID; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_BYTE; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_RECORD; /** * Created by jingxing on 14-8-25. @@ -51,16 +57,16 @@ public abstract class Channel protected long flowControlInterval; protected volatile boolean isClosed = false; protected Configuration configuration; - protected volatile long waitReaderTime = 0; - protected volatile long waitWriterTime = 0; + protected volatile AtomicLong waitReaderTime = new AtomicLong(0); + protected volatile AtomicLong waitWriterTime = new AtomicLong(0); private Communication currentCommunication; public Channel(Configuration configuration) { //channel的queue里默认record为1万条。原来为512条 - int capacity = configuration.getInt(CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY, 2048); - long byteSpeed = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024L); - long recordSpeed = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000L); + int capacity = configuration.getInt(CORE_TRANSPORT_CHANNEL_CAPACITY, 2048); + long byteSpeed = configuration.getLong(CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024L); + long recordSpeed = configuration.getLong(CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000L); if (capacity <= 0) { throw new IllegalArgumentException(String.format("The channel capacity [%d] must be greater than 0.", capacity)); @@ -72,13 +78,13 @@ public Channel(Configuration configuration) isFirstPrint = false; } - this.taskGroupId = configuration.getInt(CoreConstant.CORE_CONTAINER_TASK_GROUP_ID); + this.taskGroupId = configuration.getInt(CORE_CONTAINER_TASK_GROUP_ID); this.capacity = capacity; this.byteSpeed = byteSpeed; this.recordSpeed = recordSpeed; - this.flowControlInterval = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL, 1000); + this.flowControlInterval = configuration.getLong(CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL, 1000); //channel的queue默认大小为8M,原来为64M - this.byteCapacity = configuration.getInt(CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); + this.byteCapacity = configuration.getInt(CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); this.configuration = configuration; } @@ -177,8 +183,8 @@ private void statPush(long recordSize, long byteSize) currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES, byteSize); //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数 - currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime); - currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime); + currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime.get()); + currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime.get()); boolean isChannelByteSpeedLimit = (this.byteSpeed > 0); boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0); diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/channel/memory/MemoryChannel.java b/core/src/main/java/com/wgzhao/addax/core/transport/channel/memory/MemoryChannel.java index c7b522d71..a4a2cd102 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/channel/memory/MemoryChannel.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/channel/memory/MemoryChannel.java @@ -24,7 +24,6 @@ import com.wgzhao.addax.common.util.Configuration; import com.wgzhao.addax.core.transport.channel.Channel; import com.wgzhao.addax.core.transport.record.TerminateRecord; -import com.wgzhao.addax.core.util.container.CoreConstant; import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; @@ -34,6 +33,7 @@ import java.util.concurrent.locks.ReentrantLock; import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE; /** * 内存Channel的具体实现,底层其实是一个ArrayBlockingQueue @@ -57,7 +57,7 @@ public MemoryChannel(Configuration configuration) { super(configuration); this.queue = new ArrayBlockingQueue<>(this.getCapacity()); - this.bufferSize = configuration.getInt(CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32); + this.bufferSize = configuration.getInt(CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32); lock = new ReentrantLock(); notInsufficient = lock.newCondition(); @@ -88,7 +88,7 @@ protected void doPush(Record r) try { long startTime = System.nanoTime(); this.queue.put(r); - waitWriterTime += System.nanoTime() - startTime; + waitReaderTime.addAndGet(System.nanoTime() - startTime); memoryBytes.addAndGet(r.getMemorySize()); } catch (InterruptedException ex) { @@ -107,7 +107,7 @@ protected void doPushAll(Collection<Record> rs) notInsufficient.await(200L, TimeUnit.MILLISECONDS); } this.queue.addAll(rs); - waitWriterTime += System.nanoTime() - startTime; + waitReaderTime.addAndGet(System.nanoTime() - startTime); memoryBytes.addAndGet(bytes); notEmpty.signalAll(); } @@ -125,7 +125,7 @@ protected Record doPull() try { long startTime = System.nanoTime(); Record r = this.queue.take(); - waitReaderTime += System.nanoTime() - startTime; + waitReaderTime.addAndGet(System.nanoTime() - startTime); memoryBytes.addAndGet(-r.getMemorySize()); return r; } @@ -146,7 +146,7 @@ protected void doPullAll(Collection<Record> rs) while (this.queue.drainTo(rs, bufferSize) <= 0) { notEmpty.await(200L, TimeUnit.MILLISECONDS); } - waitReaderTime += System.nanoTime() - startTime; + waitReaderTime.addAndGet(System.nanoTime() - startTime); int bytes = getRecordBytes(rs); memoryBytes.addAndGet(-bytes); notInsufficient.signalAll(); diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/BufferedRecordExchanger.java b/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/BufferedRecordExchanger.java index f38634461..35ebab344 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/BufferedRecordExchanger.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/BufferedRecordExchanger.java @@ -20,7 +20,6 @@ package com.wgzhao.addax.core.transport.exchanger; import com.wgzhao.addax.common.element.Record; -import com.wgzhao.addax.common.spi.ErrorCode; import com.wgzhao.addax.common.exception.AddaxException; import com.wgzhao.addax.common.plugin.RecordReceiver; import com.wgzhao.addax.common.plugin.RecordSender; @@ -28,7 +27,6 @@ import com.wgzhao.addax.common.util.Configuration; import com.wgzhao.addax.core.transport.channel.Channel; import com.wgzhao.addax.core.transport.record.TerminateRecord; -import com.wgzhao.addax.core.util.container.CoreConstant; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; +import static com.wgzhao.addax.common.spi.ErrorCode.SHUT_DOWN_TASK; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_RECORD_CLASS; public class BufferedRecordExchanger implements RecordSender, RecordReceiver @@ -65,17 +67,17 @@ public BufferedRecordExchanger(Channel channel, TaskPluginCollector pluginCollec this.pluginCollector = pluginCollector; Configuration configuration = channel.getConfiguration(); - this.bufferSize = configuration.getInt(CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32); + this.bufferSize = configuration.getInt(CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32); this.buffer = new ArrayList<>(bufferSize); //channel的queue默认大小为8M,原来为64M this.byteCapacity = configuration.getInt( - CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); + CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); try { BufferedRecordExchanger.recordClass = ((Class<? extends Record>) Class .forName(configuration.getString( - CoreConstant.CORE_TRANSPORT_RECORD_CLASS, + CORE_TRANSPORT_RECORD_CLASS, "com.wgzhao.addax.core.transport.record.DefaultRecord"))); } catch (Exception e) { @@ -98,7 +100,7 @@ public Record createRecord() public void sendToWriter(Record record) { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } Validate.notNull(record, "The record cannot be empty."); @@ -124,7 +126,7 @@ public void sendToWriter(Record record) public void flush() { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } this.channel.pushAll(this.buffer); this.buffer.clear(); @@ -136,7 +138,7 @@ public void flush() public void terminate() { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } flush(); this.channel.pushTerminate(TerminateRecord.get()); @@ -146,7 +148,7 @@ public void terminate() public Record getFromReader() { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } boolean isEmpty = (this.bufferIndex >= this.buffer.size()); if (isEmpty) { diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/RecordExchanger.java b/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/RecordExchanger.java index b73863671..45451681b 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/RecordExchanger.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/exchanger/RecordExchanger.java @@ -20,7 +20,6 @@ package com.wgzhao.addax.core.transport.exchanger; import com.wgzhao.addax.common.element.Record; -import com.wgzhao.addax.common.spi.ErrorCode; import com.wgzhao.addax.common.exception.AddaxException; import com.wgzhao.addax.common.plugin.RecordReceiver; import com.wgzhao.addax.common.plugin.RecordSender; @@ -28,13 +27,16 @@ import com.wgzhao.addax.common.util.Configuration; import com.wgzhao.addax.core.statistics.communication.Communication; import com.wgzhao.addax.core.transport.channel.Channel; +import com.wgzhao.addax.core.transport.record.DefaultRecord; import com.wgzhao.addax.core.transport.record.TerminateRecord; import com.wgzhao.addax.core.transport.transformer.TransformerExecution; -import com.wgzhao.addax.core.util.container.CoreConstant; +import org.apache.commons.lang3.StringUtils; import java.util.List; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; +import static com.wgzhao.addax.common.spi.ErrorCode.SHUT_DOWN_TASK; +import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_RECORD_CLASS; public class RecordExchanger extends TransformerExchanger @@ -54,10 +56,13 @@ public RecordExchanger(int taskGroupId, int taskId, Channel channel, Communicati this.channel = channel; Configuration configuration = channel.getConfiguration(); try { - RecordExchanger.RECORD_CLASS = (Class<? extends Record>) Class - .forName(configuration.getString( - CoreConstant.CORE_TRANSPORT_RECORD_CLASS, - "com.wgzhao.addax.core.transport.record.DefaultRecord")); + String cls = configuration.getString(CORE_TRANSPORT_RECORD_CLASS, null); + if (!StringUtils.isBlank(cls)) { + RECORD_CLASS = (Class<? extends Record>) Class.forName(cls); + } + else { + RecordExchanger.RECORD_CLASS = DefaultRecord.class; + } } catch (ClassNotFoundException e) { throw AddaxException.asAddaxException(CONFIG_ERROR, e); @@ -68,7 +73,7 @@ public RecordExchanger(int taskGroupId, int taskId, Channel channel, Communicati public Record getFromReader() { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } Record record = this.channel.pull(); return (record instanceof TerminateRecord ? null : record); @@ -89,7 +94,7 @@ public Record createRecord() public void sendToWriter(Record record) { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } record = doTransformer(record); if (record == null) { @@ -109,7 +114,7 @@ public void flush() public void terminate() { if (shutdown) { - throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, ""); + throw AddaxException.asAddaxException(SHUT_DOWN_TASK, ""); } this.channel.pushTerminate(TerminateRecord.get()); //和channel的统计保持同步 diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/FilterTransformer.java b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/FilterTransformer.java index 7bfb4c046..d12a71337 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/FilterTransformer.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/FilterTransformer.java @@ -62,7 +62,7 @@ public Record evaluate(Record record, Object... paras) } columnIndex = (Integer) paras[0]; - code = (String) paras[1]; + code = paras[1].toString().toLowerCase(); value = (String) paras[2]; if (StringUtils.isEmpty(value)) { @@ -77,33 +77,26 @@ public Record evaluate(Record record, Object... paras) Column column = record.getColumn(columnIndex); try { - - if ("like".equalsIgnoreCase(code)) { - return doLike(record, value, column); - } - else if ("not like".equalsIgnoreCase(code)) { - return doNotLike(record, value, column); - } - else if (">".equalsIgnoreCase(code)) { - return doGreat(record, value, column, false); - } - else if ("<".equalsIgnoreCase(code)) { - return doLess(record, value, column, false); - } - else if ("=".equalsIgnoreCase(code) || "==".equalsIgnoreCase(code)) { - return doEqual(record, value, column); - } - else if ("!=".equalsIgnoreCase(code)) { - return doNotEqual(record, value, column); - } - else if (">=".equalsIgnoreCase(code)) { - return doGreat(record, value, column, true); - } - else if ("<=".equalsIgnoreCase(code)) { - return doLess(record, value, column, true); - } - else { - throw new RuntimeException("dx_filter code:" + code + " is unsupported"); + switch (code) { + case "like": + return doLike(record, value, column); + case "not like": + return doNotLike(record, value, column); + case ">": + return doGreat(record, value, column, false); + case "<": + return doLess(record, value, column, false); + case "=": + case "==": + return doEqual(record, value, column); + case "!=": + return doNotEqual(record, value, column); + case ">=": + return doGreat(record, value, column, true); + case "<=": + return doLess(record, value, column, true); + default: + throw new RuntimeException("dx_filter code:" + code + " is unsupported"); } } catch (Exception e) { @@ -267,6 +260,7 @@ else if (column instanceof StringColumn /** * DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值 + * * @param record message record * @param value value to compared * @param column the column of record diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/SubstrTransformer.java b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/SubstrTransformer.java index 9bdd93b67..dd56b5b76 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/SubstrTransformer.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/SubstrTransformer.java @@ -72,11 +72,13 @@ public Record evaluate(Record record, Object... paras) if (oriValue == null) { return record; } - String newValue; + if (startIndex > oriValue.length()) { throw new RuntimeException(String.format("The dx_substr startIndex(%s) out of range" + "(%s) of (%s)", startIndex, oriValue.length(), oriValue)); } + + String newValue; if (startIndex + length >= oriValue.length()) { newValue = oriValue.substring(startIndex); } diff --git a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/TransformerRegistry.java b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/TransformerRegistry.java index 45bd67c9e..ae4fc493d 100644 --- a/core/src/main/java/com/wgzhao/addax/core/transport/transformer/TransformerRegistry.java +++ b/core/src/main/java/com/wgzhao/addax/core/transport/transformer/TransformerRegistry.java @@ -37,6 +37,7 @@ import java.util.Map; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; +import static com.wgzhao.addax.core.util.container.CoreConstant.STORAGE_TRANSFORMER_HOME; /** * no comments. @@ -57,7 +58,7 @@ public static void loadTransformerFromLocalStorage() public static void loadTransformerFromLocalStorage(List<String> transformers) { - String[] paths = new File(CoreConstant.STORAGE_TRANSFORMER_HOME).list(); + String[] paths = new File(STORAGE_TRANSFORMER_HOME).list(); if (null == paths) { return; } @@ -77,7 +78,7 @@ public static void loadTransformerFromLocalStorage(List<String> transformers) public static void loadTransformer(String each) { - String transformerPath = CoreConstant.STORAGE_TRANSFORMER_HOME + File.separator + each; + String transformerPath = STORAGE_TRANSFORMER_HOME + File.separator + each; Configuration transformerConfiguration; try { transformerConfiguration = loadTransFormerConfig(transformerPath); diff --git a/core/src/main/java/com/wgzhao/addax/core/util/TransformerUtil.java b/core/src/main/java/com/wgzhao/addax/core/util/TransformerUtil.java index f7a7c0c38..b5c324ce0 100644 --- a/core/src/main/java/com/wgzhao/addax/core/util/TransformerUtil.java +++ b/core/src/main/java/com/wgzhao/addax/core/util/TransformerUtil.java @@ -78,7 +78,7 @@ public static List<TransformerExecution> buildTransformerInfo(Configuration task /* * 延迟load 第三方插件的function,并按需load */ - LOG.info(String.format("Loading the user config transformers [%s] ...", functionNames)); + LOG.info("Loading the user config transformers [{}] ...", functionNames); TransformerRegistry.loadTransformerFromLocalStorage(functionNames); int i = 0;