Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The internal implementation of StarRocks sink connector is cached and imported b
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. |
| data_save_mode | Enum | no | APPEND_DATA | Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. |
| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. |
| warehouse_name | String | no | - | Name of the Warehouse to which CN belongs |

### save_mode_create_template

Expand Down Expand Up @@ -304,6 +305,30 @@ sink {
}
```

### Use warehouse_name

```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
warehouse_name = "default_warehouse"
}
}
```

### Multiple table

#### example1
Expand Down
25 changes: 25 additions & 0 deletions docs/zh/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ StarRocks数据接收器内部实现采用了缓存,通过stream load将数据
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法 |
| data_save_mode | Enum | no | APPEND_DATA | 在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法 |
| custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 |
| warehouse_name | String | no | - | CN 所属的 Warehouse 名 |

### save_mode_create_template

Expand Down Expand Up @@ -300,6 +301,30 @@ sink {
}
```

### 使用warehouse_name

```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
warehouse_name = "default_warehouse"
}
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -293,6 +294,8 @@ private Map<String, String> getStreamLoadHttpHeader(String label) {
headerMap.put(
"Authorization",
getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
Optional.ofNullable(sinkConfig.getWarehouseName())
.ifPresent(warehouse -> headerMap.put("warehouse", warehouse));
return headerMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public enum StreamLoadFormat {

private int httpSocketTimeout;

private String warehouseName;

@Getter private final Map<String, Object> streamLoadProps = new HashMap<>();

public static SinkConfig of(ReadonlyConfig config) {
Expand Down Expand Up @@ -98,6 +100,7 @@ public static SinkConfig of(ReadonlyConfig config) {
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE));
sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL));
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
sinkConfig.setWarehouseName(config.get(StarRocksSinkOptions.WAREHOUSE_NAME));
return sinkConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,10 @@ public class StarRocksSinkOptions extends StarRocksBaseOptions {
.stringType()
.noDefaultValue()
.withDescription("when data_save_mode selects CUSTOM_PROCESSING custom SQL");

public static final Option<String> WAREHOUSE_NAME =
Options.key("warehouse_name")
.stringType()
.noDefaultValue()
.withDescription("Warehouse name");
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ sink {
format = "JSON"
strip_outer_array = true
}
warehouse_name = "default_warehouse"
}
}