Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/en/connector-v2/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Reads data from Apache Hbase.
| is_binary_rowkey | boolean | No | false |
| start_rowkey | string | No | - |
| end_rowkey | string | No | - |
| start_row_inclusive | boolean | No | true |
| end_row_inclusive | boolean | No | false |
| common-options | | No | - |

### zookeeper_quorum [string]
Expand Down Expand Up @@ -73,6 +75,14 @@ The start row of the scan

The stop row of the scan

### start_row_inclusive

Whether to include the start row in the scan range. When set to true, the start row is included in the scan results. Default: true (inclusive).

### end_row_inclusive

Whether to include the end row in the scan range. When set to false, the end row is excluded from the scan results, following the left-closed-right-open convention [start, end). Default: false (exclusive).

### common-options

Common parameters for Source plugins, refer to [Common Source Options](../source-common-options.md).
Expand Down
10 changes: 10 additions & 0 deletions docs/zh/connector-v2/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import ChangeLog from '../changelog/connector-hbase.md';
| is_binary_rowkey | boolean | 否 | false |
| start_rowkey | string | 否 | - |
| end_rowkey | string | 否 | - |
| start_row_inclusive | boolean | 否 | true |
| end_row_inclusive | boolean | 否 | false |
| common-options | | 否 | - |

### zookeeper_quorum [string]
Expand Down Expand Up @@ -73,6 +75,14 @@ HBase 的行键既可以是文本字符串,也可以是二进制数据。在 S

扫描结束行

### start_row_inclusive

设置扫描范围是否包含起始行。当设置为 true 时,扫描结果将包含起始行。默认值: true (包含)。

### end_row_inclusive

设置扫描范围是否包含结束行。当设置为 false 时,扫描结果将不包含结束行,遵循左闭右开的区间约定 [start, end)。默认值: false (不包含)。

### 常用选项

Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ public ResultScanner scan(
HbaseSourceSplit split, HbaseParameters hbaseParameters, List<String> columnNames)
throws IOException {
Scan scan = new Scan();
scan.withStartRow(split.getStartRow(), true);
scan.withStopRow(split.getEndRow(), true);
scan.withStartRow(split.getStartRow(), hbaseParameters.isStartRowInclusive());
scan.withStopRow(split.getEndRow(), hbaseParameters.isEndRowInclusive());
scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
scan.setCaching(hbaseParameters.getCaching());
scan.setBatch(hbaseParameters.getBatch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public class HbaseParameters implements Serializable {
@Builder.Default
private HbaseSinkOptions.EnCoding enCoding = HbaseSinkOptions.ENCODING.defaultValue();

@Builder.Default
private boolean startRowInclusive = HbaseSourceOptions.START_ROW_INCLUSIVE.defaultValue();

@Builder.Default
private boolean endRowInclusive = HbaseSourceOptions.END_ROW_INCLUSIVE.defaultValue();

public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
String table = config.get(HbaseBaseOptions.TABLE);
Expand Down Expand Up @@ -143,6 +149,12 @@ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig)
if (pluginConfig.getOptional(HbaseSourceOptions.END_ROW_KEY).isPresent()) {
builder.endRowkey(pluginConfig.get(HbaseSourceOptions.END_ROW_KEY));
}
if (pluginConfig.getOptional(HbaseSourceOptions.START_ROW_INCLUSIVE).isPresent()) {
builder.startRowInclusive(pluginConfig.get(HbaseSourceOptions.START_ROW_INCLUSIVE));
}
if (pluginConfig.getOptional(HbaseSourceOptions.END_ROW_INCLUSIVE).isPresent()) {
builder.endRowInclusive(pluginConfig.get(HbaseSourceOptions.END_ROW_INCLUSIVE));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ public class HbaseSourceOptions extends HbaseBaseOptions {
.noDefaultValue()
.withDescription("Hbase scan end rowkey");

public static final Option<Boolean> START_ROW_INCLUSIVE =
Options.key("start_row_inclusive")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to include the start row in the scan. Default is true (inclusive).");

public static final Option<Boolean> END_ROW_INCLUSIVE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to maintain consistency with the previous logic, i think the default value of end_row_inclusive should also be true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @xiaochen-zhou suggest, to ensure that data can be read normally without duplication, we need to include only one side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi,Will the data be duplicated when both END_ROW_INCLUSIVE and START_ROW_INCLUSIVE are true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intended result of the configuration should be to control whether the boundaries are included. However, I believe that when both sides are set to false, it will result in shards like (start, 1), (1, 3), (3, end), which causes the loss of boundary data. Conversely, it will lead to duplicate boundary data. Instead of adjusting the boundaries of each split, we should only modify the inclusion settings of the start and end points

Options.key("end_row_inclusive")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to include the end row in the scan. Default is false (exclusive), following the left-closed-right-open convention.");

public static final Option<Boolean> IS_BINARY_ROW_KEY =
Options.key("is_binary_rowkey")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,24 @@ public void testHbaseSourceWithBatchQuery(TestContainer container)
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}

@TestTemplate
public void testHbaseSourceWithStartEndInclusive(TestContainer container)
throws IOException, InterruptedException {
fakeToHbaseArray(container);
Container.ExecResult sourceExecResult =
container.executeJob("/hbase-source-with-start-end-inclusive.conf");
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}

@TestTemplate
public void testHbaseSourceWithDefaultInclusive(TestContainer container)
throws IOException, InterruptedException {
fakeToHbaseArray(container);
Container.ExecResult sourceExecResult =
container.executeJob("/hbase-source-with-default-inclusive.conf");
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}

@TestTemplate
public void testCatalog(TestContainer container) {
// create exiting table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
query_columns=["rowkey", "info:name", "info:score"]
caching = 1000
batch = 100
cache_blocks = false
is_binary_rowkey = false
start_rowkey = "A"
end_rowkey = "C"
# Test default values: start_row_inclusive = true (default), end_row_inclusive = false (default)
# This should scan [A, C), which includes A and B, but excludes C
schema = {
columns = [
{
name = rowkey
type = string
},
{
name = "info:name"
type = string
},
{
name = "info:score"
type = int
}
]
}
}
}

sink {
Assert {
rules = {
row_rules = [
{
rule_type = "MIN_ROW"
rule_value = 2
},
{
rule_type = "MAX_ROW"
rule_value = 2
}
]
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ source {
cache_blocks = false
is_binary_rowkey = false
end_rowkey = "A"
end_row_inclusive = true
schema = {
columns = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ source {
is_binary_rowkey = false
start_rowkey = "B"
end_rowkey = "C"
end_row_inclusive = true
schema = {
columns = [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
query_columns=["rowkey", "info:name", "info:score"]
caching = 1000
batch = 100
cache_blocks = false
is_binary_rowkey = false
start_rowkey = "A"
end_rowkey = "C"
start_row_inclusive = true
end_row_inclusive = true
schema = {
columns = [
{
name = rowkey
type = string
},
{
name = "info:name"
type = string
},
{
name = "info:score"
type = int
}
]
}
}
}

sink {
Assert {
rules = {
row_rules = [
{
rule_type = "MIN_ROW"
rule_value = 3
},
{
rule_type = "MAX_ROW"
rule_value = 3
}
]
}
}
}