diff --git a/docs/ecosystem/doris-kafka-connector.md b/docs/ecosystem/doris-kafka-connector.md
index b490566affe0b..e72ca25352b04 100644
--- a/docs/ecosystem/doris-kafka-connector.md
+++ b/docs/ecosystem/doris-kafka-connector.md
@@ -197,9 +197,9 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris password |
| doris.database | - | - | Y | The database to write to. It can be empty when there are multiple libraries. At the same time, the specific library name needs to be configured in topic2table.map. |
| doris.topic2table.map | - | - | N | The corresponding relationship between topic and table table, for example: topic1:tb1,topic2:tb2
The default is empty, indicating that topic and table names correspond one to one.
The format of multiple libraries is topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 10000 records |
+| buffer.count.records | - | 50000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 50000 records |
| buffer.flush.time | - | 120 | N | Buffer refresh interval, in seconds, default 120 seconds |
-| buffer.size.bytes | - | 5000000(5MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB |
+| buffer.size.bytes | - | 10485760(100MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 100MB |
| jmx | - | true | N | To obtain connector internal monitoring indicators through JMX, please refer to: [Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/en/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | Whether to enable two-phase commit (TwoPhaseCommit) of Stream Load, the default is true. |
| enable.delete | - | false | N | Whether to delete records synchronously, default false |
@@ -212,7 +212,10 @@ errors.deadletterqueue.topic.replication.factor=1
| database.time_zone | - | UTC | N | When `converter.mode` is not `normal` mode, it provides a way to specify time zone conversion for date data types (such as datetime, date, timestamp, etc.). The default is UTC time zone. |
| avro.topic2schema.filepath | - | - | N | By reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent.
This configuration needs to be used with the `key.converter` or `value.converter` prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: `"value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
For specific usage, please refer to: [#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |
-| enable.combine.flush | `true`,
`false` | false | N | Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed.|
+| enable.combine.flush | `true`,
`false` | false | N | Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed. |
+| max.retries | - | 10 | N | The maximum number of times to retry on errors before failing the task. |
+| retry.interval.ms | - | 6000 | N | The time in milliseconds to wait following an error before attempting a retry. |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | Defined how to handle records with null values. |
For other Kafka Connect Sink common configuration items, please refer to: [connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
index 23d3a4f694a04..ec4d082e697f7 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
@@ -198,15 +198,15 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris 密码 |
| doris.database | - | - | Y | 要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称 |
| doris.topic2table.map | - | - | N | topic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2
默认为空,表示 topic 和 table 名称一一对应。
多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录 |
-| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
-| buffer.size.bytes | - | 5000000(5MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 5MB |
+| buffer.count.records | - | 50000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 50000 条记录 |
+| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
+| buffer.size.bytes | - | 10485760(100MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 100MB |
| jmx | - | true | N | 通过 JMX 获取 Connector 内部监控指标,请参考:[Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/zh-CN/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | 是否开启 Stream Load 的两阶段提交 (TwoPhaseCommit),默认为 true。 |
| enable.delete | - | false | N | 是否同步删除记录,默认 false |
| label.prefix | - | ${name} | N | Stream load 导入数据时的 label 前缀。默认为 Connector 应用名称。 |
| auto.redirect | - | true | N | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 重定向到需要写入数据的 BE,并且不再显示获取 BE 信息 |
-| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
+| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
| delivery.guarantee | `at_least_once`,
`exactly_once` | at_least_once | N | 消费 Kafka 数据导入至 doris 时,数据一致性的保障方式。支持 `at_least_once` `exactly_once`,默认为 `at_least_once` 。Doris 需要升级至 2.1.0 以上,才能保障数据的 `exactly_once` |
| converter.mode | `normal`,
`debezium_ingestion` | normal | N | 使用 Connector 消费 Kafka 数据时,上游数据的类型转换模式。
```normal```表示正常消费 Kafka 中的数据,不经过任何类型转换。
```debezium_ingestion```表示当 Kafka 上游的数据通过 Debezium 等 CDC(Changelog Data Capture,变更数据捕获)工具采集时,上游数据需要经过特殊的类型转换才能支持。 |
| debezium.schema.evolution | `none`,
`basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。
`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。
`basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
@@ -214,6 +214,9 @@ errors.deadletterqueue.topic.replication.factor=1
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。
此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。配置详情参考:[#58](https://github.com/apache/doris-kafka-connector/pull/58) |
| enable.combine.flush | `true`,
`false`| false | N | 是否将所有分区的数据合并在一起写入。默认值为 false。开启后只能保证 at_least_once 语义。|
+| max.retries | - | 10 | N | 任务失败前重试错误的最大次数。 |
+| retry.interval.ms | - | 6000 | N | 发生错误后,尝试重试之前的等待时间,单位为毫秒,默认6000毫秒。 |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | 如何处理null值的记录,默认跳过不处理。 |
其他 Kafka Connect Sink 通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
index c2ae80891cdc7..5733e21bbb718 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
@@ -19,7 +19,7 @@ maven 依赖
org.apache.doris
doris-kafka-connector
- 1.0.0
+ 25.0.0
```
@@ -189,22 +189,26 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris 密码 |
| doris.database | - | - | Y | 要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称 |
| doris.topic2table.map | - | - | N | topic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2
默认为空,表示 topic 和 table 名称一一对应。
多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录 |
-| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
-| buffer.size.bytes | - | 5000000(5MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 5MB |
+| buffer.count.records | - | 50000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 50000 条记录 |
+| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
+| buffer.size.bytes | - | 10485760(100MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 100MB |
| jmx | - | true | N | 通过 JMX 获取 Connector 内部监控指标,请参考:[Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/zh-CN/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | 是否开启 Stream Load 的两阶段提交 (TwoPhaseCommit),默认为 true。 |
| enable.delete | - | false | N | 是否同步删除记录,默认 false |
| label.prefix | - | ${name} | N | Stream load 导入数据时的 label 前缀。默认为 Connector 应用名称。 |
| auto.redirect | - | true | N | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 重定向到需要写入数据的 BE,并且不再显示获取 BE 信息 |
| load.model | `stream_load`,
`copy_into` | stream_load | N | 导入数据的方式。支持 `stream_load` 直接数据导入到 Doris 中;同时支持 `copy_into` 的方式导入数据至对象存储中,然后将数据加载至 Doris 中 |
-| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。 Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式 ,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
+| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/stream-load-manual.md)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。 Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式 ,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
| delivery.guarantee | `at_least_once`,
`exactly_once` | at_least_once | N | 消费 Kafka 数据导入至 doris 时,数据一致性的保障方式。支持 `at_least_once` `exactly_once`,默认为 `at_least_once` 。Doris 需要升级至 2.1.0 以上,才能保障数据的 `exactly_once` |
| converter.mode | `normal`,
`debezium_ingestion` | normal | N | 使用 Connector 消费 Kafka 数据时,上游数据的类型转换模式。
```normal```表示正常消费 Kafka 中的数据,不经过任何类型转换。
```debezium_ingestion```表示当 Kafka 上游的数据通过 Debezium 等 CDC (Changelog Data Capture,变更数据捕获)工具采集时,上游数据需要经过特殊的类型转换才能支持。 |
| debezium.schema.evolution | `none`,
`basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。
`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。
`basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
| database.time_zone | - | UTC | N | 当 `converter.mode` 为非 `normal` 模式时,对于日期数据类型(如 datetime, date, timestamp 等等)提供指定时区转换的方式,默认为 UTC 时区。 |
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。
此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。 配置详情参考: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |
+| enable.combine.flush | `true`,
`false` | false | N | 是否将所有分区的数据合并在一起写入。默认值为false。仅支持主键模型的表启用此选项(设置为true)。如果topic包含大量分区,启用此功能可以减少资源消耗。 |
+| max.retries | - | 10 | N | 任务失败前重试错误的最大次数。 |
+| retry.interval.ms | - | 6000 | N | 发生错误后,尝试重试之前的等待时间,单位为毫秒,默认6000毫秒。 |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | 如何处理null值的记录,默认跳过不处理。 |
其他Kafka Connect Sink通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
index c56299cbfd297..1e3adce577ef0 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
@@ -198,15 +198,15 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris 密码 |
| doris.database | - | - | Y | 要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称 |
| doris.topic2table.map | - | - | N | topic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2
默认为空,表示 topic 和 table 名称一一对应。
多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录 |
-| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
-| buffer.size.bytes | - | 5000000(5MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 5MB |
+| buffer.count.records | - | 50000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 50000 条记录 |
+| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
+| buffer.size.bytes | - | 10485760(100MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 100MB |
| jmx | - | true | N | 通过 JMX 获取 Connector 内部监控指标,请参考:[Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/zh-CN/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | 是否开启 Stream Load 的两阶段提交 (TwoPhaseCommit),默认为 true。 |
| enable.delete | - | false | N | 是否同步删除记录,默认 false |
| label.prefix | - | ${name} | N | Stream load 导入数据时的 label 前缀。默认为 Connector 应用名称。 |
| auto.redirect | - | true | N | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 重定向到需要写入数据的 BE,并且不再显示获取 BE 信息 |
-| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
+| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
| delivery.guarantee | `at_least_once`,
`exactly_once` | at_least_once | N | 消费 Kafka 数据导入至 doris 时,数据一致性的保障方式。支持 `at_least_once` `exactly_once`,默认为 `at_least_once` 。Doris 需要升级至 2.1.0 以上,才能保障数据的 `exactly_once` |
| converter.mode | `normal`,
`debezium_ingestion` | normal | N | 使用 Connector 消费 Kafka 数据时,上游数据的类型转换模式。
```normal```表示正常消费 Kafka 中的数据,不经过任何类型转换。
```debezium_ingestion```表示当 Kafka 上游的数据通过 Debezium 等 CDC(Changelog Data Capture,变更数据捕获)工具采集时,上游数据需要经过特殊的类型转换才能支持。 |
| debezium.schema.evolution | `none`,
`basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。
`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。
`basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
@@ -214,6 +214,9 @@ errors.deadletterqueue.topic.replication.factor=1
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。
此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。配置详情参考:[#58](https://github.com/apache/doris-kafka-connector/pull/58) |
| enable.combine.flush | `true`,
`false` | false | N | 是否将所有分区的数据合并在一起写入。默认值为 false。开启后只能保证 at_least_once 语义。|
+| max.retries | - | 10 | N | 任务失败前重试错误的最大次数。 |
+| retry.interval.ms | - | 6000 | N | 发生错误后,尝试重试之前的等待时间,单位为毫秒,默认6000毫秒。 |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | 如何处理null值的记录,默认跳过不处理。 |
其他 Kafka Connect Sink 通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
index 6a75dfb8c6159..81b54c79aaa19 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
@@ -198,15 +198,15 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris 密码 |
| doris.database | - | - | Y | 要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称 |
| doris.topic2table.map | - | - | N | topic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2
默认为空,表示 topic 和 table 名称一一对应。
多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录 |
-| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
-| buffer.size.bytes | - | 5000000(5MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 5MB |
+| buffer.count.records | - | 50000 | N | 在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 50000 条记录 |
+| buffer.flush.time | - | 120 | N | buffer 刷新间隔,单位秒,默认 120 秒 |
+| buffer.size.bytes | - | 10485760(100MB) | N | 每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认 100MB |
| jmx | - | true | N | 通过 JMX 获取 Connector 内部监控指标,请参考:[Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/zh-CN/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | 是否开启 Stream Load 的两阶段提交 (TwoPhaseCommit),默认为 true。 |
| enable.delete | - | false | N | 是否同步删除记录,默认 false |
| label.prefix | - | ${name} | N | Stream load 导入数据时的 label 前缀。默认为 Connector 应用名称。 |
| auto.redirect | - | true | N | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 重定向到需要写入数据的 BE,并且不再显示获取 BE 信息 |
-| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
+| sink.properties.* | - | `'sink.properties.format':'json'`,
`'sink.properties.read_json_by_line':'true'` | N | Stream Load 的导入参数。
例如:定义列分隔符`'sink.properties.column_separator':','`
详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
**开启 Group Commit**,例如开启 sync_mode 模式的 group commit:`"sink.properties.group_commit":"sync_mode"`。Group Commit 可以配置 `off_mode`、`sync_mode`、`async_mode` 三种模式,具体使用参考:[Group-Commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual/)
**开启部分列更新**,例如开启更新指定 col2 的部分列:`"sink.properties.partial_columns":"true"`, `"sink.properties.columns": "col2",` |
| delivery.guarantee | `at_least_once`,
`exactly_once` | at_least_once | N | 消费 Kafka 数据导入至 doris 时,数据一致性的保障方式。支持 `at_least_once` `exactly_once`,默认为 `at_least_once` 。Doris 需要升级至 2.1.0 以上,才能保障数据的 `exactly_once` |
| converter.mode | `normal`,
`debezium_ingestion` | normal | N | 使用 Connector 消费 Kafka 数据时,上游数据的类型转换模式。
```normal```表示正常消费 Kafka 中的数据,不经过任何类型转换。
```debezium_ingestion```表示当 Kafka 上游的数据通过 Debezium 等 CDC(Changelog Data Capture,变更数据捕获)工具采集时,上游数据需要经过特殊的类型转换才能支持。 |
| debezium.schema.evolution | `none`,
`basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。
`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。
`basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
@@ -214,6 +214,9 @@ errors.deadletterqueue.topic.replication.factor=1
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。
此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。配置详情参考:[#58](https://github.com/apache/doris-kafka-connector/pull/58) |
| enable.combine.flush | `true`,
`false` | false | N | 是否将所有分区的数据合并在一起写入。默认值为 false。开启后只能保证 at_least_once 语义。|
+| max.retries | - | 10 | N | 任务失败前重试错误的最大次数。 |
+| retry.interval.ms | - | 6000 | N | 发生错误后,尝试重试之前的等待时间,单位为毫秒,默认6000毫秒。 |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | 如何处理null值的记录,默认跳过不处理。 |
其他 Kafka Connect Sink 通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
index 5305cacf2822c..32313813528f1 100644
--- a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
@@ -19,7 +19,7 @@ maven dependencies
org.apache.doris
doris-kafka-connector
- 1.0.0
+ 25.0.0
```
@@ -188,9 +188,9 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris password |
| doris.database | - | - | Y | The database to write to. It can be empty when there are multiple libraries. At the same time, the specific library name needs to be configured in topic2table.map. |
| doris.topic2table.map | - | - | N | The corresponding relationship between topic and table table, for example: topic1:tb1,topic2:tb2
The default is empty, indicating that topic and table names correspond one to one.
The format of multiple libraries is topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 10000 records |
+| buffer.count.records | - | 50000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 50000 records |
| buffer.flush.time | - | 120 | N | Buffer refresh interval, in seconds, default 120 seconds |
-| buffer.size.bytes | - | 5000000(5MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB |
+| buffer.size.bytes | - | 10485760(100MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 100MB |
| jmx | - | true | N | To obtain connector internal monitoring indicators through JMX, please refer to: [Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/en/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | Whether to enable two-phase commit (TwoPhaseCommit) of Stream Load, the default is true. |
| enable.delete | - | false | N | Whether to delete records synchronously, default false |
@@ -203,7 +203,11 @@ errors.deadletterqueue.topic.replication.factor=1
| debezium.schema.evolution | `none`,
`basic` | none | N | Use Debezium to collect upstream database systems (such as MySQL), and when structural changes occur, the added fields can be synchronized to Doris.
`none` means that when the structure of the upstream database system changes, the changed structure will not be synchronized to Doris.
`basic` means synchronizing the data change operation of the upstream database. Since changing the column structure is a dangerous operation (it may lead to accidentally deleting columns of the Doris table structure), currently only the operation of adding columns synchronously upstream is supported. When a column is renamed, the old column remains unchanged, and the Connector will add a new column in the target table and sink the renamed new data into the new column. |
| database.time_zone | - | UTC | N | When `converter.mode` is not `normal` mode, it provides a way to specify time zone conversion for date data types (such as datetime, date, timestamp, etc.). The default is UTC time zone. |
| avro.topic2schema.filepath | - | - | N | By reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent.
This configuration needs to be used with the `key.converter` or `value.converter` prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: `"value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
For specific usage, please refer to: [#32](https://github.com/apache/doris-kafka-connector/pull/32) |
-| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |
+| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) | |
+| enable.combine.flush | `true`,
`false` | false | N | Whether to merge all partition data before writing. The default value is false. Enabling this option (setting to true) is only supported for tables with unique keys. If the topic contains a large number of partitions, enabling this feature can reduce resource consumption. |
+| max.retries | - | 10 | N | The maximum number of times to retry on errors before failing the task. |
+| retry.interval.ms | - | 6000 | N | The time in milliseconds to wait following an error before attempting a retry. |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | Defined how to handle records with null values. |
For other Kafka Connect Sink common configuration items, please refer to: [connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
index 94f044bdd75db..f768a58bbc9ff 100644
--- a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
@@ -197,9 +197,9 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris password |
| doris.database | - | - | Y | The database to write to. It can be empty when there are multiple libraries. At the same time, the specific library name needs to be configured in topic2table.map. |
| doris.topic2table.map | - | - | N | The corresponding relationship between topic and table table, for example: topic1:tb1,topic2:tb2
The default is empty, indicating that topic and table names correspond one to one.
The format of multiple libraries is topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 10000 records |
+| buffer.count.records | - | 50000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 50000 records |
| buffer.flush.time | - | 120 | N | Buffer refresh interval, in seconds, default 120 seconds |
-| buffer.size.bytes | - | 5000000(5MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB |
+| buffer.size.bytes | - | 10485760(100MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 100MB |
| jmx | - | true | N | To obtain connector internal monitoring indicators through JMX, please refer to: [Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/en/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | Whether to enable two-phase commit (TwoPhaseCommit) of Stream Load, the default is true. |
| enable.delete | - | false | N | Whether to delete records synchronously, default false |
@@ -212,7 +212,10 @@ errors.deadletterqueue.topic.replication.factor=1
| database.time_zone | - | UTC | N | When `converter.mode` is not `normal` mode, it provides a way to specify time zone conversion for date data types (such as datetime, date, timestamp, etc.). The default is UTC time zone. |
| avro.topic2schema.filepath | - | - | N | By reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent.
This configuration needs to be used with the `key.converter` or `value.converter` prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: `"value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
For specific usage, please refer to: [#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |
-| enable.combine.flush | `true`,
`false` | false | N | Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed.|
+| enable.combine.flush | `true`,
`false` | false | N | Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed.|
+| max.retries | - | 10 | N | The maximum number of times to retry on errors before failing the task. |
+| retry.interval.ms | - | 6000 | N | The time in milliseconds to wait following an error before attempting a retry. |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | Defined how to handle records with null values. |
For other Kafka Connect Sink common configuration items, please refer to: [connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)
diff --git a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
index 0329079e85263..167cc5ea5716e 100644
--- a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
@@ -197,9 +197,9 @@ errors.deadletterqueue.topic.replication.factor=1
| doris.password | - | - | Y | Doris password |
| doris.database | - | - | Y | The database to write to. It can be empty when there are multiple libraries. At the same time, the specific library name needs to be configured in topic2table.map. |
| doris.topic2table.map | - | - | N | The corresponding relationship between topic and table table, for example: topic1:tb1,topic2:tb2
The default is empty, indicating that topic and table names correspond one to one.
The format of multiple libraries is topic1:db1.tbl1,topic2:db2.tbl2 |
-| buffer.count.records | - | 10000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 10000 records |
+| buffer.count.records | - | 50000 | N | The number of records each Kafka partition buffers in memory before flushing to doris. Default 50000 records |
| buffer.flush.time | - | 120 | N | Buffer refresh interval, in seconds, default 120 seconds |
-| buffer.size.bytes | - | 5000000(5MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB |
+| buffer.size.bytes | - | 10485760(100MB) | N | The cumulative size of records buffered in memory for each Kafka partition, in bytes, default 100MB |
| jmx | - | true | N | To obtain connector internal monitoring indicators through JMX, please refer to: [Doris-Connector-JMX](https://github.com/apache/doris-kafka-connector/blob/master/docs/en/Doris-Connector-JMX.md) |
| enable.2pc | - | true | N | Whether to enable two-phase commit (TwoPhaseCommit) of Stream Load, the default is true. |
| enable.delete | - | false | N | Whether to delete records synchronously, default false |
@@ -213,6 +213,9 @@ errors.deadletterqueue.topic.replication.factor=1
| avro.topic2schema.filepath | - | - | N | By reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent.
This configuration needs to be used with the `key.converter` or `value.converter` prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: `"value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"`
For specific usage, please refer to: [#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |
| enable.combine.flush | `true`,
`false` | false | N | Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed.|
+| max.retries | - | 10 | N | The maximum number of times to retry on errors before failing the task. |
+| retry.interval.ms | - | 6000 | N | The time in milliseconds to wait following an error before attempting a retry. |
+| behavior.on.null.values | `ignore`,
`fail` | ignore | N | Defined how to handle records with null values. |
For other Kafka Connect Sink common configuration items, please refer to: [connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)