From d812053b4fcf911638d04fafdab3fbb471207d2b Mon Sep 17 00:00:00 2001 From: zhangbutao Date: Fri, 11 Apr 2025 17:22:42 +0800 Subject: [PATCH 1/2] [Enhancement] Support to specify warehouse for sr flink source connector Signed-off-by: zhangbutao --- .../flink/manager/StarRocksQueryPlanVisitor.java | 1 + .../flink/table/source/StarRocksSourceOptions.java | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index 576f5cc05..ea19635b9 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -112,6 +112,7 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock HttpPost post = new HttpPost(url); post.setHeader("Content-Type", "application/json;charset=UTF-8"); post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword())); + post.setHeader("warehouse", sourceOptions.getWarehouseName()); post.setEntity(new ByteArrayEntity(body.getBytes())); try (CloseableHttpResponse response = httpClient.execute(post)) { requsetCode = response.getStatusLine().getStatusCode(); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java index bbdc10a05..f5833f984 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java @@ -33,6 +33,7 @@ public class StarRocksSourceOptions implements Serializable { private final ReadableConfig tableOptions; private final Map tableOptionsMap; private final Map beScanProps = new HashMap<>(); + public static final String SOURCE_PROPERTIES_PREFIX = "scan.params."; // required Options @@ -53,6 +54,9 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType().noDefaultValue().withDescription("Table name"); + + public static final ConfigOption WAREHOUSE_NAME = ConfigOptions.key(SOURCE_PROPERTIES_PREFIX + "warehouse") + .stringType().noDefaultValue().withDescription("Warehouse name"); // optional Options @@ -103,7 +107,6 @@ public class StarRocksSourceOptions implements Serializable { .intType().defaultValue(1).withDescription("the max retry times if lookup database failed."); - public static final String SOURCE_PROPERTIES_PREFIX = "scan.params."; public StarRocksSourceOptions(ReadableConfig options, Map optionsMap) { this.tableOptions = options; @@ -167,6 +170,10 @@ public String getTableName() { return tableOptions.get(TABLE_NAME); } + public String getWarehouseName() { + return tableOptions.get(WAREHOUSE_NAME); + } + // optional Options public int getConnectTimeoutMs() { From dff3263f6435be93a23ab4c427884bbf89c0e2d8 Mon Sep 17 00:00:00 2001 From: zhangbutao Date: Wed, 16 Apr 2025 18:02:50 +0800 Subject: [PATCH 2/2] Do not set warehouse header if not set scan.params.warehouse Signed-off-by: zhangbutao --- .../connector/flink/manager/StarRocksQueryPlanVisitor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index ea19635b9..d27486262 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; @@ -112,7 +113,8 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock HttpPost post = new HttpPost(url); post.setHeader("Content-Type", "application/json;charset=UTF-8"); post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword())); - post.setHeader("warehouse", sourceOptions.getWarehouseName()); + Optional.ofNullable(sourceOptions.getWarehouseName()) + .ifPresent(warehouse -> post.setHeader("warehouse", warehouse)); post.setEntity(new ByteArrayEntity(body.getBytes())); try (CloseableHttpResponse response = httpClient.execute(post)) { requsetCode = response.getStatusLine().getStatusCode();