Skip to content

refactor: remove backoff config #5808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `64MB` | The size of the metadata store log file. |
Expand Down Expand Up @@ -349,10 +345,6 @@
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
| `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. |
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
| `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
Expand Down Expand Up @@ -436,10 +428,6 @@
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
Expand Down
16 changes: 0 additions & 16 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,6 @@ max_batch_bytes = "1MB"
## **It's only used when the provider is `kafka`**.
consumer_wait_timeout = "100ms"

## The initial backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_init = "500ms"

## The maximum backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_max = "10s"

## The exponential backoff rate, i.e. next backoff = base * current backoff.
## **It's only used when the provider is `kafka`**.
backoff_base = 2

## The deadline of retries.
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

## Whether to enable WAL index creation.
## **It's only used when the provider is `kafka`**.
create_index = true
Expand Down
11 changes: 0 additions & 11 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,6 @@ replication_factor = 1

## Above which a topic creation operation will be cancelled.
create_topic_timeout = "30s"
## The initial backoff for kafka clients.
backoff_init = "500ms"

## The maximum backoff for kafka clients.
backoff_max = "10s"

## Exponential backoff rate, i.e. next backoff = base * current backoff.
backoff_base = 2

## Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
Expand Down
16 changes: 0 additions & 16 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,6 @@ max_batch_bytes = "1MB"
## **It's only used when the provider is `kafka`**.
consumer_wait_timeout = "100ms"

## The initial backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_init = "500ms"

## The maximum backoff delay.
## **It's only used when the provider is `kafka`**.
backoff_max = "10s"

## The exponential backoff rate, i.e. next backoff = base * current backoff.
## **It's only used when the provider is `kafka`**.
backoff_base = 2

## The deadline of retries.
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
Expand Down
10 changes: 2 additions & 8 deletions src/common/meta/src/wal_options_allocator/topic_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::ResultExt;

use crate::error::{
Expand Down Expand Up @@ -127,16 +127,10 @@ impl KafkaTopicCreator {

pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
max_backoff: config.backoff.max,
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
Expand Down
15 changes: 0 additions & 15 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
}),
Expand All @@ -65,7 +64,6 @@ impl From<MetasrvWalConfig> for DatanodeWalConfig {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
}),
Expand All @@ -84,7 +82,6 @@ mod tests {
use tests::kafka::common::KafkaTopicConfig;

use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::TopicSelectorType;

Expand Down Expand Up @@ -175,12 +172,6 @@ mod tests {
client_key_path: None,
}),
},
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
Expand Down Expand Up @@ -212,12 +203,6 @@ mod tests {
},
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
Expand Down
39 changes: 8 additions & 31 deletions src/common/wal/src/config/kafka/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,21 @@ use std::sync::Arc;
use std::time::Duration;

use rskafka::client::{Credentials, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use snafu::{OptionExt, ResultExt};

pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
base: 2.0,
deadline: Some(Duration::from_secs(40)),
};

use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};

with_prefix!(pub backoff_prefix "backoff_");

/// Backoff configurations for kafka client.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct BackoffConfig {
/// The initial backoff delay.
#[serde(with = "humantime_serde")]
pub init: Duration,
/// The maximum backoff delay.
#[serde(with = "humantime_serde")]
pub max: Duration,
/// The exponential backoff rate, i.e. next backoff = base * current backoff.
pub base: u32,
/// The deadline of retries. `None` stands for no deadline.
#[serde(with = "humantime_serde")]
pub deadline: Option<Duration>,
}

impl Default for BackoffConfig {
fn default() -> Self {
Self {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

/// The SASL configurations for kafka client.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaClientSasl {
Expand Down
6 changes: 1 addition & 5 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};

use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
use crate::config::kafka::common::KafkaTopicConfig;

/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand All @@ -34,9 +34,6 @@ pub struct DatanodeKafkaConfig {
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
Expand All @@ -57,7 +54,6 @@ impl Default for DatanodeKafkaConfig {
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
auto_create_topics: true,
create_index: true,
Expand Down
6 changes: 1 addition & 5 deletions src/common/wal/src/config/kafka/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use serde::{Deserialize, Serialize};

use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
use crate::config::kafka::common::KafkaTopicConfig;

/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -24,9 +24,6 @@ pub struct MetasrvKafkaConfig {
/// The kafka connection config.
#[serde(flatten)]
pub connection: KafkaConnectionConfig,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
Expand All @@ -38,7 +35,6 @@ impl Default for MetasrvKafkaConfig {
fn default() -> Self {
Self {
connection: Default::default(),
backoff: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
}
Expand Down
11 changes: 3 additions & 8 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::DatanodeKafkaConfig;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::{Mutex, RwLock};
Expand Down Expand Up @@ -73,16 +73,11 @@ impl ClientManager {
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
max_backoff: config.backoff.max,
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
let mut builder =
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
Expand Down
Loading