From 352bd7b6fd70f2919243833d4d08ab810ca58d7f Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 8 Mar 2024 10:34:40 +0800 Subject: [PATCH] feat: max-txn-ops option (#3458) * feat: max-txn-ops limit * chore: by comment --- src/cmd/src/cli/bench.rs | 4 ++- src/cmd/src/cli/upgrade.rs | 2 +- src/cmd/src/metasrv.rs | 8 +++++- src/common/meta/src/error.rs | 9 +++++++ src/common/meta/src/key.rs | 33 ++++++++++++++++++++++- src/common/meta/src/key/test_utils.rs | 11 ++++++-- src/common/meta/src/kv_backend/etcd.rs | 36 ++++++++++++++------------ src/common/meta/src/kv_backend/txn.rs | 10 ++++++- src/meta-srv/examples/kv_store.rs | 4 ++- src/meta-srv/src/bootstrap.rs | 3 ++- src/meta-srv/src/metasrv.rs | 12 +++++++++ src/meta-srv/src/mocks.rs | 2 +- src/operator/src/insert.rs | 2 -- tests-integration/src/cluster.rs | 2 +- 14 files changed, 109 insertions(+), 29 deletions(-) diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 60405e812819..e441e7643d98 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -62,7 +62,9 @@ pub struct BenchTableMetadataCommand { impl BenchTableMetadataCommand { pub async fn build(&self) -> Result { - let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap(); + let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) + .await + .unwrap(); let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store)); diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 6936b13fd7b4..0aa787fbe965 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -70,7 +70,7 @@ impl UpgradeCommand { etcd_addr: &self.etcd_addr, })?; let tool = MigrateTableMetadata { - etcd_store: EtcdStore::with_etcd_client(client), + etcd_store: EtcdStore::with_etcd_client(client, 128), dryrun: self.dryrun, skip_catalog_keys: self.skip_catalog_keys, skip_table_global_keys: self.skip_table_global_keys, diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 97c26af0e1ed..0e45ffa461a8 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -117,10 +117,12 @@ struct StartCommand { /// The working home directory of this metasrv instance. #[clap(long)] data_home: Option, - /// If it's not empty, the metasrv will store all data with this key prefix. #[clap(long, default_value = "")] store_key_prefix: String, + /// The max operations per txn + #[clap(long)] + max_txn_ops: Option, } impl StartCommand { @@ -181,6 +183,10 @@ impl StartCommand { opts.store_key_prefix = self.store_key_prefix.clone() } + if let Some(max_txn_ops) = self.max_txn_ops { + opts.max_txn_ops = max_txn_ops; + } + // Disable dashboard in metasrv. opts.http.disable_dashboard = true; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index cfe1cbf65602..56823fd2e9ab 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -67,6 +67,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))] + EtcdTxnFailed { + max_operations: usize, + #[snafu(source)] + error: etcd_client::Error, + location: Location, + }, + #[snafu(display("Failed to get sequence: {}", err_msg))] NextSequence { err_msg: String, location: Location }, @@ -400,6 +408,7 @@ impl ErrorExt for Error { IllegalServerState { .. } | EtcdTxnOpResponse { .. } | EtcdFailed { .. } + | EtcdTxnFailed { .. } | ConnectEtcd { .. } => StatusCode::Internal, SerdeJson { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 1d4ee73f9e4c..ea3f4c1eb571 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -464,7 +464,7 @@ impl TableMetadataManager { pub fn max_logical_tables_per_batch(&self) -> usize { // The batch size is max_txn_size / 3 because the size of the `tables_data` // is 3 times the size of the `tables_data`. - self.kv_backend.max_txn_size() / 3 + self.kv_backend.max_txn_ops() / 3 } /// Creates metadata for multiple logical tables and return an error if different metadata exists. @@ -860,6 +860,7 @@ mod tests { use bytes::Bytes; use common_time::util::current_time_millis; use futures::TryStreamExt; + use store_api::storage::RegionId; use table::metadata::{RawTableInfo, TableInfo}; use super::datanode_table::DatanodeTableKey; @@ -1056,6 +1057,36 @@ mod tests { ); } + #[tokio::test] + async fn test_create_many_logical_tables_metadata() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(kv_backend); + + let mut tables_data = vec![]; + for i in 0..128 { + let table_id = i + 1; + let regin_number = table_id * 3; + let region_id = RegionId::new(table_id, regin_number); + let region_route = new_region_route(region_id.as_u64(), 2); + let region_routes = vec![region_route.clone()]; + let table_info: RawTableInfo = test_utils::new_test_table_info_with_name( + table_id, + &format!("my_table_{}", table_id), + region_routes.iter().map(|r| r.region.id.region_number()), + ) + .into(); + let table_route_value = TableRouteValue::physical(region_routes.clone()); + + tables_data.push((table_info, table_route_value)); + } + + // creates metadata. + table_metadata_manager + .create_logical_tables_metadata(tables_data) + .await + .unwrap(); + } + #[tokio::test] async fn test_delete_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/test_utils.rs b/src/common/meta/src/key/test_utils.rs index f321091842ff..2d17ebabdb43 100644 --- a/src/common/meta/src/key/test_utils.rs +++ b/src/common/meta/src/key/test_utils.rs @@ -19,8 +19,9 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder}; use store_api::storage::TableId; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; -pub fn new_test_table_info>( +pub fn new_test_table_info_with_name>( table_id: TableId, + table_name: &str, region_numbers: I, ) -> TableInfo { let column_schemas = vec![ @@ -50,8 +51,14 @@ pub fn new_test_table_info>( TableInfoBuilder::default() .table_id(table_id) .table_version(5) - .name("mytable") + .name(table_name) .meta(meta) .build() .unwrap() } +pub fn new_test_table_info>( + table_id: TableId, + region_numbers: I, +) -> TableInfo { + new_test_table_info_with_name(table_id, "mytable", region_numbers) +} diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 1b14bc6994b0..56323d12f8c3 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -33,12 +33,6 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; -// Maximum number of operations permitted in a transaction. -// The etcd default configuration's `--max-txn-ops` is 128. -// -// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ -const MAX_TXN_SIZE: usize = 128; - fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue { let (key, value) = kv.into_key_value(); KeyValue { key, value } @@ -46,10 +40,15 @@ fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue { pub struct EtcdStore { client: Client, + // Maximum number of operations permitted in a transaction. + // The etcd default configuration's `--max-txn-ops` is 128. + // + // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ + max_txn_ops: usize, } impl EtcdStore { - pub async fn with_endpoints(endpoints: S) -> Result + pub async fn with_endpoints(endpoints: S, max_txn_ops: usize) -> Result where E: AsRef, S: AsRef<[E]>, @@ -58,16 +57,19 @@ impl EtcdStore { .await .context(error::ConnectEtcdSnafu)?; - Ok(Self::with_etcd_client(client)) + Ok(Self::with_etcd_client(client, max_txn_ops)) } - pub fn with_etcd_client(client: Client) -> KvBackendRef { - Arc::new(Self { client }) + pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef { + Arc::new(Self { + client, + max_txn_ops, + }) } async fn do_multi_txn(&self, txn_ops: Vec) -> Result> { - let max_txn_size = self.max_txn_size(); - if txn_ops.len() < max_txn_size { + let max_txn_ops = self.max_txn_ops(); + if txn_ops.len() < max_txn_ops { // fast path let _timer = METRIC_META_TXN_REQUEST .with_label_values(&["etcd", "txn"]) @@ -83,7 +85,7 @@ impl EtcdStore { } let txns = txn_ops - .chunks(max_txn_size) + .chunks(max_txn_ops) .map(|part| async move { let _timer = METRIC_META_TXN_REQUEST .with_label_values(&["etcd", "txn"]) @@ -311,18 +313,20 @@ impl TxnService for EtcdStore { .with_label_values(&["etcd", "txn"]) .start_timer(); + let max_operations = txn.max_operations(); + let etcd_txn: Txn = txn.into(); let txn_res = self .client .kv_client() .txn(etcd_txn) .await - .context(error::EtcdFailedSnafu)?; + .context(error::EtcdTxnFailedSnafu { max_operations })?; txn_res.try_into() } - fn max_txn_size(&self) -> usize { - MAX_TXN_SIZE + fn max_txn_ops(&self) -> usize { + self.max_txn_ops } } diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index ac9dc6cd6b22..f5db5a5cdb24 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::max; + use common_error::ext::ErrorExt; use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse}; @@ -27,7 +29,7 @@ pub trait TxnService: Sync + Send { } /// Maximum number of operations permitted in a transaction. - fn max_txn_size(&self) -> usize { + fn max_txn_ops(&self) -> usize { usize::MAX } } @@ -192,6 +194,12 @@ impl Txn { self.req.failure = operations.into(); self } + + #[inline] + pub fn max_operations(&self) -> usize { + let opc = max(self.req.compare.len(), self.req.success.len()); + max(opc, self.req.failure.len()) + } } impl From for TxnRequest { diff --git a/src/meta-srv/examples/kv_store.rs b/src/meta-srv/examples/kv_store.rs index c11bc0508efb..c0cd41b6ad7c 100644 --- a/src/meta-srv/examples/kv_store.rs +++ b/src/meta-srv/examples/kv_store.rs @@ -24,7 +24,9 @@ fn main() { #[tokio::main] async fn run() { - let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap(); + let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"], 128) + .await + .unwrap(); // put let put_req = PutRequest { diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7ed7df942263..858b62a8d213 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -193,7 +193,8 @@ pub async fn metasrv_builder( (None, false) => { let etcd_client = create_etcd_client(opts).await?; let kv_backend = { - let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone()); + let etcd_backend = + EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); if !opts.store_key_prefix.is_empty() { Arc::new(ChrootKvBackend::new( opts.store_key_prefix.clone().into_bytes(), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 2aafe519bff0..be126d218f0d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -79,6 +79,17 @@ pub struct MetaSrvOptions { pub wal: MetaSrvWalConfig, pub export_metrics: ExportMetricsOption, pub store_key_prefix: String, + /// The max operations per txn + /// + /// This value is usually limited by which store is used for the `KvBackend`. + /// For example, if using etcd, this value should ensure that it is less than + /// or equal to the `--max-txn-ops` option value of etcd. + /// + /// TODO(jeremy): Currently, this option only affects the etcd store, but it may + /// also affect other stores in the future. In other words, each store needs to + /// limit the number of operations in a txn because an infinitely large txn could + /// potentially block other operations. + pub max_txn_ops: usize, } impl MetaSrvOptions { @@ -112,6 +123,7 @@ impl Default for MetaSrvOptions { wal: MetaSrvWalConfig::default(), export_metrics: ExportMetricsOption::default(), store_key_prefix: String::new(), + max_txn_ops: 128, } } } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index c042e379243e..2228a2dc5a4b 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -42,7 +42,7 @@ pub async fn mock_with_memstore() -> MockInfo { } pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { - let kv_backend = EtcdStore::with_endpoints([addr]).await.unwrap(); + let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap(); mock(Default::default(), kv_backend, None, None).await } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 7aae77df9593..6102b8354e96 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -468,8 +468,6 @@ impl Inserter { &req.table_name, ); - info!("Logical table `{table_ref}` does not exist, try creating table"); - let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 026261551204..197d3dead15a 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -94,7 +94,7 @@ impl GreptimeDbClusterBuilder { .split(',') .map(|s| s.to_string()) .collect::>(); - let backend = EtcdStore::with_endpoints(endpoints) + let backend = EtcdStore::with_endpoints(endpoints, 128) .await .expect("malformed endpoints"); // Each retry requires a new isolation namespace.