From 42e4b399dbc0fd4b7e568aa2295282d5d564d2f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 17 May 2024 21:40:07 +0800 Subject: [PATCH] refactor: Improve logging and snapshot management This commit introduces several enhancements to the snapshot management system: - Improved Progress Logging for Snapshot Building - **Introduction of `SMEntry`:** - A new structure, `SMEntry`, has been introduced to represent all variants of a state-machine record. `RaftStoreEntry` is a supper set of `SMEntry` that includes both state machine record and log records. 3. **Asynchronous Snapshot Writing:** - Added the `spawn_writer_thread()` function to facilitate the writing of snapshots in a separate thread, thereby improving the performance. --- src/binaries/metactl/snapshot.rs | 27 ++-- src/meta/raft-store/src/key_spaces.rs | 144 ++++++++++++++---- src/meta/raft-store/src/ondisk/mod.rs | 57 ++++--- src/meta/raft-store/src/sm_v002/mod.rs | 2 + .../raft-store/src/sm_v002/snapshot_stat.rs | 44 ++++++ .../raft-store/src/sm_v002/snapshot_store.rs | 48 ++++++ .../src/sm_v002/snapshot_view_v002.rs | 22 +-- .../raft-store/src/sm_v002/writer_v002.rs | 120 ++++----------- src/meta/service/src/metrics/meta_metrics.rs | 4 +- src/meta/service/src/store/store_inner.rs | 101 +++++------- 10 files changed, 329 insertions(+), 240 deletions(-) create mode 100644 src/meta/raft-store/src/sm_v002/snapshot_stat.rs diff --git a/src/binaries/metactl/snapshot.rs b/src/binaries/metactl/snapshot.rs index fa5d60758b66d..5999ceb89ae49 100644 --- a/src/binaries/metactl/snapshot.rs +++ b/src/binaries/metactl/snapshot.rs @@ -35,6 +35,7 @@ use databend_common_meta_raft_store::ondisk::DataVersion; use databend_common_meta_raft_store::ondisk::OnDisk; use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002; +use databend_common_meta_raft_store::sm_v002::WriteEntry; use databend_common_meta_raft_store::state::RaftState; use databend_common_meta_sled_store::get_sled_db; use databend_common_meta_sled_store::init_sled_db; @@ -172,8 +173,9 @@ async fn import_v002( let mut max_log_id: Option = None; let mut trees = BTreeMap::new(); - let mut snapshot_store = SnapshotStoreV002::new(DataVersion::V002, raft_config); - let mut writer = snapshot_store.new_writer()?; + let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, raft_config); + + let (tx, join_handle) = snapshot_store.spawn_writer_thread("import_v002"); for line in lines { let l = line?; @@ -181,9 +183,10 @@ async fn import_v002( if tree_name.starts_with("state_machine/") { // Write to snapshot - writer - .write_entry_results::(futures::stream::iter([Ok(kv_entry)])) - .await?; + let sm_entry = kv_entry.try_into().map_err(|err_str| { + anyhow::anyhow!("Failed to convert RaftStoreEntry to SMEntry: {}", err_str) + })?; + tx.send(WriteEntry::Data(sm_entry)).await?; } else { // Write to sled tree if !trees.contains_key(&tree_name) { @@ -208,14 +211,12 @@ async fn import_v002( for tree in trees.values() { tree.flush()?; } - let (snapshot_id, snapshot_size) = writer.commit(None)?; - - eprintln!( - "Imported {} records, snapshot id: {}; snapshot size: {}", - n, - snapshot_id.to_string(), - snapshot_size - ); + + tx.send(WriteEntry::Finish).await?; + + let (_snapshot_store, snapshot_stat) = join_handle.await??; + + eprintln!("Imported {} records, snapshot: {}", n, snapshot_stat,); Ok(max_log_id) } diff --git a/src/meta/raft-store/src/key_spaces.rs b/src/meta/raft-store/src/key_spaces.rs index b9801e53c4055..0c392c3b2f03d 100644 --- a/src/meta/raft-store/src/key_spaces.rs +++ b/src/meta/raft-store/src/key_spaces.rs @@ -131,6 +131,86 @@ impl SledKeySpace for DataHeader { type V = Header; } +/// Serialize SledKeySpace key value pair +macro_rules! serialize_for_sled { + ($ks:tt, $key:expr, $value:expr) => { + Ok(($ks::serialize_key($key)?, $ks::serialize_value($value)?)) + }; +} + +/// Convert (sub_tree_prefix, key, value, key_space1, key_space2...) into a [`RaftStoreEntry`]. +/// +/// It compares the sub_tree_prefix with prefix defined by every key space to determine which key space it belongs to. +macro_rules! deserialize_by_prefix { + ($prefix: expr, $vec_key: expr, $vec_value: expr, $($key_space: tt),+ ) => { + $( + + if <$key_space as SledKeySpace>::PREFIX == $prefix { + + let key = SledOrderedSerde::de($vec_key)?; + let value = SledSerde::de($vec_value)?; + + // Self reference the enum that use this macro + return Ok(Self::$key_space { key, value, }); + } + )+ + }; +} + +/// Enum of key-value pairs that are used in the raft state machine. +/// +/// It is a sub set of [`RaftStoreEntry`] and contains only the types used by state-machine. +#[rustfmt::skip] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SMEntry { + DataHeader { key: ::K, value: ::V, }, + Nodes { key: ::K, value: ::V, }, + StateMachineMeta { key: ::K, value: ::V, }, + Expire { key: ::K, value: ::V, }, + GenericKV { key: ::K, value: ::V, }, + Sequences { key: ::K, value: ::V, }, +} + +impl SMEntry { + /// Serialize a key-value entry into a two elt vec of vec: `[key, value]`. + #[rustfmt::skip] + pub fn serialize(kv: &SMEntry) -> Result<(sled::IVec, sled::IVec), MetaStorageError> { + + match kv { + Self::DataHeader { key, value } => serialize_for_sled!(DataHeader, key, value), + Self::Nodes { key, value } => serialize_for_sled!(Nodes, key, value), + Self::StateMachineMeta { key, value } => serialize_for_sled!(StateMachineMeta, key, value), + Self::Expire { key, value } => serialize_for_sled!(Expire, key, value), + Self::GenericKV { key, value } => serialize_for_sled!(GenericKV, key, value), + Self::Sequences { key, value } => serialize_for_sled!(Sequences, key, value), + } + } + + /// Deserialize a serialized key-value entry `[key, value]`. + /// + /// It is able to deserialize openraft-v7 or openraft-v8 key-value pairs. + /// The compatibility is provided by [`SledSerde`] implementation for value types. + pub fn deserialize(prefix_key: &[u8], vec_value: &[u8]) -> Result { + let prefix = prefix_key[0]; + let vec_key = &prefix_key[1..]; + + deserialize_by_prefix!( + prefix, + vec_key, + vec_value, + // Available key spaces: + DataHeader, + Nodes, + StateMachineMeta, + Expire, + GenericKV, + Sequences + ); + + unreachable!("unknown prefix: {}", prefix); + } +} + /// Enum of key-value pairs that are used in the raft storage impl for meta-service. #[rustfmt::skip] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -151,23 +231,18 @@ impl RaftStoreEntry { /// Serialize a key-value entry into a two elt vec of vec: `[key, value]`. #[rustfmt::skip] pub fn serialize(kv: &RaftStoreEntry) -> Result<(sled::IVec, sled::IVec), MetaStorageError> { - macro_rules! ser { - ($ks:tt, $key:expr, $value:expr) => { - Ok(($ks::serialize_key($key)?, $ks::serialize_value($value)?)) - }; - } match kv { - Self::DataHeader { key, value } => ser!(DataHeader, key, value), - Self::Logs { key, value } => ser!(Logs, key, value), - Self::Nodes { key, value } => ser!(Nodes, key, value), - Self::StateMachineMeta { key, value } => ser!(StateMachineMeta, key, value), - Self::RaftStateKV { key, value } => ser!(RaftStateKV, key, value), - Self::Expire { key, value } => ser!(Expire, key, value), - Self::GenericKV { key, value } => ser!(GenericKV, key, value), - Self::Sequences { key, value } => ser!(Sequences, key, value), - Self::ClientLastResps { key, value } => ser!(ClientLastResps, key, value), - Self::LogMeta { key, value } => ser!(LogMeta, key, value), + Self::DataHeader { key, value } => serialize_for_sled!(DataHeader, key, value), + Self::Logs { key, value } => serialize_for_sled!(Logs, key, value), + Self::Nodes { key, value } => serialize_for_sled!(Nodes, key, value), + Self::StateMachineMeta { key, value } => serialize_for_sled!(StateMachineMeta, key, value), + Self::RaftStateKV { key, value } => serialize_for_sled!(RaftStateKV, key, value), + Self::Expire { key, value } => serialize_for_sled!(Expire, key, value), + Self::GenericKV { key, value } => serialize_for_sled!(GenericKV, key, value), + Self::Sequences { key, value } => serialize_for_sled!(Sequences, key, value), + Self::ClientLastResps { key, value } => serialize_for_sled!(ClientLastResps, key, value), + Self::LogMeta { key, value } => serialize_for_sled!(LogMeta, key, value), } } @@ -179,24 +254,6 @@ impl RaftStoreEntry { let prefix = prefix_key[0]; let vec_key = &prefix_key[1..]; - // Convert (sub_tree_prefix, key, value, key_space1, key_space2...) into a [`RaftStoreEntry`]. - // - // It compares the sub_tree_prefix with prefix defined by every key space to determine which key space it belongs to. - macro_rules! deserialize_by_prefix { - ($prefix: expr, $vec_key: expr, $vec_value: expr, $($key_space: tt),+ ) => { - $( - - if <$key_space as SledKeySpace>::PREFIX == $prefix { - - let key = SledOrderedSerde::de($vec_key)?; - let value = SledSerde::de($vec_value)?; - - return Ok(RaftStoreEntry::$key_space { key, value, }); - } - )+ - }; - } - deserialize_by_prefix!( prefix, vec_key, @@ -217,3 +274,24 @@ impl RaftStoreEntry { unreachable!("unknown prefix: {}", prefix); } } + +impl TryInto for RaftStoreEntry { + type Error = String; + + #[rustfmt::skip] + fn try_into(self) -> Result { + match self { + Self::DataHeader { key, value } => Ok(SMEntry::DataHeader { key, value }), + Self::Nodes { key, value } => Ok(SMEntry::Nodes { key, value }), + Self::StateMachineMeta { key, value } => Ok(SMEntry::StateMachineMeta { key, value }), + Self::Expire { key, value } => Ok(SMEntry::Expire { key, value }), + Self::GenericKV { key, value } => Ok(SMEntry::GenericKV { key, value }), + Self::Sequences { key, value } => Ok(SMEntry::Sequences { key, value }), + + Self::Logs { .. } => {Err("SMEntry does not contain Logs".to_string())}, + Self::RaftStateKV { .. } => {Err("SMEntry does not contain RaftStateKV".to_string())} + Self::ClientLastResps { .. } => {Err("SMEntry does not contain ClientLastResps".to_string())} + Self::LogMeta { .. } => {Err("SMEntry does not contain LogMeta".to_string())} + } + } +} diff --git a/src/meta/raft-store/src/ondisk/mod.rs b/src/meta/raft-store/src/ondisk/mod.rs index 23ba1c0c5916c..aeb5a6d613b5e 100644 --- a/src/meta/raft-store/src/ondisk/mod.rs +++ b/src/meta/raft-store/src/ondisk/mod.rs @@ -30,13 +30,13 @@ pub use header::Header; use log::debug; use log::info; use openraft::AnyError; -use tokio::io; use crate::config::RaftConfig; use crate::key_spaces::DataHeader; -use crate::key_spaces::RaftStoreEntry; +use crate::key_spaces::SMEntry; use crate::log::TREE_RAFT_LOG; use crate::sm_v002::SnapshotStoreV002; +use crate::sm_v002::WriteEntry; use crate::state::TREE_RAFT_STATE; use crate::state_machine::StateMachineMetaKey; @@ -276,31 +276,33 @@ impl OnDisk { &mut self, sm_tree_name: &str, ) -> Result<(), MetaStorageError> { - let mut cnt = 0; + // Helper function to create a snapshot error. + fn snap_err(e: impl std::error::Error + 'static, context: &str) -> MetaStorageError { + let ae = AnyError::new(&e).add_context(|| context); + MetaStorageError::SnapshotError(ae) + } + let tree = self.db.open_tree(sm_tree_name)?; - let mut snapshot_store = SnapshotStoreV002::new(DataVersion::V002, self.config.clone()); + let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, self.config.clone()); - let mut writer = snapshot_store.new_writer().map_err(|e| { - let ae = AnyError::new(&e).add_context(|| "new snapshot writer"); - MetaStorageError::SnapshotError(ae) - })?; + let (tx, join_handle) = snapshot_store.spawn_writer_thread("upgrade-v001-to-v002-snapshot"); for ivec_pair_res in tree.iter() { - let kv_entry = { + let sm_entry = { let (k_ivec, v_ivec) = ivec_pair_res?; - RaftStoreEntry::deserialize(&k_ivec, &v_ivec)? + SMEntry::deserialize(&k_ivec, &v_ivec)? }; debug!( - kv_entry :? =(&kv_entry); + kv_entry :? =(&sm_entry); "upgrade kv from {:?}", self.header.version ); - if let RaftStoreEntry::StateMachineMeta { + if let SMEntry::StateMachineMeta { key: StateMachineMetaKey::Initialized, .. - } = kv_entry + } = sm_entry { self.progress(format_args!( "Skip no longer used state machine key: {}", @@ -309,27 +311,24 @@ impl OnDisk { continue; } - writer - .write_entry_results::(futures::stream::iter([Ok(kv_entry)])) + tx.send(WriteEntry::Data(sm_entry)) .await - .map_err(|e| { - let ae = AnyError::new(&e).add_context(|| "write snapshot entry"); - MetaStorageError::SnapshotError(ae) - })?; - - cnt += 1; + .map_err(|e| snap_err(e, "send SMEntry"))?; } - let (snapshot_id, file_size) = writer.commit(None).map_err(|e| { - let ae = AnyError::new(&e).add_context(|| "commit snapshot"); - MetaStorageError::SnapshotError(ae) - })?; + tx.send(WriteEntry::Finish) + .await + .map_err(|e| snap_err(e, "send Commit"))?; + + let (snapshot_store, snapshot_stat) = join_handle + .await + .map_err(|e| snap_err(e, "join snapshot writer thread"))? + .map_err(|e| snap_err(e, "writer error"))?; self.progress(format_args!( - "Written {} records to snapshot, filesize: {}, path: {}", - cnt, - file_size, - snapshot_store.snapshot_path(&snapshot_id.to_string()) + "Written to snapshot: {}, path: {}", + snapshot_stat, + snapshot_store.snapshot_path(&snapshot_stat.snapshot_id.to_string()) )); Ok(()) diff --git a/src/meta/raft-store/src/sm_v002/mod.rs b/src/meta/raft-store/src/sm_v002/mod.rs index 30a47eb27e096..1abf25638bd6e 100644 --- a/src/meta/raft-store/src/sm_v002/mod.rs +++ b/src/meta/raft-store/src/sm_v002/mod.rs @@ -16,6 +16,7 @@ pub mod leveled_store; pub(in crate::sm_v002) mod marked; #[allow(clippy::module_inception)] mod sm_v002; +mod snapshot_stat; mod snapshot_store; mod snapshot_view_v002; mod writer_v002; @@ -29,6 +30,7 @@ mod snapshot_view_v002_test; pub use importer::Importer; pub use sm_v002::SMV002; +pub use snapshot_stat::SnapshotStat; pub use snapshot_store::SnapshotStoreError; pub use snapshot_store::SnapshotStoreV002; pub use snapshot_view_v002::SnapshotViewV002; diff --git a/src/meta/raft-store/src/sm_v002/snapshot_stat.rs b/src/meta/raft-store/src/sm_v002/snapshot_stat.rs new file mode 100644 index 0000000000000..c5db14d1fc80b --- /dev/null +++ b/src/meta/raft-store/src/sm_v002/snapshot_stat.rs @@ -0,0 +1,44 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::fmt::Formatter; + +use crate::state_machine::MetaSnapshotId; + +/// Snapshot stat. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct SnapshotStat { + pub snapshot_id: MetaSnapshotId, + + /// Total number of entries in the snapshot. + /// + /// Including meta entries, such as seq, nodes, generic kv, and expire index + pub entry_cnt: u64, + + /// Size in bytes of the snapshot file + pub size: u64, +} + +impl fmt::Display for SnapshotStat { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{{ snapshot_id: {}, entry_cnt: {}, size: {} }}", + self.snapshot_id.to_string(), + self.entry_cnt, + self.size + ) + } +} diff --git a/src/meta/raft-store/src/sm_v002/snapshot_store.rs b/src/meta/raft-store/src/sm_v002/snapshot_store.rs index 9990b951623ca..ab40a3d335a4d 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_store.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_store.rs @@ -30,9 +30,14 @@ use log::warn; use openraft::AnyError; use openraft::ErrorVerb; use openraft::SnapshotId; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::config::RaftConfig; +use crate::key_spaces::SMEntry; use crate::ondisk::DataVersion; +use crate::sm_v002::SnapshotStat; +use crate::sm_v002::WriteEntry; use crate::sm_v002::WriterV002; use crate::state_machine::MetaSnapshotId; @@ -265,6 +270,49 @@ impl SnapshotStoreV002 { Ok((snapshot_ids, invalid_files)) } + /// Spawn a thread to receive snapshot data and write them to a snapshot file. + /// + /// It returns a sender to send entries and a handle to wait for the thread to finish. + /// Internally it calls tokio::spawn_blocking. + #[allow(clippy::type_complexity)] + pub fn spawn_writer_thread( + mut self, + context: impl Display + Send + Sync + 'static, + ) -> ( + mpsc::Sender>, + JoinHandle>, + ) { + // Add context information to io::Error + + let (tx, rx) = mpsc::channel(64 * 1024); + + // Spawn another thread to write entries to disk. + let join_handle = databend_common_base::runtime::spawn_blocking(move || { + let with_context = + |e: io::Error| io::Error::new(e.kind(), format!("{} while {}", e, context)); + + let mut writer = self.new_writer().map_err(|e| { + io::Error::new(e.source.kind(), format!("creating snapshot writer: {}", e)) + })?; + + info!("snapshot_writer_thread start writing: {}", context); + let cnt = writer.write_entries_sync(rx).map_err(with_context)?; + + info!("snapshot_writer_thread committing...: {}", context); + let (snapshot_id, size) = writer.commit(None).map_err(with_context)?; + + info!("snapshot_writer_thread commit done: {}", context); + + Ok::<(Self, SnapshotStat), io::Error>((self, SnapshotStat { + snapshot_id, + size, + entry_cnt: cnt as u64, + })) + }); + + (tx, join_handle) + } + pub fn new_writer(&mut self) -> Result { self.ensure_snapshot_dir()?; diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs index b09a62d7171b7..d1ca760ce4fed 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs @@ -22,7 +22,7 @@ use databend_common_meta_types::SnapshotMeta; use futures_util::StreamExt; use futures_util::TryStreamExt; -use crate::key_spaces::RaftStoreEntry; +use crate::key_spaces::SMEntry; use crate::ondisk::Header; use crate::ondisk::OnDisk; use crate::sm_v002::leveled_store::map_api::AsMap; @@ -114,16 +114,16 @@ impl SnapshotViewV002 { Ok(()) } - /// Export all its data in RaftStoreEntry format. - // pub async fn export(&self) -> Result + '_, io::Error> { - pub async fn export(&self) -> Result, io::Error> { + /// Export all its data in SMEntry format. + // pub async fn export(&self) -> Result + '_, io::Error> { + pub async fn export(&self) -> Result, io::Error> { let d = self.compacted.newest().unwrap(); let mut sm_meta = vec![]; // Data header to identify snapshot version - sm_meta.push(RaftStoreEntry::DataHeader { + sm_meta.push(SMEntry::DataHeader { key: OnDisk::KEY_HEADER.to_string(), value: Header::this_version(), }); @@ -131,7 +131,7 @@ impl SnapshotViewV002 { // Last applied if let Some(last_applied) = d.last_applied_ref() { - sm_meta.push(RaftStoreEntry::StateMachineMeta { + sm_meta.push(SMEntry::StateMachineMeta { key: StateMachineMetaKey::LastApplied, value: StateMachineMetaValue::LogId(*last_applied), }) @@ -141,7 +141,7 @@ impl SnapshotViewV002 { { let last_membership = d.last_membership_ref(); - sm_meta.push(RaftStoreEntry::StateMachineMeta { + sm_meta.push(SMEntry::StateMachineMeta { key: StateMachineMetaKey::LastMembership, value: StateMachineMetaValue::Membership(last_membership.clone()), }) @@ -149,7 +149,7 @@ impl SnapshotViewV002 { // Sequence - sm_meta.push(RaftStoreEntry::Sequences { + sm_meta.push(SMEntry::Sequences { // Use this fixed key `generic-kv` for back compatibility: // Only this key is used. key: s("generic-kv"), @@ -159,7 +159,7 @@ impl SnapshotViewV002 { // Nodes for (node_id, node) in d.nodes_ref().iter() { - sm_meta.push(RaftStoreEntry::Nodes { + sm_meta.push(SMEntry::Nodes { key: *node_id, value: node.clone(), }) @@ -170,7 +170,7 @@ impl SnapshotViewV002 { let strm = self.compacted.str_map().range(..).await?; let kv_iter = strm.try_filter_map(|(k, v)| { let seqv: Option> = v.into(); - let ent = seqv.map(|value| RaftStoreEntry::GenericKV { key: k, value }); + let ent = seqv.map(|value| SMEntry::GenericKV { key: k, value }); future::ready(Ok(ent)) }); @@ -179,7 +179,7 @@ impl SnapshotViewV002 { let strm = self.compacted.expire_map().range(..).await?; let expire_iter = strm.try_filter_map(|(k, v)| { let exp_val: Option = v.into(); - let ent = exp_val.map(|value| RaftStoreEntry::Expire { key: k, value }); + let ent = exp_val.map(|value| SMEntry::Expire { key: k, value }); future::ready(Ok(ent)) }); diff --git a/src/meta/raft-store/src/sm_v002/writer_v002.rs b/src/meta/raft-store/src/sm_v002/writer_v002.rs index 39d5c74f9ea3d..2f2bb7daa2173 100644 --- a/src/meta/raft-store/src/sm_v002/writer_v002.rs +++ b/src/meta/raft-store/src/sm_v002/writer_v002.rs @@ -17,26 +17,23 @@ use std::io; use std::io::BufWriter; use std::io::Seek; use std::io::Write; -use std::time::Duration; use databend_common_meta_types::LogId; -use futures::Stream; -use futures_util::StreamExt; use log::debug; use log::info; -use crate::key_spaces::RaftStoreEntry; +use crate::key_spaces::SMEntry; use crate::sm_v002::SnapshotStoreV002; use crate::state_machine::MetaSnapshotId; use crate::state_machine::StateMachineMetaKey; /// A write entry sent to snapshot writer. /// -/// A `Commit` entry will flush the writer. +/// A `Finish` entry indicates the end of the data. #[derive(Debug, Clone, PartialEq, Eq)] pub enum WriteEntry { Data(T), - Commit, + Finish, } /// Write json lines snapshot data to [`SnapshotStoreV002`]. @@ -56,16 +53,6 @@ pub struct WriterV002<'a> { snapshot_store: &'a mut SnapshotStoreV002, } -impl<'a> io::Write for WriterV002<'a> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - impl<'a> WriterV002<'a> { /// Create a singleton writer for the snapshot. pub fn new(snapshot_store: &'a mut SnapshotStoreV002) -> Result { @@ -89,77 +76,35 @@ impl<'a> WriterV002<'a> { Ok(writer) } - /// Write `Result` of entries to the snapshot, without flushing. - /// - /// Returns the count of entries - pub async fn write_entry_results( - &mut self, - entry_results: impl Stream>, - ) -> Result - where - E: std::error::Error + From + 'static, - { - let mut cnt = 0; - let data_version = self.snapshot_store.data_version(); - - let mut entry_results = std::pin::pin!(entry_results); - - while let Some(ent) = entry_results.next().await { - let ent = ent?; - - debug!(entry :? =(&ent); "write {} entry", data_version); - - if let RaftStoreEntry::StateMachineMeta { - key: StateMachineMetaKey::LastApplied, - ref value, - } = ent - { - let last: LogId = value.clone().try_into().unwrap(); - info!(last_applied :? =(last); "write last applied to snapshot"); - - assert!( - self.last_applied.is_none(), - "already seen a last_applied: {:?}", - self.last_applied - ); - self.last_applied = Some(last); - } - - serde_json::to_writer(&mut *self, &ent) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - self.write(b"\n") - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - cnt += 1; - - if cnt % 10_000 == 0 { - info!("Snapshot Writer has written {} kilo entries", cnt / 1000) - } - } - - Ok(cnt) - } - /// Write entries to the snapshot, without flushing. /// /// Returns the count of entries pub fn write_entries_sync( &mut self, - mut entries_rx: tokio::sync::mpsc::Receiver>, + mut entries_rx: tokio::sync::mpsc::Receiver>, ) -> Result { - fn log_progress(c: usize) { + fn log_progress(start: std::time::Instant, c: usize) { + let avg = c / (start.elapsed().as_secs() as usize + 1); + if c >= 10_000_000 { info!( - "Snapshot Writer has written {} million entries", - c / 1_000_000 + "Snapshot Writer has written {} million entries; avg: {} kilo entries/s", + c / 1_000_000, + avg / 1_000, ) } else { - info!("Snapshot Writer has written {} kilo entries", c / 1_000) + info!( + "Snapshot Writer has written {} kilo entries; avg: {} kilo entries/s", + c / 1_000, + avg / 1_000, + ) } } + let now = std::time::Instant::now(); let mut cnt = 0; + let mut next_progress_cnt = 1000; + let data_version = self.snapshot_store.data_version(); while let Some(ent) = entries_rx.blocking_recv() { @@ -167,7 +112,7 @@ impl<'a> WriterV002<'a> { let ent = match ent { WriteEntry::Data(ent) => ent, - WriteEntry::Commit => { + WriteEntry::Finish => { info!( "received Commit entry, written {} entries, quit and about to commit", cnt @@ -176,7 +121,7 @@ impl<'a> WriterV002<'a> { } }; - if let RaftStoreEntry::StateMachineMeta { + if let SMEntry::StateMachineMeta { key: StateMachineMetaKey::LastApplied, ref value, } = ent @@ -192,28 +137,23 @@ impl<'a> WriterV002<'a> { self.last_applied = Some(last); } - serde_json::to_writer(&mut *self, &ent) + serde_json::to_writer(&mut self.inner, &ent) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - self.write(b"\n") + self.inner + .write(b"\n") .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; cnt += 1; - // Yield to give up the CPU to avoid starving other tasks. - if cnt % 1000 == 0 { - std::thread::sleep(Duration::from_millis(1)); - } + if cnt == next_progress_cnt { + log_progress(now, cnt); - #[allow(clippy::collapsible_else_if)] - if cnt < 1_000_000 { - if cnt % 10_000 == 0 { - log_progress(cnt); - } - } else { - if cnt % 100_000 == 0 { - log_progress(cnt); - } + // increase by 5%, but at least 50k, at most 800k + let step = std::cmp::min(next_progress_cnt / 20, 800_000); + let step = std::cmp::max(step, 50_000); + + next_progress_cnt += step; } } diff --git a/src/meta/service/src/metrics/meta_metrics.rs b/src/meta/service/src/metrics/meta_metrics.rs index 2bb464929a4e5..86ee98d76d06d 100644 --- a/src/meta/service/src/metrics/meta_metrics.rs +++ b/src/meta/service/src/metrics/meta_metrics.rs @@ -170,8 +170,8 @@ pub mod server_metrics { SERVER_METRICS.applying_snapshot.inc_by(cnt); } - pub fn set_snapshot_key_num(snapshot_key_num: usize) { - SERVER_METRICS.snapshot_key_num.set(snapshot_key_num as i64); + pub fn set_snapshot_key_num(snapshot_key_num: i64) { + SERVER_METRICS.snapshot_key_num.set(snapshot_key_num); } pub fn set_proposals_applied(proposals_applied: u64) { diff --git a/src/meta/service/src/store/store_inner.rs b/src/meta/service/src/store/store_inner.rs index 230bfb86eb7df..16c778835badc 100644 --- a/src/meta/service/src/store/store_inner.rs +++ b/src/meta/service/src/store/store_inner.rs @@ -41,8 +41,6 @@ use databend_common_meta_raft_store::state::RaftStateValue; use databend_common_meta_raft_store::state_machine::MetaSnapshotId; use databend_common_meta_raft_store::state_machine::StoredSnapshot; use databend_common_meta_sled_store::get_sled_db; -use databend_common_meta_sled_store::openraft::ErrorSubject; -use databend_common_meta_sled_store::openraft::ErrorVerb; use databend_common_meta_sled_store::SledTree; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::Endpoint; @@ -170,7 +168,7 @@ impl StoreInner { (Default::default(), None) }; let key_num = Self::calculate_key_num(&stored_snapshot, config.clone()).await; - metrics::server_metrics::set_snapshot_key_num(key_num.unwrap_or_default()); + metrics::server_metrics::set_snapshot_key_num(key_num.unwrap_or_default() as i64); Ok(Self { id: raft_state.id, @@ -279,69 +277,54 @@ impl StoreInner { .map_err(|e| StorageIOError::read_snapshot(None, &e))?; let mut snapshot_meta = snapshot_view.build_snapshot_meta(); + let meta = snapshot_meta.clone(); + let signature = snapshot_meta.signature(); info!("do_build_snapshot writing snapshot start"); - let mut ss_store = self.snapshot_store(); - let mut strm = snapshot_view.export().await.map_err(|e| { SnapshotStoreError::read(e).with_meta("export state machine", &snapshot_meta) })?; - let meta = snapshot_meta.clone(); - - let (tx, rx) = tokio::sync::mpsc::channel(64 * 1024); - - // Spawn another thread to write entries to disk. - let th = databend_common_base::runtime::spawn_blocking(move || { - let mut writer = ss_store.new_writer()?; - - info!("do_build_snapshot writer start"); - - writer - .write_entries_sync(rx) - .map_err(|e| SnapshotStoreError::write(e).with_meta("serialize entries", &meta))?; - - info!("do_build_snapshot commit start"); - - let (snapshot_id, snapshot_size) = writer - .commit(None) - .map_err(|e| SnapshotStoreError::write(e).with_meta("writer.commit", &meta))?; - - Ok::<(SnapshotStoreV002, MetaSnapshotId, u64), SnapshotStoreError>(( - ss_store, - snapshot_id, - snapshot_size, - )) - }); + let context = format!("build snapshot: {}", meta.snapshot_id); + let ss_store = self.snapshot_store(); + let (tx, th) = ss_store.spawn_writer_thread(context); // Pipe entries to the writer. - while let Some(ent) = strm - .try_next() - .await - .map_err(|e| StorageIOError::write_snapshot(Some(snapshot_meta.signature()), &e))? { - tx.send(WriteEntry::Data(ent)).await.map_err(|e| { - let e = StorageIOError::write_snapshot(Some(snapshot_meta.signature()), &e); - StorageError::from(e) - })?; + while let Some(ent) = strm + .try_next() + .await + .map_err(|e| StorageIOError::read_snapshot(Some(signature.clone()), &e))? + { + tx.send(WriteEntry::Data(ent)) + .await + .map_err(|e| StorageIOError::write_snapshot(Some(signature.clone()), &e))?; + } + tx.send(WriteEntry::Finish) + .await + .map_err(|e| StorageIOError::write_snapshot(Some(signature.clone()), &e))?; } - { tx }.send(WriteEntry::Commit).await.map_err(|e| { - let e = StorageIOError::write_snapshot(Some(snapshot_meta.signature()), &e); - StorageError::from(e) - })?; - - let (ss_store, snapshot_id, snapshot_size) = th + // Get snapshot write result + let (ss_store, snapshot_stat) = th .await - .map_err(|e| StorageIOError::write_snapshot(None, &e))??; + .map_err(|e| { + error!(error :% = e; "snapshot writer thread error"); + StorageIOError::write_snapshot(Some(signature.clone()), &e) + })? + .map_err(|e| { + error!(error :% = e; "snapshot writer thread error"); + StorageIOError::write_snapshot(Some(signature.clone()), &e) + })?; - info!(snapshot_size :% =(snapshot_size); "do_build_snapshot complete"); + info!(snapshot_stat :% = snapshot_stat; "do_build_snapshot complete"); + // Clean old snapshot ss_store.clean_old_snapshots().await?; - info!("do_build_snapshot clean_old_snapshots complete"); + let snapshot_id = &snapshot_stat.snapshot_id; assert_eq!( snapshot_id.last_applied, snapshot_meta.last_log_id, "snapshot_id.last_applied: {:?} must equal snapshot_meta.last_log_id: {:?}", @@ -349,17 +332,18 @@ impl StoreInner { ); snapshot_meta.snapshot_id = snapshot_id.to_string(); + { + let mut key_num = self.key_num.write().await; + *key_num = Some(snapshot_stat.entry_cnt as usize); + } + + metrics::server_metrics::set_snapshot_key_num(snapshot_stat.entry_cnt as i64); + { let snapshot = Some(StoredSnapshot { meta: snapshot_meta.clone(), }); - if let Some(num) = Self::calculate_key_num(&snapshot, self.config.clone()).await { - let mut key_num = self.key_num.write().await; - *key_num = Some(num); - metrics::server_metrics::set_snapshot_key_num(num); - } - let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = snapshot; } @@ -367,14 +351,7 @@ impl StoreInner { let r = ss_store .load_snapshot(&snapshot_meta.snapshot_id) .await - .map_err(|e| { - let e = StorageIOError::new( - ErrorSubject::Snapshot(Some(snapshot_meta.signature())), - ErrorVerb::Read, - &e, - ); - StorageError::from(e) - })?; + .map_err(|e| StorageIOError::read_snapshot(Some(signature), &e))?; Ok(Snapshot { meta: snapshot_meta,