Skip to content

Commit

Permalink
[update][plugin][influxdbreader] Migrate Apache HttpComponents from 4…
Browse files Browse the repository at this point in the history
….x to 5.x
  • Loading branch information
wgzhao committed Oct 9, 2024
1 parent 77a1fd7 commit b9faed6
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 144 deletions.
84 changes: 39 additions & 45 deletions core/src/main/job/influxdb2pg.json
Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
{
"job": {
"content": [
{
"reader": {
"name": "influxdbreader",
"parameter": {
"column": [
"*"
],
"connection": [
{
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet"
}
],
"username": "influx",
"password": "influx123"
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "wgzhao",
"password": "wgzhao",
"column": [
"*"
],
"connection": [
{
"table": [
"influx_tbl"
],
"jdbcUrl": "jdbc:postgresql://localhost:5432/wgzhao"
}
]
}
}
}
],
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
"job": {
"content": {
"reader": {
"name": "influxdbreader",
"parameter": {
"column": [
"*"
],
"connection": {
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet"
},
"username": "influx",
"password": "influx123"
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "wgzhao",
"password": "wgzhao",
"column": [
"*"
],
"connection": {
"table": [
"influx_tbl"
],
"jdbcUrl": "jdbc:postgresql://localhost:5432/wgzhao"
}
}
}
},
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
}
}
}
46 changes: 21 additions & 25 deletions core/src/main/job/influxdb2stream.json
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
{
"job": {
"content": [
{
"reader": {
"name": "influxdbreader",
"parameter": {
"column": [
"*"
],
"connection": [
{
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet"
}
],
"username": "influx",
"password": "influx123"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
"content": {
"reader": {
"name": "influxdbreader",
"parameter": {
"column": [
"*"
],
"connection": {
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet"
},
"username": "influx",
"password": "influx123"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
}
],
},
"setting": {
"speed": {
"bytes": -1,
Expand Down
14 changes: 6 additions & 8 deletions docs/assets/jobs/influxdbreader.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
"column": [
"*"
],
"connection": [
{
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet",
"where": "1=1"
}
],
"where": "1=1",
"connection": {
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet"
},
"connTimeout": 15,
"readTimeout": 20,
"writeTimeout": 20,
Expand Down
6 changes: 3 additions & 3 deletions plugin/reader/influxdbreader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>${fluent.hc.version}</version>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-fluent</artifactId>
<version>${httpclient.version}</version>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@

import com.wgzhao.addax.common.base.Key;

public final class InfluxDBKey extends Key
public final class InfluxDBKey
extends Key
{
public static final String ENDPOINT = "endpoint";
public static final String COLUMN = "column";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String DATABASE = "database";
public static final String QUERY_SQL = "querySql";
public static final String CONNECTION = "connection";
public static final String CONNECT_TIMEOUT_SECONDS = "connTimeout";
public static final String SOCKET_TIMEOUT_SECONDS = "socketTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ public void preCheck()
{
init();
originalConfig.getNecessaryValue(InfluxDBKey.ENDPOINT, REQUIRED_VALUE);
originalConfig.getNecessaryValue(InfluxDBKey.DATABASE, REQUIRED_VALUE);
List<String> columns = originalConfig.getList(InfluxDBKey.COLUMN, String.class);
String querySql = originalConfig.getNecessaryValue(InfluxDBKey.QUERY_SQL, null);
String database = originalConfig.getString(InfluxDBKey.DATABASE, null);
if (StringUtils.isAllBlank(querySql,database)) {
String querySql = originalConfig.getString(InfluxDBKey.QUERY_SQL, null);
String table = originalConfig.getString(InfluxDBKey.TABLE, null);
if (StringUtils.isAllBlank(querySql, table)) {
throw AddaxException.asAddaxException(
REQUIRED_VALUE,
"One of database or querySql must be specified"
"One of table or querySql must be specified"
);
}
if (columns == null || columns.isEmpty()) {
Expand All @@ -76,12 +77,6 @@ public List<Configuration> split(int adviceNumber)
return splitConfigs;
}

@Override
public void post()
{
//
}

@Override
public void destroy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@
import com.wgzhao.addax.common.plugin.TaskPluginCollector;
import com.wgzhao.addax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;

Expand All @@ -55,19 +54,22 @@ public class InfluxDBReaderTask
private final String password;

private final int connTimeout;
private final int socketTimeout;

public InfluxDBReaderTask(Configuration configuration)
{
List<Object> connList = configuration.getList(InfluxDBKey.CONNECTION);
Configuration conn = Configuration.from(connList.get(0).toString());
this.querySql = configuration.getString(InfluxDBKey.QUERY_SQL, "");
Configuration conn = configuration.getConfiguration(InfluxDBKey.CONNECTION);
this.querySql = configuration.getString(InfluxDBKey.QUERY_SQL, null);
this.database = conn.getString(InfluxDBKey.DATABASE);
this.endpoint = conn.getString(InfluxDBKey.ENDPOINT);
this.username = configuration.getString(InfluxDBKey.USERNAME);
this.password = configuration.getString(InfluxDBKey.PASSWORD, null);
this.connTimeout = configuration.getInt(InfluxDBKey.CONNECT_TIMEOUT_SECONDS, CONNECT_TIMEOUT_SECONDS_DEFAULT) * 1000;
this.socketTimeout = configuration.getInt(InfluxDBKey.SOCKET_TIMEOUT_SECONDS, SOCKET_TIMEOUT_SECONDS_DEFAULT) * 1000;
if (this.querySql == null) {
this.querySql = "select * from " + conn.getString(InfluxDBKey.TABLE);
}
if (!"".equals(configuration.getString(InfluxDBKey.WHERE, ""))) {
this.querySql += " where " + configuration.getString(InfluxDBKey.WHERE);
}
this.username = configuration.getString(InfluxDBKey.USERNAME, "");
this.password = configuration.getString(InfluxDBKey.PASSWORD, "");
this.connTimeout = configuration.getInt(InfluxDBKey.CONNECT_TIMEOUT_SECONDS, CONNECT_TIMEOUT_SECONDS_DEFAULT);
}

public void post()
Expand All @@ -83,29 +85,16 @@ public void destroy()
public void startRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("connect influxdb: {} with username: {}", endpoint, username);

String tail = "/query";
String enc = "utf-8";
String result;
try {
String url = endpoint + tail + "?db=" + URLEncoder.encode(database, enc);
if (!"".equals(username)) {
url += "&u=" + URLEncoder.encode(username, enc);
}
if (!"".equals(password)) {
url += "&p=" + URLEncoder.encode(password, enc);
}
if (querySql.contains("#lastMinute#")) {
this.querySql = querySql.replace("#lastMinute#", getLastMinute());
}
url += "&q=" + URLEncoder.encode(querySql, enc);
result = get(url);
result = Request.get(combineUrl())
.connectTimeout(Timeout.ofSeconds(connTimeout))
.execute()
.returnContent().asString();
}
catch (Exception e) {
throw AddaxException.asAddaxException(
ILLEGAL_VALUE, "Failed to get data point!", e);
catch (IOException e) {
throw new RuntimeException(e);
}

if (StringUtils.isBlank(result)) {
throw AddaxException.asAddaxException(
ILLEGAL_VALUE, "Get nothing!", null);
Expand Down Expand Up @@ -145,18 +134,27 @@ else if (resultsMap.containsKey("error")) {
}
}

public String get(String url)
throws Exception
private String combineUrl()
{
Content content = Request.Get(url)
.connectTimeout(this.connTimeout)
.socketTimeout(this.socketTimeout)
.execute()
.returnContent();
if (content == null) {
return null;
String enc = "utf-8";
try {
String url = endpoint + "/query?db=" + URLEncoder.encode(database, enc);
if (!"".equals(username)) {
url += "&u=" + URLEncoder.encode(username, enc);
}
if (!"".equals(password)) {
url += "&p=" + URLEncoder.encode(password, enc);
}
if (querySql.contains("#lastMinute#")) {
this.querySql = querySql.replace("#lastMinute#", getLastMinute());
}
url += "&q=" + URLEncoder.encode(querySql, enc);
return url;
}
catch (Exception e) {
throw AddaxException.asAddaxException(
ILLEGAL_VALUE, "Failed to get data point!", e);
}
return content.asString(StandardCharsets.UTF_8);
}

@SuppressWarnings("JavaTimeDefaultTimeZone")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
"column": [
"*"
],
"connection": [
"where": "1=1",
"connection":
{
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet",
"where": "1=1"
}
],
"table": "h2o_feet"
},
"connTimeout": 15,
"readTimeout": 20,
"writeTimeout": 20,
Expand Down

0 comments on commit b9faed6

Please sign in to comment.