Skip to content

Commit

Permalink
[improve][plugin][clickhousewriter] Optimize Codebase: Update JDBC Dr…
Browse files Browse the repository at this point in the history
…iver and Enhance HTTP Connections

1. Upgrade JDBC driver from version 0.3.2 to 0.6.5 and update package name from `ru.yandax` to `com.clickhouse`.
2. Remove overridden `doBatchInsert` method as the new JDBC driver now supports compliant conversion behavior.
3. Integrate Apache HttpComponents to manage HTTP connections more efficiently.
  • Loading branch information
wgzhao committed Sep 24, 2024
1 parent 0750c8b commit e7a0774
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 42 deletions.
31 changes: 20 additions & 11 deletions plugin/writer/clickhousewriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,29 @@
</dependency>

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.jdbc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.2.4</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@
package com.wgzhao.addax.plugin.writer.clickhousewriter;

import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.plugin.RecordReceiver;
import com.wgzhao.addax.common.spi.Writer;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.rdbms.util.DataBaseType;
import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_DATE_FORMAT;

public class ClickHouseWriter
extends Writer
Expand Down Expand Up @@ -111,39 +110,19 @@ protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement pr
else if (columnTypeName.startsWith("DateTime(")) {
preparedStatement.setObject(columnIndex, column.asTimestamp());
}
else if (columnTypeName.equals("DateTime")) {
// no precision specified, use default
SimpleDateFormat sdf = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
preparedStatement.setString(columnIndex, sdf.format(column.asDate()));
}
else {
preparedStatement.setString(columnIndex, column.asString());
}
return preparedStatement;
}

return super.fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqlType, column);
}

@Override
protected void doBatchInsert(Connection connection, List<Record> buffer)
throws SQLException
{
// references https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc
String insertSql = "insert into " + this.table + " select ";
StringJoiner selectCols = new StringJoiner(",");
StringJoiner selectColWithType = new StringJoiner(",");
for (int i = 1; i < this.resultSetMetaData.size(); i++) {
final Map<String, Object> md = this.resultSetMetaData.get(i);
selectCols.add(md.get("name").toString());
selectColWithType.add(md.get("name").toString() + " " + md.get("typeName"));
}
insertSql += selectCols + " from input('" + selectColWithType + "')";
LOG.info("insert sql: {}", insertSql);
PreparedStatement ps = connection.prepareStatement(insertSql);
for (Record record : buffer) {
ps = this.fillPreparedStatement(ps, record);
ps.addBatch();
}
ps.executeBatch(); // stream everything on-hand into ClickHouse
}
}
;
};

this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@

<!-- jdbc driver version -->
<cassandra.jdbc.version>3.11.0</cassandra.jdbc.version>
<clickhouse.jdbc.version>0.3.2</clickhouse.jdbc.version>
<clickhouse.jdbc.version>0.6.5</clickhouse.jdbc.version>
<databend.jdbc.version>0.3.2</databend.jdbc.version>
<dbcp2.version>2.11.0</dbcp2.version>
<hive.jdbc.version>3.1.3</hive.jdbc.version>
Expand Down

0 comments on commit e7a0774

Please sign in to comment.