Skip to content

Commit

Permalink
Merge pull request #2707 from rbtcollins/chunkedio
Browse files Browse the repository at this point in the history
Stream large files during unpacking
  • Loading branch information
kinnison authored Apr 6, 2021
2 parents 84974df + 8ad8b74 commit b132a85
Show file tree
Hide file tree
Showing 17 changed files with 633 additions and 134 deletions.
1 change: 1 addition & 0 deletions src/cli/self_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ pub fn install(

fn rustc_or_cargo_exists_in_path() -> Result<()> {
// Ignore rustc and cargo if present in $HOME/.cargo/bin or a few other directories
#[allow(clippy::ptr_arg)]
fn ignore_paths(path: &PathBuf) -> bool {
!path
.components()
Expand Down
1 change: 1 addition & 0 deletions src/cli/self_update/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ fn _apply_new_path(new_path: Option<Vec<u16>>) -> Result<()> {
}

// Tell other processes to update their environment
#[allow(clippy::unnecessary_cast)]
unsafe {
SendMessageTimeoutA(
HWND_BROADCAST,
Expand Down
2 changes: 1 addition & 1 deletion src/cli/topical_doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn index_html(doc: &DocData<'_>, wpath: &Path) -> Option<PathBuf> {
}
}

fn dir_into_vec(dir: &PathBuf) -> Result<Vec<OsString>> {
fn dir_into_vec(dir: &Path) -> Result<Vec<OsString>> {
let entries = fs::read_dir(dir).chain_err(|| format!("Opening directory {:?}", dir))?;
let mut v = Vec::new();
for entry in entries {
Expand Down
7 changes: 3 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl<'a> OverrideCfg<'a> {
|| file.toolchain.components.is_some()
|| file.toolchain.profile.is_some()
{
return Err(ErrorKind::CannotSpecifyPathAndOptions(path.into()).into());
return Err(ErrorKind::CannotSpecifyPathAndOptions(path).into());
}
Some(Toolchain::from_path(cfg, cfg_path, &path)?)
}
(Some(channel), Some(path)) => {
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path.into()).into())
return Err(ErrorKind::CannotSpecifyChannelAndPath(channel, path).into())
}
(None, None) => None,
},
Expand Down Expand Up @@ -172,8 +172,7 @@ impl PgpPublicKey {
Ok(ret)
}
use pgp::types::KeyTrait;
let mut ret = Vec::new();
ret.push(format!("from {}", self));
let mut ret = vec![format!("from {}", self)];
let key = self.key();
let keyid = format_hex(&key.key_id().to_vec(), "-", 4)?;
let algo = key.algorithm();
Expand Down
199 changes: 188 additions & 11 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,204 @@
///
/// Use for diagnosing bugs or working around any unexpected issues with the
/// threaded code paths.
use super::{perform, Executor, Item};
use std::{
fmt::Debug,
fs::{File, OpenOptions},
io::{self, Write},
path::Path,
sync::{Arc, Mutex},
time::Instant,
};

use super::{CompletedIo, Executor, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
completed_chunks: Vec<usize>,
err: Option<io::Result<()>>,
item: Option<Item>,
finished: bool,
}

pub(super) type IncrementalFileState = Arc<Mutex<Option<_IncrementalFileState>>>;

#[derive(Default, Debug)]
pub struct ImmediateUnpacker {
incremental_state: IncrementalFileState,
}

#[derive(Default)]
pub struct ImmediateUnpacker {}
impl ImmediateUnpacker {
pub fn new() -> Self {
Self {}
Self {
..Default::default()
}
}

fn deque(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
let mut guard = self.incremental_state.lock().unwrap();
// incremental file in progress
if let Some(ref mut state) = *guard {
// Case 1: pending errors
if state.finished {
let mut item = state.item.take().unwrap();
if state.err.is_some() {
let err = state.err.take().unwrap();
item.result = err;
}
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
if state.finished {
*guard = None;
}
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
// Case 2: pending chunks (which might be empty)
let mut completed_chunks = vec![];
completed_chunks.append(&mut state.completed_chunks);
Box::new(completed_chunks.into_iter().map(CompletedIo::Chunk))
}
} else {
Box::new(None.into_iter())
}
}
}

impl Executor for ImmediateUnpacker {
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = Item> + '_> {
perform(&mut item);
Box::new(Some(item).into_iter())
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
super::write_file(&item.full_path, &contents, item.mode)
}
super::Kind::IncrementalFile(_incremental_file) => {
return {
// If there is a pending error, return it, otherwise stash the
// Item for eventual return when the file is finished.
let mut guard = self.incremental_state.lock().unwrap();
if let Some(ref mut state) = *guard {
if state.err.is_some() {
let err = state.err.take().unwrap();
item.result = err;
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
*guard = None;
Box::new(Some(CompletedIo::Item(item)).into_iter())
} else {
state.item = Some(item);
Box::new(None.into_iter())
}
} else {
unreachable!();
}
};
}
};
item.finish = item
.start
.map(|s| Instant::now().saturating_duration_since(s));
Box::new(Some(CompletedIo::Item(item)).into_iter())
}

fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIo>> {
self.deque()
}

fn completed(&self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
fn incremental_file_state(&self) -> super::IncrementalFileState {
let mut state = self.incremental_state.lock().unwrap();
if state.is_some() {
unreachable!();
} else {
*state = Some(_IncrementalFileState {
completed_chunks: vec![],
err: None,
item: None,
finished: false,
});
super::IncrementalFileState::Immediate(self.incremental_state.clone())
}
}
}

/// The non-shared state for writing a file incrementally
#[derive(Debug)]
pub(super) struct IncrementalFileWriter {
state: IncrementalFileState,
file: Option<File>,
path_display: String,
}

impl IncrementalFileWriter {
#[allow(unused_variables)]
pub fn new<P: AsRef<Path>>(
path: P,
mode: u32,
state: IncrementalFileState,
) -> std::result::Result<Self, io::Error> {
let mut opts = OpenOptions::new();
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opts.mode(mode);
}
let path = path.as_ref();
let path_display = format!("{}", path.display());
let file = Some({
trace_scoped!("creat", "name": path_display);
opts.write(true).create(true).truncate(true).open(path)?
});
Ok(IncrementalFileWriter {
file,
state,
path_display,
})
}

pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
match self.write(chunk) {
Ok(v) => v,
Err(e) => {
let mut state = self.state.lock().unwrap();
if let Some(ref mut state) = *state {
state.err.replace(Err(e));
state.finished = true;
false
} else {
false
}
}
}
}

fn write(&mut self, chunk: Vec<u8>) -> std::result::Result<bool, io::Error> {
let mut state = self.state.lock().unwrap();
if let Some(ref mut state) = *state {
if let Some(ref mut file) = (&mut self.file).as_mut() {
// Length 0 vector is used for clean EOF signalling.
if chunk.is_empty() {
trace_scoped!("close", "name:": self.path_display);
drop(std::mem::take(&mut self.file));
state.finished = true;
} else {
trace_scoped!("write_segment", "name": self.path_display, "len": chunk.len());
file.write_all(&chunk)?;

state.completed_chunks.push(chunk.len());
}
Ok(true)
} else {
Ok(false)
}
} else {
unreachable!();
}
}
}
Loading

0 comments on commit b132a85

Please sign in to comment.