Skip to content

Commit

Permalink
[improve][writer][doris] Refactor Doris writer plugin (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao authored Feb 6, 2024
1 parent 02944a8 commit c41c765
Show file tree
Hide file tree
Showing 19 changed files with 1,185 additions and 578 deletions.
10 changes: 6 additions & 4 deletions docs/assets/jobs/doriswriter.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["127.0.0.1:8030"],
"username": "test",
"password": "123456",
"batchSize": 1024,
Expand All @@ -17,12 +18,13 @@
{
"table": "table1",
"database": "example_db",
"endpoint": ["http://127.0.0.1:8030/"]
"jdbcUrl": "jdbc:mysql://localhost:9030/example_db"
}
],
"loadProps": {},
"lineDelimiter": "\n",
"format": "csv"
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
},
"reader": {
Expand Down
71 changes: 45 additions & 26 deletions docs/writer/doriswriter.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Doris Writer

DorisWriter 插件用于向 [Doris](http://doris.incubator.apache.org/master/zh-CN/) 数据库以流式方式写入数据。 其实现上是通过访问
Doris http 连接(8030)
,然后通过 [stream load](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
Doris http 连接(8030),然后通过 [stream load](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 [MySQLReader](../../reader/mysqlreader) 进行访问。
Expand Down Expand Up @@ -47,38 +46,58 @@ bin/addax.sh job/stream2doris.json

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :------------- | :------: | ------ | -------- | ------------------------------------------------------------ |
| endpoint | | string | | Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 | |
| username | | string | | HTTP 签名验证帐号 |
| password | | string | | HTTP 签名验证密码 |
| table | | string || 所选取的需要同步的表名 |
| column | | list | | 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| batchSize | | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
| lineDelimiter | | string | `\n` | 每行的的分隔符,支持多个字节, 例如 `\x02\x03` |
| fieldDelimiter | | string | `|` | 字段的分隔符 例如 `,` |
| format | | string | `csv` | 导入数据的格式, 可以使是 json 或者 csv |
| loadProps | | map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
| connectTimeout | | int | -1 | StreamLoad单次请求的超时时间, 单位毫秒(ms) |
| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
|:-----------------|:----:|--------|-------|-----------------------------------------------------|
| loadUrl | | string || Stream Load 的连接目标 | |
| username || string || 访问Doris数据库的用户名 |
| password || string || 访问Doris数据库的密码 |
| flushInterval | | int | 3000 | 数据写入到目标表的间隔时间,单位为毫秒,即每隔多少毫秒写入一次数据 |
| flushQueueLength | | int | 1 | 上传数据的队列长度 |
| table || string | | 所选取的需要同步的表名 |
| column | | list | | 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter](../rdbmswriter) |
| batchSize || int | 2048 | 每批次导入数据的最大行数 |
| loadProps | | map | `csv` | streamLoad 的请求参数,详情参照[StreamLoad介绍页面][1] |
| preSqL | | list | | 写入数据到目标表前要执行的 SQL 语句 |
| postSqL | | list | | 数据写完后要执行的 SQL 语句 |

[1]: https://doris.apache.org/master/zh-CN/administrator-guide/load-data/load-json-format.html#stream-load
[1]: https://github.com/apache/doris-streamloader/tree/master

## endpoint
## loadUrl

`endpoint` 只是的任意一个 BE 的主机名及 `webserver_port` 端口,官方文档描述也可以填写 FE 主机名和 `http_port`
端口,但实际测试一直处于连接拒绝状态。
作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,当填写多个时,插件会每个批次随机选择一个有效 FE 节点进行连接。

### column

该插件中的 `column` 不是必须项,如果没有配置该项,或者配置为 `["*"]` , 则按照 reader 插件获取的字段值进行顺序拼装。
否则可以按照如下方式指定需要插入的字段
允许配置为 `["*"]` , 如果是 "*" , 则尝试从 Doris 数据库中直接读取表字段,然后进行拼装。

### loadProps

StreamLoad 的请求参数,详情参照StreamLoad介绍页面。[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)

这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息

## 类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。

默认是csv格式导入,如需更改列分隔符, 则正确配置 loadProps 即可

```json
{
"column": [
"siteid",
"citycode",
"username"
]
"loadProps": {
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}
}
```

如需更改导入格式为json, 则正确配置 loadProps 即可:

```json
{
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.wgzhao.addax.plugin.writer.doriswriter;
Expand All @@ -22,11 +23,14 @@

import java.io.StringWriter;

public class DelimiterParser {
public class DelimiterParser
{

private static final String HEX_STRING = "0123456789ABCDEF";

public static String parse(String sp, String dSp) throws RuntimeException {
public static String parse(String sp, String dSp)
throws RuntimeException
{
if (Strings.isNullOrEmpty(sp)) {
return dSp;
}
Expand All @@ -42,7 +46,8 @@ public static String parse(String sp, String dSp) throws RuntimeException {
return writer.toString();
}

private static String getHexStr(String sp) {
private static String getHexStr(String sp)
{
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
Expand All @@ -59,7 +64,8 @@ private static String getHexStr(String sp) {
return hexStr;
}

private static byte[] hexStrToBytes(String hexStr) {
private static byte[] hexStrToBytes(String hexStr)
{
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
Expand All @@ -71,7 +77,8 @@ private static byte[] hexStrToBytes(String hexStr) {
return bytes;
}

private static byte charToByte(char c) {
private static byte charToByte(char c)
{
return (byte) HEX_STRING.indexOf(c);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.wgzhao.addax.plugin.writer.doriswriter;

import com.wgzhao.addax.common.element.Column;

public class DorisBaseCodec {
protected String convertField( Column col) {
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
return null;
}
if ( Column.Type.BOOL == col.getType()) {
return String.valueOf(col.asLong());
}
if ( Column.Type.BYTES == col.getType()) {
byte[] bts = (byte[])col.getRawData();
long value = 0;
for (int i = 0; i < bts.length; i++) {
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
}
return String.valueOf(value);
}
return col.asString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,24 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package com.wgzhao.addax.plugin.writer.doriswriter;

import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.DateColumn;
import com.wgzhao.addax.common.element.Record;
import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.List;
import java.util.TimeZone;

public abstract class DorisCodec
{
protected static String timeZone = "GMT+8";
protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
protected final List<String> fieldNames;

public DorisCodec(final List<String> fieldNames) {
this.fieldNames = fieldNames;
}
import java.io.Serializable;

public abstract String serialize(Record row);
public interface DorisCodec extends Serializable {

/**
* convert datax internal data to string
*
* @param col
* @return
*/
protected Object convertColumn(final Column col) {
if (null == col.getRawData()) {
return null;
}
Column.Type type = col.getType();
switch (type) {
case BOOL:
case INT:
case LONG:
return col.asLong();
case DOUBLE:
return col.asDouble();
case STRING:
return col.asString();
case DATE: {
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
switch (dateType) {
case DATE:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
case DATETIME:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
default:
return col.asString();
}
}
default:
// BAD, NULL, BYTES
return null;
}
}
String codec( Record row);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package com.wgzhao.addax.plugin.writer.doriswriter;

import java.util.Map;

public class DorisCodecFactory {
public DorisCodecFactory (){

}
public static DorisCodec createCodec( DorisKey writerOptions) {
if ( DorisKey.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
}
if ( DorisKey.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new DorisJsonCodec (writerOptions.getColumns());
}
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}
Loading

0 comments on commit c41c765

Please sign in to comment.