From 0ca00f9a20d53b1acd8f7de917f0d1c0ce00c23c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 5 Mar 2024 16:47:14 +0800 Subject: [PATCH] refactor: enable feature flag openraft:storage-v2 (#14838) * refactor: enable feature flag openraft:storage-v2 Separate the implementations of RaftLogStorage and RaftStateMachine. * chore: tolerate time diff in expiration test --- Cargo.toml | 1 + src/binaries/metactl/snapshot.rs | 6 +- src/meta/raft-store/src/sm_v002/sm_v002.rs | 7 +- .../service/src/meta_service/meta_node.rs | 8 +- src/meta/service/src/store/mod.rs | 2 + .../src/store/raft_log_storage_impl.rs | 272 ++++++++++++ .../src/store/raft_state_machine_impl.rs | 182 ++++++++ src/meta/service/src/store/store.rs | 402 ------------------ .../it/meta_node/meta_node_kv_api_expire.rs | 4 +- src/meta/service/tests/it/store.rs | 21 +- 10 files changed, 481 insertions(+), 424 deletions(-) create mode 100644 src/meta/service/src/store/raft_log_storage_impl.rs create mode 100644 src/meta/service/src/store/raft_state_machine_impl.rs diff --git a/Cargo.toml b/Cargo.toml index 14aee5f590710..9238856b42df3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.9.0-alpha.8", features = [ "serde", "tracing-log", + "storage-v2", "loosen-follower-log-revert", # allows removing all data from a follower and restoring from the leader. ] } diff --git a/src/binaries/metactl/snapshot.rs b/src/binaries/metactl/snapshot.rs index 2577b2325fd5f..fa5d60758b66d 100644 --- a/src/binaries/metactl/snapshot.rs +++ b/src/binaries/metactl/snapshot.rs @@ -38,8 +38,8 @@ use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002; use databend_common_meta_raft_store::state::RaftState; use databend_common_meta_sled_store::get_sled_db; use databend_common_meta_sled_store::init_sled_db; +use databend_common_meta_sled_store::openraft::storage::RaftLogStorageExt; use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; -use databend_common_meta_sled_store::openraft::RaftStorage; use databend_common_meta_types::Cmd; use databend_common_meta_types::CommittedLeaderId; use databend_common_meta_types::Endpoint; @@ -360,7 +360,7 @@ async fn init_new_cluster( payload: EntryPayload::Membership(membership), }; - sto.append_to_log([entry]).await?; + sto.blocking_append([entry]).await?; // insert AddNodes logs for (node_id, node) in nodes { @@ -381,7 +381,7 @@ async fn init_new_cluster( }), }; - sto.append_to_log([entry]).await?; + sto.blocking_append([entry]).await?; } } diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index 19dafaa5ff99a..ed24e0d180fd7 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -232,18 +232,19 @@ impl SMV002 { Applier::new(self) } - pub async fn apply_entries<'a>( + pub async fn apply_entries( &mut self, - entries: impl IntoIterator, + entries: impl IntoIterator, ) -> Result, StorageIOError> { let mut applier = Applier::new(self); let mut res = vec![]; for ent in entries.into_iter() { + info!("apply: {}", *ent.get_log_id()); let log_id = *ent.get_log_id(); let r = applier - .apply(ent) + .apply(&ent) .await .map_err(|e| StorageIOError::apply(log_id, &e))?; res.push(r); diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index bb2785a403c04..25bdfdbaf41b4 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -36,7 +36,6 @@ use databend_common_meta_raft_store::ondisk::DataVersion; use databend_common_meta_raft_store::ondisk::DATA_VERSION; use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use databend_common_meta_sled_store::openraft; -use databend_common_meta_sled_store::openraft::storage::Adaptor; use databend_common_meta_sled_store::openraft::ChangeMembers; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::protobuf::raft_service_client::RaftServiceClient; @@ -156,8 +155,8 @@ pub struct MetaNodeStatus { pub last_seq: u64, } -pub type LogStore = Adaptor; -pub type SMStore = Adaptor; +pub type LogStore = RaftStore; +pub type SMStore = RaftStore; /// MetaRaft is a implementation of the generic Raft handling meta data R/W. pub type MetaRaft = Raft; @@ -205,7 +204,8 @@ impl MetaNodeBuilder { let net = Network::new(sto.clone()); - let (log_store, sm_store) = Adaptor::new(sto.clone()); + let log_store = sto.clone(); + let sm_store = sto.clone(); let raft = MetaRaft::new(node_id, Arc::new(config), net, log_store, sm_store) .await diff --git a/src/meta/service/src/store/mod.rs b/src/meta/service/src/store/mod.rs index 4c917cf234802..19fcbb03c3356 100644 --- a/src/meta/service/src/store/mod.rs +++ b/src/meta/service/src/store/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod raft_log_storage_impl; +mod raft_state_machine_impl; #[allow(clippy::module_inception)] mod store; mod store_inner; diff --git a/src/meta/service/src/store/raft_log_storage_impl.rs b/src/meta/service/src/store/raft_log_storage_impl.rs new file mode 100644 index 0000000000000..7558058e1bf4f --- /dev/null +++ b/src/meta/service/src/store/raft_log_storage_impl.rs @@ -0,0 +1,272 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::io::ErrorKind; +use std::ops::RangeBounds; + +use databend_common_base::base::tokio::io; +use databend_common_meta_sled_store::openraft::storage::LogFlushed; +use databend_common_meta_sled_store::openraft::storage::RaftLogStorage; +use databend_common_meta_sled_store::openraft::ErrorSubject; +use databend_common_meta_sled_store::openraft::ErrorVerb; +use databend_common_meta_sled_store::openraft::LogState; +use databend_common_meta_sled_store::openraft::OptionalSend; +use databend_common_meta_sled_store::openraft::RaftLogReader; +use databend_common_meta_types::Entry; +use databend_common_meta_types::LogId; +use databend_common_meta_types::StorageError; +use databend_common_meta_types::TypeConfig; +use databend_common_meta_types::Vote; +use log::debug; +use log::error; +use log::info; + +use crate::metrics::raft_metrics; +use crate::store::RaftStore; +use crate::store::ToStorageError; + +impl RaftLogReader for RaftStore { + #[minitrace::trace] + async fn try_get_log_entries + Clone + Debug + Send>( + &mut self, + range: RB, + ) -> Result, StorageError> { + debug!( + "try_get_log_entries: self.id={}, range: {:?}", + self.id, range + ); + + match self + .log + .read() + .await + .range_values(range) + .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) + { + Ok(entries) => Ok(entries), + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("try_get_log_entries", false); + Err(err) + } + } + } +} + +impl RaftLogStorage for RaftStore { + type LogReader = RaftStore; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let last_purged_log_id = match self + .log + .read() + .await + .get_last_purged() + .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) + { + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("get_log_state", false); + return Err(err); + } + Ok(r) => r, + }; + + let last = match self + .log + .read() + .await + .logs() + .last() + .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) + { + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("get_log_state", false); + return Err(err); + } + Ok(r) => r, + }; + + let last_log_id = match last { + None => last_purged_log_id, + Some(x) => Some(x.1.log_id), + }; + + debug!( + "get_log_state: ({:?},{:?}]", + last_purged_log_id, last_log_id + ); + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } + + async fn save_committed(&mut self, committed: Option) -> Result<(), StorageError> { + self.raft_state + .write() + .await + .save_committed(committed) + .await + .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write) + } + + async fn read_committed(&mut self) -> Result, StorageError> { + self.raft_state + .read() + .await + .read_committed() + .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read) + } + + #[minitrace::trace] + async fn save_vote(&mut self, hs: &Vote) -> Result<(), StorageError> { + info!(id = self.id, hs :? =(hs); "save_vote"); + + match self + .raft_state + .write() + .await + .save_vote(hs) + .await + .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write) + { + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("save_vote", true); + Err(err) + } + Ok(_) => Ok(()), + } + } + + #[minitrace::trace] + async fn read_vote(&mut self) -> Result, StorageError> { + match self + .raft_state + .read() + .await + .read_vote() + .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read) + { + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("read_vote", false); + Err(err) + } + Ok(vote) => Ok(vote), + } + } + + #[minitrace::trace] + async fn append( + &mut self, + entries: I, + callback: LogFlushed, + ) -> Result<(), StorageError> + where + I: IntoIterator + OptionalSend, + I::IntoIter: OptionalSend, + { + // TODO: it is bad: allocates a new vec. + let entries = entries + .into_iter() + .map(|x| { + info!("append_to_log: {}", x.log_id); + x + }) + .collect::>(); + + let res = match self.log.write().await.append(entries).await { + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("append_to_log", true); + Err(err) + } + Ok(_) => Ok(()), + }; + + callback.log_io_completed(res.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))); + + Ok(()) + } + + #[minitrace::trace] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + info!(id = self.id; "truncate: {}", log_id); + + match self + .log + .write() + .await + .range_remove(log_id.index..) + .await + .map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete) + { + Ok(_) => Ok(()), + Err(err) => { + raft_metrics::storage::incr_raft_storage_fail("delete_conflict_logs_since", true); + Err(err) + } + } + } + + #[minitrace::trace] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + info!(id = self.id, log_id :? =(&log_id); "purge upto: start"); + + if let Err(err) = self + .log + .write() + .await + .set_last_purged(log_id) + .await + .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write) + { + raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true); + return Err(err); + }; + + info!(id = self.id, log_id :? =(&log_id); "purge_logs_upto: Done: set_last_purged()"); + + let log = self.log.write().await.clone(); + + // Purge can be done in another task safely, because: + // + // - Next time when raft starts, it will read last_purged_log_id without examining the actual first log. + // And junk can be removed next time purge_logs_upto() is called. + // + // - Purging operates the start of the logs, and only committed logs are purged; + // while append and truncate operates on the end of the logs, + // it is safe to run purge && (append || truncate) concurrently. + databend_common_base::runtime::spawn({ + let id = self.id; + async move { + info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Start: asynchronous range_remove()"); + + let res = log.range_remove(..=log_id.index).await; + + if let Err(err) = res { + error!(id = id, log_id :? =(&log_id); "purge_logs_upto: in asynchronous error: {}", err); + raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true); + } + + info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Done: asynchronous range_remove()"); + } + }); + + Ok(()) + } +} diff --git a/src/meta/service/src/store/raft_state_machine_impl.rs b/src/meta/service/src/store/raft_state_machine_impl.rs new file mode 100644 index 0000000000000..bc6d9b52e88ba --- /dev/null +++ b/src/meta/service/src/store/raft_state_machine_impl.rs @@ -0,0 +1,182 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_raft_store::ondisk::DATA_VERSION; +use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO; +use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002; +use databend_common_meta_raft_store::state_machine::StoredSnapshot; +use databend_common_meta_sled_store::openraft::storage::RaftStateMachine; +use databend_common_meta_sled_store::openraft::ErrorVerb; +use databend_common_meta_sled_store::openraft::OptionalSend; +use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; +use databend_common_meta_types::AppliedState; +use databend_common_meta_types::Entry; +use databend_common_meta_types::ErrorSubject; +use databend_common_meta_types::LogId; +use databend_common_meta_types::Snapshot; +use databend_common_meta_types::SnapshotData; +use databend_common_meta_types::SnapshotMeta; +use databend_common_meta_types::StorageError; +use databend_common_meta_types::StoredMembership; +use databend_common_meta_types::TypeConfig; +use log::debug; +use log::error; +use log::info; + +use crate::metrics::raft_metrics; +use crate::metrics::server_metrics; +use crate::store::RaftStore; + +impl RaftSnapshotBuilder for RaftStore { + #[minitrace::trace] + async fn build_snapshot(&mut self) -> Result { + self.do_build_snapshot().await + } +} + +impl RaftStateMachine for RaftStore { + type SnapshotBuilder = RaftStore; + + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { + let sm = self.state_machine.read().await; + let last_applied = *sm.sys_data_ref().last_applied_ref(); + let last_membership = sm.sys_data_ref().last_membership_ref().clone(); + + debug!( + "applied_state: applied: {:?}, membership: {:?}", + last_applied, last_membership + ); + + Ok((last_applied, last_membership)) + } + + #[minitrace::trace] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where + I: IntoIterator + OptionalSend, + I::IntoIter: OptionalSend, + { + let mut sm = self.state_machine.write().await; + let res = sm.apply_entries(entries).await?; + + Ok(res) + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } + + #[minitrace::trace] + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + server_metrics::incr_applying_snapshot(1); + + let snapshot_store = SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); + + let temp = snapshot_store.new_temp().await.map_err(|e| { + StorageError::from_io_error(ErrorSubject::Snapshot(None), ErrorVerb::Write, e) + })?; + + Ok(Box::new(temp)) + } + + #[minitrace::trace] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result<(), StorageError> { + let data_size = snapshot.data_size().await.map_err(|e| { + StorageError::from_io_error( + ErrorSubject::Snapshot(Some(meta.signature())), + ErrorVerb::Read, + e, + ) + })?; + + info!( + id = self.id, + snapshot_size = data_size; + "decoding snapshot for installation" + ); + server_metrics::incr_applying_snapshot(-1); + + assert!(snapshot.is_temp()); + + let snapshot_store = SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); + + let d = snapshot_store + .commit_received(snapshot, meta) + .await + .map_err(|e| { + e.with_context(format_args!( + "commit received snapshot: {:?}", + meta.signature() + )) + })?; + + let d = Box::new(d); + + info!("snapshot meta: {:?}", meta); + + // Replace state machine with the new one + let res = self.do_install_snapshot(d).await; + match res { + Ok(_) => {} + Err(e) => { + raft_metrics::storage::incr_raft_storage_fail("install_snapshot", true); + error!("error: {:?} when install_snapshot", e); + } + }; + + // Update current snapshot. + let new_snapshot = StoredSnapshot { meta: meta.clone() }; + { + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(new_snapshot); + } + Ok(()) + } + + #[minitrace::trace] + async fn get_current_snapshot( + &mut self, + ) -> Result>, StorageError> + { + info!(id = self.id; "get snapshot start"); + let p = self.current_snapshot.read().await; + + let snap = match &*p { + Some(snapshot) => { + let meta = &snapshot.meta; + + let snapshot_store = + SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); + let d = snapshot_store + .load_snapshot(&meta.snapshot_id) + .await + .map_err(|e| e.with_meta("get snapshot", meta))?; + + Ok(Some(Snapshot { + meta: meta.clone(), + snapshot: Box::new(d), + })) + } + None => Ok(None), + }; + + info!("get snapshot complete"); + + snap + } +} diff --git a/src/meta/service/src/store/store.rs b/src/meta/service/src/store/store.rs index 01faf0cf7f9c3..8394212d06d0e 100644 --- a/src/meta/service/src/store/store.rs +++ b/src/meta/service/src/store/store.rs @@ -12,41 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; use std::ops::Deref; -use std::ops::RangeBounds; use std::sync::Arc; use databend_common_meta_raft_store::config::RaftConfig; -use databend_common_meta_raft_store::ondisk::DATA_VERSION; -use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO; -use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002; -use databend_common_meta_raft_store::state_machine::StoredSnapshot; -use databend_common_meta_sled_store::openraft::ErrorSubject; -use databend_common_meta_sled_store::openraft::ErrorVerb; -use databend_common_meta_sled_store::openraft::LogState; -use databend_common_meta_sled_store::openraft::RaftLogReader; -use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; -use databend_common_meta_sled_store::openraft::RaftStorage; -use databend_common_meta_types::AppliedState; -use databend_common_meta_types::Entry; -use databend_common_meta_types::LogId; use databend_common_meta_types::MetaStartupError; -use databend_common_meta_types::Snapshot; -use databend_common_meta_types::SnapshotData; -use databend_common_meta_types::SnapshotMeta; -use databend_common_meta_types::StorageError; -use databend_common_meta_types::StoredMembership; -use databend_common_meta_types::TypeConfig; -use databend_common_meta_types::Vote; -use log::debug; -use log::error; -use log::info; -use crate::metrics::raft_metrics; -use crate::metrics::server_metrics; use crate::store::StoreInner; -use crate::store::ToStorageError; /// A store that implements `RaftStorage` trait and provides full functions. /// @@ -85,377 +57,3 @@ impl Deref for RaftStore { &self.inner } } - -impl RaftLogReader for RaftStore { - #[minitrace::trace] - async fn try_get_log_entries + Clone + Debug + Send>( - &mut self, - range: RB, - ) -> Result, StorageError> { - debug!( - "try_get_log_entries: self.id={}, range: {:?}", - self.id, range - ); - - match self - .log - .read() - .await - .range_values(range) - .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) - { - Ok(entries) => Ok(entries), - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("try_get_log_entries", false); - Err(err) - } - } - } -} - -impl RaftSnapshotBuilder for RaftStore { - #[minitrace::trace] - async fn build_snapshot(&mut self) -> Result { - self.do_build_snapshot().await - } -} - -impl RaftStorage for RaftStore { - type LogReader = RaftStore; - type SnapshotBuilder = RaftStore; - - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } - - async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { - self.clone() - } - - async fn save_committed(&mut self, committed: Option) -> Result<(), StorageError> { - self.raft_state - .write() - .await - .save_committed(committed) - .await - .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write) - } - - async fn read_committed(&mut self) -> Result, StorageError> { - self.raft_state - .read() - .await - .read_committed() - .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read) - } - - async fn get_log_state(&mut self) -> Result, StorageError> { - let last_purged_log_id = match self - .log - .read() - .await - .get_last_purged() - .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) - { - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("get_log_state", false); - return Err(err); - } - Ok(r) => r, - }; - - let last = match self - .log - .read() - .await - .logs() - .last() - .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) - { - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("get_log_state", false); - return Err(err); - } - Ok(r) => r, - }; - - let last_log_id = match last { - None => last_purged_log_id, - Some(x) => Some(x.1.log_id), - }; - - debug!( - "get_log_state: ({:?},{:?}]", - last_purged_log_id, last_log_id - ); - - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - - #[minitrace::trace] - async fn save_vote(&mut self, hs: &Vote) -> Result<(), StorageError> { - info!(id = self.id, hs :? =(hs); "save_vote"); - - match self - .raft_state - .write() - .await - .save_vote(hs) - .await - .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write) - { - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("save_vote", true); - Err(err) - } - Ok(_) => Ok(()), - } - } - - #[minitrace::trace] - async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> Result<(), StorageError> { - info!(id = self.id; "delete_conflict_logs_since: {}", log_id); - - match self - .log - .write() - .await - .range_remove(log_id.index..) - .await - .map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete) - { - Ok(_) => Ok(()), - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("delete_conflict_logs_since", true); - Err(err) - } - } - } - - #[minitrace::trace] - async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> { - info!(id = self.id, log_id :? =(&log_id); "purge_logs_upto: start"); - - if let Err(err) = self - .log - .write() - .await - .set_last_purged(log_id) - .await - .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write) - { - raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true); - return Err(err); - }; - - info!(id = self.id, log_id :? =(&log_id); "purge_logs_upto: Done: set_last_purged()"); - - let log = self.log.write().await.clone(); - - // Purge can be done in another task safely, because: - // - // - Next time when raft starts, it will read last_purged_log_id without examining the actual first log. - // And junk can be removed next time purge_logs_upto() is called. - // - // - Purging operates the start of the logs, and only committed logs are purged; - // while append and truncate operates on the end of the logs, - // it is safe to run purge && (append || truncate) concurrently. - databend_common_base::runtime::spawn({ - let id = self.id; - async move { - info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Start: asynchronous range_remove()"); - - let res = log.range_remove(..=log_id.index).await; - - if let Err(err) = res { - error!(id = id, log_id :? =(&log_id); "purge_logs_upto: in asynchronous error: {}", err); - raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true); - } - - info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Done: asynchronous range_remove()"); - } - }); - - Ok(()) - } - - #[minitrace::trace] - async fn append_to_log + Send>( - &mut self, - entries: I, - ) -> Result<(), StorageError> { - // TODO: it is bad: allocates a new vec. - let entries = entries - .into_iter() - .map(|x| { - info!("append_to_log: {}", x.log_id); - x - }) - .collect::>(); - - match self - .log - .write() - .await - .append(entries) - .await - .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write) - { - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("append_to_log", true); - Err(err) - } - Ok(_) => Ok(()), - } - } - - #[minitrace::trace] - async fn apply_to_state_machine( - &mut self, - entries: &[Entry], - ) -> Result, StorageError> { - for ent in entries { - info!("apply_to_state_machine: {}", ent.log_id); - } - - let mut sm = self.state_machine.write().await; - let res = sm.apply_entries(entries).await?; - - Ok(res) - } - - #[minitrace::trace] - async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { - server_metrics::incr_applying_snapshot(1); - - let snapshot_store = SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); - - let temp = snapshot_store.new_temp().await.map_err(|e| { - StorageError::from_io_error(ErrorSubject::Snapshot(None), ErrorVerb::Write, e) - })?; - - Ok(Box::new(temp)) - } - - #[minitrace::trace] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box, - ) -> Result<(), StorageError> { - let data_size = snapshot.data_size().await.map_err(|e| { - StorageError::from_io_error( - ErrorSubject::Snapshot(Some(meta.signature())), - ErrorVerb::Read, - e, - ) - })?; - - info!( - id = self.id, - snapshot_size = data_size; - "decoding snapshot for installation" - ); - server_metrics::incr_applying_snapshot(-1); - - assert!(snapshot.is_temp()); - - let snapshot_store = SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); - - let d = snapshot_store - .commit_received(snapshot, meta) - .await - .map_err(|e| { - e.with_context(format_args!( - "commit received snapshot: {:?}", - meta.signature() - )) - })?; - - let d = Box::new(d); - - info!("snapshot meta: {:?}", meta); - - // Replace state machine with the new one - let res = self.do_install_snapshot(d).await; - match res { - Ok(_) => {} - Err(e) => { - raft_metrics::storage::incr_raft_storage_fail("install_snapshot", true); - error!("error: {:?} when install_snapshot", e); - } - }; - - // Update current snapshot. - let new_snapshot = StoredSnapshot { meta: meta.clone() }; - { - let mut current_snapshot = self.current_snapshot.write().await; - *current_snapshot = Some(new_snapshot); - } - Ok(()) - } - - #[minitrace::trace] - async fn get_current_snapshot(&mut self) -> Result, StorageError> { - info!(id = self.id; "get snapshot start"); - let p = self.current_snapshot.read().await; - - let snap = match &*p { - Some(snapshot) => { - let meta = &snapshot.meta; - - let snapshot_store = - SnapshotStoreV002::new(DATA_VERSION, self.inner.config.clone()); - let d = snapshot_store - .load_snapshot(&meta.snapshot_id) - .await - .map_err(|e| e.with_meta("get snapshot", meta))?; - - Ok(Some(Snapshot { - meta: meta.clone(), - snapshot: Box::new(d), - })) - } - None => Ok(None), - }; - - info!("get snapshot complete"); - - snap - } - - #[minitrace::trace] - async fn read_vote(&mut self) -> Result, StorageError> { - match self - .raft_state - .read() - .await - .read_vote() - .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read) - { - Err(err) => { - raft_metrics::storage::incr_raft_storage_fail("read_vote", false); - Err(err) - } - Ok(vote) => Ok(vote), - } - } - - async fn last_applied_state( - &mut self, - ) -> Result<(Option, StoredMembership), StorageError> { - let sm = self.state_machine.read().await; - let last_applied = *sm.sys_data_ref().last_applied_ref(); - let last_membership = sm.sys_data_ref().last_membership_ref().clone(); - - debug!( - "last_applied_state: applied: {:?}, membership: {:?}", - last_applied, last_membership - ); - - Ok((last_applied, last_membership)) - } -} diff --git a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs index ae579d8ed0d23..4de5f4fd9b451 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs @@ -87,7 +87,9 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> { { let resp = leader.get_kv(key).await?; let seq_v = resp.unwrap(); - assert_eq!(Some(KVMeta::new_expire(now_sec + 1000)), seq_v.meta); + let want = (now_sec + 1000) * 1000; + let expire_ms = seq_v.meta.unwrap().get_expire_at_ms().unwrap(); + assert!((want..want + 800).contains(&expire_ms)); assert_eq!(value2.to_string().into_bytes(), seq_v.data); } diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index 990418b8de374..8f793658d0a8a 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -15,12 +15,13 @@ use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use databend_common_meta_raft_store::state_machine::testing::snapshot_logs; use databend_common_meta_sled_store::openraft::entry::RaftEntry; -use databend_common_meta_sled_store::openraft::storage::Adaptor; use databend_common_meta_sled_store::openraft::storage::RaftLogReaderExt; +use databend_common_meta_sled_store::openraft::storage::RaftLogStorage; +use databend_common_meta_sled_store::openraft::storage::RaftLogStorageExt; +use databend_common_meta_sled_store::openraft::storage::RaftStateMachine; use databend_common_meta_sled_store::openraft::testing::log_id; use databend_common_meta_sled_store::openraft::testing::StoreBuilder; use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; -use databend_common_meta_sled_store::openraft::RaftStorage; use databend_common_meta_types::new_log_id; use databend_common_meta_types::Entry; use databend_common_meta_types::Membership; @@ -52,8 +53,7 @@ impl StoreBuilder for MetaSto let sto = RaftStore::open_create(&tc.config.raft_config, None, Some(())) .await .expect("fail to create store"); - let (log_store, sm_store) = Adaptor::new(sto); - Ok((tc, log_store, sm_store)) + Ok((tc, sto.clone(), sto)) } } @@ -90,13 +90,12 @@ async fn test_meta_store_restart() -> anyhow::Result<()> { sto.save_vote(&Vote::new(10, 5)).await?; - sto.append_to_log([Entry::new_blank(log_id(1, 2, 1))]) + sto.blocking_append([Entry::new_blank(log_id(1, 2, 1))]) .await?; sto.save_committed(Some(log_id(1, 2, 2))).await?; - sto.apply_to_state_machine(&[Entry::new_blank(log_id(1, 2, 2))]) - .await?; + sto.apply([Entry::new_blank(log_id(1, 2, 2))]).await?; } info!("--- reopen meta store"); @@ -110,7 +109,7 @@ async fn test_meta_store_restart() -> anyhow::Result<()> { assert_eq!(Some(log_id(1, 2, 2)), sto.read_committed().await?); assert_eq!( None, - sto.last_applied_state().await?.0, + sto.applied_state().await?.0, "state machine is not persisted" ); } @@ -134,7 +133,7 @@ async fn test_meta_store_build_snapshot() -> anyhow::Result<()> { let (logs, want) = snapshot_logs(); sto.log.write().await.append(logs.clone()).await?; - sto.state_machine.write().await.apply_entries(&logs).await?; + sto.state_machine.write().await.apply_entries(logs).await?; let curr_snap = sto.build_snapshot().await?; assert_eq!(Some(new_log_id(1, 0, 9)), curr_snap.meta.last_log_id); @@ -183,7 +182,7 @@ async fn test_meta_store_current_snapshot() -> anyhow::Result<()> { sto.log.write().await.append(logs.clone()).await?; { let mut sm = sto.state_machine.write().await; - sm.apply_entries(&logs).await?; + sm.apply_entries(logs).await?; } sto.build_snapshot().await?; @@ -226,7 +225,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { info!("--- feed logs and state machine"); sto.log.write().await.append(logs.clone()).await?; - sto.state_machine.write().await.apply_entries(&logs).await?; + sto.state_machine.write().await.apply_entries(logs).await?; snap = sto.build_snapshot().await?; }