Skip to content

Commit

Permalink
[update][plugin][doriswriter] code improvements
Browse files Browse the repository at this point in the history
- Add validation for username and password
- Refactor: Move all codec-related classes into a new folder
- Improve log output
- General code improvements
  • Loading branch information
wgzhao committed Oct 16, 2024
1 parent f31dea1 commit 2c6fbf3
Show file tree
Hide file tree
Showing 18 changed files with 427 additions and 389 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -56,10 +58,13 @@ public enum StreamLoadFormat
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";

private static final String COLUMN_SEPARATOR = "column_separator";
private static final String LINE_SEPARATOR = "line_delimiter";
private static final String DEFAULT_LABEL_PREFIX = "addax_doris_writer_";

private final Configuration loadProps;
private final Configuration options;
private final StreamLoadFormat streamLoadFormat;

private List<String> infoSchemaColumns;

Expand All @@ -74,9 +79,17 @@ public DorisKey(Configuration options)
this.database = conn.getNecessaryValue(DATABASE, REQUIRED_VALUE);
this.jdbcUrl = conn.getNecessaryValue(JDBC_URL, REQUIRED_VALUE);
this.table = conn.getList(TABLE, String.class).get(0);

if (options.getString(LOAD_PROPS, null) == null) {
this.loadProps = Configuration.newDefault();
}
else if (options.getString(LOAD_PROPS).startsWith("{")) {
this.loadProps = options.getConfiguration(LOAD_PROPS);
} else {
throw AddaxException.asAddaxException(CONFIG_ERROR, "The format of loadProps should be a map");
}
this.streamLoadFormat = StreamLoadFormat.valueOf(loadProps.getString(LOAD_PROPS_FORMAT, "csv").toUpperCase());
infoSchemaColumns = options.getList(COLUMN, String.class);
// this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());

if (1 == infoSchemaColumns.size() && "*".trim().equals(infoSchemaColumns.get(0))) {
// get columns from database
this.infoSchemaColumns = getDorisTableColumns();
Expand All @@ -85,8 +98,8 @@ public DorisKey(Configuration options)

public List<String> getDorisTableColumns()
{
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;",
database, table);
String currentSql = "SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '" +
database + "' AND `TABLE_NAME` = '" + table + "' ORDER BY `ORDINAL_POSITION` ASC;";
List<String> columns = new ArrayList<>();
ResultSet rs = null;
try (Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, options.getString(USERNAME), options.getString(PASSWORD))) {
Expand All @@ -108,7 +121,6 @@ public List<String> getDorisTableColumns()

public void doPretreatment()
{
// validateRequired();
validateStreamLoadUrl();
}

Expand Down Expand Up @@ -162,45 +174,39 @@ public List<String> getPostSqlList()
return options.getList(POST_SQL, String.class);
}

public Map<String, Object> getLoadProps()
{
return options.getMap(LOAD_PROPS);
}

public int getMaxRetries()
{
return MAX_RETRIES;
}

public long getBatchSize()
{
Long size = options.getLong(BATCH_SIZE);
return null == size ? DEFAULT_BATCH_SIZE : size;
return options.getLong(BATCH_SIZE, DEFAULT_BATCH_SIZE);
}

public long getFlushInterval()
{
Long interval = options.getLong(FLUSH_INTERVAL);
return null == interval ? DEFAULT_FLUSH_INTERVAL : interval;
return options.getLong(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL);
}

public int getFlushQueueLength()
{
Integer len = options.getInt(FLUSH_QUEUE_LENGTH);
return null == len ? 1 : len;
return options.getInt(FLUSH_QUEUE_LENGTH, 1);
}

public StreamLoadFormat getStreamLoadFormat()
{
Map<String, Object> loadProps = getLoadProps();
if (null == loadProps) {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
return streamLoadFormat;
}

public boolean isJsonFormat()
{
return StreamLoadFormat.JSON.equals(streamLoadFormat);
}

public boolean isCsvFormat()
{
return StreamLoadFormat.CSV.equals(streamLoadFormat);
}

private void validateStreamLoadUrl()
Expand All @@ -213,4 +219,23 @@ private void validateStreamLoadUrl()
}
}
}

public String getLineDelimiter()
{
return loadProps.getString(LINE_SEPARATOR, "\n");
}

public String getColumnSeparator()
{
return loadProps.getString(COLUMN_SEPARATOR, "\t");
}

public Map<String, String> loadProps2Map()
{
Map<String, String> result = new HashMap<>();
loadProps.getKeys().forEach(key -> {
result.put(key, loadProps.getString(key));
});
return result;
}
}
Loading

0 comments on commit 2c6fbf3

Please sign in to comment.