Skip to content

Commit

Permalink
refactor: enable feature flag openraft:storage-v2 (#14838)
Browse files Browse the repository at this point in the history
* refactor: enable feature flag openraft:storage-v2

Separate the implementations of RaftLogStorage and RaftStateMachine.

* chore: tolerate time diff in expiration test
  • Loading branch information
drmingdrmer authored Mar 5, 2024
1 parent f22ad96 commit 0ca00f9
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 424 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.9.0-alpha.8", features = [
"serde",
"tracing-log",
"storage-v2",
"loosen-follower-log-revert", # allows removing all data from a follower and restoring from the leader.
] }

Expand Down
6 changes: 3 additions & 3 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002;
use databend_common_meta_raft_store::state::RaftState;
use databend_common_meta_sled_store::get_sled_db;
use databend_common_meta_sled_store::init_sled_db;
use databend_common_meta_sled_store::openraft::storage::RaftLogStorageExt;
use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder;
use databend_common_meta_sled_store::openraft::RaftStorage;
use databend_common_meta_types::Cmd;
use databend_common_meta_types::CommittedLeaderId;
use databend_common_meta_types::Endpoint;
Expand Down Expand Up @@ -360,7 +360,7 @@ async fn init_new_cluster(
payload: EntryPayload::Membership(membership),
};

sto.append_to_log([entry]).await?;
sto.blocking_append([entry]).await?;

// insert AddNodes logs
for (node_id, node) in nodes {
Expand All @@ -381,7 +381,7 @@ async fn init_new_cluster(
}),
};

sto.append_to_log([entry]).await?;
sto.blocking_append([entry]).await?;
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,19 @@ impl SMV002 {
Applier::new(self)
}

pub async fn apply_entries<'a>(
pub async fn apply_entries(
&mut self,
entries: impl IntoIterator<Item = &'a Entry>,
entries: impl IntoIterator<Item = Entry>,
) -> Result<Vec<AppliedState>, StorageIOError> {
let mut applier = Applier::new(self);

let mut res = vec![];

for ent in entries.into_iter() {
info!("apply: {}", *ent.get_log_id());
let log_id = *ent.get_log_id();
let r = applier
.apply(ent)
.apply(&ent)
.await
.map_err(|e| StorageIOError::apply(log_id, &e))?;
res.push(r);
Expand Down
8 changes: 4 additions & 4 deletions src/meta/service/src/meta_service/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use databend_common_meta_raft_store::ondisk::DataVersion;
use databend_common_meta_raft_store::ondisk::DATA_VERSION;
use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use databend_common_meta_sled_store::openraft;
use databend_common_meta_sled_store::openraft::storage::Adaptor;
use databend_common_meta_sled_store::openraft::ChangeMembers;
use databend_common_meta_stoerr::MetaStorageError;
use databend_common_meta_types::protobuf::raft_service_client::RaftServiceClient;
Expand Down Expand Up @@ -156,8 +155,8 @@ pub struct MetaNodeStatus {
pub last_seq: u64,
}

pub type LogStore = Adaptor<TypeConfig, RaftStore>;
pub type SMStore = Adaptor<TypeConfig, RaftStore>;
pub type LogStore = RaftStore;
pub type SMStore = RaftStore;

/// MetaRaft is a implementation of the generic Raft handling meta data R/W.
pub type MetaRaft = Raft<TypeConfig>;
Expand Down Expand Up @@ -205,7 +204,8 @@ impl MetaNodeBuilder {

let net = Network::new(sto.clone());

let (log_store, sm_store) = Adaptor::new(sto.clone());
let log_store = sto.clone();
let sm_store = sto.clone();

let raft = MetaRaft::new(node_id, Arc::new(config), net, log_store, sm_store)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/meta/service/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod raft_log_storage_impl;
mod raft_state_machine_impl;
#[allow(clippy::module_inception)]
mod store;
mod store_inner;
Expand Down
272 changes: 272 additions & 0 deletions src/meta/service/src/store/raft_log_storage_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::io::ErrorKind;
use std::ops::RangeBounds;

use databend_common_base::base::tokio::io;
use databend_common_meta_sled_store::openraft::storage::LogFlushed;
use databend_common_meta_sled_store::openraft::storage::RaftLogStorage;
use databend_common_meta_sled_store::openraft::ErrorSubject;
use databend_common_meta_sled_store::openraft::ErrorVerb;
use databend_common_meta_sled_store::openraft::LogState;
use databend_common_meta_sled_store::openraft::OptionalSend;
use databend_common_meta_sled_store::openraft::RaftLogReader;
use databend_common_meta_types::Entry;
use databend_common_meta_types::LogId;
use databend_common_meta_types::StorageError;
use databend_common_meta_types::TypeConfig;
use databend_common_meta_types::Vote;
use log::debug;
use log::error;
use log::info;

use crate::metrics::raft_metrics;
use crate::store::RaftStore;
use crate::store::ToStorageError;

impl RaftLogReader<TypeConfig> for RaftStore {
#[minitrace::trace]
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
&mut self,
range: RB,
) -> Result<Vec<Entry>, StorageError> {
debug!(
"try_get_log_entries: self.id={}, range: {:?}",
self.id, range
);

match self
.log
.read()
.await
.range_values(range)
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
{
Ok(entries) => Ok(entries),
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("try_get_log_entries", false);
Err(err)
}
}
}
}

impl RaftLogStorage<TypeConfig> for RaftStore {
type LogReader = RaftStore;

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError> {
let last_purged_log_id = match self
.log
.read()
.await
.get_last_purged()
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
{
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("get_log_state", false);
return Err(err);
}
Ok(r) => r,
};

let last = match self
.log
.read()
.await
.logs()
.last()
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
{
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("get_log_state", false);
return Err(err);
}
Ok(r) => r,
};

let last_log_id = match last {
None => last_purged_log_id,
Some(x) => Some(x.1.log_id),
};

debug!(
"get_log_state: ({:?},{:?}]",
last_purged_log_id, last_log_id
);

Ok(LogState {
last_purged_log_id,
last_log_id,
})
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}

async fn save_committed(&mut self, committed: Option<LogId>) -> Result<(), StorageError> {
self.raft_state
.write()
.await
.save_committed(committed)
.await
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write)
}

async fn read_committed(&mut self) -> Result<Option<LogId>, StorageError> {
self.raft_state
.read()
.await
.read_committed()
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read)
}

#[minitrace::trace]
async fn save_vote(&mut self, hs: &Vote) -> Result<(), StorageError> {
info!(id = self.id, hs :? =(hs); "save_vote");

match self
.raft_state
.write()
.await
.save_vote(hs)
.await
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write)
{
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("save_vote", true);
Err(err)
}
Ok(_) => Ok(()),
}
}

#[minitrace::trace]
async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
match self
.raft_state
.read()
.await
.read_vote()
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read)
{
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("read_vote", false);
Err(err)
}
Ok(vote) => Ok(vote),
}
}

#[minitrace::trace]
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<TypeConfig>,
) -> Result<(), StorageError>
where
I: IntoIterator<Item = Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
// TODO: it is bad: allocates a new vec.
let entries = entries
.into_iter()
.map(|x| {
info!("append_to_log: {}", x.log_id);
x
})
.collect::<Vec<_>>();

let res = match self.log.write().await.append(entries).await {
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("append_to_log", true);
Err(err)
}
Ok(_) => Ok(()),
};

callback.log_io_completed(res.map_err(|e| io::Error::new(ErrorKind::InvalidData, e)));

Ok(())
}

#[minitrace::trace]
async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> {
info!(id = self.id; "truncate: {}", log_id);

match self
.log
.write()
.await
.range_remove(log_id.index..)
.await
.map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete)
{
Ok(_) => Ok(()),
Err(err) => {
raft_metrics::storage::incr_raft_storage_fail("delete_conflict_logs_since", true);
Err(err)
}
}
}

#[minitrace::trace]
async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> {
info!(id = self.id, log_id :? =(&log_id); "purge upto: start");

if let Err(err) = self
.log
.write()
.await
.set_last_purged(log_id)
.await
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write)
{
raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true);
return Err(err);
};

info!(id = self.id, log_id :? =(&log_id); "purge_logs_upto: Done: set_last_purged()");

let log = self.log.write().await.clone();

// Purge can be done in another task safely, because:
//
// - Next time when raft starts, it will read last_purged_log_id without examining the actual first log.
// And junk can be removed next time purge_logs_upto() is called.
//
// - Purging operates the start of the logs, and only committed logs are purged;
// while append and truncate operates on the end of the logs,
// it is safe to run purge && (append || truncate) concurrently.
databend_common_base::runtime::spawn({
let id = self.id;
async move {
info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Start: asynchronous range_remove()");

let res = log.range_remove(..=log_id.index).await;

if let Err(err) = res {
error!(id = id, log_id :? =(&log_id); "purge_logs_upto: in asynchronous error: {}", err);
raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true);
}

info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Done: asynchronous range_remove()");
}
});

Ok(())
}
}
Loading

0 comments on commit 0ca00f9

Please sign in to comment.