Skip to content

Commit

Permalink
working on new version
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Jul 17, 2024
1 parent a0637ae commit 154d89c
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 104 deletions.
109 changes: 11 additions & 98 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ mod ops;
mod plan;

use crate::ops::{generate_random_file, generate_zero_file, truncate_file};
use crate::plan::{explain_plan, plan_chunks, plan_into_realization};
use crate::plan::{commit_plan, explain_plan, plan_chunks, plan_into_realization};
use clap::Parser;
use std::env;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Write};

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -16,12 +14,8 @@ struct Args {
file: String,

/// Percent of file at which truncate should be performed
#[clap(short, long, default_value = "50")]
truncate: f64,

/// Time to wait before truncating the file
#[clap(short, long, default_value = "30")]
safety_time: u64,
#[clap(short, long, default_value = "50000")]
chunk_size: u64,

#[clap(long)]
test_create_zero_file_size: Option<u64>,
Expand All @@ -42,96 +36,15 @@ struct Args {
test_random: bool,
}

fn cat_file(file_path: &str, drop_percent: f64, safety_time: u64) -> anyhow::Result<()> {
let mut buffer = Vec::new();
let mut stdout = io::stdout();

let mut is_first = true;
// Read the file
'outer: loop {
let drop_bytes =
(drop_percent / 100.0) * std::fs::metadata(file_path).unwrap().len() as f64;
let drop_bytes = drop_bytes as usize + 10000000;
log::info!(
"Reading file {}, drop at {}% - limit bytes {}",
file_path,
drop_percent,
drop_bytes
);
//open file and check if you have write permission at the same time
let mut file = OpenOptions::new()
.read(true)
.truncate(false)
.open(file_path)
.unwrap();
let file_path_copy = format!("{file_path}.part");
let mut local_bytes_read = 0;
loop {
buffer.resize(1000 * 1000, 0);
let bytes_read = file.read(buffer.as_mut_slice()).unwrap();
if bytes_read == 0 {
break;
}
if bytes_read < buffer.len() {
buffer.resize(bytes_read, 0);
}
local_bytes_read += bytes_read;

stdout.write_all(&buffer)?;

stdout.flush()?;
if local_bytes_read > drop_bytes {
break;
}
}
let mut bytes_written = 0;
let mut file_copy = File::create(&file_path_copy).unwrap();
log::info!(
"Writing rest of the file to {}. {}/{}",
file_path_copy,
local_bytes_read,
std::fs::metadata(file_path).unwrap().len()
);
loop {
buffer.resize(1000 * 1000, 0);
let bytes_read = file.read(buffer.as_mut_slice()).unwrap();
if bytes_read == 0 {
break;
}
if bytes_read < buffer.len() {
buffer.resize(bytes_read, 0);
}
file_copy.write_all(buffer.as_slice()).unwrap();
bytes_written += bytes_read;
}
log::info!(
"Finished reading and copying file, bytes written: {}",
bytes_written
);
//remove the file

file_copy.flush().unwrap();
drop(file);
drop(file_copy);

if is_first {
log::warn!("At this point file {file_path} will be removed without possibility to recover, you have {safety_time} seconds to stop the script and prevent this.");
std::thread::sleep(std::time::Duration::from_secs(safety_time));
log::warn!("Time passed, removing file {file_path}. Do not stop script the script after this point");
is_first = false;
}

std::fs::remove_file(file_path).unwrap();
if bytes_written == 0 {
log::info!("Removing empty file {}", file_path_copy);
std::fs::remove_file(&file_path_copy).unwrap();
break 'outer;
}
std::fs::rename(&file_path_copy, file_path).unwrap();
}
Ok(())
fn cat_file(file_path: &str, chunk_size: u64) -> anyhow::Result<()> {
let file_size = std::fs::metadata(file_path)?.len();
let chunk_size = std::cmp::min(file_size, chunk_size) as u64;
let plan = plan_chunks(chunk_size, file_size).unwrap();
let operations = plan_into_realization(plan).unwrap();
commit_plan(file_path, &operations)
}


fn main() -> anyhow::Result<()> {
env::set_var(
"RUST_LOG",
Expand Down Expand Up @@ -167,7 +80,7 @@ fn main() -> anyhow::Result<()> {
}

if !test_run {
return cat_file(&args.file, args.truncate, args.safety_time);
return cat_file(&args.file, args.chunk_size);
}
Ok(())
}
128 changes: 126 additions & 2 deletions src/ops.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::bail;
use anyhow::{bail};
use rand::distributions::{Alphanumeric, DistString};
use rand::{thread_rng, Rng};
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::io::{Read, Seek, SeekFrom, Write};

fn truncate_file_int(file_path: &str, target_size: u64) -> anyhow::Result<()> {
//1 open file
Expand Down Expand Up @@ -111,3 +111,127 @@ pub fn generate_random_file(file_path: &str, len: u64, is_ascii: bool) -> anyhow
}
}
}

fn ranges_overlap(src: (u64, u64), dst: (u64, u64)) -> bool {
!(src.1 <= dst.0 || src.0 >= dst.1)
}

#[test]
fn test_ranges_overlap_no_overlap() {
assert!(!ranges_overlap((10, 20), (20, 30)));
assert!(!ranges_overlap((30, 40), (10, 20)));
assert!(!ranges_overlap((0, 10), (10, 20)));
assert!(!ranges_overlap((10, 20), (0, 10)));
assert!(!ranges_overlap((10, 20), (20, 30)));
assert!(!ranges_overlap((20, 30), (10, 20)));
assert!(ranges_overlap((10, 20), (15, 25)));
assert!(ranges_overlap((15, 25), (10, 20)));
assert!(ranges_overlap((10, 20), (10, 20)));
assert!(ranges_overlap((10, 30), (15, 25)));
assert!(ranges_overlap((15, 25), (10, 30)));
assert!(ranges_overlap((10, 30), (15, 20)));
assert!(ranges_overlap((15, 20), (10, 30)));
assert!(!ranges_overlap((0, 1), (1, 2)));
assert!(!ranges_overlap((1, 2), (0, 1)));
}

fn copy_chunk_int(file_path: &str, src: (u64, u64), dst: (u64, u64)) -> anyhow::Result<()> {
if src.1 <= src.0 {
bail!("Source range is invalid {}-{}", src.0, src.1);
}
if dst.1 <= dst.0 {
bail!("Destination range is invalid {}-{}", dst.0, dst.1);
}
if src.1 - src.0 != dst.1 - dst.0 {
bail!("Source and destination ranges are not the same size {}-{} {}-{}", src.0, src.1, dst.0, dst.1);
}

//check if chunks are overlapping
if ranges_overlap(src, dst) {
bail!("Source and destination ranges overlap {}-{} {}-{}", src.0, src.1, dst.0, dst.1);
}
let file_size = std::fs::metadata(file_path)?.len();
if src.1 > file_size {
bail!("Source range is out of bounds {}-{} file size {}", src.0, src.1, file_size);
}
if dst.1 > file_size {
bail!("Destination range is out of bounds {}-{} file size {}", dst.0, dst.1, file_size);
}

//open file for read write
let mut file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.open(file_path)?;

file.seek(SeekFrom::Start(src.0))?;
//seek to source start

//read buffer
let mut bytes_left = src.1 - src.0;
let mut buffer = vec![0u8; std::cmp::min(1000 * 1000, bytes_left as usize)];
while bytes_left > 0 {
let bytes_read = std::cmp::min(buffer.len() as u64, bytes_left);
buffer.resize(bytes_read as usize, 0);
file.read_exact(buffer.as_mut_slice())?;
bytes_left -= bytes_read;

//seek to destination start
file.seek(SeekFrom::Start(dst.0))?;
file.write_all(&buffer)?;
}

Ok(())
}



pub fn copy_chunk(file_path: &str, src: (u64, u64), dst: (u64, u64)) -> anyhow::Result<()> {
match copy_chunk_int(file_path, src, dst) {
Ok(_) => Ok(()),
Err(e) => {
log::error!("Error copying chunk {}: {}", file_path, e);
Err(e)
}
}

}

pub fn output_chunk_int(file_path: &str, data: (u64, u64)) -> anyhow::Result<()> {
if data.1 <= data.0 {
bail!("Data range is invalid {}-{}", data.0, data.1);
}
let file_size = std::fs::metadata(file_path)?.len();
if data.1 > file_size {
bail!("Data range is out of bounds {}-{} file size {}", data.0, data.1, file_size);
}

//open file for read write
let mut file = OpenOptions::new()
.read(true)
.truncate(false)
.open(file_path)?;

file.seek(SeekFrom::Start(data.0))?;
//read buffer
let mut bytes_left = data.1 - data.0;
let mut buffer = vec![0u8; std::cmp::min(1000 * 1000, bytes_left as usize)];
while bytes_left > 0 {
let bytes_read = std::cmp::min(buffer.len() as u64, bytes_left);
buffer.resize(bytes_read as usize, 0);
file.read_exact(buffer.as_mut_slice())?;
bytes_left -= bytes_read;
std::io::stdout().write_all(&buffer)?;
}
Ok(())
}
pub fn output_chunk(file_path: &str, data: (u64, u64)) -> anyhow::Result<()> {
match output_chunk_int(file_path, data) {
Ok(_) => Ok(()),
Err(e) => {
log::error!("Error outputting chunk {}: {}", file_path, e);
Err(e)
}
}
}
26 changes: 22 additions & 4 deletions src/plan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::bail;
use crate::ops::{copy_chunk, output_chunk, truncate_file};

pub struct ChunkPlan {
chunk_size: u64,
Expand Down Expand Up @@ -36,6 +37,25 @@ pub struct Operation {
pub is_middle: bool
}

pub fn commit_plan(file_path: &str, operations: &[Operation]) -> anyhow::Result<()> {
let mut step_no = 0;
for op in operations {
let middle_msg = if op.is_middle { "(middle) " } else { "" };
log::info!("{} - {}Output chunk {}-{}", step_no, middle_msg, op.data_chunk.0, op.data_chunk.1);
output_chunk(file_path, op.data_chunk).unwrap();
step_no += 1;
if let Some((src_start, src_end)) = op.src_chunk {
log::info!("{} - Copy {} bytes from {}-{} to {}-{}", step_no, src_end - src_start, src_start, src_end, op.data_chunk.0, op.data_chunk.1);
copy_chunk(file_path, (src_start, src_end), op.data_chunk).unwrap();
}
step_no += 1;
log::info!("{} - Truncate file to {} bytes", step_no, op.truncate_to);
truncate_file(file_path, op.truncate_to).unwrap();
step_no += 1;
}
Ok(())
}

pub fn explain_plan(operations: &[Operation]) {
let mut step_no = 0;
for op in operations {
Expand All @@ -44,8 +64,6 @@ pub fn explain_plan(operations: &[Operation]) {
step_no += 1;
if let Some((src_start, src_end)) = op.src_chunk {
log::info!("{} - Copy {} bytes from {}-{} to {}-{}", step_no, src_end - src_start, src_start, src_end, op.data_chunk.0, op.data_chunk.1);
} else {
log::info!("{} - Output chunk {} - pos {}-{}", step_no, op.chunk_no, op.data_chunk.0, op.data_chunk.1);
}
step_no += 1;
log::info!("{} - Truncate file to {} bytes", step_no, op.truncate_to);
Expand Down Expand Up @@ -108,14 +126,14 @@ pub fn plan_into_realization(plan: ChunkPlan) -> anyhow::Result<Vec<Operation>>
for i in 0..plan.start_chunks {
let chunk_no = plan.start_chunks - i - 1;
let dst_chunk_start = chunk_no * plan.chunk_size;
let dst_chunk_end = dst_chunk_start + plan.middle_right_size;
let dst_chunk_end = dst_chunk_start + plan.chunk_size;

operations.push(Operation {
chunk_no: operation_no,
src_chunk: None,
data_chunk: (dst_chunk_start, dst_chunk_end),
truncate_to: plan.chunk_size * chunk_no,
is_middle: true,
is_middle: false,
});
operation_no += 1;
if operation_no > operation_limit {
Expand Down

0 comments on commit 154d89c

Please sign in to comment.