From 448637bde295b2969adcb57bdc5dd57af0158571 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 11 Mar 2021 16:45:08 +0800 Subject: [PATCH 1/3] feat: add index pipe --- Cargo.lock | 20 +---- Cargo.toml | 2 +- src/index_pipe.rs | 180 +++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/stream_pipe.rs | 8 +- src/traits.rs | 8 +- 6 files changed, 192 insertions(+), 27 deletions(-) create mode 100644 src/index_pipe.rs diff --git a/Cargo.lock b/Cargo.lock index 0c19066..76ffb40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,22 +173,6 @@ dependencies = [ "vec_map", ] -[[package]] -name = "console" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50aab2529019abfabfa93f1e6c41ef392f91fbf179b347a7e96abb524884a08" -dependencies = [ - "encode_unicode", - "lazy_static", - "libc", - "regex", - "terminal_size", - "unicode-width", - "winapi", - "winapi-util", -] - [[package]] name = "console" version = "0.14.0" @@ -670,7 +654,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" dependencies = [ - "console 0.14.0", + "console", "lazy_static", "number_prefix", "regex", @@ -831,7 +815,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "console 0.13.0", + "console", "flate2", "futures-core", "futures-retry", diff --git a/Cargo.toml b/Cargo.toml index 5464b9c..907f9b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ version = "0.2.0" async-trait = "0.1" bytes = "1.0" chrono = "0.4" -console = "0.13" +console = "0.14" flate2 = "1.0" futures-core = "0.3" futures-retry = "0.6" diff --git a/src/index_pipe.rs b/src/index_pipe.rs new file mode 100644 index 0000000..697623a --- /dev/null +++ b/src/index_pipe.rs @@ -0,0 +1,180 @@ +//! IndexPipe adds Index to every directory of source. + +use crate::common::{Mission, SnapshotConfig, SnapshotPath, TransferURL}; +use crate::error::Result; +use crate::metadata::SnapshotMeta; +use crate::stream_pipe::ByteStream; +use crate::traits::{Key, SnapshotStorage, SourceStorage}; +use async_trait::async_trait; +use std::collections::{BTreeMap, BTreeSet}; + +static LIST_URL: &'static str = "mirror_intel_list.html"; +pub struct IndexPipe { + source: Source, + index: Index, +} + +pub struct Index { + prefixes: BTreeMap, + objects: BTreeSet, +} + +impl Index { + fn new() -> Self { + Self { + prefixes: BTreeMap::new(), + objects: BTreeSet::new(), + } + } + + fn insert(&mut self, path: &str) { + match path.split_once('/') { + Some((parent, rest)) => { + self.prefixes + .entry(parent.to_string()) + .or_insert(Index::new()) + .insert(rest); + } + None => { + self.objects.insert(path.to_string()); + } + } + } + + fn snapshot(&self, prefix: &str, list_key: &str) -> Vec { + let mut result = vec![]; + result.push(format!("{}{}", prefix, list_key)); + for (key, index) in &self.prefixes { + let new_prefix = format!("{}{}/", prefix, key); + result.extend(index.snapshot(&new_prefix, list_key)); + } + result + } +} + +fn generate_index(objects: &[String]) -> Index { + let mut index = Index::new(); + for object in objects { + index.insert(object); + } + index +} + +impl IndexPipe { + fn snapshot_index_keys(&mut self, mut snapshot: Vec) -> Vec { + snapshot.sort(); + // If duplicated keys are found, there should be a warning. + // This warning will be handled on transfer. + snapshot.dedup(); + self.index = generate_index(&snapshot); + self.index.snapshot("", LIST_URL) + } +} + +#[async_trait] +impl SnapshotStorage for IndexPipe +where + Source: SnapshotStorage + std::fmt::Debug, +{ + async fn snapshot( + &mut self, + mission: Mission, + config: &SnapshotConfig, + ) -> Result> { + let mut snapshot = self.source.snapshot(mission, config).await?; + let index_keys = + self.snapshot_index_keys(snapshot.iter().map(|x| x.key().to_owned()).collect()); + snapshot.extend(index_keys.into_iter().map(|x| SnapshotPath(x))); + Ok(snapshot) + } + + fn info(&self) -> String { + format!("IndexPipe (path) <{:?}>", self.source) + } +} + +#[async_trait] +impl SnapshotStorage for IndexPipe +where + Source: SnapshotStorage + std::fmt::Debug, +{ + async fn snapshot( + &mut self, + mission: Mission, + config: &SnapshotConfig, + ) -> Result> { + let mut snapshot = self.source.snapshot(mission, config).await?; + let index_keys = + self.snapshot_index_keys(snapshot.iter().map(|x| x.key().to_owned()).collect()); + snapshot.extend(index_keys.into_iter().map(|x| SnapshotMeta::force(x))); + Ok(snapshot) + } + + fn info(&self) -> String { + format!("IndexPipe (meta) <{:?}>", self.source) + } +} +#[async_trait] +impl SourceStorage for IndexPipe +where + Snapshot: Key, + Source: SourceStorage + std::fmt::Debug, +{ + async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result { + if snapshot.key().ends_with(LIST_URL) { + todo!(); + } else { + self.source.get_object(snapshot, mission).await + } + } +} + +#[cfg(test)] +mod tests { + use itertools::Itertools; + + use super::*; + + #[test] + fn test_simple() { + let mut source = ["a", "b", "c"].iter().map(|x| x.to_string()).collect_vec(); + source.sort(); + assert_eq!( + generate_index(&source).snapshot("", "list.html"), + vec!["list.html"] + ); + } + + #[test] + fn test_dir() { + let mut source = ["a", "b", "c/a", "c/b", "c/c", "d"] + .iter() + .map(|x| x.to_string()) + .collect_vec(); + source.sort(); + assert_eq!( + generate_index(&source).snapshot("", "list.html"), + vec!["list.html", "c/list.html"] + ); + } + + #[test] + fn test_dir_more() { + let mut source = ["a", "b", "c/a/b/c/d/e"] + .iter() + .map(|x| x.to_string()) + .collect_vec(); + source.sort(); + assert_eq!( + generate_index(&source).snapshot("", "list.html"), + vec![ + "list.html", + "c/list.html", + "c/a/list.html", + "c/a/b/list.html", + "c/a/b/c/list.html", + "c/a/b/c/d/list.html" + ] + ); + } +} diff --git a/src/main.rs b/src/main.rs index 26871b1..9035e24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod github_release; mod gradle; mod homebrew; mod html_scanner; +mod index_pipe; mod metadata; mod mirror_intel; mod opts; diff --git a/src/stream_pipe.rs b/src/stream_pipe.rs index 4b4c0b6..fdf166d 100644 --- a/src/stream_pipe.rs +++ b/src/stream_pipe.rs @@ -1,4 +1,4 @@ -//! ByteStreamPipe pipes TransferURL to ByteObject. +//! ByteStreamPipe pipes TransferURL to ByteObject //! //! A `ByteStreamPipe` is a wrapper on sources which yields `TransferURL`. //! After piping a source through `ByteStreamPipe`, it will become a source @@ -93,8 +93,8 @@ where fn info(&self) -> String { format!( - "pipe <{:?}> to bytestream, buffered to {}", - self.source, self.buffer_path + "StreamPipe buffered to {} <{:?}>", + self.buffer_path, self.source ) } } @@ -118,7 +118,7 @@ fn hash_string(key: &str) -> String { impl SourceStorage for ByteStreamPipe where Snapshot: Send + Sync + 'static, - Source: SourceStorage + std::fmt::Debug + Send + Sync, + Source: SourceStorage + std::fmt::Debug, { async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result { let transfer_url = self.source.get_object(snapshot, mission).await?; diff --git a/src/traits.rs b/src/traits.rs index d60bb56..cbcc163 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -3,7 +3,7 @@ use crate::error::Result; use async_trait::async_trait; #[async_trait] -pub trait SnapshotStorage { +pub trait SnapshotStorage: Send + Sync + 'static { async fn snapshot( &mut self, mission: Mission, @@ -13,12 +13,12 @@ pub trait SnapshotStorage { } #[async_trait] -pub trait SourceStorage { +pub trait SourceStorage: Send + Sync + 'static { async fn get_object(&self, snapshot: &SnapshotItem, mission: &Mission) -> Result; } #[async_trait] -pub trait TargetStorage { +pub trait TargetStorage: Send + Sync + 'static { async fn put_object( &self, snapshot: &SnapshotItem, @@ -31,7 +31,7 @@ pub trait TargetStorage { #[async_trait] impl SourceStorage for Source where - Source: SnapshotStorage + Send + Sync, + Source: SnapshotStorage, Snapshot: Key, { async fn get_object(&self, snapshot: &Snapshot, _mission: &Mission) -> Result { From 262a1763b9b088cf5f63655d50c89c103bcda86e Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 11 Mar 2021 23:11:09 +0800 Subject: [PATCH 2/3] index: generate html --- Cargo.lock | 23 ++++++++++ Cargo.toml | 2 + src/index_pipe.rs | 107 ++++++++++++++++++++++++++++++++++++++++----- src/main.rs | 8 +++- src/stream_pipe.rs | 27 +++--------- src/utils.rs | 15 +++++++ 6 files changed, 150 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4ed173..c02f0df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -557,6 +557,15 @@ dependencies = [ "digest", ] +[[package]] +name = "html-escape" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d64794b2265e97e459334ed47a7b7369ce8e8ee3d3450c0c363a0b563fc92233" +dependencies = [ + "utf8-width", +] + [[package]] name = "http" version = "0.2.3" @@ -821,6 +830,7 @@ dependencies = [ "futures-core", "futures-retry", "futures-util", + "html-escape", "indicatif", "iter-set", "itertools", @@ -846,6 +856,7 @@ dependencies = [ "tokio-stream", "tokio-util", "url", + "urlencoding", "walkdir", "zip", ] @@ -1974,6 +1985,18 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9232eb53352b4442e40d7900465dfc534e8cb2dc8f18656fcb2ac16112b5593" + +[[package]] +name = "utf8-width" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9071ac216321a4470a69fb2b28cfc68dcd1a39acd877c8be8e014df6772d8efa" + [[package]] name = "vcpkg" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index b109932..0e88399 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,3 +43,5 @@ tokio-util = "0.6" url = "2.2" walkdir = "2" zip = "0.5" +urlencoding = "1.1" +html-escape = "0.2" diff --git a/src/index_pipe.rs b/src/index_pipe.rs index 697623a..e2d6dfa 100644 --- a/src/index_pipe.rs +++ b/src/index_pipe.rs @@ -1,19 +1,26 @@ //! IndexPipe adds Index to every directory of source. -use crate::common::{Mission, SnapshotConfig, SnapshotPath, TransferURL}; +use crate::common::{Mission, SnapshotConfig, SnapshotPath}; use crate::error::Result; use crate::metadata::SnapshotMeta; -use crate::stream_pipe::ByteStream; +use crate::stream_pipe::{ByteObject, ByteStream}; use crate::traits::{Key, SnapshotStorage, SourceStorage}; +use crate::utils::{hash_string, unix_time}; + use async_trait::async_trait; +use itertools::Itertools; use std::collections::{BTreeMap, BTreeSet}; +use std::path::Path; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; -static LIST_URL: &'static str = "mirror_intel_list.html"; +static LIST_URL: &'static str = "mirror_clone_list.html"; pub struct IndexPipe { source: Source, index: Index, + buffer_path: String, } +#[derive(Debug)] pub struct Index { prefixes: BTreeMap, objects: BTreeSet, @@ -50,6 +57,50 @@ impl Index { } result } + + fn index_for(&self, prefix: &str, current_directory: &str, list_key: &str) -> String { + if prefix == "" { + let mut data = String::new(); + // TODO: need escape HTML and URL + data += &format!("

{}

", html_escape::encode_text(current_directory)); + data += &self + .prefixes + .iter() + .map(|(key, _)| { + format!( + r#"{}/"#, + urlencoding::encode(key), + list_key, + html_escape::encode_text(key) + ) + }) + .collect_vec() + .join("\n
\n"); + data += "\n
\n"; + data += &self + .objects + .iter() + .map(|key| { + format!( + r#"{}"#, + urlencoding::encode(key), + html_escape::encode_text(key) + ) + }) + .collect_vec() + .join("\n
\n"); + data + } else { + if let Some((parent, rest)) = prefix.split_once('/') { + self.prefixes + .get(parent) + .unwrap() + .index_for(rest, parent, list_key) + } else { + panic!("unsupported prefix {}", prefix); + } + } + } } fn generate_index(objects: &[String]) -> Index { @@ -61,6 +112,14 @@ fn generate_index(objects: &[String]) -> Index { } impl IndexPipe { + pub fn new(source: Source, buffer_path: String) -> Self { + Self { + source, + index: Index::new(), + buffer_path, + } + } + fn snapshot_index_keys(&mut self, mut snapshot: Vec) -> Vec { snapshot.sort(); // If duplicated keys are found, there should be a warning. @@ -74,7 +133,7 @@ impl IndexPipe { #[async_trait] impl SnapshotStorage for IndexPipe where - Source: SnapshotStorage + std::fmt::Debug, + Source: SnapshotStorage, { async fn snapshot( &mut self, @@ -89,14 +148,14 @@ where } fn info(&self) -> String { - format!("IndexPipe (path) <{:?}>", self.source) + format!("IndexPipe (path) <{}>", self.source.info()) } } #[async_trait] impl SnapshotStorage for IndexPipe where - Source: SnapshotStorage + std::fmt::Debug, + Source: SnapshotStorage, { async fn snapshot( &mut self, @@ -111,18 +170,46 @@ where } fn info(&self) -> String { - format!("IndexPipe (meta) <{:?}>", self.source) + format!("IndexPipe (meta) <{}>", self.source.info()) } } + #[async_trait] impl SourceStorage for IndexPipe where Snapshot: Key, - Source: SourceStorage + std::fmt::Debug, + Source: SourceStorage, { async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result { - if snapshot.key().ends_with(LIST_URL) { - todo!(); + let key = snapshot.key(); + if key.ends_with(LIST_URL) { + let content = self + .index + .index_for(&key[..key.len() - LIST_URL.len()], "", LIST_URL) + .into_bytes(); + let pipe_file = format!("{}.{}.buffer", hash_string(key), unix_time()); + let path = Path::new(&self.buffer_path).join(pipe_file); + let mut f = BufWriter::new( + tokio::fs::OpenOptions::default() + .create(true) + .truncate(true) + .write(true) + .read(true) + .open(&path) + .await?, + ); + f.write_all(&content).await?; + f.flush().await?; + let mut f = f.into_inner(); + f.seek(std::io::SeekFrom::Start(0)).await?; + Ok(ByteStream { + object: ByteObject::LocalFile { + file: Some(f), + path: Some(path.into()), + }, + length: content.len() as u64, + modified_at: unix_time(), + }) } else { self.source.get_object(snapshot, mission).await } diff --git a/src/main.rs b/src/main.rs index 9035e24..462dfea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,19 +39,23 @@ macro_rules! transfer { transfer.transfer().await.unwrap(); } Target::S3 => { + let buffer_path = $opts.s3_config.s3_buffer_path.clone().unwrap(); let source = stream_pipe::ByteStreamPipe { source: $source, - buffer_path: $opts.s3_config.s3_buffer_path.clone().unwrap(), + buffer_path: buffer_path.clone(), }; + let source = index_pipe::IndexPipe::new(source, buffer_path); let target: S3Backend = $opts.s3_config.into(); let transfer = SimpleDiffTransfer::new(source, target, $transfer_config); transfer.transfer().await.unwrap(); } Target::File => { + let buffer_path = $opts.file_config.file_buffer_path.clone().unwrap(); let source = stream_pipe::ByteStreamPipe { source: $source, - buffer_path: $opts.file_config.file_buffer_path.clone().unwrap(), + buffer_path: buffer_path.clone(), }; + let source = index_pipe::IndexPipe::new(source, buffer_path); let target: FileBackend = $opts.file_config.into(); let transfer = SimpleDiffTransfer::new(source, target, $transfer_config); transfer.transfer().await.unwrap(); diff --git a/src/stream_pipe.rs b/src/stream_pipe.rs index fdf166d..87449cf 100644 --- a/src/stream_pipe.rs +++ b/src/stream_pipe.rs @@ -14,6 +14,7 @@ use chrono::DateTime; use crate::common::{Mission, SnapshotConfig, TransferURL}; use crate::error::{Error, Result}; use crate::traits::{SnapshotStorage, SourceStorage}; +use crate::utils::{hash_string, unix_time}; use futures_core::Stream; use futures_util::{StreamExt, TryStreamExt}; use slog::debug; @@ -72,7 +73,7 @@ pub struct ByteStream { pub modified_at: u64, } -pub struct ByteStreamPipe { +pub struct ByteStreamPipe { pub source: Source, pub buffer_path: String, } @@ -81,7 +82,7 @@ pub struct ByteStreamPipe { impl SnapshotStorage for ByteStreamPipe where Snapshot: Send + 'static, - Source: SnapshotStorage + std::fmt::Debug + Send, + Source: SnapshotStorage + Send, { async fn snapshot( &mut self, @@ -93,32 +94,18 @@ where fn info(&self) -> String { format!( - "StreamPipe buffered to {} <{:?}>", - self.buffer_path, self.source + "StreamPipe buffered to {} <{}>", + self.buffer_path, + self.source.info() ) } } -fn unix_time() -> u64 { - let start = std::time::SystemTime::now(); - start - .duration_since(std::time::UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() -} - -fn hash_string(key: &str) -> String { - use std::hash::{Hash, Hasher}; - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - key.hash(&mut hasher); - format!("{:x}", hasher.finish()) -} - #[async_trait] impl SourceStorage for ByteStreamPipe where Snapshot: Send + Sync + 'static, - Source: SourceStorage + std::fmt::Debug, + Source: SourceStorage, { async fn get_object(&self, snapshot: &Snapshot, mission: &Mission) -> Result { let transfer_url = self.source.get_object(snapshot, mission).await?; diff --git a/src/utils.rs b/src/utils.rs index ddc8a98..ed50506 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -82,3 +82,18 @@ pub fn rewrite_snapshot(target_snapshot: &mut [SnapshotPath]) { path.0 = rewrite_url_string(&gen_map, &path.0); } } + +pub fn hash_string(key: &str) -> String { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + key.hash(&mut hasher); + format!("{:x}", hasher.finish()) +} + +pub fn unix_time() -> u64 { + let start = std::time::SystemTime::now(); + start + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() +} From 5cbeaedc752a2842c311a743654baec28d552345 Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 11 Mar 2021 23:12:44 +0800 Subject: [PATCH 3/3] update Cargo.toml --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0e88399..bfb60ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ flate2 = "1.0" futures-core = "0.3" futures-retry = "0.6" futures-util = "0.3" +html-escape = "0.2" indicatif = "0.15" iter-set = "2.0" itertools = "0.9" @@ -41,7 +42,6 @@ tokio = {version = "1.0", features = ["full"]} tokio-stream = "0.1" tokio-util = "0.6" url = "2.2" +urlencoding = "1.1" walkdir = "2" zip = "0.5" -urlencoding = "1.1" -html-escape = "0.2"