Skip to content

Commit

Permalink
refactor: improve install-snapshot performance (#15345)
Browse files Browse the repository at this point in the history
Move the IO into a separate thread and use `std::io` instead of `tokio::io`.
drmingdrmer authored Apr 26, 2024

Verified

This commit was signed with the committer’s verified signature.
yuyichao Yichao Yu
1 parent f2a49de commit 0b23dcb
Showing 3 changed files with 39 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/meta/raft-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ test = true
io-uring = ["databend-common-meta-sled-store/io-uring"]

[dependencies]
databend-common-base = { path = "../../common/base" }
databend-common-exception = { path = "../../common/exception" }
databend-common-grpc = { path = "../../common/grpc" }
databend-common-meta-api = { path = "../api" }
44 changes: 34 additions & 10 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
@@ -42,8 +42,6 @@ use log::debug;
use log::info;
use log::warn;
use openraft::RaftLogId;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::sync::RwLock;

use crate::applier::Applier;
@@ -151,17 +149,43 @@ impl SMV002 {

let mut importer = sm_v002::SMV002::new_importer();

let br = BufReader::new(data);
let mut lines = AsyncBufReadExt::lines(br);
// AsyncBufReadExt::lines() is a bit slow.
//
// let br = BufReader::with_capacity(16 * 1024 * 1024, data);
// let mut lines = AsyncBufReadExt::lines(br);
// while let Some(l) = lines.next_line().await? {
// let ent: RaftStoreEntry = serde_json::from_str(&l)
// .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
// importer.import(ent)?;
// }

let f = data.into_std().await;

let h = databend_common_base::runtime::spawn_blocking(move || {
let mut br = std::io::BufReader::with_capacity(16 * 1024 * 1024, f);
let mut line_buf = String::with_capacity(4 * 1024);

loop {
line_buf.clear();
let n_read = std::io::BufRead::read_line(&mut br, &mut line_buf)?;
if n_read == 0 {
break;
}

let ent: RaftStoreEntry = serde_json::from_str(&line_buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

importer.import(ent)?;
}

while let Some(l) = lines.next_line().await? {
let ent: RaftStoreEntry = serde_json::from_str(&l)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let level_data = importer.commit();
Ok::<_, io::Error>(level_data)
});

importer.import(ent)?;
}
let level_data = h
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))??;

let level_data = importer.commit();
let new_last_applied = *level_data.last_applied_ref();

{
4 changes: 4 additions & 0 deletions src/meta/types/src/raft_snapshot_data.rs
Original file line number Diff line number Diff line change
@@ -77,6 +77,10 @@ impl SnapshotData {
&self.path
}

pub async fn into_std(self) -> std::fs::File {
self.f.into_std().await
}

pub async fn sync_all(&mut self) -> Result<(), io::Error> {
self.f.flush().await?;
self.f.sync_all().await

0 comments on commit 0b23dcb

Please sign in to comment.