Skip to content

[Enhancement] Support to specify warehouse for sr flink source connector#423

Merged
banmoy merged 2 commits into
StarRocks:mainfrom
zhangbutao:fix_flink_source_warehouse
Jun 13, 2025
Merged

[Enhancement] Support to specify warehouse for sr flink source connector#423
banmoy merged 2 commits into
StarRocks:mainfrom
zhangbutao:fix_flink_source_warehouse

Conversation

@zhangbutao
Copy link
Copy Markdown
Contributor

@zhangbutao zhangbutao commented Apr 11, 2025

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Which issues of this PR fixes :

This PR is used for StarRocks/starrocks#57930
Support to specify warehouse for sr flink source connector using param `'scan.params.warehouse'='warehosue_name', and then pass the warehouse to fe through http header.

CREATE TABLE flink_test
(
    `id` INT,
    `name` STRING,
    `score` INT
)
WITH
(
    'connector'='starrocks',
    'scan-url'='192.168.xxx.xxx:8030',
    'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
    'username'='xxxxxx',
    'password'='xxxxxx',
    'database-name'='test',
    'table-name'='score_board',
    'scan.params.warehouse'='warehosue_name'
);

`
Fixes #

Problem Summary(Required) :

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr will affect users' behaviors
  • This pr needs user documentation (for new or modified features or behaviors)
  • I have added documentation for my new feature or new function

Signed-off-by: zhangbutao <zhangbutao@cmss.chinamobile.com>
HttpPost post = new HttpPost(url);
post.setHeader("Content-Type", "application/json;charset=UTF-8");
post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword()));
post.setHeader("warehouse", sourceOptions.getWarehouseName());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the default value if the specific key is not configured?

Signed-off-by: zhangbutao <zhangbutao@cmss.chinamobile.com>
post.setHeader("Content-Type", "application/json;charset=UTF-8");
post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword()));
Optional.ofNullable(sourceOptions.getWarehouseName())
.ifPresent(warehouse -> post.setHeader("warehouse", warehouse));
Copy link
Copy Markdown
Contributor Author

@zhangbutao zhangbutao Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevincai I refine this code to avoid setting warehouse header(null value) in case of flink sink table not having 'scan.params.warehouse'='warehosue_name'. And then fe will use the default_warehouse if not getting the warehouse header.

@zhangbutao
Copy link
Copy Markdown
Contributor Author

Can we merge this change? @kevincai
Thanks.

@banmoy banmoy merged commit f6de994 into StarRocks:main Jun 13, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants