diff --git a/Cargo.lock b/Cargo.lock index 35a589979..cfab037a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi 0.3.9", +] + [[package]] name = "autocfg" version = "1.0.0" @@ -314,6 +325,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "476e9cd489f9e121e02ffa6014a8ef220ecb15c05ed23fc34cca13925dc283fb" +[[package]] +name = "bstr" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31accafdb70df7871592c058eca3985b71104e15ac32f64706022c58867da931" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "3.4.0" @@ -366,6 +389,15 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" +[[package]] +name = "cast" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0" +dependencies = [ + "rustc_version", +] + [[package]] name = "cc" version = "1.0.58" @@ -441,6 +473,79 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" +[[package]] +name = "criterion" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70daa7ceec6cf143990669a04c7df13391d55fb27bd4079d252fca774ba244d8" +dependencies = [ + "atty", + "cast", + "clap", + "criterion-plot", + "csv", + "itertools 0.9.0", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_cbor", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e022feadec601fba1649cfa83586381a4ad31c6bf3a9ab7d408118b05dd9889d" +dependencies = [ + "cast", + "itertools 0.9.0", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "lazy_static", + "maybe-uninit", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "maybe-uninit", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -468,6 +573,28 @@ dependencies = [ "subtle 1.0.0", ] +[[package]] +name = "csv" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.3.2" @@ -972,6 +1099,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177" + [[package]] name = "hashbrown" version = "0.8.1" @@ -1183,6 +1316,7 @@ dependencies = [ name = "ipfs-http" version = "0.1.0" dependencies = [ + "anyhow", "async-stream", "bytes 0.5.6", "cid", @@ -1220,6 +1354,7 @@ name = "ipfs-unixfs" version = "0.0.1" dependencies = [ "cid", + "criterion", "either", "filetime", "hex-literal", @@ -1228,6 +1363,7 @@ dependencies = [ "multihash", "quick-protobuf", "sha2 0.9.1", + "tar", ] [[package]] @@ -1245,6 +1381,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.6" @@ -1659,6 +1804,15 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "memoffset" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -1854,6 +2008,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +[[package]] +name = "oorandom" +version = "11.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -2027,6 +2187,18 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d36492546b6af1463394d46f0c834346f31548646f6ba10849802c9c9a27ac33" +[[package]] +name = "plotters" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d1685fbe7beba33de0330629da9d955ac75bd54f33d7b79f9a895590124f6bb" +dependencies = [ + "js-sys", + "num-traits", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -2096,7 +2268,7 @@ checksum = "02b10678c913ecbd69350e8535c3aef91a8676c0773fc1d7b95cdd196d7f2f26" dependencies = [ "bytes 0.5.6", "heck", - "itertools", + "itertools 0.8.2", "log", "multimap", "petgraph", @@ -2113,7 +2285,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "537aa19b95acde10a12fec4301466386f757403de4cd4e5b4fa78fb5ecb18f72" dependencies = [ "anyhow", - "itertools", + "itertools 0.8.2", "proc-macro2", "quote", "syn", @@ -2243,6 +2415,31 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" +dependencies = [ + "crossbeam-deque", + "crossbeam-queue", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -2364,6 +2561,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -2406,6 +2612,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.114" @@ -2722,6 +2938,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tinytemplate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d3dc76004a03cec1c5932bca4cdc2e39aaa798e3f82363dd94f9adf6098c12f" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "0.3.3" @@ -3021,6 +3247,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7" +[[package]] +name = "walkdir" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d" +dependencies = [ + "same-file", + "winapi 0.3.9", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" @@ -3201,6 +3438,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/conformance/rust.sh b/conformance/rust.sh index 95e9a5689..8e2a590ec 100755 --- a/conformance/rust.sh +++ b/conformance/rust.sh @@ -31,6 +31,29 @@ on_killed () { echo ">>>> new execution $$ with args: $@" | tee -a /tmp/rust.log >&2 killed=true + +# +# testing around the time of PR #284 +# +# binutils | lld-9 | +# 2.33 | 9.0.0 | notes +# ---------+-------+-------------------------------------- +# 256 | | crashes at id, unlikely inits? +# | 256 | crashes at p2p swarm init +# | 300 | crashes at behaviour building +# | 350 | crashes but built the dns threadpool +# | 375 | crashes at p2p init +# | 387 | crashes at kad init +# | 390 | ok +# | 393 | ok +# | 400 | ok +# | 450 | ok +# 512 | | crashes at id, unlikely inits? +# 1024 | | crashes right away unlikely inits +# 4096 | | still the same +# 8192 | | works without -c unlimited? +# +# ulimit -s 8192 -c unlimited ./http "$@" 2>&1 | tee -a /tmp/rust.log || retval=$? killed=false echo "<<<< exiting $$ with $retval" | tee -a /tmp/rust.log >&2 diff --git a/conformance/test/index.js b/conformance/test/index.js index c0f6d50ec..d884bed89 100644 --- a/conformance/test/index.js +++ b/conformance/test/index.js @@ -76,13 +76,6 @@ tests.root.get(factory); tests.root.add(factory, { skip: [ // ordered in the order of most likely implementation - // progress: - "should add a BIG Buffer with progress enabled", - // directories: - "should add a nested directory as array of tupples", - "should add a nested directory as array of tupples with progress", - "should add files to a directory non sequentially", - "should wrap content in a directory", // unixfsv1.5 metadata "should add with mode as string", "should add with mode as number", @@ -90,12 +83,6 @@ tests.root.add(factory, { "should add with mtime as { nsecs, secs }", "should add with mtime as timespec", "should add with mtime as hrtime", - // filesystem (maybe) - "should add a directory from the file system", - "should add a directory from the file system with an odd name", - "should ignore a directory from the file system", - "should add a file from the file system", - "should add a hidden file in a directory from the file system", // raw leaves "should respect raw leaves when file is smaller than one block and no metadata is present", "should override raw leaves when file is smaller than one block and metadata is present", diff --git a/http/Cargo.toml b/http/Cargo.toml index d93dde55f..bccbcc8b5 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -10,6 +10,7 @@ prost-build = { default-features = false, version = "0.6" } vergen = { default-features = false, version = "3.1" } [dependencies] +anyhow = "*" # temporarily needed until the next release of mpart-async async-stream = { default-features = false, version = "0.3" } bytes = { default-features = false, version = "0.5" } cid = { default-features = false, version = "0.5" } diff --git a/http/src/v0.rs b/http/src/v0.rs index 96b5eddaa..102b278f9 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -1,5 +1,4 @@ use ipfs::{Ipfs, IpfsTypes}; -use std::convert::Infallible; use warp::{query, Filter}; pub mod bitswap; @@ -16,84 +15,130 @@ pub mod support; pub use support::recover_as_message_response; pub(crate) use support::{with_ipfs, InvalidPeerId, NotImplemented, StringError}; -/// Helper to combine the multiple filters together with Filter::or, possibly boxing the types in +/// Helper to combine multiple filters together with Filter::or, possibly boxing the types in /// the process. This greatly helps the build times for `ipfs-http`. macro_rules! combine { - ($x:expr, $($y:expr),+) => { + ($x:expr, $($y:expr),+ $(,)?) => { { - let filter = boxed_on_debug!($x); + let filter = $x; $( - let filter = boxed_on_debug!(filter.or($y)); + let filter = filter.or($y); )+ filter } } } -#[cfg(debug_assertions)] +/// Helper to combine multiple filters together with Filter::or. The filters are never boxed but +/// the output is assumed to be equal and so the output is unified. +macro_rules! combine_unify { + ($x:expr, $($y:expr),+ $(,)?) => { + { + let filter = $x; + $( + let filter = filter.or($y).unify(); + )+ + filter + } + } +} + +/// Macro will cause boxing on debug builds. Might be a good idea to explore how much boxing always +/// would speed up builds. +#[cfg(not(debug_assertions))] macro_rules! boxed_on_debug { ($x:expr) => { - $x.boxed() + $x }; } -#[cfg(not(debug_assertions))] +#[cfg(debug_assertions)] macro_rules! boxed_on_debug { ($x:expr) => { - $x + $x.boxed() }; } +/// Like `Filter::and` but the next filter is boxed. This might be a good idea to combine path +/// matching to the route implementation while maintaining a healthy balance for compilation time +/// and optimization. +macro_rules! and_boxed { + ($x:expr, $y:expr) => { + ($x).and(boxed_on_debug!($y)) + }; +} + +/// Supported routes of the crate. pub fn routes( ipfs: &Ipfs, shutdown_tx: tokio::sync::mpsc::Sender<()>, -) -> impl warp::Filter + Clone { - let mount = warp::path("api").and(warp::path("v0")); - - let shutdown = warp::post() - .and(warp::path!("shutdown")) - .and(warp::any().map(move || shutdown_tx.clone())) - .and_then(handle_shutdown); +) -> impl warp::Filter + Clone { + let mount = warp::post().and(warp::path!("api" / "v0" / ..)); let api = mount.and(combine!( - shutdown, - id::identity(ipfs), - root_files::add(ipfs), - bitswap::wantlist(ipfs), - bitswap::stat(ipfs), - block::get(ipfs), - block::put(ipfs), - block::rm(ipfs), - block::stat(ipfs), - warp::path!("bootstrap" / ..).and_then(not_implemented), - warp::path!("config" / ..).and_then(not_implemented), - dag::put(ipfs), - dag::resolve(ipfs), - warp::path!("dht" / ..).and_then(not_implemented), - root_files::cat(ipfs), - root_files::get(ipfs), - warp::path!("key" / ..).and_then(not_implemented), - warp::path!("name" / ..).and_then(not_implemented), - warp::path!("object" / ..).and_then(not_implemented), - warp::path!("pin" / ..).and_then(not_implemented), - warp::path!("ping" / ..).and_then(not_implemented), - pubsub::routes(ipfs), - refs::local(ipfs), - refs::refs(ipfs), - warp::path!("repo" / ..).and_then(not_implemented), - warp::path!("stats" / ..).and_then(not_implemented), - swarm::connect(ipfs), - swarm::peers(ipfs), - swarm::addrs(ipfs), - swarm::addrs_local(ipfs), - swarm::disconnect(ipfs), + and_boxed!( + warp::path!("shutdown"), + warp::any() + .map(move || shutdown_tx.clone()) + .and_then(handle_shutdown) + ), + and_boxed!(warp::path!("id"), id::identity(ipfs)), + and_boxed!(warp::path!("add"), root_files::add(ipfs)), + and_boxed!(warp::path!("cat"), root_files::cat(ipfs)), + and_boxed!(warp::path!("get"), root_files::get(ipfs)), + and_boxed!(warp::path!("refs" / "local"), refs::local(ipfs)), + and_boxed!(warp::path!("refs"), refs::refs(ipfs)), warp::path!("version") .and(query::()) - .and_then(version::version) + .and_then(version::version), + warp::path("bitswap").and(combine!( + and_boxed!(warp::path!("wantlist"), bitswap::wantlist(ipfs)), + and_boxed!(warp::path!("stat"), bitswap::stat(ipfs)) + )), + warp::path("block").and(combine!( + and_boxed!(warp::path!("get"), block::get(ipfs)), + and_boxed!(warp::path!("put"), block::put(ipfs)), + and_boxed!(warp::path!("rm"), block::rm(ipfs)), + and_boxed!(warp::path!("stat"), block::stat(ipfs)), + )), + warp::path("dag").and(combine!( + and_boxed!(warp::path!("put"), dag::put(ipfs)), + and_boxed!(warp::path!("resolve"), dag::resolve(ipfs)), + )), + warp::path("pubsub").and(combine!( + and_boxed!(warp::path!("peers"), pubsub::peers(ipfs)), + and_boxed!(warp::path!("ls"), pubsub::list_subscriptions(ipfs)), + and_boxed!(warp::path!("pub"), pubsub::publish(ipfs)), + and_boxed!( + warp::path!("sub"), + pubsub::subscribe(ipfs, Default::default()) + ), + )), + warp::path("swarm").and(combine!( + and_boxed!(warp::path!("addrs" / "local"), swarm::addrs_local(ipfs)), + and_boxed!(warp::path!("addrs"), swarm::addrs(ipfs)), + and_boxed!(warp::path!("connect"), swarm::connect(ipfs)), + and_boxed!(warp::path!("disconnect"), swarm::disconnect(ipfs)), + and_boxed!(warp::path!("peers"), swarm::peers(ipfs)), + )), + combine_unify!( + warp::path!("bootstrap" / ..), + warp::path!("config" / ..), + warp::path!("dht" / ..), + warp::path!("key" / ..), + warp::path!("name" / ..), + warp::path!("object" / ..), + warp::path!("pin" / ..), + warp::path!("ping" / ..), + warp::path!("repo" / ..), + warp::path!("stats" / ..), + ) + .and_then(not_implemented), )); // have a common handler turn the rejections into 400 or 500 with json body - api.recover(recover_as_message_response) + // boxing this might save up to 15s. + boxed_on_debug!(api.recover(recover_as_message_response)) } pub(crate) async fn handle_shutdown( @@ -105,17 +150,15 @@ pub(crate) async fn handle_shutdown( }) } -async fn not_implemented() -> Result { - Ok(warp::http::StatusCode::NOT_IMPLEMENTED) +async fn not_implemented() -> Result<(impl warp::Reply,), std::convert::Infallible> { + Ok((warp::http::StatusCode::NOT_IMPLEMENTED,)) } #[cfg(test)] mod tests { - use std::convert::Infallible; - /// Creates routes for tests, the ipfs will not work as no background task is being spawned. async fn testing_routes( - ) -> impl warp::Filter + Clone { + ) -> impl warp::Filter + Clone { use super::routes; use ipfs::{IpfsOptions, UninitializedIpfs}; @@ -149,7 +192,7 @@ mod tests { async fn invalid_peer_id_as_messageresponse() { let routes = testing_routes().await; let resp = warp::test::request() - .method("GET") + .method("POST") .path("/api/v0/id?arg=foobar") .reply(&routes) .await; diff --git a/http/src/v0/bitswap.rs b/http/src/v0/bitswap.rs index 2ed139c96..b950ea060 100644 --- a/http/src/v0/bitswap.rs +++ b/http/src/v0/bitswap.rs @@ -2,7 +2,7 @@ use crate::v0::support::{with_ipfs, InvalidPeerId, StringError}; use ipfs::{BitswapStats, Ipfs, IpfsTypes}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use warp::{path, query, reply, Filter, Rejection, Reply}; +use warp::{query, reply, Filter, Rejection, Reply}; #[derive(Debug, Deserialize)] pub struct WantlistQuery { @@ -39,9 +39,8 @@ async fn wantlist_query( pub fn wantlist( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("bitswap" / "wantlist") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(wantlist_query) } @@ -99,8 +98,6 @@ async fn stat_query(ipfs: Ipfs) -> Result( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("bitswap" / "stat") - .and(with_ipfs(ipfs)) - .and_then(stat_query) +) -> impl Filter + Clone { + with_ipfs(ipfs).and_then(stat_query) } diff --git a/http/src/v0/block.rs b/http/src/v0/block.rs index 3ce392831..ceae05232 100644 --- a/http/src/v0/block.rs +++ b/http/src/v0/block.rs @@ -12,7 +12,7 @@ use mime::Mime; use multihash::Multihash; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; -use warp::{http::Response, path, query, reply, Filter, Rejection, Reply}; +use warp::{http::Response, query, reply, Filter, Rejection, Reply}; mod options; use options::RmOptions; @@ -44,9 +44,8 @@ async fn get_query( pub fn get( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("block" / "get") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(get_query) } @@ -88,9 +87,8 @@ impl PutQuery { pub fn put( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("block" / "put") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and(warp::header::("content-type")) // TODO: rejects if missing .and(warp::body::stream()) @@ -151,11 +149,8 @@ pub struct EmptyResponse; pub fn rm( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("block" / "rm") - .and(with_ipfs(ipfs)) - .and(rm_options()) - .and_then(rm_query) +) -> impl Filter + Clone { + with_ipfs(ipfs).and(rm_options()).and_then(rm_query) } fn rm_options() -> impl Filter + Clone { @@ -221,6 +216,14 @@ async fn rm_query( Ok(StreamResponse(st)) } +pub fn stat( + ipfs: &Ipfs, +) -> impl Filter + Clone { + with_ipfs(ipfs) + .and(query::()) + .and_then(stat_query) +} + async fn stat_query( ipfs: Ipfs, query: GetStatOptions, @@ -238,12 +241,3 @@ async fn stat_query( "Size": block.data().len(), }))) } - -pub fn stat( - ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("block" / "stat") - .and(with_ipfs(ipfs)) - .and(query::()) - .and_then(stat_query) -} diff --git a/http/src/v0/dag.rs b/http/src/v0/dag.rs index efc8a7525..55cafaef8 100644 --- a/http/src/v0/dag.rs +++ b/http/src/v0/dag.rs @@ -9,7 +9,7 @@ use mime::Mime; use serde::Deserialize; use serde_json::json; -use warp::{path, query, reply, Buf, Filter, Rejection, Reply}; +use warp::{query, reply, Buf, Filter, Rejection, Reply}; #[derive(Debug, Deserialize)] pub struct PutQuery { @@ -35,9 +35,8 @@ impl Default for InputEncoding { pub fn put( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("dag" / "put") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and(warp::header::("content-type")) // TODO: rejects if missing .and(warp::body::stream()) @@ -106,9 +105,8 @@ async fn put_query( /// (rempath). pub fn resolve( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("dag" / "resolve") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(inner_resolve) } diff --git a/http/src/v0/id.rs b/http/src/v0/id.rs index bd79635a3..33a15c3fc 100644 --- a/http/src/v0/id.rs +++ b/http/src/v0/id.rs @@ -6,9 +6,8 @@ use warp::{query, Filter}; pub fn identity( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("id") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(optional_peer_id()) .and_then(identity_query) } diff --git a/http/src/v0/pubsub.rs b/http/src/v0/pubsub.rs index feb17101e..0696ee71e 100644 --- a/http/src/v0/pubsub.rs +++ b/http/src/v0/pubsub.rs @@ -40,26 +40,11 @@ pub struct Pubsub { Mutex>>>, } -/// Creates a filter composing pubsub/{peers,ls,pub,sub}. -pub fn routes( - ipfs: &Ipfs, -) -> impl warp::Filter + Clone { - warp::path("pubsub").and( - peers(ipfs) - .or(list_subscriptions(ipfs)) - .or(publish(ipfs)) - .or(subscribe(ipfs, Default::default())), - ) -} - /// Handling of https://docs-beta.ipfs.io/reference/http/api/#api-v0-pubsub-peers pub fn peers( ipfs: &Ipfs, -) -> impl warp::Filter + Clone { - warp::path!("peers") - .and(warp::get().or(warp::post())) - .unify() - .and(with_ipfs(ipfs)) +) -> impl warp::Filter + Clone { + with_ipfs(ipfs) .and(warp::query::().map(|tp: OptionalTopicParameter| tp.topic)) .and_then(inner_peers) } @@ -67,44 +52,38 @@ pub fn peers( async fn inner_peers( ipfs: Ipfs, topic: Option, -) -> Result { +) -> Result<(impl warp::Reply,), warp::Rejection> { let peers = ipfs .pubsub_peers(topic) .await .map_err(|e| warp::reject::custom(StringError::from(e)))?; - Ok(warp::reply::json(&StringListResponse { + Ok((warp::reply::json(&StringListResponse { strings: peers.into_iter().map(|id| id.to_string()).collect(), - })) + }),)) } /// Handling of https://docs-beta.ipfs.io/reference/http/api/#api-v0-pubsub-ls pub fn list_subscriptions( ipfs: &Ipfs, -) -> impl warp::Filter + Clone { - warp::path!("ls") - .and(warp::get().or(warp::post())) - .unify() - .and(with_ipfs(ipfs)) - .and_then(inner_ls) +) -> impl warp::Filter + Clone { + with_ipfs(ipfs).and_then(inner_ls) } -async fn inner_ls(ipfs: Ipfs) -> Result { +async fn inner_ls(ipfs: Ipfs) -> Result<(impl warp::Reply,), warp::Rejection> { let topics = ipfs .pubsub_subscribed() .await .map_err(|e| warp::reject::custom(StringError::from(e)))?; - Ok(warp::reply::json(&StringListResponse { strings: topics })) + Ok((warp::reply::json(&StringListResponse { strings: topics }),)) } /// Handling of https://docs-beta.ipfs.io/reference/http/api/#api-v0-pubsub-pub pub fn publish( ipfs: &Ipfs, -) -> impl warp::Filter + Clone { - warp::path!("pub") - .and(warp::post()) - .and(with_ipfs(ipfs)) +) -> impl warp::Filter + Clone { + with_ipfs(ipfs) .and(publish_args("arg")) .and_then(inner_publish) } @@ -112,11 +91,11 @@ pub fn publish( async fn inner_publish( ipfs: Ipfs, PublishArgs { topic, message }: PublishArgs, -) -> Result { +) -> Result<(impl warp::Reply,), warp::Rejection> { ipfs.pubsub_publish(topic, message.into_inner()) .await .map_err(|e| warp::reject::custom(StringError::from(e)))?; - Ok(warp::reply::reply()) + Ok((warp::reply::reply(),)) } /// Handling of https://docs-beta.ipfs.io/reference/http/api/#api-v0-pubsub-sub @@ -127,11 +106,8 @@ async fn inner_publish( pub fn subscribe( ipfs: &Ipfs, pubsub: Arc, -) -> impl warp::Filter + Clone { - warp::path!("sub") - .and(warp::get().or(warp::post())) - .unify() - .and(with_ipfs(ipfs)) +) -> impl warp::Filter + Clone { + with_ipfs(ipfs) .and(warp::any().map(move || pubsub.clone())) .and(warp::query::()) .and_then(|ipfs, pubsub, TopicParameter { topic }| async move { diff --git a/http/src/v0/refs.rs b/http/src/v0/refs.rs index 6f3695d93..3f7d998c0 100644 --- a/http/src/v0/refs.rs +++ b/http/src/v0/refs.rs @@ -26,11 +26,8 @@ use crate::v0::support::{HandledErr, StreamResponse}; /// https://docs-beta.ipfs.io/reference/http/api/#api-v0-refs pub fn refs( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("refs") - .and(with_ipfs(ipfs)) - .and(refs_options()) - .and_then(refs_inner) +) -> impl Filter + Clone { + with_ipfs(ipfs).and(refs_options()).and_then(refs_inner) } async fn refs_inner( @@ -588,10 +585,8 @@ fn dagpb_links(ipld: Ipld) -> Vec<(Option, Cid)> { /// Handling of https://docs-beta.ipfs.io/reference/http/api/#api-v0-refs-local pub fn local( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("refs" / "local") - .and(with_ipfs(ipfs)) - .and_then(inner_local) +) -> impl Filter + Clone { + with_ipfs(ipfs).and_then(inner_local) } async fn inner_local(ipfs: Ipfs) -> Result { diff --git a/http/src/v0/root_files.rs b/http/src/v0/root_files.rs index 5bbadf99d..773e3dac0 100644 --- a/http/src/v0/root_files.rs +++ b/http/src/v0/root_files.rs @@ -14,7 +14,7 @@ use serde::Deserialize; use std::convert::TryFrom; use std::fmt; use std::path::Path; -use warp::{path, query, Filter, Rejection, Reply}; +use warp::{query, Filter, Rejection, Reply}; mod tar_helper; use tar_helper::TarHelper; @@ -23,19 +23,21 @@ mod add; #[derive(Debug, Deserialize)] pub struct AddArgs { - // probably never interesting - #[serde(default)] + // unknown meaning; ignoring it doesn't fail any tests + #[serde(default, rename = "stream-channels")] stream_channels: bool, - // unsure what this does + // progress reports totaling to the input file size #[serde(default)] progress: bool, + /// When true, a new directory is created to hold more than 1 root level directories. + #[serde(default, rename = "wrap-with-directory")] + wrap_with_directory: bool, } pub fn add( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("add") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and(warp::header::("content-type")) // TODO: rejects if missing .and(warp::body::stream()) @@ -53,11 +55,8 @@ pub struct CatArgs { pub fn cat( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("cat") - .and(with_ipfs(ipfs)) - .and(query::()) - .and_then(cat_inner) +) -> impl Filter + Clone { + with_ipfs(ipfs).and(query::()).and_then(cat_inner) } async fn cat_inner(ipfs: Ipfs, args: CatArgs) -> Result { @@ -112,11 +111,8 @@ struct GetArgs { pub fn get( ipfs: &Ipfs, -) -> impl Filter + Clone { - path!("get") - .and(with_ipfs(ipfs)) - .and(query::()) - .and_then(get_inner) +) -> impl Filter + Clone { + with_ipfs(ipfs).and(query::()).and_then(get_inner) } async fn get_inner(ipfs: Ipfs, args: GetArgs) -> Result { diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 402873ffe..71c731360 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -1,11 +1,20 @@ use super::AddArgs; use crate::v0::support::StringError; -use bytes::{Buf, Bytes}; +use bytes::{ + buf::{BufExt, BufMutExt}, + Buf, BufMut, Bytes, BytesMut, +}; use cid::Cid; -use futures::stream::{Stream, TryStreamExt}; -use ipfs::{Ipfs, IpfsTypes}; +use futures::stream::{Stream, StreamExt, TryStreamExt}; +use ipfs::unixfs::ll::{ + dir::builder::{ + BufferingTreeBuilder, TreeBuildingFailed, TreeConstructionFailed, TreeNode, TreeOptions, + }, + file::adder::FileAdder, +}; +use ipfs::{Block, Ipfs, IpfsTypes}; use mime::Mime; -use mpart_async::server::MultipartStream; +use mpart_async::server::{MultipartError, MultipartStream}; use serde::Serialize; use std::borrow::Cow; use std::fmt; @@ -13,99 +22,300 @@ use warp::{Rejection, Reply}; pub(super) async fn add_inner( ipfs: Ipfs, - _opts: AddArgs, + opts: AddArgs, content_type: Mime, - body: impl Stream> + Unpin, + body: impl Stream> + Send + Unpin + 'static, ) -> Result { - // FIXME: this should be without adder at least - use ipfs::unixfs::ll::file::adder::FileAdder; - let boundary = content_type .get_param("boundary") .map(|v| v.to_string()) .ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?; - let mut stream = - MultipartStream::new(Bytes::from(boundary), body.map_ok(|mut buf| buf.to_bytes())); + let st = MultipartStream::new(Bytes::from(boundary), body.map_ok(|mut buf| buf.to_bytes())); - // this should be a while loop but clippy will warn if this is a while loop which will only get - // executed once. - if let Some(mut field) = stream - .try_next() - .await - .map_err(|e| StringError::from(format!("IO error: {}", e)))? - { - let field_name = field - .name() - .map_err(|e| StringError::from(format!("unparseable headers: {}", e)))?; + let st = add_stream(ipfs, st, opts); - if field_name != "file" { - return Err(StringError::from(format!("unsupported field: {}", field_name)).into()); + // map the errors into json objects; as we can't return them as trailers yet + + let st = st.map(|res| match res { + passthrough @ Ok(_) | passthrough @ Err(AddError::ResponseSerialization(_)) => { + // there is nothing we should do or could do for these; the assumption is that hyper + // will send the bytes and stop on serialization error and log it. the response + // *should* be closed on the error. + passthrough + } + Err(something_else) => { + let msg = crate::v0::support::MessageResponseBuilder::default() + .with_message(something_else.to_string()); + let bytes: Bytes = serde_json::to_vec(&msg) + .expect("serializing here should not have failed") + .into(); + let crlf = Bytes::from(&b"\r\n"[..]); + // note that here we are assuming that the stream ends on error + Ok(bytes.chain(crlf).to_bytes()) } + }); - let filename = field - .filename() - .map_err(|e| StringError::from(format!("unparseable filename: {}", e)))? - .to_string(); + let body = crate::v0::support::StreamResponse(st); + + Ok(body) +} + +#[derive(Debug)] +enum AddError { + Parsing(MultipartError), + Header(MultipartError), + InvalidFilename(std::str::Utf8Error), + UnsupportedField(String), + UnsupportedContentType(String), + ResponseSerialization(serde_json::Error), + Persisting(ipfs::Error), + TreeGathering(TreeBuildingFailed), + TreeBuilding(TreeConstructionFailed), +} + +impl From for AddError { + fn from(e: MultipartError) -> AddError { + AddError::Parsing(e) + } +} + +impl fmt::Display for AddError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use AddError::*; + match self { + Parsing(me) => write!(fmt, "invalid request body: {}", me), + Header(me) => write!(fmt, "invalid multipart header(s): {}", me), + InvalidFilename(e) => write!(fmt, "invalid multipart filename: {:?}", e), + UnsupportedField(name) => write!(fmt, "unsupported field name: {:?}", name), + UnsupportedContentType(t) => write!(fmt, "unsupported content-type: {:?} (supported: application/{{octet-stream,x-directory}})", t), + ResponseSerialization(e) => write!(fmt, "progress serialization failed: {}", e), + Persisting(e) => write!(fmt, "put_block failed: {}", e), + TreeGathering(g) => write!(fmt, "invalid directory tree: {}", g), + TreeBuilding(b) => write!(fmt, "constructed invalid directory tree: {}", b), + } + } +} - let mut adder = FileAdder::default(); - let mut total = 0u64; +impl std::error::Error for AddError {} + +fn add_stream( + ipfs: Ipfs, + mut fields: MultipartStream, + opts: AddArgs, +) -> impl Stream> + Send + 'static +where + St: Stream> + Send + Unpin + 'static, + E: Into + Send + 'static, +{ + async_stream::try_stream! { + + let mut tree_opts = TreeOptions::default(); + if opts.wrap_with_directory { + tree_opts.wrap_with_directory(); + } - loop { - let next = field - .try_next() - .await - .map_err(|e| StringError::from(format!("IO error: {}", e)))?; + let mut tree = BufferingTreeBuilder::new(tree_opts); + let mut buffer = BytesMut::new(); + + while let Some(mut field) = fields + .try_next() + .await? + { + + let field_name = field.name().map_err(AddError::Header)?; + let filename = field.filename().map_err(AddError::Header)?; + let filename = percent_encoding::percent_decode_str(filename) + .decode_utf8() + .map_err(AddError::InvalidFilename)?; + + let filename = if filename.starts_with('/') { + // normalize single first slash; seems similar to what js-ipfs does: filesystem + // test cases post with paths '/some-directory/...' and others post with + // 'some-directory/...'. + + // since slash is a single code point, we can just do + filename[1..].to_owned() + } else { + filename.into_owned() + }; + + let content_type = field.content_type().map_err(AddError::Header)?; + + let next = match content_type { + "application/octet-stream" => { + + // files are of the form "file-{1,2,3,..}" + let _ = if field_name != "file" && !field_name.starts_with("file-") { + Err(AddError::UnsupportedField(field_name.to_string())) + } else { + Ok(()) + }?; + + let mut adder = FileAdder::default(); + // how many bytes we have stored as blocks + let mut total_written = 0u64; + // how many bytes of input we have read + let mut total_read = 0u64; + + loop { + let next = field + .try_next() + .await + .map_err(AddError::Parsing)?; + + match next { + Some(next) => { + let (read, saved_any, written) = push_all(&ipfs, &mut adder, next).await?; + total_written += written; + total_read += read; + + if saved_any && opts.progress { + // technically we could just send messages but that'd + // require us to stop using Cow's and use Arc or + // similar. not especially fond of either. + serde_json::to_writer((&mut buffer).writer(), &Response::Progress { + name: Cow::Borrowed(&filename), + bytes: total_read, + }).map_err(AddError::ResponseSerialization)?; + + buffer.put(&b"\r\n"[..]); + yield buffer.split().freeze(); + } + } + None => break, + } + } - match next { - Some(next) => { - let mut read = 0usize; - while read < next.len() { - let (iter, used) = adder.push(&next.slice(read..)); - read += used; + if opts.progress { + // in the interface-http-core tests the subtotal is expected to be full + // size, ordering w.r.t. to the "added" is not specified + serde_json::to_writer((&mut buffer).writer(), &Response::Progress { + name: Cow::Borrowed(&filename), + bytes: total_read, + }).map_err(AddError::ResponseSerialization)?; - let maybe_tuple = import_all(&ipfs, iter).await.map_err(|e| { - StringError::from(format!("Failed to save blocks: {}", e)) - })?; + buffer.put(&b"\r\n"[..]); - total += maybe_tuple.map(|t| t.1).unwrap_or(0); + // it is not required to yield here so perhaps we just accumulate the next + // response in as well } + + let (root, subtotal) = import_all(&ipfs, adder.finish()) + .await + .map_err(AddError::Persisting)? + // there was a bug in ipfs-unixfs however in general the "push" operation + // should flush so that the final finish would still have work to do. + .expect("there should always be something from finish"); + + total_written += subtotal; + + tracing::trace!("completed processing file of {} bytes: {:?}", total_read, filename); + + // using the filename as the path since we can tolerate a single empty named file + // however the second one will cause issues + tree.put_link(&filename, root.clone(), total_written) + .map_err(AddError::TreeGathering)?; + + let filename: Cow<'_, str> = if filename.is_empty() { + // cid needs to be repeated if no filename was given; in which case there + // should not be anything to build as tree either. note that intentionally + // no such Cid repeating happens when building the tree and a new wrapping + // root will have empty filename in the progress report. + Cow::Owned(root.to_string()) + } else { + Cow::Owned(filename) + }; + + serde_json::to_writer((&mut buffer).writer(), &Response::Added { + name: filename, + hash: Quoted(&root), + size: Quoted(total_written), + }).map_err(AddError::ResponseSerialization)?; + + buffer.put(&b"\r\n"[..]); + + Ok(buffer.split().freeze()) + }, + "application/x-directory" => { + // dirs are of the form "dir-{1,2,3,..}" + let _ = if field_name != "dir" && !field_name.starts_with("dir-") { + Err(AddError::UnsupportedField(field_name.to_string())) + } else { + Ok(()) + }?; + + // we need to fully consume this part, even though there shouldn't be anything + // except for the already parsed *but* ignored headers + while field.try_next().await.map_err(AddError::Parsing)?.is_some() {} + + // while at the moment we don't parse the mtime, mtime-nsec headers and mode + // those should be reflected in the metadata. this will still add an empty + // directory which is a good thing. + tree.set_metadata(&filename, ipfs::unixfs::ll::Metadata::default()) + .map_err(AddError::TreeGathering)?; + continue; } - None => break, - } + unsupported => { + Err(AddError::UnsupportedContentType(unsupported.to_string())) + } + }?; + + yield next; } - let (root, subtotal) = import_all(&ipfs, adder.finish()) - .await - .map_err(|e| StringError::from(format!("Failed to save blocks: {}", e)))? - .expect("I think there should always be something from finish -- except if the link block has just been compressed?"); + let mut iter = tree.build(); - total += subtotal; + while let Some(res) = iter.next_borrowed() { + let TreeNode { path, cid, total_size, block } = res.map_err(AddError::TreeBuilding)?; - let root = root.to_string(); + // shame we need to allocate once again here.. + ipfs.put_block(Block { cid: cid.to_owned(), data: block.into() }).await.map_err(AddError::Persisting)?; - let filename: Cow<'_, str> = if filename.is_empty() { - // cid needs to be repeated if no filename was given - Cow::Borrowed(&root) - } else { - Cow::Owned(filename) - }; + serde_json::to_writer((&mut buffer).writer(), &Response::Added { + name: Cow::Borrowed(path), + hash: Quoted(cid), + size: Quoted(total_size), + }).map_err(AddError::ResponseSerialization)?; + + buffer.put(&b"\r\n"[..]); + + yield buffer.split().freeze(); + } + } +} + +async fn push_all( + ipfs: &Ipfs, + adder: &mut FileAdder, + next: Bytes, +) -> Result<(u64, bool, u64), AddError> { + let mut read = 0usize; + let mut saved_any = false; + let mut total_written = 0; + + while read < next.len() { + let (iter, used) = adder.push(&next.slice(read..)); + read += used; + + let maybe_tuple = import_all(&ipfs, iter) + .await + .map_err(AddError::Persisting)?; + + let subtotal = maybe_tuple.map(|t| t.1); + + total_written += subtotal.unwrap_or(0); - return Ok(warp::reply::json(&Response::Added { - name: filename, - hash: Cow::Borrowed(&root), - size: Quoted(total), - })); + saved_any |= subtotal.is_some(); } - Err(StringError::from("not implemented").into()) + Ok((read as u64, saved_any, total_written)) } async fn import_all( ipfs: &Ipfs, iter: impl Iterator)>, ) -> Result, ipfs::Error> { - use ipfs::Block; // TODO: use FuturesUnordered let mut last: Option = None; let mut total = 0u64; @@ -131,22 +341,22 @@ async fn import_all( enum Response<'a> { /// When progress=true query parameter has been given, this will be output every N bytes, or /// perhaps every chunk. - #[allow(unused)] // unused == not implemented yet Progress { /// Probably the name of the file being added or empty if none was provided. name: Cow<'a, str>, /// Bytes processed since last progress; for a file, all progress reports must add up to - /// the total file size. + /// the total file size. Interestingly this should not be stringified with `Quoted`, + /// whereas the `Added::size` needs to be `Quoted`. bytes: u64, }, /// Output for every input item. #[serde(rename_all = "PascalCase")] Added { /// The resulting Cid as a string. - hash: Cow<'a, str>, + hash: Quoted<&'a Cid>, /// Name of the file added from filename or the resulting Cid. name: Cow<'a, str>, - /// Stringified version of the total size in bytes. + /// Stringified version of the total cumulative size in bytes. size: Quoted, }, } @@ -194,7 +404,7 @@ mod tests { assert_eq!( body, - r#"{"Hash":"Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP","Name":"testfile.txt","Size":"20"}"# + "{\"Hash\":\"Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP\",\"Name\":\"testfile.txt\",\"Size\":\"20\"}\r\n" ); } diff --git a/http/src/v0/support.rs b/http/src/v0/support.rs index 064dd2055..2004095b6 100644 --- a/http/src/v0/support.rs +++ b/http/src/v0/support.rs @@ -1,7 +1,6 @@ use ipfs::{Ipfs, IpfsTypes}; use serde::Serialize; use std::borrow::Cow; -use std::convert::Infallible; use std::error::Error as StdError; use std::fmt; @@ -53,6 +52,12 @@ impl MessageKind { #[derive(Debug, Clone)] pub struct MessageResponseBuilder(MessageKind, usize); +impl Default for MessageResponseBuilder { + fn default() -> Self { + MessageResponseBuilder(MessageKind::Error, 0) + } +} + impl MessageResponseBuilder { pub fn with_message>>(self, message: S) -> MessageResponse { let Self(kind, code) = self; @@ -141,7 +146,7 @@ impl StringError { /// Common rejection handling strategy for ipfs http api compatible error responses pub async fn recover_as_message_response( err: warp::reject::Rejection, -) -> Result { +) -> Result { use warp::http::StatusCode; use warp::reject::{InvalidQuery, LengthRequired, MethodNotAllowed}; diff --git a/http/src/v0/swarm.rs b/http/src/v0/swarm.rs index 9303be84f..6d245c51f 100644 --- a/http/src/v0/swarm.rs +++ b/http/src/v0/swarm.rs @@ -23,9 +23,8 @@ async fn connect_query( pub fn connect( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("swarm" / "connect") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(connect_query) } @@ -89,9 +88,8 @@ async fn peers_query( pub fn peers( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("swarm" / "peers") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(peers_query) } @@ -120,10 +118,8 @@ async fn addrs_query(ipfs: Ipfs) -> Result( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("swarm" / "addrs") - .and(with_ipfs(ipfs)) - .and_then(addrs_query) +) -> impl Filter + Clone { + with_ipfs(ipfs).and_then(addrs_query) } #[derive(Debug, Deserialize)] @@ -154,9 +150,8 @@ async fn addrs_local_query( pub fn addrs_local( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("swarm" / "addrs" / "local") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(addrs_local_query) } @@ -179,9 +174,8 @@ async fn disconnect_query( pub fn disconnect( ipfs: &Ipfs, -) -> impl Filter + Clone { - warp::path!("swarm" / "disconnect") - .and(with_ipfs(ipfs)) +) -> impl Filter + Clone { + with_ipfs(ipfs) .and(query::()) .and_then(disconnect_query) } diff --git a/http/src/v0/version.rs b/http/src/v0/version.rs index 1cccec7e0..bd291e750 100644 --- a/http/src/v0/version.rs +++ b/http/src/v0/version.rs @@ -24,12 +24,14 @@ pub struct Response { // https://docs-beta.ipfs.io/reference/http/api/#api-v0-version // Note: the parameter formatting is only verified, feature looks to be unimplemented for `go-ipfs // 0.4.23` and handled by cli. This is not compatible with `rust-ipfs-api`. -pub async fn version(_query: Query) -> Result { +pub fn version( + _query: Query, +) -> impl std::future::Future> { let response = Response { version: env!("CARGO_PKG_VERSION"), // TODO: move over to rust-ipfs not to worry about syncing version numbers? commit: env!("VERGEN_SHA_SHORT"), repo: "", }; - Ok(warp::reply::json(&response)) + futures::future::ready(Ok((warp::reply::json(&response),))) } diff --git a/unixfs/Cargo.toml b/unixfs/Cargo.toml index 5ffcc4e64..465490b66 100644 --- a/unixfs/Cargo.toml +++ b/unixfs/Cargo.toml @@ -23,3 +23,9 @@ sha2 = { default-features = false, version = "0.9" } hex-literal = { default-features = false, version = "0.3" } libc = { default-features = false, version = "0.2.71" } multibase = { default-features = false, version = "0.8.0" } +tar = { default-features = false, version = "0.4" } +criterion = "0.3" + +[[bench]] +name = "ingest-tar" +harness = false diff --git a/unixfs/benches/ingest-tar.rs b/unixfs/benches/ingest-tar.rs new file mode 100644 index 000000000..a24d8403d --- /dev/null +++ b/unixfs/benches/ingest-tar.rs @@ -0,0 +1,127 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; + +pub fn criterion_benchmark(c: &mut Criterion) { + let file = "benchmark.tar"; + + match std::fs::read(file) { + Ok(tar_bytes) => { + // warmup should take care of right sizing these + let mut buffer = Vec::new(); + let mut path = String::new(); + + c.bench_function("ingest-tar", |b| { + b.iter(|| ingest_tar(&tar_bytes, &mut buffer, &mut path)) + }); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + eprintln!("could not find {:?}:", file); + eprintln!("please download a linux kernel and unpack it to enable benchmark. specific version doesn't matter."); + } + Err(e) => panic!("failed to read the {:?}: {}", file, e), + } +} + +fn ingest_tar(bytes: &[u8], buffer: &mut Vec, path: &mut String) { + use cid::Cid; + use ipfs_unixfs::dir::builder::{BufferingTreeBuilder, TreeOptions}; + use ipfs_unixfs::file::adder::FileAdder; + use std::io::Read; + + let mut archive = tar::Archive::new(std::io::Cursor::new(bytes)); + let entries = archive.entries().unwrap(); + + let mut opts = TreeOptions::default(); + opts.wrap_with_directory(); + let mut tree = BufferingTreeBuilder::new(opts); + + for entry in entries { + let mut entry = entry.expect("assuming good tar"); + + if let Some(_link_name) = entry.link_name_bytes() { + // TODO: symlinks + continue; + } + + let path_bytes = entry.path_bytes(); + let tmp_path = std::str::from_utf8(&*path_bytes).unwrap(); + path.clear(); + path.push_str(tmp_path); + + if !path.ends_with('/') { + let mut adder = FileAdder::default(); + + // with the std::io::Read it'd be good to read into the fileadder, or read into ... + // something. trying to acccess the buffer from inside FileAdder does not seem the be the + // way to go. + // + // reusing the buffers between files would make a lot of sense as well + + if let Some(needed) = adder.size_hint().checked_sub(buffer.capacity()) { + buffer.reserve(needed); + } + + if let Some(mut needed) = adder.size_hint().checked_sub(buffer.len()) { + let zeros = [0u8; 8]; + + while needed > zeros.len() { + buffer.extend_from_slice(&zeros[..]); + needed -= zeros.len(); + } + + buffer.extend(std::iter::repeat(0).take(needed)); + } + + let mut total_written = 0usize; + + loop { + match entry.read(&mut buffer[0..]).unwrap() { + 0 => { + let blocks = adder.finish(); + let (cid, subtotal) = blocks + .fold( + None, + |acc: Option<(Cid, usize)>, (cid, bytes): (Cid, Vec)| match acc + { + Some((_, total)) => Some((cid, total + bytes.len())), + None => Some((cid, bytes.len())), + }, + ) + .expect("this is probably always present"); + + total_written += subtotal; + + tree.put_link(&path, cid, total_written as u64).unwrap(); + break; + } + n => { + let mut read = 0; + while read < n { + let (blocks, consumed) = adder.push(&buffer[read..n]); + read += consumed; + total_written += blocks.map(|(_, bytes)| bytes.len()).sum::(); + } + } + } + } + } else { + tree.set_metadata(&path[..path.len() - 1], ipfs_unixfs::Metadata::default()) + .unwrap(); + } + } + + let mut iter = tree.build(); + + let mut last: Option<(Cid, u64, usize)> = None; + + while let Some(res) = iter.next_borrowed() { + let res = res.unwrap(); + last = Some((res.cid.to_owned(), res.total_size, res.block.len())); + } + + let last = last.unwrap(); + + black_box(last); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/unixfs/src/dir.rs b/unixfs/src/dir.rs index 07af0ea99..caf69b344 100644 --- a/unixfs/src/dir.rs +++ b/unixfs/src/dir.rs @@ -10,6 +10,9 @@ pub use sharded_lookup::{Cache, LookupError, ShardError, ShardedLookup}; mod directory; pub(crate) use directory::{check_directory_supported, UnexpectedDirectoryProperties}; +/// Directory tree builder. +pub mod builder; + pub(crate) fn check_hamtshard_supported( mut flat: FlatUnixFs<'_>, ) -> Result, ShardError> { diff --git a/unixfs/src/dir/builder.rs b/unixfs/src/dir/builder.rs new file mode 100644 index 000000000..16004c135 --- /dev/null +++ b/unixfs/src/dir/builder.rs @@ -0,0 +1,151 @@ +use cid::Cid; +use std::fmt; + +mod dir_builder; +use dir_builder::DirBuilder; + +mod iter; +pub use iter::{OwnedTreeNode, PostOrderIterator, TreeNode}; + +mod buffered; +pub use buffered::BufferingTreeBuilder; + +mod custom_pb; +use custom_pb::CustomFlatUnixFs; + +enum Entry { + Leaf(Leaf), + Directory(DirBuilder), +} + +impl fmt::Debug for Entry { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use Entry::*; + + match self { + Leaf(leaf) => write!(fmt, "Leaf {{ {:?} }}", leaf), + Directory(_) => write!(fmt, "DirBuilder {{ .. }}"), + } + } +} + +impl Entry { + fn as_dir_builder(&mut self) -> Result<&mut DirBuilder, ()> { + use Entry::*; + match self { + Directory(ref mut d) => Ok(d), + _ => Err(()), + } + } +} + +struct Leaf { + link: Cid, + total_size: u64, +} + +impl fmt::Debug for Leaf { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "{}, {}", self.link, self.total_size) + } +} + +/// Configuration for customizing how the tree is built. +#[derive(Debug, Clone)] +pub struct TreeOptions { + block_size_limit: Option, + wrap_with_directory: bool, +} + +impl Default for TreeOptions { + fn default() -> Self { + TreeOptions { + // this is just a guess; our bitswap message limit is a bit more + block_size_limit: Some(512 * 1024), + wrap_with_directory: false, + } + } +} + +impl TreeOptions { + /// Overrides the default directory block size limit. If the size limit is set to `None`, no + /// directory will be too large. + pub fn block_size_limit(&mut self, limit: Option) { + self.block_size_limit = limit; + } + + /// When true, allow multiple top level entries, otherwise error on the second entry. + /// Defaults to false. + pub fn wrap_with_directory(&mut self) { + self.wrap_with_directory = true; + } +} + +/// Tree building failure cases. +#[derive(Debug)] +pub enum TreeBuildingFailed { + /// The given full path started with a slash; paths in the `/add` convention are not rooted. + RootedPath(String), + /// The given full path contained an empty segment. + RepeatSlashesInPath(String), + /// The given full path ends in slash. + PathEndsInSlash(String), + /// If the `BufferingTreeBuilder` was created without `TreeOptions` with the option + /// `wrap_with_directory` enabled, then there can be only a single element at the root. + TooManyRootLevelEntries, + /// The given full path had already been added. + DuplicatePath(String), + /// The given full path had already been added as a link to an opaque entry. + LeafAsDirectory(String), +} + +impl fmt::Display for TreeBuildingFailed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use TreeBuildingFailed::*; + + match self { + RootedPath(s) => write!(fmt, "path is rooted: {:?}", s), + RepeatSlashesInPath(s) => write!(fmt, "path contains repeat slashes: {:?}", s), + PathEndsInSlash(s) => write!(fmt, "path ends in a slash: {:?}", s), + TooManyRootLevelEntries => write!( + fmt, + "multiple root level entries while configured wrap_with_directory = false" + ), + // TODO: perhaps we should allow adding two leafs with the same Cid? + DuplicatePath(s) => write!(fmt, "path exists already: {:?}", s), + LeafAsDirectory(s) => write!( + fmt, + "attempted to use already added leaf as a subdirectory: {:?}", + s + ), + } + } +} + +impl std::error::Error for TreeBuildingFailed {} + +/// Failure cases for `PostOrderIterator` creating the tree dag-pb nodes. +#[derive(Debug)] +pub enum TreeConstructionFailed { + /// Failed to serialize the protobuf node for the directory + Protobuf(quick_protobuf::Error), + /// The resulting directory would be too large and HAMT sharding is yet to be implemented or + /// denied. + TooLargeBlock(u64), +} + +impl fmt::Display for TreeConstructionFailed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + use TreeConstructionFailed::*; + + match self { + Protobuf(e) => write!(fmt, "serialization failed: {}", e), + TooLargeBlock(size) => write!(fmt, "attempted to create block of {} bytes", size), + } + } +} + +impl std::error::Error for TreeConstructionFailed {} + +#[derive(Debug)] +struct NamedLeaf(String, Cid, u64); diff --git a/unixfs/src/dir/builder/buffered.rs b/unixfs/src/dir/builder/buffered.rs new file mode 100644 index 000000000..995fd9607 --- /dev/null +++ b/unixfs/src/dir/builder/buffered.rs @@ -0,0 +1,445 @@ +use super::{DirBuilder, Entry, Leaf, PostOrderIterator, TreeBuildingFailed, TreeOptions}; +use crate::Metadata; +use cid::Cid; +use std::collections::btree_map::Entry::*; + +/// UnixFs directory tree builder which buffers entries until `build()` is called. +#[derive(Debug)] +pub struct BufferingTreeBuilder { + /// At the root there can be only one element, unless an option was given to create a new + /// directory surrounding the root elements. + root_builder: DirBuilder, + longest_path: usize, + // used to generate a unique id for each node; it is used when doing the post order traversal to + // recover all children's rendered Cids + counter: u64, + opts: TreeOptions, +} + +impl Default for BufferingTreeBuilder { + fn default() -> Self { + Self::new(TreeOptions::default()) + } +} + +impl BufferingTreeBuilder { + /// Construct a new tree builder with the given configuration. + pub fn new(opts: TreeOptions) -> Self { + BufferingTreeBuilder { + root_builder: DirBuilder::root(0), + longest_path: 0, + counter: 1, + opts, + } + } + + /// Registers the given path to be a link to the cid that follows. The target leaf should be + /// either a file, directory or symlink but could of course be anything. It will be treated as + /// an opaque link. + pub fn put_link( + &mut self, + full_path: &str, + target: Cid, + total_size: u64, + ) -> Result<(), TreeBuildingFailed> { + let leaf = Leaf { + link: target, + total_size, + }; + + self.modify_with(full_path, |parent, basename, _| { + parent + .put_leaf(basename, leaf) + .map_err(|_| TreeBuildingFailed::DuplicatePath(full_path.to_string())) + }) + } + + /// Directories get "put" implicitly through the put files, and directories need to be adjusted + /// only when wanting them to have metadata. + pub fn set_metadata( + &mut self, + full_path: &str, + metadata: Metadata, + ) -> Result<(), TreeBuildingFailed> { + // create all paths along the way + // + // set if not set, error otherwise? FIXME: doesn't error atm + self.modify_with(full_path, |parent, basename, id| { + parent + .add_or_get_node(basename, id) + .map_err(|_| TreeBuildingFailed::LeafAsDirectory(full_path.to_string()))? + .set_metadata(metadata); + Ok(()) + }) + } + + fn modify_with(&mut self, full_path: &str, f: F) -> Result<(), TreeBuildingFailed> + where + F: FnOnce(&mut DirBuilder, String, &mut Option) -> Result<(), TreeBuildingFailed>, + { + // create all paths along the way + // + // assuming it's ok to split at '/' since that cannot be escaped in linux at least + + self.longest_path = full_path.len().max(self.longest_path); + let mut remaining = full_path.split('/').enumerate().peekable(); + let mut dir_builder = &mut self.root_builder; + + // check these before to avoid creation of bogus nodes in the tree or having to clean up. + + if full_path.ends_with('/') { + return Err(TreeBuildingFailed::PathEndsInSlash(full_path.to_string())); + } + + if full_path.contains("//") { + return Err(TreeBuildingFailed::RepeatSlashesInPath( + full_path.to_string(), + )); + } + + // needed to avoid borrowing into the DirBuilder::new calling closure + let counter = &mut self.counter; + + while let Some((depth, next)) = remaining.next() { + let last = remaining.peek().is_none(); + + match (depth, next, last) { + // this might need to be accepted in case there is just a single file + (0, "", true) => { + // accepted: allows unconditional tree building in ipfs-http + // but the resulting tree will have at most single node, which doesn't prompt + // creation of new directories and should be fine. + } + (0, "", false) => { + // ok to keep this inside the loop; we are yet to create any nodes. + // note the ipfs-http (and for example js-ipfs) normalizes the path by + // removing the slash from the start. + return Err(TreeBuildingFailed::RootedPath(full_path.to_string())); + } + (_, "", false) => unreachable!("already validated: no repeat slashes"), + (_, "", true) => unreachable!("already validated: path does not end in slash"), + _ => {} + } + + // our first level can be full, depending on the options given + let full = depth == 0 && !self.opts.wrap_with_directory && !dir_builder.is_empty(); + + if last { + let mut next_id = Some(*counter); + + let ret = if full { + Err(TreeBuildingFailed::TooManyRootLevelEntries) + } else { + f(dir_builder, next.to_string(), &mut next_id) + }; + + if next_id.is_none() { + *counter += 1; + } + + if ret.is_err() { + // FIXME: there might be a case where we have now stale nodes in our tree but + // cannot figure out an example for that. + } + + return ret; + } + + let parent_id = dir_builder.id; + + dir_builder = match (full, dir_builder.nodes.entry(next.to_string())) { + (_, Occupied(oe)) => oe + .into_mut() + .as_dir_builder() + .map_err(|_| TreeBuildingFailed::LeafAsDirectory(full_path.to_string()))?, + (false, Vacant(ve)) => { + let next_id = *counter; + *counter += 1; + ve.insert(Entry::Directory(DirBuilder::new(parent_id, next_id))) + .as_dir_builder() + .expect("safe: we just inserted a DirBuilder") + } + (true, Vacant(_)) => return Err(TreeBuildingFailed::TooManyRootLevelEntries), + }; + } + + // as the str::split will always return a single element this should not ever be hit + unreachable!( + "walked the full_path but failed to add anything: {:?}", + full_path + ); + } + + /// Called to build the tree. The built tree will have the added files and their implied + /// directory structure, along with the directory entries which were created using + /// `set_metadata`. To build the whole hierarchy, one must iterate the returned iterator to + /// completion while storing the created blocks. + /// + /// Returned `PostOrderIterator` will use the given `full_path` and `block_buffer` to store + /// its data during the walk. `PostOrderIterator` implements `Iterator` while also allowing + /// borrowed access via `next_borrowed`. + pub fn build(self) -> PostOrderIterator { + PostOrderIterator::new(self.root_builder, self.opts, self.longest_path) + } +} + +#[cfg(test)] +mod tests { + use super::{ + super::OwnedTreeNode, BufferingTreeBuilder, Metadata, TreeBuildingFailed, TreeOptions, + }; + use cid::Cid; + use std::convert::TryFrom; + + #[test] + fn some_directories() { + let mut builder = BufferingTreeBuilder::default(); + + // foobar\n + let five_block_foobar = + Cid::try_from("QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6").unwrap(); + + builder + .put_link("a/b/c/d/e/f/g.txt", five_block_foobar.clone(), 221) + .unwrap(); + builder + .put_link("a/b/c/d/e/h.txt", five_block_foobar.clone(), 221) + .unwrap(); + builder + .put_link("a/b/c/d/e/i.txt", five_block_foobar, 221) + .unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|n| (n.path, n.cid, n.block))) + .collect::, _>>() + .unwrap(); + + let expected = vec![ + ( + "a/b/c/d/e/f", + "Qmbgf44ztW9wLcGNRNYGinGQB6SQDQtbHVbkM5MrWms698", + ), + ( + "a/b/c/d/e", + "Qma1hCr3CuPRAq2Gw4DCNMqsi42Bjs4Bt1MGSS57kNh144", + ), + ("a/b/c/d", "QmUqaYatcJqiSFdykHXGh4Nog1eMSfDJBeYzcG67KV5Ri4"), + ("a/b/c", "QmYwaNBaGpDCNN9XpHmjxVPHmEXZMw9KDY3uikE2UU5fVB"), + ("a/b", "QmeAzCPig4o4gBLh2LvP96Sr8MUBrsu2Scw9MTq1EvTDhY"), + ("a", "QmSTUFaPwJW8xD4KNRLLQRqVTYtYC29xuhYTJoYPWdzvKp"), + ]; + + verify_results(expected, actual); + } + + #[test] + fn empty_path() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("", some_cid(0), 1).unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|OwnedTreeNode { path, .. }| path)) + .collect::, _>>() + .unwrap(); + + assert!( + actual.is_empty(), + "wrapping in directory was not asked, single element" + ); + } + + #[test] + #[should_panic] + fn rooted_path() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("/a", some_cid(0), 1).unwrap(); + } + + #[test] + #[should_panic] + fn successive_slashes() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a//b", some_cid(0), 1).unwrap(); + } + + #[test] + fn multiple_roots() { + // foobar\n + let five_block_foobar = + Cid::try_from("QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6").unwrap(); + + let mut opts = TreeOptions::default(); + opts.wrap_with_directory(); + let mut builder = BufferingTreeBuilder::new(opts); + builder + .put_link("a", five_block_foobar.clone(), 221) + .unwrap(); + builder.put_link("b", five_block_foobar, 221).unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|OwnedTreeNode { path, cid, .. }| (path, cid.to_string()))) + .collect::, _>>() + .unwrap(); + + assert_eq!( + actual, + &[( + "".to_string(), + "QmdbWuhpVCX9weVMMqvVTMeGwKMqCNJDbx7ZK1zG36sea7".to_string() + )] + ); + } + + #[test] + fn single_wrapped_root() { + // foobar\n + let five_block_foobar = + Cid::try_from("QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6").unwrap(); + + let mut opts = TreeOptions::default(); + opts.wrap_with_directory(); + let mut builder = BufferingTreeBuilder::new(opts); + builder.put_link("a", five_block_foobar, 221).unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|OwnedTreeNode { path, cid, .. }| (path, cid.to_string()))) + .collect::, _>>() + .unwrap(); + + assert_eq!( + actual, + &[( + "".to_string(), + "QmQBseoi3b2FBrYhjM2E4mCF4Q7C8MgCUbzAbGNfyVwgNk".to_string() + )] + ); + } + + #[test] + #[should_panic] + fn denied_multiple_root_dirs() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a/c.txt", some_cid(0), 1).unwrap(); + builder.put_link("b/d.txt", some_cid(1), 1).unwrap(); + } + + #[test] + #[should_panic] + fn denied_multiple_root_files() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a.txt", some_cid(0), 1).unwrap(); + builder.put_link("b.txt", some_cid(1), 1).unwrap(); + } + + #[test] + #[should_panic] + fn using_leaf_as_node() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a.txt", some_cid(0), 1).unwrap(); + builder.put_link("a.txt/b.txt", some_cid(1), 1).unwrap(); + } + + #[test] + fn set_metadata_before_files() { + let mut builder = BufferingTreeBuilder::default(); + builder + .set_metadata("a/b/c/d", Metadata::default()) + .unwrap(); + builder.put_link("a/b/c/d/e.txt", some_cid(1), 1).unwrap(); + builder.put_link("a/b/c/d/f.txt", some_cid(2), 1).unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|OwnedTreeNode { path, .. }| path)) + .collect::, _>>() + .unwrap(); + + assert_eq!(actual, &["a/b/c/d", "a/b/c", "a/b", "a",]) + } + + #[test] + fn set_metadata_on_file() { + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a/a.txt", some_cid(0), 1).unwrap(); + let err = builder + .set_metadata("a/a.txt", Metadata::default()) + .unwrap_err(); + + assert!( + matches!(err, TreeBuildingFailed::LeafAsDirectory(_)), + "{:?}", + err + ); + } + + #[test] + fn dir_with_cidv1_link() { + // this is `echo '{ "name": "hello" }` | ./ipfs dag put` + let target = + Cid::try_from("bafyreihakpd7te5nbmlhdk5ntvcvhf2hmfgrvcwna2sddq5zz5342mcbli").unwrap(); + + let mut builder = BufferingTreeBuilder::default(); + builder.put_link("a/b", target, 12).unwrap(); + + let actual = builder + .build() + .map(|res| res.map(|n| (n.path, n.cid, n.block))) + .collect::, _>>() + .unwrap(); + + let expected = vec![("a", "QmPMDMPG8dbHDC9GuvqWr9pfruLnp4GZCAWrskwCmenVQa")]; + + verify_results(expected, actual); + } + + fn verify_results( + mut expected: Vec<( + impl AsRef + std::fmt::Debug, + impl AsRef + std::fmt::Debug, + )>, + mut actual: Vec<(String, Cid, Box<[u8]>)>, + ) { + use std::fmt; + + struct Hex<'a>(&'a [u8]); + + impl<'a> fmt::Debug for Hex<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + for b in self.0 { + write!(fmt, "{:02x}", b)?; + } + Ok(()) + } + } + + // hopefully this way the errors will be easier to hunt down + + actual.reverse(); + expected.reverse(); + + while let Some(actual) = actual.pop() { + let expected = expected.pop().expect("size mismatch"); + assert_eq!(actual.0, expected.0.as_ref()); + assert_eq!( + actual.1.to_string(), + expected.1.as_ref(), + "{:?}: {:?}", + actual.0, + Hex(&actual.2) + ); + } + + assert_eq!(expected.len(), 0, "size mismatch: {:?}", actual); + } + + /// Returns a quick and dirty sha2-256 of the given number as a Cidv0 + fn some_cid(number: usize) -> Cid { + use multihash::Sha2_256; + let mh = Sha2_256::digest(&number.to_le_bytes()); + Cid::new_v0(mh).unwrap() + } +} diff --git a/unixfs/src/dir/builder/custom_pb.rs b/unixfs/src/dir/builder/custom_pb.rs new file mode 100644 index 000000000..ee0a0ad95 --- /dev/null +++ b/unixfs/src/dir/builder/custom_pb.rs @@ -0,0 +1,108 @@ +//! Custom protobuf types which are used in encoding directorys. + +use super::NamedLeaf; +use crate::pb::UnixFs; +use cid::Cid; +use quick_protobuf::{MessageWrite, Writer, WriterBackend}; + +/// Newtype which uses the &[Option<(NamedLeaf)>] as Vec. +pub(super) struct CustomFlatUnixFs<'a> { + pub(super) links: &'a [Option], + pub(super) data: UnixFs<'a>, +} + +impl<'a> CustomFlatUnixFs<'a> { + fn mapped(&self) -> impl Iterator> + '_ { + self.links + .iter() + .map(|triple| triple.as_ref().map(|l| NamedLeafAsPBLink(l)).unwrap()) + } +} + +impl<'a> MessageWrite for CustomFlatUnixFs<'a> { + fn get_size(&self) -> usize { + use quick_protobuf::sizeofs::*; + + let links = self + .mapped() + .map(|link| 1 + sizeof_len(link.get_size())) + .sum::(); + + links + 1 + sizeof_len(self.data.get_size()) + } + + fn write_message(&self, w: &mut Writer) -> quick_protobuf::Result<()> { + self.mapped() + .try_for_each(|l| w.write_with_tag(18, |w| w.write_message(&l)))?; + w.write_with_tag(10, |w| w.write_message(&self.data)) + } +} + +/// Custom NamedLeaf as PBLink "adapter." +struct NamedLeafAsPBLink<'a>(&'a NamedLeaf); + +impl<'a> MessageWrite for NamedLeafAsPBLink<'a> { + fn get_size(&self) -> usize { + use quick_protobuf::sizeofs::*; + + // ones are the tags + 1 + sizeof_len((self.0).0.len()) + + 1 + + sizeof_len(WriteableCid(&(self.0).1).get_size()) + //+ sizeof_len(self.1.link.to_bytes().len()) + + 1 + + sizeof_varint((self.0).2) + } + + fn write_message(&self, w: &mut Writer) -> quick_protobuf::Result<()> { + w.write_with_tag(10, |w| w.write_message(&WriteableCid(&(self.0).1)))?; + //w.write_with_tag(10, |w| w.write_bytes(&self.1.link.to_bytes()))?; + w.write_with_tag(18, |w| w.write_string((self.0).0.as_str()))?; + w.write_with_tag(24, |w| w.write_uint64((self.0).2))?; + Ok(()) + } +} + +/// Newtype around Cid to allow embedding it as PBLink::Hash without allocating a vector. +struct WriteableCid<'a>(&'a Cid); + +impl<'a> MessageWrite for WriteableCid<'a> { + fn get_size(&self) -> usize { + use cid::Version::*; + use quick_protobuf::sizeofs::*; + + let hash_len = self.0.hash().as_bytes().len(); + + match self.0.version() { + V0 => hash_len, + V1 => { + let version_len = 1; + let codec_len = sizeof_varint(u64::from(self.0.codec())); + version_len + codec_len + hash_len + } + } + } + + fn write_message(&self, w: &mut Writer) -> quick_protobuf::Result<()> { + use cid::Version::*; + + match self.0.version() { + V0 => { /* cidv0 has only the _multi_hash */ } + V1 => { + // it is possible that CidV1 should not be linked to from a unixfs + // directory; at least go-ipfs 0.5 `ipfs files` denies making a cbor link + // but happily accepts and does refs over one. + w.write_u8(1)?; + w.write_varint(u64::from(self.0.codec()))?; + } + } + + self.0 + .hash() + .as_bytes() + .iter() + // while this looks bad it cannot be measured; note we cannot use the + // write_bytes because that is length prefixed bytes write + .try_for_each(|b| w.write_u8(*b)) + } +} diff --git a/unixfs/src/dir/builder/dir_builder.rs b/unixfs/src/dir/builder/dir_builder.rs new file mode 100644 index 000000000..486deecec --- /dev/null +++ b/unixfs/src/dir/builder/dir_builder.rs @@ -0,0 +1,78 @@ +use super::{Entry, Leaf}; +use crate::Metadata; +use std::collections::btree_map::Entry::*; +use std::collections::BTreeMap; + +pub(super) struct DuplicateName; +pub(super) struct FoundLeaf; + +/// Node in a directory tree. +#[derive(Debug)] +pub(super) struct DirBuilder { + /// Immediate files, symlinks or directories in this directory + pub nodes: BTreeMap, + /// Metadata for this directory + metadata: Metadata, + /// Id of the parent; None for the root node + pub parent_id: Option, + /// Internal id, used for propagating Cids back from children during post order visit. + pub id: u64, +} + +impl DirBuilder { + pub fn new(parent_id: u64, id: u64) -> Self { + assert_ne!(parent_id, id); + DirBuilder { + nodes: Default::default(), + metadata: Default::default(), + parent_id: Some(parent_id), + id, + } + } + + pub fn root(id: u64) -> Self { + DirBuilder { + nodes: Default::default(), + metadata: Default::default(), + parent_id: None, + id, + } + } + + pub fn put_leaf(&mut self, key: String, leaf: Leaf) -> Result<(), DuplicateName> { + match self.nodes.entry(key) { + Occupied(_) => Err(DuplicateName), + Vacant(ve) => { + ve.insert(Entry::Leaf(leaf)); + Ok(()) + } + } + } + + pub fn add_or_get_node( + &mut self, + key: String, + id: &mut Option, + ) -> Result<&mut DirBuilder, FoundLeaf> { + match self.nodes.entry(key) { + Occupied(oe) => oe.into_mut().as_dir_builder().map_err(|_| FoundLeaf), + Vacant(ve) => { + let id = id.take().unwrap(); + let entry = ve.insert(Entry::Directory(Self::new(self.id, id))); + Ok(entry.as_dir_builder().expect("just inserted")) + } + } + } + + pub fn len(&self) -> usize { + self.nodes.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn set_metadata(&mut self, metadata: Metadata) { + self.metadata = metadata; + } +} diff --git a/unixfs/src/dir/builder/iter.rs b/unixfs/src/dir/builder/iter.rs new file mode 100644 index 000000000..208e4f519 --- /dev/null +++ b/unixfs/src/dir/builder/iter.rs @@ -0,0 +1,465 @@ +use super::{ + CustomFlatUnixFs, DirBuilder, Entry, Leaf, NamedLeaf, TreeConstructionFailed, TreeOptions, +}; +use cid::Cid; +use std::collections::HashMap; +use std::fmt; + +/// Constructs the directory nodes required for a tree. +/// +/// Implements the Iterator interface for owned values and the borrowed version, `next_borrowed`. +/// The tree is fully constructed once this has been exhausted. +pub struct PostOrderIterator { + full_path: String, + old_depth: usize, + block_buffer: Vec, + // our stack of pending work + pending: Vec, + // "communication channel" from nested entries back to their parents; this hashmap is only used + // in the event of mixed child nodes (leaves and nodes). + persisted_cids: HashMap>>, + reused_children: Vec, + cid: Option, + total_size: u64, + // from TreeOptions + opts: TreeOptions, +} + +/// The link list used to create the directory node. This list is created from a the BTreeMap +/// inside DirBuilder, and initially it will have `Some` values only for the initial leaves and +/// `None` values for subnodes which are not yet ready. At the time of use, this list is expected +/// to have only `Some` values. +type Leaves = Vec>; + +/// The nodes in the visit. We need to do a post-order visit, which starts from a single +/// `DescentRoot`, followed by N `Descents` where N is the deepest directory in the tree. On each +/// descent, we'll need to first schedule a `Post` (or `PostRoot`) followed the immediate children +/// of the node. Directories are rendered when all of their direct and indirect descendants have +/// been serialized into NamedLeafs. +#[derive(Debug)] +enum Visited { + // handle root differently not to infect with the Option and Option + DescentRoot(DirBuilder), + Descent { + node: DirBuilder, + name: String, + depth: usize, + /// The index in the parents `Leaves` accessible through `PostOrderIterator::persisted_cids`. + index: usize, + }, + Post { + parent_id: u64, + depth: usize, + name: String, + index: usize, + /// Leaves will be stored directly in this field when there are no DirBuilder descendants, + /// in the `PostOrderIterator::persisted_cids` otherwise. + leaves: LeafStorage, + }, + PostRoot { + leaves: LeafStorage, + }, +} + +impl PostOrderIterator { + pub(super) fn new(root: DirBuilder, opts: TreeOptions, longest_path: usize) -> Self { + let root = Visited::DescentRoot(root); + PostOrderIterator { + full_path: String::with_capacity(longest_path), + old_depth: 0, + block_buffer: Default::default(), + pending: vec![root], + persisted_cids: Default::default(), + reused_children: Vec::new(), + cid: None, + total_size: 0, + opts, + } + } + + fn render_directory( + links: &[Option], + buffer: &mut Vec, + block_size_limit: &Option, + ) -> Result { + use crate::pb::{UnixFs, UnixFsType}; + use quick_protobuf::{BytesWriter, MessageWrite, Writer}; + use sha2::{Digest, Sha256}; + + // FIXME: ideas on how to turn this into a HAMT sharding on some heuristic. we probably + // need to introduce states in to the "iterator": + // + // 1. bucketization + // 2. another post order visit of the buckets? + // + // the nested post order visit should probably re-use the existing infra ("message + // passing") and new ids can be generated by giving this iterator the counter from + // BufferedTreeBuilder. + // + // could also be that the HAMT shard building should start earlier, since the same + // heuristic can be detected *at* bufferedtreewriter. there the split would be easier, and + // this would "just" be a single node rendering, and not need any additional states.. + + let node = CustomFlatUnixFs { + links, + data: UnixFs { + Type: UnixFsType::Directory, + ..Default::default() + }, + }; + + let size = node.get_size(); + + if let Some(limit) = block_size_limit { + let size = size as u64; + if *limit < size { + // FIXME: this could probably be detected at builder + return Err(TreeConstructionFailed::TooLargeBlock(size)); + } + } + + let cap = buffer.capacity(); + + if let Some(additional) = size.checked_sub(cap) { + buffer.reserve(additional); + } + + if let Some(mut needed_zeroes) = size.checked_sub(buffer.len()) { + let zeroes = [0; 8]; + + while needed_zeroes > 8 { + buffer.extend_from_slice(&zeroes[..]); + needed_zeroes -= zeroes.len(); + } + + buffer.extend(std::iter::repeat(0).take(needed_zeroes)); + } + + let mut writer = Writer::new(BytesWriter::new(&mut buffer[..])); + node.write_message(&mut writer) + .map_err(TreeConstructionFailed::Protobuf)?; + + buffer.truncate(size); + + let mh = multihash::wrap(multihash::Code::Sha2_256, &Sha256::digest(&buffer)); + let cid = Cid::new_v0(mh).expect("sha2_256 is the correct multihash for cidv0"); + + let combined_from_links = links + .iter() + .map(|opt| { + opt.as_ref() + .map(|NamedLeaf(_, _, total_size)| total_size) + .unwrap() + }) + .sum::(); + + Ok(Leaf { + link: cid, + total_size: buffer.len() as u64 + combined_from_links, + }) + } + + /// Construct the next dag-pb node, if any. + /// + /// Returns a `TreeNode` of the latest constructed tree node. + pub fn next_borrowed(&mut self) -> Option, TreeConstructionFailed>> { + while let Some(visited) = self.pending.pop() { + let (name, depth) = match &visited { + Visited::DescentRoot(_) => (None, 0), + Visited::Descent { name, depth, .. } => (Some(name.as_ref()), *depth), + Visited::Post { name, depth, .. } => (Some(name.as_ref()), *depth), + Visited::PostRoot { .. } => (None, 0), + }; + + update_full_path((&mut self.full_path, &mut self.old_depth), name, depth); + + match visited { + Visited::DescentRoot(node) => { + let children = &mut self.reused_children; + let leaves = partition_children_leaves(depth, node.nodes.into_iter(), children); + let any_children = !children.is_empty(); + + let leaves = if any_children { + self.persisted_cids.insert(node.id, leaves); + LeafStorage::from(node.id) + } else { + leaves.into() + }; + + self.pending.push(Visited::PostRoot { leaves }); + self.pending.extend(children.drain(..)); + } + Visited::Descent { + node, + name, + depth, + index, + } => { + let children = &mut self.reused_children; + let leaves = partition_children_leaves(depth, node.nodes.into_iter(), children); + let any_children = !children.is_empty(); + let parent_id = node.parent_id.expect("only roots parent_id is None"); + + let leaves = if any_children { + self.persisted_cids.insert(node.id, leaves); + node.id.into() + } else { + leaves.into() + }; + + self.pending.push(Visited::Post { + parent_id, + name, + depth, + leaves, + index, + }); + + self.pending.extend(children.drain(..)); + } + Visited::Post { + parent_id, + name, + leaves, + index, + .. + } => { + let leaves = leaves.into_inner(&mut self.persisted_cids); + let buffer = &mut self.block_buffer; + + let leaf = match Self::render_directory( + &leaves, + buffer, + &self.opts.block_size_limit, + ) { + Ok(leaf) => leaf, + Err(e) => return Some(Err(e)), + }; + + self.cid = Some(leaf.link.clone()); + self.total_size = leaf.total_size; + + { + // name is None only for wrap_with_directory, which cannot really be + // propagated up but still the parent_id is allowed to be None + let parent_leaves = self.persisted_cids.get_mut(&parent_id); + + match (parent_id, parent_leaves, index) { + (pid, None, index) => panic!( + "leaves not found for parent_id = {} and index = {}", + pid, index + ), + (_, Some(vec), index) => { + let cell = &mut vec[index]; + // all + assert!(cell.is_none()); + *cell = Some(NamedLeaf(name, leaf.link, leaf.total_size)); + } + } + } + + return Some(Ok(TreeNode { + path: self.full_path.as_str(), + cid: self.cid.as_ref().unwrap(), + total_size: self.total_size, + block: &self.block_buffer, + })); + } + Visited::PostRoot { leaves } => { + let leaves = leaves.into_inner(&mut self.persisted_cids); + + if !self.opts.wrap_with_directory { + break; + } + + let buffer = &mut self.block_buffer; + + let leaf = match Self::render_directory( + &leaves, + buffer, + &self.opts.block_size_limit, + ) { + Ok(leaf) => leaf, + Err(e) => return Some(Err(e)), + }; + + self.cid = Some(leaf.link.clone()); + self.total_size = leaf.total_size; + + return Some(Ok(TreeNode { + path: self.full_path.as_str(), + cid: self.cid.as_ref().unwrap(), + total_size: self.total_size, + block: &self.block_buffer, + })); + } + } + } + None + } +} + +impl Iterator for PostOrderIterator { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_borrowed() + .map(|res| res.map(TreeNode::into_owned)) + } +} + +/// Borrowed representation of a node in the tree. +pub struct TreeNode<'a> { + /// Full path to the node. + pub path: &'a str, + /// The Cid of the document. + pub cid: &'a Cid, + /// Cumulative total size of the subtree in bytes. + pub total_size: u64, + /// Raw dag-pb document. + pub block: &'a [u8], +} + +impl<'a> fmt::Debug for TreeNode<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("TreeNode") + .field("path", &format_args!("{:?}", self.path)) + .field("cid", &format_args!("{}", self.cid)) + .field("total_size", &self.total_size) + .field("size", &self.block.len()) + .finish() + } +} + +impl TreeNode<'_> { + /// Convert to an owned and detached representation. + pub fn into_owned(self) -> OwnedTreeNode { + OwnedTreeNode { + path: self.path.to_owned(), + cid: self.cid.to_owned(), + total_size: self.total_size, + block: self.block.into(), + } + } +} + +/// Owned representation of a node in the tree. +pub struct OwnedTreeNode { + /// Full path to the node. + pub path: String, + /// The Cid of the document. + pub cid: Cid, + /// Cumulative total size of the subtree in bytes. + pub total_size: u64, + /// Raw dag-pb document. + pub block: Box<[u8]>, +} + +fn update_full_path( + (full_path, old_depth): (&mut String, &mut usize), + name: Option<&str>, + depth: usize, +) { + if depth < 2 { + // initially thought it might be a good idea to add a slash to all components; removing it made + // it impossible to get back down to empty string, so fixing this for depths 0 and 1. + full_path.clear(); + *old_depth = 0; + } else { + while *old_depth >= depth && *old_depth > 0 { + // we now want to pop the last segment + // this would be easier with PathBuf + let slash_at = full_path.bytes().rposition(|ch| ch == b'/'); + if let Some(slash_at) = slash_at { + if *old_depth == depth && Some(&full_path[(slash_at + 1)..]) == name { + // minor unmeasurable perf optimization: + // going from a/b/foo/zz => a/b/foo does not need to go through the a/b + return; + } + full_path.truncate(slash_at); + *old_depth -= 1; + } else { + todo!( + "no last slash_at in {:?} yet {} >= {}", + full_path, + old_depth, + depth + ); + } + } + } + + debug_assert!(*old_depth <= depth); + + if let Some(name) = name { + if !full_path.is_empty() { + full_path.push_str("/"); + } + full_path.push_str(name); + *old_depth += 1; + } + + assert_eq!(*old_depth, depth); +} + +/// Returns a Vec of the links in order with only the leaves, the given `children` will contain yet +/// incomplete nodes of the tree. +fn partition_children_leaves( + depth: usize, + it: impl Iterator, + children: &mut Vec, +) -> Leaves { + let mut leaves = Vec::new(); + + for (i, (k, v)) in it.enumerate() { + match v { + Entry::Directory(node) => { + children.push(Visited::Descent { + node, + // this needs to be pushed down to update the full_path + name: k, + depth: depth + 1, + index: i, + }); + + // this will be overwritten later, but the order is fixed + leaves.push(None); + } + Entry::Leaf(leaf) => leaves.push(Some(NamedLeaf(k, leaf.link, leaf.total_size))), + } + } + + leaves +} + +#[derive(Debug)] +enum LeafStorage { + Direct(Leaves), + Stashed(u64), +} + +impl LeafStorage { + fn into_inner(self, stash: &mut HashMap) -> Leaves { + use LeafStorage::*; + + match self { + Direct(leaves) => leaves, + Stashed(id) => stash + .remove(&id) + .ok_or(id) + .expect("leaves are either stashed or direct, must able to find with id"), + } + } +} + +impl From for LeafStorage { + fn from(key: u64) -> LeafStorage { + LeafStorage::Stashed(key) + } +} + +impl From for LeafStorage { + fn from(leaves: Leaves) -> LeafStorage { + LeafStorage::Direct(leaves) + } +}