Skip to content

Commit

Permalink
fix: Avoid duplicate temp file name for install-snapshot requests
Browse files Browse the repository at this point in the history
This commit resolves an issue where duplicate temporary file names were
generated when two install-snapshot requests were received in close
temporal proximity. This could potentially lead to conflicts and
erroneous behavior during snapshot installation.

Changes:

- Added logic to generate unique temporary file names for each
  install-snapshot request to prevent naming conflicts.

- Additionally, this commit enhances error handling by including context
  information in the `io::Error` returned from `SnapshotStoreV002`. This
  improvement aids in better understanding the source and nature of
  errors when they occur.
  • Loading branch information
drmingdrmer committed May 17, 2024
1 parent 8d6ebaf commit 43c3d84
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 31 deletions.
51 changes: 49 additions & 2 deletions src/meta/raft-store/src/sm_v002/snapshot_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct SnapshotStoreV002 {
}

impl SnapshotStoreV002 {
const TEMP_PREFIX: &'static str = "0.snap";

pub fn new(data_version: DataVersion, config: RaftConfig) -> Self {
SnapshotStoreV002 {
data_version,
Expand Down Expand Up @@ -130,12 +132,15 @@ impl SnapshotStoreV002 {
}

pub fn snapshot_temp_path(&self) -> String {
// Sleep to avoid timestamp collision when this function is called twice in a short time.
std::thread::sleep(std::time::Duration::from_millis(2));

let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();

format!("{}/0.snap-{}", self.snapshot_dir(), ts)
format!("{}/{}-{}", self.snapshot_dir(), Self::TEMP_PREFIX, ts)
}

/// Return a list of valid snapshot ids found in the snapshot directory.
Expand Down Expand Up @@ -164,7 +169,19 @@ impl SnapshotStoreV002 {

info!("cleaning old snapshots in {}", dir);

let (snapshot_ids, invalid_files) = self.load_snapshot_ids().await?;
let (snapshot_ids, mut invalid_files) = self.load_snapshot_ids().await?;

// The last several temp files may be in use by snapshot transmitting.
// And do not delete them at once.
{
let l = invalid_files.len();
if l > 2 {
invalid_files = invalid_files.into_iter().take(l - 2).collect();
} else {
invalid_files = vec![];
}
}

for invalid_file in invalid_files {
let path = format!("{}/{}", dir, invalid_file);

Expand Down Expand Up @@ -240,7 +257,10 @@ impl SnapshotStoreV002 {
}

snapshot_ids.sort();
invalid_files.sort();

info!("dir: {}; loaded snapshots: {:?}", dir, snapshot_ids);
info!("dir: {}; invalid files: {:?}", dir, invalid_files);

Ok((snapshot_ids, invalid_files))
}
Expand Down Expand Up @@ -329,3 +349,30 @@ impl SnapshotStoreV002 {
SnapshotStoreError::read(e).with_context(context)
}
}

#[cfg(test)]
mod tests {
use crate::config::RaftConfig;
use crate::ondisk::DATA_VERSION;

#[test]
fn test_temp_path_no_dup() -> anyhow::Result<()> {
let temp = tempfile::tempdir()?;
let p = temp.path();
let raft_config = RaftConfig {
raft_dir: p.to_str().unwrap().to_string(),
..Default::default()
};

let store = super::SnapshotStoreV002::new(DATA_VERSION, raft_config);

let mut prev = None;
for _i in 0..10 {
let path = store.snapshot_temp_path();
assert_ne!(prev, Some(path.clone()), "dup: {}", path);
prev = Some(path);
}

Ok(())
}
}
64 changes: 35 additions & 29 deletions src/meta/types/src/raft_snapshot_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::task::Poll;
use tokio::fs;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
Expand All @@ -41,7 +40,10 @@ impl SnapshotData {
.create(false)
.create_new(false)
.read(true)
.open(&path)?;
.open(&path)
.map_err(|e| {
io::Error::new(e.kind(), format!("{}: while open(); path: {}", e, path))
})?;

Ok(SnapshotData {
is_temp: false,
Expand All @@ -56,7 +58,10 @@ impl SnapshotData {
.write(true)
.read(true)
.open(&path)
.await?;
.await
.map_err(|e| {
io::Error::new(e.kind(), format!("{}: while new_temp(); path: {}", e, path))
})?;

Ok(SnapshotData {
is_temp: true,
Expand All @@ -66,7 +71,12 @@ impl SnapshotData {
}

pub async fn data_size(&self) -> Result<u64, io::Error> {
self.f.metadata().await.map(|m| m.len())
self.f.metadata().await.map(|m| m.len()).map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while data_size(); path: {}", e, self.path),
)
})
}

pub fn is_temp(&self) -> bool {
Expand All @@ -82,44 +92,40 @@ impl SnapshotData {
}

pub async fn sync_all(&mut self) -> Result<(), io::Error> {
self.f.flush().await?;
self.f.sync_all().await
}

pub async fn read_lines<T>(self: Box<SnapshotData>) -> Result<Vec<T>, io::Error>
where T: serde::de::DeserializeOwned {
let mut res = vec![];

let b = BufReader::new(self);
let mut lines = AsyncBufReadExt::lines(b);

while let Some(l) = lines.next_line().await? {
let ent: T = serde_json::from_str(&l)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
res.push(ent)
}

Ok(res)
self.f.flush().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while flush(); path: {}", e, self.path),
)
})?;

self.f.sync_all().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while sync_all(); path: {}", e, self.path),
)
})
}

pub async fn read_to_lines(self: Box<SnapshotData>) -> Result<Vec<String>, io::Error> {
let mut res = vec![];

let path = self.path.clone();

let b = BufReader::new(self);
let mut lines = AsyncBufReadExt::lines(b);

while let Some(l) = lines.next_line().await? {
while let Some(l) = lines.next_line().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while read_to_lines(); path: {}", e, path),
)
})? {
res.push(l)
}

Ok(res)
}

pub async fn read_to_string(self: &mut Box<SnapshotData>) -> Result<String, io::Error> {
let mut res = String::new();
self.f.read_to_string(&mut res).await?;
Ok(res)
}
}

impl AsyncRead for SnapshotData {
Expand Down

0 comments on commit 43c3d84

Please sign in to comment.