From 5d45ca6ec22b45300ec2b0077817b1b4e240e850 Mon Sep 17 00:00:00 2001 From: lijie Date: Wed, 7 Jan 2026 20:46:36 +0800 Subject: [PATCH] [Improve][Connector-V2][starrocks-sink] Support to specify warehouse for starrocks sink --- docs/en/connector-v2/sink/StarRocks.md | 25 +++++++++++++++++++ docs/zh/connector-v2/sink/StarRocks.md | 25 +++++++++++++++++++ .../client/StarRocksStreamLoadVisitor.java | 3 +++ .../starrocks/config/SinkConfig.java | 3 +++ .../config/StarRocksSinkOptions.java | 6 +++++ .../src/test/resources/fake-to-starrocks.conf | 1 + 6 files changed, 63 insertions(+) diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 89dc7117f15d..ad46d68db5b2 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -54,6 +54,7 @@ The internal implementation of StarRocks sink connector is cached and imported b | schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. | | data_save_mode | Enum | no | APPEND_DATA | Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. | | custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | +| warehouse_name | String | no | - | Name of the Warehouse to which CN belongs | ### save_mode_create_template @@ -304,6 +305,30 @@ sink { } ``` +### Use warehouse_name + +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + base-url = "jdbc:mysql://e2e_starRocksdb:9030/" + username = root + password = "" + database = "test" + table = "test_${schema_name}_${table_name}" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode="APPEND_DATA" + batch_max_rows = 10 + starrocks.config = { + format = "CSV" + column_separator = "\\x01" + row_delimiter = "\\x02" + } + warehouse_name = "default_warehouse" + } +} +``` + ### Multiple table #### example1 diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md index e62a475268e3..c096c0ad76d5 100644 --- a/docs/zh/connector-v2/sink/StarRocks.md +++ b/docs/zh/connector-v2/sink/StarRocks.md @@ -53,6 +53,7 @@ StarRocks数据接收器内部实现采用了缓存,通过stream load将数据 | schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法 | | data_save_mode | Enum | no | APPEND_DATA | 在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法 | | custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 | +| warehouse_name | String | no | - | CN 所属的 Warehouse 名 | ### save_mode_create_template @@ -300,6 +301,30 @@ sink { } ``` +### 使用warehouse_name + +``` +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + base-url = "jdbc:mysql://e2e_starRocksdb:9030/" + username = root + password = "" + database = "test" + table = "test_${schema_name}_${table_name}" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode="APPEND_DATA" + batch_max_rows = 10 + starrocks.config = { + format = "CSV" + column_separator = "\\x01" + row_delimiter = "\\x02" + } + warehouse_name = "default_warehouse" + } +} +``` + ## 变更日志 diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java index 741898cf8959..de05f7ed3c70 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -293,6 +294,8 @@ private Map getStreamLoadHttpHeader(String label) { headerMap.put( "Authorization", getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword())); + Optional.ofNullable(sinkConfig.getWarehouseName()) + .ifPresent(warehouse -> headerMap.put("warehouse", warehouse)); return headerMap; } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java index 4b0ee5e6ebba..c3ab025fec0f 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java @@ -65,6 +65,8 @@ public enum StreamLoadFormat { private int httpSocketTimeout; + private String warehouseName; + @Getter private final Map streamLoadProps = new HashMap<>(); public static SinkConfig of(ReadonlyConfig config) { @@ -98,6 +100,7 @@ public static SinkConfig of(ReadonlyConfig config) { sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE)); sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL)); sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)); + sinkConfig.setWarehouseName(config.get(StarRocksSinkOptions.WAREHOUSE_NAME)); return sinkConfig; } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java index 5c20abd44405..76302af2ee60 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java @@ -155,4 +155,10 @@ public class StarRocksSinkOptions extends StarRocksBaseOptions { .stringType() .noDefaultValue() .withDescription("when data_save_mode selects CUSTOM_PROCESSING custom SQL"); + + public static final Option WAREHOUSE_NAME = + Options.key("warehouse_name") + .stringType() + .noDefaultValue() + .withDescription("Warehouse name"); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf index d053a600088d..7240a52ed406 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf @@ -102,5 +102,6 @@ sink { format = "JSON" strip_outer_array = true } + warehouse_name = "default_warehouse" } } \ No newline at end of file