Skip to content

Commit

Permalink
Merge branch 'fetch-pack'
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Oct 3, 2022
2 parents 3c49400 + d7f62b4 commit f47c891
Show file tree
Hide file tree
Showing 32 changed files with 707 additions and 212 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Please see _'Development Status'_ for a listing of all crates and their capabili
* **remote**
* [x] **refs** - list all references available on the remote based on the current remote configuration.
* [x] **ref-map** - show how remote references relate to their local tracking branches as mapped by refspecs.
* **fetch** - fetch the current remote or the given one, optionally just as dry-run.
* **credential**
* [x] **fill/approve/reject** - The same as `git credential`, but implemented in Rust, calling helpers only when from trusted configuration.
* **free** - no git repository necessary
Expand Down
33 changes: 21 additions & 12 deletions git-pack/src/bundle/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::io::Write;
use std::{
io,
path::{Path, PathBuf},
sync::{atomic::AtomicBool, Arc},
};

use git_features::{interrupt, progress, progress::Progress};
use git_tempfile::{handle::Writable, AutoRemove, ContainingDirectory};
use git_tempfile::{AutoRemove, ContainingDirectory};

use crate::data;

mod error;
pub use error::Error;

mod types;
use crate::bundle::write::types::SharedTempFile;
use types::{LockWriter, PassThrough};
pub use types::{Options, Outcome};

Expand Down Expand Up @@ -55,10 +57,13 @@ impl crate::Bundle {
};

let object_hash = options.object_hash;
let data_file = Arc::new(parking_lot::Mutex::new(match directory.as_ref() {
Some(directory) => git_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?,
None => git_tempfile::new(std::env::temp_dir(), ContainingDirectory::Exists, AutoRemove::Tempfile)?,
}));
let data_file = Arc::new(parking_lot::Mutex::new(io::BufWriter::with_capacity(
64 * 1024,
match directory.as_ref() {
Some(directory) => git_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?,
None => git_tempfile::new(std::env::temp_dir(), ContainingDirectory::Exists, AutoRemove::Tempfile)?,
},
)));
let (pack_entries_iter, pack_kind): (
Box<dyn Iterator<Item = Result<data::input::Entry, data::input::Error>>>,
_,
Expand Down Expand Up @@ -97,7 +102,7 @@ impl crate::Bundle {
},
writer: Some(data_file.clone()),
};
// This buff-reader is required to assure we call 'read()' in order to fill the (extra) buffer. Otherwise all the counting
// This buf-reader is required to assure we call 'read()' in order to fill the (extra) buffer. Otherwise all the counting
// we do with the wrapped pack reader doesn't work as it does not expect anyone to call BufRead functions directly.
// However, this is exactly what's happening in the ZipReader implementation that is eventually used.
// The performance impact of this is probably negligible, compared to all the other work that is done anyway :D.
Expand Down Expand Up @@ -153,10 +158,10 @@ impl crate::Bundle {
progress: progress::ThroughputOnDrop::new(read_progress),
};

let data_file = Arc::new(parking_lot::Mutex::new(match directory.as_ref() {
let data_file = Arc::new(parking_lot::Mutex::new(io::BufWriter::new(match directory.as_ref() {
Some(directory) => git_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?,
None => git_tempfile::new(std::env::temp_dir(), ContainingDirectory::Exists, AutoRemove::Tempfile)?,
}));
})));
let object_hash = options.object_hash;
let eight_pages = 4096 * 8;
let (pack_entries_iter, pack_kind): (
Expand Down Expand Up @@ -231,7 +236,7 @@ impl crate::Bundle {
index_version: index_kind,
object_hash,
}: Options,
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
data_file: SharedTempFile,
pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>,
should_interrupt: &AtomicBool,
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error> {
Expand All @@ -255,12 +260,14 @@ impl crate::Bundle {
object_hash,
)?;

let data_path = directory.join(format!("{}.pack", outcome.data_hash.to_hex()));
let data_path = directory.join(format!("pack-{}.pack", outcome.data_hash.to_hex()));
let index_path = data_path.with_extension("idx");

Arc::try_unwrap(data_file)
.expect("only one handle left after pack was consumed")
.into_inner()
.into_inner()
.map_err(|err| Error::from(err.into_error()))?
.persist(&data_path)?;
index_file
.persist(&index_path)
Expand Down Expand Up @@ -292,10 +299,12 @@ impl crate::Bundle {
}

fn new_pack_file_resolver(
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
data_file: SharedTempFile,
) -> io::Result<impl Fn(data::EntryRange, &mut Vec<u8>) -> Option<()> + Send + Clone> {
let mut guard = data_file.lock();
guard.flush()?;
let mapped_file = Arc::new(crate::mmap::read_only(
&data_file.lock().with_mut(|f| f.path().to_owned())?,
&guard.get_mut().with_mut(|f| f.path().to_owned())?,
)?);
let pack_data_lookup = move |range: std::ops::Range<u64>, out: &mut Vec<u8>| -> Option<()> {
mapped_file
Expand Down
8 changes: 5 additions & 3 deletions git-pack/src/bundle/write/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ impl Outcome {
}
}

pub(crate) type SharedTempFile = Arc<parking_lot::Mutex<std::io::BufWriter<git_tempfile::Handle<Writable>>>>;

pub(crate) struct PassThrough<R> {
pub reader: R,
pub writer: Option<Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>>,
pub writer: Option<SharedTempFile>,
}

impl<R> io::Read for PassThrough<R>
Expand Down Expand Up @@ -87,7 +89,7 @@ where
}

pub(crate) struct LockWriter {
pub writer: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
pub writer: SharedTempFile,
}

impl io::Write for LockWriter {
Expand All @@ -102,7 +104,7 @@ impl io::Write for LockWriter {

impl io::Read for LockWriter {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.writer.lock().read(buf)
self.writer.lock().get_mut().read(buf)
}
}

Expand Down
4 changes: 2 additions & 2 deletions git-pack/tests/pack/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ mod write_to_directory {
assert_eq!(sorted_entries.len(), 2, "we want a pack and the corresponding index");

let pack_hash = res.index.data_hash.to_hex();
assert_eq!(file_name(&sorted_entries[0]), format!("{}.idx", pack_hash));
assert_eq!(file_name(&sorted_entries[0]), format!("pack-{}.idx", pack_hash));
assert_eq!(Some(sorted_entries[0].path()), index_path);

assert_eq!(file_name(&sorted_entries[1]), format!("{}.pack", pack_hash));
assert_eq!(file_name(&sorted_entries[1]), format!("pack-{}.pack", pack_hash));
assert_eq!(Some(sorted_entries[1].path()), data_path);

res.index_path = index_path;
Expand Down
7 changes: 6 additions & 1 deletion git-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,12 @@ where
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
const ENCODED_BAND: usize = 1;
match band {
BandRef::Data(d) => break (U16_HEX_BYTES + ENCODED_BAND, d.len()),
BandRef::Data(d) => {
if d.is_empty() {
continue;
}
break (U16_HEX_BYTES + ENCODED_BAND, d.len());
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
Expand Down
7 changes: 6 additions & 1 deletion git-packetline/src/read/sidebands/blocking_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ where
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
const ENCODED_BAND: usize = 1;
match band {
BandRef::Data(d) => break (U16_HEX_BYTES + ENCODED_BAND, d.len()),
BandRef::Data(d) => {
if d.is_empty() {
continue;
}
break (U16_HEX_BYTES + ENCODED_BAND, d.len());
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
Expand Down
4 changes: 2 additions & 2 deletions git-protocol/src/fetch/arguments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use bstr::{BStr, BString, ByteVec};
use bstr::{BStr, BString, ByteSlice, ByteVec};

/// The arguments passed to a server command.
#[derive(Debug)]
Expand Down Expand Up @@ -30,7 +30,7 @@ impl Arguments {
/// This can happen if callers assure that they won't add 'wants' if their 'have' is the same, i.e. if the remote has nothing
/// new for them.
pub fn is_empty(&self) -> bool {
self.args.is_empty()
self.haves.is_empty() && !self.args.iter().rev().any(|arg| arg.starts_with_str("want "))
}
/// Return true if ref filters is supported.
pub fn can_use_filter(&self) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion git-protocol/src/fetch/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::fetch::command::Feature;
#[allow(missing_docs)]
pub enum Error {
#[error("Failed to read from line reader")]
Io(std::io::Error),
Io(#[source] std::io::Error),
#[error(transparent)]
UploadPack(#[from] git_transport::packetline::read::Error),
#[error(transparent)]
Expand Down Expand Up @@ -139,6 +139,7 @@ impl WantedRef {
}

/// A representation of a complete fetch response
#[derive(Debug)]
pub struct Response {
acks: Vec<Acknowledgement>,
shallows: Vec<ShallowUpdate>,
Expand Down
23 changes: 15 additions & 8 deletions git-protocol/tests/fetch/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,21 @@ mod v2 {

#[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))]
async fn clone() -> crate::Result {
let mut provider = mock_reader("v2/clone-only.response");
let mut reader = provider.as_read_without_sidebands();
let r = fetch::Response::from_line_reader(Protocol::V2, &mut reader).await?;
assert!(r.acknowledgements().is_empty(), "it should go straight to the packfile");
assert!(r.has_pack());
let mut buf = Vec::new();
let bytes_read = reader.read_to_end(&mut buf).await?;
assert_eq!(bytes_read, 1089, "should be able to read the whole pack");
for keepalive in [false, true] {
let fixture = format!(
"v2/clone-only{}.response",
keepalive.then(|| "-with-keepalive").unwrap_or_default()
);
let mut provider = mock_reader(&fixture);
let mut reader = provider.as_read_without_sidebands();
let r = fetch::Response::from_line_reader(Protocol::V2, &mut reader).await?;
assert!(r.acknowledgements().is_empty(), "it should go straight to the packfile");
assert!(r.has_pack());
reader.set_progress_handler(Some(Box::new(|_is_err, _text| ())));
let mut buf = Vec::new();
let bytes_read = reader.read_to_end(&mut buf).await?;
assert_eq!(bytes_read, 876, "should be able to read the whole pack");
}
Ok(())
}

Expand Down
38 changes: 22 additions & 16 deletions git-protocol/tests/fetch/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@ use crate::fetch::{helper_unused, oid, transport, CloneDelegate, LsRemoteDelegat

#[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))]
async fn clone() -> crate::Result {
let out = Vec::new();
let mut dlg = CloneDelegate::default();
git_protocol::fetch(
transport(
out,
"v1/clone.response",
Protocol::V1,
git_transport::client::git::ConnectMode::Daemon,
),
&mut dlg,
helper_unused,
progress::Discard,
FetchConnection::TerminateOnSuccessfulCompletion,
)
.await?;
assert_eq!(dlg.pack_bytes, 876, "It be able to read pack bytes");
for with_keepalive in [false, true] {
let out = Vec::new();
let mut dlg = CloneDelegate::default();
let fixture = format!(
"v1/clone{}.response",
with_keepalive.then(|| "-with-keepalive").unwrap_or_default()
);
git_protocol::fetch(
transport(
out,
&fixture,
Protocol::V1,
git_transport::client::git::ConnectMode::Daemon,
),
&mut dlg,
helper_unused,
progress::Discard,
FetchConnection::TerminateOnSuccessfulCompletion,
)
.await?;
assert_eq!(dlg.pack_bytes, 876, "{}: It be able to read pack bytes", fixture);
}
Ok(())
}

Expand Down
Binary file not shown.
Binary file not shown.
27 changes: 20 additions & 7 deletions git-repository/src/remote/connection/fetch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::remote::fetch::{DryRun, RefMap};
use crate::remote::{fetch, ref_map, Connection};
use crate::Progress;
use crate::{remote, Progress};
use git_odb::FindExt;
use git_protocol::transport::client::Transport;
use std::sync::atomic::AtomicBool;
Expand All @@ -16,7 +16,7 @@ mod error {
desired: Option<i64>,
source: Option<git_config::value::Error>,
},
#[error(transparent)]
#[error("Could not decode server reply")]
FetchResponse(#[from] git_protocol::fetch::response::Error),
#[error(transparent)]
Negotiate(#[from] super::negotiate::Error),
Expand Down Expand Up @@ -62,6 +62,16 @@ pub struct Outcome<'spec> {
///
pub mod negotiate;

pub mod prepare {
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Cannot perform a meaningful fetch operation without any configured ref-specs")]
MissingRefSpecs,
#[error(transparent)]
RefMap(#[from] crate::remote::ref_map::Error),
}
}

impl<'remote, 'repo, T, P> Connection<'remote, 'repo, T, P>
where
T: Transport,
Expand All @@ -78,12 +88,15 @@ where
/// Note that this implementation is currently limited to blocking mode as it relies on Drop semantics to close the connection
/// should the fetch not be performed. Furthermore, there the code doing the fetch is inherently blocking so there is no benefit.
/// It's best to unblock it by placing it into its own thread or offload it should usage in an async context be required.
pub fn prepare_fetch(mut self, options: ref_map::Options) -> Result<Prepare<'remote, 'repo, T, P>, ref_map::Error> {
pub fn prepare_fetch(mut self, options: ref_map::Options) -> Result<Prepare<'remote, 'repo, T, P>, prepare::Error> {
if self.remote.refspecs(remote::Direction::Fetch).is_empty() {
return Err(prepare::Error::MissingRefSpecs);
}
let ref_map = self.ref_map_inner(options)?;
Ok(Prepare {
con: Some(self),
ref_map,
dry_run: fetch::DryRun::No,
dry_run: DryRun::No,
})
}
}
Expand Down Expand Up @@ -192,7 +205,7 @@ where
repo,
"fetch",
&self.ref_map.mappings,
con.remote.refspecs(crate::remote::Direction::Fetch),
con.remote.refspecs(remote::Direction::Fetch),
self.dry_run,
)?;

Expand Down Expand Up @@ -234,7 +247,7 @@ where
{
con: Option<Connection<'remote, 'repo, T, P>>,
ref_map: RefMap<'remote>,
dry_run: fetch::DryRun,
dry_run: DryRun,
}

/// Builder
Expand All @@ -246,7 +259,7 @@ where
///
/// This works by not actually fetching the pack after negotiating it, nor will refs be updated.
pub fn with_dry_run(mut self, enabled: bool) -> Self {
self.dry_run = enabled.then(|| fetch::DryRun::Yes).unwrap_or(DryRun::No);
self.dry_run = enabled.then(|| DryRun::Yes).unwrap_or(DryRun::No);
self
}
}
Expand Down
Loading

0 comments on commit f47c891

Please sign in to comment.