Skip to content

Commit

Permalink
[update][core] Optimize code for better performance and readability
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Oct 29, 2024
1 parent 3e1e210 commit f1e637b
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface Record

int getMemorySize();

public void setMeta(Map<String, String> meta);
void setMeta(Map<String, String> meta);

public Map<String, String> getMeta();
Map<String, String> getMeta();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import com.wgzhao.addax.core.statistics.communication.Communication;
import com.wgzhao.addax.core.statistics.communication.CommunicationTool;
import com.wgzhao.addax.core.transport.record.TerminateRecord;
import com.wgzhao.addax.core.util.container.CoreConstant;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_CONTAINER_TASK_GROUP_ID;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_BYTE;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_RECORD;

/**
* Created by jingxing on 14-8-25.
Expand All @@ -51,16 +57,16 @@ public abstract class Channel
protected long flowControlInterval;
protected volatile boolean isClosed = false;
protected Configuration configuration;
protected volatile long waitReaderTime = 0;
protected volatile long waitWriterTime = 0;
protected volatile AtomicLong waitReaderTime = new AtomicLong(0);
protected volatile AtomicLong waitWriterTime = new AtomicLong(0);
private Communication currentCommunication;

public Channel(Configuration configuration)
{
//channel的queue里默认record为1万条。原来为512条
int capacity = configuration.getInt(CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
long byteSpeed = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024L);
long recordSpeed = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000L);
int capacity = configuration.getInt(CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
long byteSpeed = configuration.getLong(CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024L);
long recordSpeed = configuration.getLong(CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000L);

if (capacity <= 0) {
throw new IllegalArgumentException(String.format("The channel capacity [%d] must be greater than 0.", capacity));
Expand All @@ -72,13 +78,13 @@ public Channel(Configuration configuration)
isFirstPrint = false;
}

this.taskGroupId = configuration.getInt(CoreConstant.CORE_CONTAINER_TASK_GROUP_ID);
this.taskGroupId = configuration.getInt(CORE_CONTAINER_TASK_GROUP_ID);
this.capacity = capacity;
this.byteSpeed = byteSpeed;
this.recordSpeed = recordSpeed;
this.flowControlInterval = configuration.getLong(CoreConstant.CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL, 1000);
this.flowControlInterval = configuration.getLong(CORE_TRANSPORT_CHANNEL_FLOW_CONTROL_INTERVAL, 1000);
//channel的queue默认大小为8M,原来为64M
this.byteCapacity = configuration.getInt(CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
this.byteCapacity = configuration.getInt(CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
this.configuration = configuration;
}

Expand Down Expand Up @@ -177,8 +183,8 @@ private void statPush(long recordSize, long byteSize)
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES, byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数

currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime.get());
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime.get());

boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.core.transport.channel.Channel;
import com.wgzhao.addax.core.transport.record.TerminateRecord;
import com.wgzhao.addax.core.util.container.CoreConstant;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -34,6 +33,7 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE;

/**
* 内存Channel的具体实现,底层其实是一个ArrayBlockingQueue
Expand All @@ -57,7 +57,7 @@ public MemoryChannel(Configuration configuration)
{
super(configuration);
this.queue = new ArrayBlockingQueue<>(this.getCapacity());
this.bufferSize = configuration.getInt(CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32);
this.bufferSize = configuration.getInt(CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32);

lock = new ReentrantLock();
notInsufficient = lock.newCondition();
Expand Down Expand Up @@ -88,7 +88,7 @@ protected void doPush(Record r)
try {
long startTime = System.nanoTime();
this.queue.put(r);
waitWriterTime += System.nanoTime() - startTime;
waitReaderTime.addAndGet(System.nanoTime() - startTime);
memoryBytes.addAndGet(r.getMemorySize());
}
catch (InterruptedException ex) {
Expand All @@ -107,7 +107,7 @@ protected void doPushAll(Collection<Record> rs)
notInsufficient.await(200L, TimeUnit.MILLISECONDS);
}
this.queue.addAll(rs);
waitWriterTime += System.nanoTime() - startTime;
waitReaderTime.addAndGet(System.nanoTime() - startTime);
memoryBytes.addAndGet(bytes);
notEmpty.signalAll();
}
Expand All @@ -125,7 +125,7 @@ protected Record doPull()
try {
long startTime = System.nanoTime();
Record r = this.queue.take();
waitReaderTime += System.nanoTime() - startTime;
waitReaderTime.addAndGet(System.nanoTime() - startTime);
memoryBytes.addAndGet(-r.getMemorySize());
return r;
}
Expand All @@ -146,7 +146,7 @@ protected void doPullAll(Collection<Record> rs)
while (this.queue.drainTo(rs, bufferSize) <= 0) {
notEmpty.await(200L, TimeUnit.MILLISECONDS);
}
waitReaderTime += System.nanoTime() - startTime;
waitReaderTime.addAndGet(System.nanoTime() - startTime);
int bytes = getRecordBytes(rs);
memoryBytes.addAndGet(-bytes);
notInsufficient.signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
package com.wgzhao.addax.core.transport.exchanger;

import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.spi.ErrorCode;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordReceiver;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.plugin.TaskPluginCollector;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.core.transport.channel.Channel;
import com.wgzhao.addax.core.transport.record.TerminateRecord;
import com.wgzhao.addax.core.util.container.CoreConstant;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +36,10 @@
import java.util.concurrent.atomic.AtomicInteger;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.SHUT_DOWN_TASK;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_RECORD_CLASS;

public class BufferedRecordExchanger
implements RecordSender, RecordReceiver
Expand Down Expand Up @@ -65,17 +67,17 @@ public BufferedRecordExchanger(Channel channel, TaskPluginCollector pluginCollec
this.pluginCollector = pluginCollector;
Configuration configuration = channel.getConfiguration();

this.bufferSize = configuration.getInt(CoreConstant.CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32);
this.bufferSize = configuration.getInt(CORE_TRANSPORT_EXCHANGER_BUFFER_SIZE, 32);
this.buffer = new ArrayList<>(bufferSize);

//channel的queue默认大小为8M,原来为64M
this.byteCapacity = configuration.getInt(
CoreConstant.CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);

try {
BufferedRecordExchanger.recordClass = ((Class<? extends Record>) Class
.forName(configuration.getString(
CoreConstant.CORE_TRANSPORT_RECORD_CLASS,
CORE_TRANSPORT_RECORD_CLASS,
"com.wgzhao.addax.core.transport.record.DefaultRecord")));
}
catch (Exception e) {
Expand All @@ -98,7 +100,7 @@ public Record createRecord()
public void sendToWriter(Record record)
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}

Validate.notNull(record, "The record cannot be empty.");
Expand All @@ -124,7 +126,7 @@ public void sendToWriter(Record record)
public void flush()
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
this.channel.pushAll(this.buffer);
this.buffer.clear();
Expand All @@ -136,7 +138,7 @@ public void flush()
public void terminate()
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
flush();
this.channel.pushTerminate(TerminateRecord.get());
Expand All @@ -146,7 +148,7 @@ public void terminate()
public Record getFromReader()
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@
package com.wgzhao.addax.core.transport.exchanger;

import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.spi.ErrorCode;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordReceiver;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.plugin.TaskPluginCollector;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.core.statistics.communication.Communication;
import com.wgzhao.addax.core.transport.channel.Channel;
import com.wgzhao.addax.core.transport.record.DefaultRecord;
import com.wgzhao.addax.core.transport.record.TerminateRecord;
import com.wgzhao.addax.core.transport.transformer.TransformerExecution;
import com.wgzhao.addax.core.util.container.CoreConstant;
import org.apache.commons.lang3.StringUtils;

import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.SHUT_DOWN_TASK;
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_TRANSPORT_RECORD_CLASS;

public class RecordExchanger
extends TransformerExchanger
Expand All @@ -54,10 +56,13 @@ public RecordExchanger(int taskGroupId, int taskId, Channel channel, Communicati
this.channel = channel;
Configuration configuration = channel.getConfiguration();
try {
RecordExchanger.RECORD_CLASS = (Class<? extends Record>) Class
.forName(configuration.getString(
CoreConstant.CORE_TRANSPORT_RECORD_CLASS,
"com.wgzhao.addax.core.transport.record.DefaultRecord"));
String cls = configuration.getString(CORE_TRANSPORT_RECORD_CLASS, null);
if (!StringUtils.isBlank(cls)) {
RECORD_CLASS = (Class<? extends Record>) Class.forName(cls);
}
else {
RecordExchanger.RECORD_CLASS = DefaultRecord.class;
}
}
catch (ClassNotFoundException e) {
throw AddaxException.asAddaxException(CONFIG_ERROR, e);
Expand All @@ -68,7 +73,7 @@ public RecordExchanger(int taskGroupId, int taskId, Channel channel, Communicati
public Record getFromReader()
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
Record record = this.channel.pull();
return (record instanceof TerminateRecord ? null : record);
Expand All @@ -89,7 +94,7 @@ public Record createRecord()
public void sendToWriter(Record record)
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
record = doTransformer(record);
if (record == null) {
Expand All @@ -109,7 +114,7 @@ public void flush()
public void terminate()
{
if (shutdown) {
throw AddaxException.asAddaxException(ErrorCode.SHUT_DOWN_TASK, "");
throw AddaxException.asAddaxException(SHUT_DOWN_TASK, "");
}
this.channel.pushTerminate(TerminateRecord.get());
//和channel的统计保持同步
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Record evaluate(Record record, Object... paras)
}

columnIndex = (Integer) paras[0];
code = (String) paras[1];
code = paras[1].toString().toLowerCase();
value = (String) paras[2];

if (StringUtils.isEmpty(value)) {
Expand All @@ -77,33 +77,26 @@ public Record evaluate(Record record, Object... paras)
Column column = record.getColumn(columnIndex);

try {

if ("like".equalsIgnoreCase(code)) {
return doLike(record, value, column);
}
else if ("not like".equalsIgnoreCase(code)) {
return doNotLike(record, value, column);
}
else if (">".equalsIgnoreCase(code)) {
return doGreat(record, value, column, false);
}
else if ("<".equalsIgnoreCase(code)) {
return doLess(record, value, column, false);
}
else if ("=".equalsIgnoreCase(code) || "==".equalsIgnoreCase(code)) {
return doEqual(record, value, column);
}
else if ("!=".equalsIgnoreCase(code)) {
return doNotEqual(record, value, column);
}
else if (">=".equalsIgnoreCase(code)) {
return doGreat(record, value, column, true);
}
else if ("<=".equalsIgnoreCase(code)) {
return doLess(record, value, column, true);
}
else {
throw new RuntimeException("dx_filter code:" + code + " is unsupported");
switch (code) {
case "like":
return doLike(record, value, column);
case "not like":
return doNotLike(record, value, column);
case ">":
return doGreat(record, value, column, false);
case "<":
return doLess(record, value, column, false);
case "=":
case "==":
return doEqual(record, value, column);
case "!=":
return doNotEqual(record, value, column);
case ">=":
return doGreat(record, value, column, true);
case "<=":
return doLess(record, value, column, true);
default:
throw new RuntimeException("dx_filter code:" + code + " is unsupported");
}
}
catch (Exception e) {
Expand Down Expand Up @@ -267,6 +260,7 @@ else if (column instanceof StringColumn

/**
* DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值
*
* @param record message record
* @param value value to compared
* @param column the column of record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ public Record evaluate(Record record, Object... paras)
if (oriValue == null) {
return record;
}
String newValue;

if (startIndex > oriValue.length()) {
throw new RuntimeException(String.format("The dx_substr startIndex(%s) out of range" +
"(%s) of (%s)", startIndex, oriValue.length(), oriValue));
}

String newValue;
if (startIndex + length >= oriValue.length()) {
newValue = oriValue.substring(startIndex);
}
Expand Down
Loading

0 comments on commit f1e637b

Please sign in to comment.