diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md
index 89dc7117f15d..ad46d68db5b2 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -54,6 +54,7 @@ The internal implementation of StarRocks sink connector is cached and imported b
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. |
| data_save_mode | Enum | no | APPEND_DATA | Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. |
| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. |
+| warehouse_name | String | no | - | Name of the Warehouse to which CN belongs |
### save_mode_create_template
@@ -304,6 +305,30 @@ sink {
}
```
+### Use warehouse_name
+
+```
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
+ username = root
+ password = ""
+ database = "test"
+ table = "test_${schema_name}_${table_name}"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ batch_max_rows = 10
+ starrocks.config = {
+ format = "CSV"
+ column_separator = "\\x01"
+ row_delimiter = "\\x02"
+ }
+ warehouse_name = "default_warehouse"
+ }
+}
+```
+
### Multiple table
#### example1
diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md
index e62a475268e3..c096c0ad76d5 100644
--- a/docs/zh/connector-v2/sink/StarRocks.md
+++ b/docs/zh/connector-v2/sink/StarRocks.md
@@ -53,6 +53,7 @@ StarRocks数据接收器内部实现采用了缓存,通过stream load将数据
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法 |
| data_save_mode | Enum | no | APPEND_DATA | 在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法 |
| custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 |
+| warehouse_name | String | no | - | CN 所属的 Warehouse 名 |
### save_mode_create_template
@@ -300,6 +301,30 @@ sink {
}
```
+### 使用warehouse_name
+
+```
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
+ username = root
+ password = ""
+ database = "test"
+ table = "test_${schema_name}_${table_name}"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ batch_max_rows = 10
+ starrocks.config = {
+ format = "CSV"
+ column_separator = "\\x01"
+ row_delimiter = "\\x02"
+ }
+ warehouse_name = "default_warehouse"
+ }
+}
+```
+
## 变更日志
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 741898cf8959..de05f7ed3c70 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -293,6 +294,8 @@ private Map getStreamLoadHttpHeader(String label) {
headerMap.put(
"Authorization",
getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
+ Optional.ofNullable(sinkConfig.getWarehouseName())
+ .ifPresent(warehouse -> headerMap.put("warehouse", warehouse));
return headerMap;
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 4b0ee5e6ebba..c3ab025fec0f 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -65,6 +65,8 @@ public enum StreamLoadFormat {
private int httpSocketTimeout;
+ private String warehouseName;
+
@Getter private final Map streamLoadProps = new HashMap<>();
public static SinkConfig of(ReadonlyConfig config) {
@@ -98,6 +100,7 @@ public static SinkConfig of(ReadonlyConfig config) {
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE));
sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL));
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
+ sinkConfig.setWarehouseName(config.get(StarRocksSinkOptions.WAREHOUSE_NAME));
return sinkConfig;
}
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 5c20abd44405..76302af2ee60 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -155,4 +155,10 @@ public class StarRocksSinkOptions extends StarRocksBaseOptions {
.stringType()
.noDefaultValue()
.withDescription("when data_save_mode selects CUSTOM_PROCESSING custom SQL");
+
+ public static final Option WAREHOUSE_NAME =
+ Options.key("warehouse_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Warehouse name");
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
index d053a600088d..7240a52ed406 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
@@ -102,5 +102,6 @@ sink {
format = "JSON"
strip_outer_array = true
}
+ warehouse_name = "default_warehouse"
}
}
\ No newline at end of file