Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid duplicate temp file name for install-snapshot requests #15565

Merged
merged 1 commit into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions src/meta/raft-store/src/sm_v002/snapshot_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct SnapshotStoreV002 {
}

impl SnapshotStoreV002 {
const TEMP_PREFIX: &'static str = "0.snap";

pub fn new(data_version: DataVersion, config: RaftConfig) -> Self {
SnapshotStoreV002 {
data_version,
Expand Down Expand Up @@ -130,12 +132,15 @@ impl SnapshotStoreV002 {
}

pub fn snapshot_temp_path(&self) -> String {
// Sleep to avoid timestamp collision when this function is called twice in a short time.
std::thread::sleep(std::time::Duration::from_millis(2));

let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();

format!("{}/0.snap-{}", self.snapshot_dir(), ts)
format!("{}/{}-{}", self.snapshot_dir(), Self::TEMP_PREFIX, ts)
}

/// Return a list of valid snapshot ids found in the snapshot directory.
Expand Down Expand Up @@ -164,7 +169,19 @@ impl SnapshotStoreV002 {

info!("cleaning old snapshots in {}", dir);

let (snapshot_ids, invalid_files) = self.load_snapshot_ids().await?;
let (snapshot_ids, mut invalid_files) = self.load_snapshot_ids().await?;

// The last several temp files may be in use by snapshot transmitting.
// And do not delete them at once.
{
let l = invalid_files.len();
if l > 2 {
invalid_files = invalid_files.into_iter().take(l - 2).collect();
} else {
invalid_files = vec![];
}
}

for invalid_file in invalid_files {
let path = format!("{}/{}", dir, invalid_file);

Expand Down Expand Up @@ -240,7 +257,10 @@ impl SnapshotStoreV002 {
}

snapshot_ids.sort();
invalid_files.sort();

info!("dir: {}; loaded snapshots: {:?}", dir, snapshot_ids);
info!("dir: {}; invalid files: {:?}", dir, invalid_files);

Ok((snapshot_ids, invalid_files))
}
Expand Down Expand Up @@ -329,3 +349,30 @@ impl SnapshotStoreV002 {
SnapshotStoreError::read(e).with_context(context)
}
}

#[cfg(test)]
mod tests {
use crate::config::RaftConfig;
use crate::ondisk::DATA_VERSION;

#[test]
fn test_temp_path_no_dup() -> anyhow::Result<()> {
let temp = tempfile::tempdir()?;
let p = temp.path();
let raft_config = RaftConfig {
raft_dir: p.to_str().unwrap().to_string(),
..Default::default()
};

let store = super::SnapshotStoreV002::new(DATA_VERSION, raft_config);

let mut prev = None;
for _i in 0..10 {
let path = store.snapshot_temp_path();
assert_ne!(prev, Some(path.clone()), "dup: {}", path);
prev = Some(path);
}

Ok(())
}
}
64 changes: 35 additions & 29 deletions src/meta/types/src/raft_snapshot_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::task::Poll;
use tokio::fs;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
Expand All @@ -41,7 +40,10 @@ impl SnapshotData {
.create(false)
.create_new(false)
.read(true)
.open(&path)?;
.open(&path)
.map_err(|e| {
io::Error::new(e.kind(), format!("{}: while open(); path: {}", e, path))
})?;

Ok(SnapshotData {
is_temp: false,
Expand All @@ -56,7 +58,10 @@ impl SnapshotData {
.write(true)
.read(true)
.open(&path)
.await?;
.await
.map_err(|e| {
io::Error::new(e.kind(), format!("{}: while new_temp(); path: {}", e, path))
})?;

Ok(SnapshotData {
is_temp: true,
Expand All @@ -66,7 +71,12 @@ impl SnapshotData {
}

pub async fn data_size(&self) -> Result<u64, io::Error> {
self.f.metadata().await.map(|m| m.len())
self.f.metadata().await.map(|m| m.len()).map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while data_size(); path: {}", e, self.path),
)
})
}

pub fn is_temp(&self) -> bool {
Expand All @@ -82,44 +92,40 @@ impl SnapshotData {
}

pub async fn sync_all(&mut self) -> Result<(), io::Error> {
self.f.flush().await?;
self.f.sync_all().await
}

pub async fn read_lines<T>(self: Box<SnapshotData>) -> Result<Vec<T>, io::Error>
where T: serde::de::DeserializeOwned {
let mut res = vec![];

let b = BufReader::new(self);
let mut lines = AsyncBufReadExt::lines(b);

while let Some(l) = lines.next_line().await? {
let ent: T = serde_json::from_str(&l)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
res.push(ent)
}

Ok(res)
self.f.flush().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while flush(); path: {}", e, self.path),
)
})?;

self.f.sync_all().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while sync_all(); path: {}", e, self.path),
)
})
}

pub async fn read_to_lines(self: Box<SnapshotData>) -> Result<Vec<String>, io::Error> {
let mut res = vec![];

let path = self.path.clone();

let b = BufReader::new(self);
let mut lines = AsyncBufReadExt::lines(b);

while let Some(l) = lines.next_line().await? {
while let Some(l) = lines.next_line().await.map_err(|e| {
io::Error::new(
e.kind(),
format!("{}: while read_to_lines(); path: {}", e, path),
)
})? {
res.push(l)
}

Ok(res)
}

pub async fn read_to_string(self: &mut Box<SnapshotData>) -> Result<String, io::Error> {
let mut res = String::new();
self.f.read_to_string(&mut res).await?;
Ok(res)
}
}

impl AsyncRead for SnapshotData {
Expand Down
Loading