Skip to content

Commit

Permalink
Early draft of greedy fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Gjengset committed Nov 24, 2020
1 parent 8f99f76 commit c5b2468
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 31 deletions.
174 changes: 143 additions & 31 deletions src/cargo/sources/registry/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl<'cfg> RegistryIndex<'cfg> {
// has run previously this will parse a Cargo-specific cache file rather
// than the registry itself. In effect this is intended to be a quite
// cheap operation.
let summaries = self.load_summaries(name, load)?;
let summaries = self.load_summaries(name, req, load)?;

// Iterate over our summaries, extract all relevant ones which match our
// version requirement, and then parse all corresponding rows in the
Expand Down Expand Up @@ -307,9 +307,10 @@ impl<'cfg> RegistryIndex<'cfg> {
}))
}

fn load_summaries(
fn load_summaries<'a>(
&mut self,
name: InternedString,
req: &'a VersionReq,
load: &mut dyn RegistryData,
) -> CargoResult<&mut Summaries> {
// If we've previously loaded what versions are present for `name`, just
Expand Down Expand Up @@ -351,6 +352,7 @@ impl<'cfg> RegistryIndex<'cfg> {
root,
&cache_root,
path.as_ref(),
req,
self.source_id,
load,
self.config,
Expand Down Expand Up @@ -479,11 +481,12 @@ impl Summaries {
/// create summaries.
/// * `load` - the actual index implementation which may be very slow to
/// call. We avoid this if we can.
pub fn parse(
pub fn parse<'a>(
index_version: Option<&str>,
root: &Path,
cache_root: &Path,
relative: &Path,
req: &'a VersionReq,
source_id: SourceId,
load: &mut dyn RegistryData,
config: &Config,
Expand Down Expand Up @@ -512,39 +515,148 @@ impl Summaries {
}
}

// This is the fallback path where we actually talk to libgit2 to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
// We failed to load from cache, and are going to have to talk to the registry backend (be
// it libgit2 or an HTTP endpoint) to get the data of the index file. To amortize this
// cost, we first try to do a pipelined greedy-fetch of all the transitive dependencies,
// and then do the individual loads below.
let mut ret = Summaries::default();
let mut hit_closure = false;
let mut cache_bytes = None;
let err = load.load(root, relative, &mut |contents| {
ret.raw_data = contents.to_vec();
let mut cache = SummariesCache::default();
hit_closure = true;
for line in split(contents, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
let mut next = HashMap::new();
let err;
if load.start_prefetch() {
log::debug!("prefetching transitive dependencies of {:?}", relative);
load.prefetch(root, relative, req.clone())?;
// TODO: something something something hit_closure and errs
while let Some(fetched) = load.next_prefetched()? {
if fetched.path == relative {
hit_closure = true;
}

let mut cache = SummariesCache::default();
for line in split(fetched.bytes, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
if fetched.path == relative {
ret.versions.insert(version, summary.into());
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
}
if let Some(index_version) = index_version {
cache_bytes = Some(cache.serialize(index_version));
if req.matches(&version) {
if summary.yanked {
continue;
}

for dep in summary.summary.dependencies() {
if dep.source_id() != source_id {
continue;
}

// NOTE: It'd be nice to just call Summaries::parse recursively here,
// but unfortunately that leads down a road of pain because the
// RegistryData is shared. A recursive call may end up getting the
// response enqueued by a call further up the stack, and then doesn't
// have the correct context to handle that response.

let relative = unimplemented!();
let cache_path = cache_root.join(relative);
let mut have = false;
if let Some(index_version) = index_version {
// NOTE: we're going to read this off disk >1 time.
if let Ok(contents) = fs::read(&cache_path) {
if let Ok(_) = Summaries::parse_cache(contents, index_version) {
have = true;
}
}
}
// NOTE: we're assuming here that if a dependency is present in cache,
// all of its dependencies are in cache too, and don't need to also be
// prefetched.
//
// NOTE: we probably want to make sure that the things we prefetch do
// not get double-checked later on _unless_ there has been an
// update_index.

if !have {
// NOTE: The .clone + Vec here is pretty sad.
next.entry(dep.package_name())
.or_insert_with(Vec::new)
.push(dep.version_req().clone());
}
}
}
}

// We fetched the bytes, so might as well write them out to cache.
if let Some(index_version) = index_version {
let bytes = cache.serialize(index_version);
if fetched.path == relative {
// This will be written out below.
cache_bytes = Some(bytes);
} else {
let cache_path = cache_root.join(fetched.path);
if paths::create_dir_all(cache_path.parent().unwrap()).is_ok() {
let path = Filesystem::new(cache_path.clone());
config.assert_package_cache_locked(&path);
if let Err(e) = fs::write(cache_path, bytes) {
log::info!("failed to write cache: {}", e);
}
}
}
}

for (name, version_reqs) in next.drain() {
let relative = unimplemented!();
// FIXME: What do we do if there are multiple version requirements through
// different dependencies? What if a later dependency adds a version
// requirement that introduces an earlier version that in turn has more
// dependencies we haven't prefetched yet?

load.prefetch(root, relative, version_req);
}
}
Ok(())
});
} else {
// This is the fallback path where we actually talk to libgit2 to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
err = load.load(root, relative, &mut |contents| {
ret.raw_data = contents.to_vec();
let mut cache = SummariesCache::default();
hit_closure = true;
for line in split(contents, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
}
if let Some(index_version) = index_version {
cache_bytes = Some(cache.serialize(index_version));
}
Ok(())
});
}

// We ignore lookup failures as those are just crates which don't exist
// or we haven't updated the registry yet. If we actually ran the
Expand Down
17 changes: 17 additions & 0 deletions src/cargo/sources/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,26 @@ impl<'a> RegistryDependency<'a> {
}
}

pub struct Fetched<'a> {
path: &'a Path,
req: &'a semver::VersionReq,
bytes: &'a [u8],
}

pub trait RegistryData {
fn prepare(&self) -> CargoResult<()>;
fn index_path(&self) -> &Filesystem;

fn start_prefetch(&mut self) -> bool {
false
}
fn prefetch(&mut self, root: &Path, path: &Path, req: semver::VersionReq) -> CargoResult<()> {
Ok(())
}
fn next_prefetched(&mut self) -> CargoResult<Option<Fetched<'a>>> {
None
}

fn load(
&self,
root: &Path,
Expand Down

0 comments on commit c5b2468

Please sign in to comment.