From 0f54cb09cd1d50e8400bdf177d20062e882e783c Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Mon, 6 Jan 2025 15:39:50 +0800 Subject: [PATCH 01/20] feat(storage): add cache configuration support for preheat tasks Signed-off-by: southwest <1403572259@qq.com> --- Cargo.lock | 2 + dragonfly-client-config/src/dfdaemon.rs | 39 ++++++++++++ dragonfly-client-storage/Cargo.toml | 2 + dragonfly-client-storage/src/cache.rs | 81 +++++++++++++++++++++++++ dragonfly-client-storage/src/lib.rs | 6 ++ dragonfly-client/src/resource/piece.rs | 25 ++++++++ 6 files changed, 155 insertions(+) create mode 100644 dragonfly-client-storage/src/cache.rs diff --git a/Cargo.lock b/Cargo.lock index 8bd3dc3c..6f054ed0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1094,6 +1094,7 @@ version = "0.1.125" dependencies = [ "base16ct", "bincode", + "bytes", "chrono", "crc32c", "crc32fast", @@ -1101,6 +1102,7 @@ dependencies = [ "dragonfly-client-config", "dragonfly-client-core", "dragonfly-client-util", + "lru", "num_cpus", "prost-wkt-types", "rayon", diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 019482b4..5ab48dd1 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -176,6 +176,20 @@ fn default_storage_read_buffer_size() -> usize { 128 * 1024 } +/// default_storage_cache_enable is the default value for the cache enable flag. +/// The cache is disabled by default. +#[inline] +pub fn default_storage_cache_enable() -> bool { + false +} + +/// default_storage_cache_capacity is the default cache capacity for the preheat task, default is +/// 100. +#[inline] +pub fn default_storage_cache_capacity() -> usize { + 100 +} + /// default_seed_peer_cluster_id is the default cluster id of seed peer. #[inline] fn default_seed_peer_cluster_id() -> u64 { @@ -820,6 +834,27 @@ impl Default for Dynconfig { } } +/// CacheConfig represents the configuration settings for the cache. +#[derive(Debug, Clone, Validate, Deserialize)] +#[serde(default, rename_all = "camelCase")] +pub struct CacheConfig { + /// enable determines whether the cache is enabled. + pub enable: bool, + + /// capacity specifies the maximum number of entries the cache can hold. + pub capacity: usize, +} + +/// Default implementation for CacheConfig. +impl Default for CacheConfig { + fn default() -> Self { + CacheConfig { + enable: default_storage_cache_enable(), + capacity: default_storage_cache_capacity(), + } + } +} + /// Storage is the storage configuration for dfdaemon. #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] @@ -839,6 +874,9 @@ pub struct Storage { /// read_buffer_size is the buffer size for reading piece from disk, default is 128KB. #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, + + /// cache is the configuration for the cache. + pub cache: CacheConfig, } /// Storage implements Default. @@ -849,6 +887,7 @@ impl Default for Storage { keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), read_buffer_size: default_storage_read_buffer_size(), + cache: CacheConfig::default(), } } } diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index 6dc7972d..fb7c1e87 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -26,9 +26,11 @@ sha2.workspace = true crc32fast.workspace = true crc32c.workspace = true base16ct.workspace = true +lru.workspace = true num_cpus = "1.0" bincode = "1.3.3" rayon = "1.10.0" +bytes = "1.8" [dev-dependencies] tempdir = "0.3" diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs new file mode 100644 index 00000000..8f600809 --- /dev/null +++ b/dragonfly-client-storage/src/cache.rs @@ -0,0 +1,81 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dragonfly_client_core::{Error, Result}; +use lru::LruCache; +use std::cmp::{max, min}; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +/// Cache is the cache for storing piece content by LRU algorithm. +#[derive(Clone)] +pub struct Cache { + /// pieces stores the piece cache data with piece id and value. + pieces: Arc>>, +} + +/// Cache implements the cache for storing http response by LRU algorithm. +impl Cache { + /// new creates a new cache with the specified capacity. + pub fn new(capacity: usize) -> Result { + let capacity = NonZeroUsize::new(capacity).ok_or(Error::InvalidParameter)?; + let pieces = Arc::new(Mutex::new(LruCache::new(capacity))); + Ok(Cache { pieces }) + } + + pub async fn upload_piece_from_cache( + &self, + piece_id: &str, + offset: u64, + length: u64, + range: Option>, + ) -> Result>> { + // Try to get the piece content from the cache + let Some(piece_content) = self.get_piece(piece_id) else { + return Ok(None); + }; + + // Calculate the range of bytes to return based on the `range` provided + let (target_offset, target_length) = if let Some(range) = range { + // If `range` is specified, calculate the target offset and length within the range + let target_offset = max(offset, range.start); + let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; + (target_offset, target_length) + } else { + // Otherwise, just use the given offset and length + (offset, length) + }; + + // Slice the content to match the required range and return it as a Vec + let content_slice = &piece_content[target_offset as usize..(target_offset + target_length) as usize]; + Ok(Some(content_slice.to_vec())) // Convert the slice to a Vec and wrap it in Some + } + + /// get gets the piece content from the cache. + pub fn get_piece(&self, id: &str) -> Option { + let mut pieces = self.pieces.lock().unwrap(); + pieces.get(id).cloned() + } + + /// add create the piece content into the cache, if the key already exists, no operation will + /// be performed. + pub fn add_piece(&self, id: &str, content: bytes::Bytes) { + let mut pieces = self.pieces.lock().unwrap(); + if !pieces.contains(id) { + pieces.put(id.to_string(), content); + } + } +} diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index e73c5ac3..06f09b32 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -32,6 +32,7 @@ use tracing::{debug, error, instrument}; pub mod content; pub mod metadata; pub mod storage_engine; +pub mod cache; /// DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL is the default interval for waiting for the piece to be finished. pub const DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL: Duration = Duration::from_millis(100); @@ -46,6 +47,9 @@ pub struct Storage { /// content implements the content storage. content: content::Content, + + /// cache implements the cache for preheat task. + cache: cache::Cache, } /// Storage implements the storage. @@ -55,10 +59,12 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; + let cache = cache::Cache::new(config.storage.cache_capacity)?; Ok(Storage { config, metadata, content, + cache, }) } diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 6235041c..a775dca4 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -349,6 +349,31 @@ impl Piece { }) } + /// upload_piece_from_cache uploads a single piece from the cache of a local peer. + #[instrument(skip_all, fields(piece_id))] + pub async fn upload_piece_from_cache( + &self, + piece_id: &str, + offset: u64, + length: u64, + range: Option>, + ) -> Result>> { + let Some(piece_content) = self.get_piece(piece_id) else { + return Ok(None); + }; + + let (target_offset, target_length) = if let Some(range) = range { + let target_offset = max(offset, range.start); + let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; + (target_offset, target_length) + } else { + (offset, length) + }; + + let content_slice = &piece_content[target_offset as usize..(target_offset + target_length) as usize]; + Ok(Some(content_slice.to_vec())) + } + /// download_from_local_peer_into_async_read downloads a single piece from a local peer. #[instrument(skip_all, fields(piece_id))] pub async fn download_from_local_peer_into_async_read( From d319dc47e2157329c8d11b9744b7ea405c06b57c Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Mon, 6 Jan 2025 16:15:05 +0800 Subject: [PATCH 02/20] feat(storage): added cache initialization, according to the configuration Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 6 ++++-- dragonfly-client-storage/src/lib.rs | 9 +++++++-- dragonfly-client/src/resource/piece.rs | 25 ------------------------- 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 8f600809..bda5bb12 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -15,6 +15,7 @@ */ use dragonfly_client_core::{Error, Result}; +use dragonfly_api::common::v2::Range; use lru::LruCache; use std::cmp::{max, min}; use std::num::NonZeroUsize; @@ -36,12 +37,13 @@ impl Cache { Ok(Cache { pieces }) } - pub async fn upload_piece_from_cache( + /// read_piece reads the piece from the cache. + pub async fn read_piece( &self, piece_id: &str, offset: u64, length: u64, - range: Option>, + range: Option, ) -> Result>> { // Try to get the piece content from the cache let Some(piece_content) = self.get_piece(piece_id) else { diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 06f09b32..1bd204b8 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -49,7 +49,7 @@ pub struct Storage { content: content::Content, /// cache implements the cache for preheat task. - cache: cache::Cache, + cache: Option, } /// Storage implements the storage. @@ -59,7 +59,12 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; - let cache = cache::Cache::new(config.storage.cache_capacity)?; + let cache = if config.storage.cache.enable { + Some(cache::Cache::new(config.storage.cache.capacity)?) + } else { + None + }; + Ok(Storage { config, metadata, diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index a775dca4..6235041c 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -349,31 +349,6 @@ impl Piece { }) } - /// upload_piece_from_cache uploads a single piece from the cache of a local peer. - #[instrument(skip_all, fields(piece_id))] - pub async fn upload_piece_from_cache( - &self, - piece_id: &str, - offset: u64, - length: u64, - range: Option>, - ) -> Result>> { - let Some(piece_content) = self.get_piece(piece_id) else { - return Ok(None); - }; - - let (target_offset, target_length) = if let Some(range) = range { - let target_offset = max(offset, range.start); - let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; - (target_offset, target_length) - } else { - (offset, length) - }; - - let content_slice = &piece_content[target_offset as usize..(target_offset + target_length) as usize]; - Ok(Some(content_slice.to_vec())) - } - /// download_from_local_peer_into_async_read downloads a single piece from a local peer. #[instrument(skip_all, fields(piece_id))] pub async fn download_from_local_peer_into_async_read( From af216633ad6ee6eefd34521cf14f39dd4b615249 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Mon, 6 Jan 2025 16:27:26 +0800 Subject: [PATCH 03/20] feat(storage): add cache configuration support for preheat tasks Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 46794756..a16c8f9a 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -188,6 +188,19 @@ fn default_storage_read_buffer_size() -> usize { 128 * 1024 } +/// default_storage_cache_enable is the default value for the cache enable flag. +/// The cache is disabled by default. +#[inline] +fn default_storage_cache_enable() -> bool { + false +} +/// default_storage_cache_capacity is the default cache capacity for the preheat task, default is +/// 100. +#[inline] +fn default_storage_cache_capacity() -> usize { + 100 +} + /// default_seed_peer_cluster_id is the default cluster id of seed peer. #[inline] fn default_seed_peer_cluster_id() -> u64 { @@ -898,6 +911,25 @@ impl Default for StorageServer { } } +/// CacheConfig represents the configuration settings for the cache. +#[derive(Debug, Clone, Validate, Deserialize)] +#[serde(default, rename_all = "camelCase")] +pub struct CacheConfig { + /// enable determines whether the cache is enabled. + pub enable: bool, + /// capacity specifies the maximum number of entries the cache can hold. + pub capacity: usize, +} +/// Default implementation for CacheConfig. +impl Default for CacheConfig { + fn default() -> Self { + CacheConfig { + enable: default_storage_cache_enable(), + capacity: default_storage_cache_capacity(), + } + } +} + /// Storage is the storage configuration for dfdaemon. #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] @@ -905,6 +937,9 @@ pub struct Storage { /// server is the storage server configuration for dfdaemon. pub server: StorageServer, + /// cache is the configuration for the cache. + pub cache: CacheConfig, + /// dir is the directory to store task's metadata and content. #[serde(default = "crate::default_storage_dir")] pub dir: PathBuf, @@ -927,6 +962,7 @@ impl Default for Storage { fn default() -> Self { Storage { server: StorageServer::default(), + cache: CacheConfig::default(), dir: crate::default_storage_dir(), keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), From 04a4a2abb39c30a757c68be64b458f0913eaef22 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Tue, 7 Jan 2025 12:30:05 +0800 Subject: [PATCH 04/20] feat: 'cache' definition and implementation Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 12 ++++++------ dragonfly-client-storage/src/lib.rs | 10 ++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index bda5bb12..b5ed6f60 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -20,6 +20,8 @@ use lru::LruCache; use std::cmp::{max, min}; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; +use tokio::io::AsyncRead; +use std::io::Cursor; /// Cache is the cache for storing piece content by LRU algorithm. #[derive(Clone)] @@ -44,26 +46,24 @@ impl Cache { offset: u64, length: u64, range: Option, - ) -> Result>> { + ) -> Result { // Try to get the piece content from the cache let Some(piece_content) = self.get_piece(piece_id) else { - return Ok(None); + return Err(Error::PieceNotFound(piece_id.to_string())); }; - // Calculate the range of bytes to return based on the `range` provided + // Calculate the range of bytes to return based on the range provided let (target_offset, target_length) = if let Some(range) = range { - // If `range` is specified, calculate the target offset and length within the range let target_offset = max(offset, range.start); let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; (target_offset, target_length) } else { - // Otherwise, just use the given offset and length (offset, length) }; // Slice the content to match the required range and return it as a Vec let content_slice = &piece_content[target_offset as usize..(target_offset + target_length) as usize]; - Ok(Some(content_slice.to_vec())) // Convert the slice to a Vec and wrap it in Some + Ok(Cursor::new(content_slice.to_vec())) } /// get gets the piece content from the cache. diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 17b71f23..808c8628 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -404,6 +404,16 @@ impl Storage { // Get the piece metadata and return the content of the piece. match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { + if let Some(cache) = &self.cache { + // Try to read the piece content from cache + if let Ok(cache_reader) = cache + .read_piece(piece_id,piece.offset, piece.length, range) + .await + { + self.metadata.upload_task_finished(task_id)?; + return Ok(cache_reader); + } + } match self .content .read_piece(task_id, piece.offset, piece.length, range) From 373072d95b5b729ae7848ec80cd9d92517b277ee Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Tue, 7 Jan 2025 12:40:03 +0800 Subject: [PATCH 05/20] refactor: change 'Cache' configurationand default value setting Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index a16c8f9a..2c2e2d7d 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -188,12 +188,6 @@ fn default_storage_read_buffer_size() -> usize { 128 * 1024 } -/// default_storage_cache_enable is the default value for the cache enable flag. -/// The cache is disabled by default. -#[inline] -fn default_storage_cache_enable() -> bool { - false -} /// default_storage_cache_capacity is the default cache capacity for the preheat task, default is /// 100. #[inline] @@ -911,20 +905,20 @@ impl Default for StorageServer { } } -/// CacheConfig represents the configuration settings for the cache. +/// Cache represents the configuration settings for the cache. #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] -pub struct CacheConfig { +pub struct Cache { /// enable determines whether the cache is enabled. pub enable: bool, /// capacity specifies the maximum number of entries the cache can hold. pub capacity: usize, } /// Default implementation for CacheConfig. -impl Default for CacheConfig { +impl Default for Cache { fn default() -> Self { - CacheConfig { - enable: default_storage_cache_enable(), + Cache { + enable: false, capacity: default_storage_cache_capacity(), } } @@ -938,7 +932,7 @@ pub struct Storage { pub server: StorageServer, /// cache is the configuration for the cache. - pub cache: CacheConfig, + pub cache: Cache, /// dir is the directory to store task's metadata and content. #[serde(default = "crate::default_storage_dir")] @@ -962,7 +956,7 @@ impl Default for Storage { fn default() -> Self { Storage { server: StorageServer::default(), - cache: CacheConfig::default(), + cache: Cache::default(), dir: crate::default_storage_dir(), keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), From aa81cc83e5c8daeefb439c94972c8dd173aaa587 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Tue, 7 Jan 2025 13:40:12 +0800 Subject: [PATCH 06/20] chore: modify comments of Cache Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 2c2e2d7d..e3b0dd95 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -914,7 +914,8 @@ pub struct Cache { /// capacity specifies the maximum number of entries the cache can hold. pub capacity: usize, } -/// Default implementation for CacheConfig. + +/// Default implementation for Cache. impl Default for Cache { fn default() -> Self { Cache { From 8b45619a0d9ba80e6dac621682c2110e8de7d281 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Tue, 7 Jan 2025 18:08:47 +0800 Subject: [PATCH 07/20] chore: add workflow diagram of Cache Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 31 ++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index e3b0dd95..a44db01a 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -905,11 +905,36 @@ impl Default for StorageServer { } } -/// Cache represents the configuration settings for the cache. +/// Cache represents the configuration settings for the cache. +/// When a user triggers a preheat request, and if the peer has the cache storage feature enabled, +/// the downloaded piece content will be stored in the cache. Subsequently, when other peers request +/// downloading the same content, the preheated peer can upload the piece content directly from memory, +/// bypassing disk I/O. +/// +/// The workflow diagram is as follows: +/// | +/// preheat +/// | +/// v +/// +--------------------------------+ +/// | Peer | +/// | +-----------+ | +/// +---------------+ | |--->| cache | | +/// | | | | +-----------+ | +/// | source | | | | | | +-------------+ +/// | |<-- download -->|----| miss --- hit ----->|<-- download -->| Peer | +/// | local | | | | | | +-------------+ +/// | | | | v | | +/// | other peers | | | +-----------+ | | +/// | | | |--->| disk |-------| | +/// +---------------+ | +-----------+ | +/// | | +/// +--------------------------------+ +/// #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] pub struct Cache { - /// enable determines whether the cache is enabled. + /// enable determines whether the cache feature is enabled. pub enable: bool, /// capacity specifies the maximum number of entries the cache can hold. pub capacity: usize, @@ -932,7 +957,7 @@ pub struct Storage { /// server is the storage server configuration for dfdaemon. pub server: StorageServer, - /// cache is the configuration for the cache. + /// cache is the configuration for the cache storage. pub cache: Cache, /// dir is the directory to store task's metadata and content. From f306041bb0b2ab7e0a8b0e86f03caf02b1595696 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 9 Jan 2025 13:48:56 +0800 Subject: [PATCH 08/20] feat: cache is enabled by default, and the diagram has been modified with added annotations. Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 82 ++++++++++--------------- 1 file changed, 32 insertions(+), 50 deletions(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index a44db01a..801430c9 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -587,7 +587,7 @@ impl UploadClient { /// the host info in real-time from the parents and then select the parents for downloading. /// /// The workflow diagram is as follows: -/// +///``` /// +----------+ /// ----------------| parent |--------------- /// | +----------+ | @@ -607,6 +607,7 @@ impl UploadClient { /// | | download | | /// | +------------+ | /// +-------------------------------------------------------------------+ +/// ``` #[derive(Debug, Clone, Default, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] pub struct ParentSelector { @@ -905,51 +906,6 @@ impl Default for StorageServer { } } -/// Cache represents the configuration settings for the cache. -/// When a user triggers a preheat request, and if the peer has the cache storage feature enabled, -/// the downloaded piece content will be stored in the cache. Subsequently, when other peers request -/// downloading the same content, the preheated peer can upload the piece content directly from memory, -/// bypassing disk I/O. -/// -/// The workflow diagram is as follows: -/// | -/// preheat -/// | -/// v -/// +--------------------------------+ -/// | Peer | -/// | +-----------+ | -/// +---------------+ | |--->| cache | | -/// | | | | +-----------+ | -/// | source | | | | | | +-------------+ -/// | |<-- download -->|----| miss --- hit ----->|<-- download -->| Peer | -/// | local | | | | | | +-------------+ -/// | | | | v | | -/// | other peers | | | +-----------+ | | -/// | | | |--->| disk |-------| | -/// +---------------+ | +-----------+ | -/// | | -/// +--------------------------------+ -/// -#[derive(Debug, Clone, Validate, Deserialize)] -#[serde(default, rename_all = "camelCase")] -pub struct Cache { - /// enable determines whether the cache feature is enabled. - pub enable: bool, - /// capacity specifies the maximum number of entries the cache can hold. - pub capacity: usize, -} - -/// Default implementation for Cache. -impl Default for Cache { - fn default() -> Self { - Cache { - enable: false, - capacity: default_storage_cache_capacity(), - } - } -} - /// Storage is the storage configuration for dfdaemon. #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] @@ -957,9 +913,6 @@ pub struct Storage { /// server is the storage server configuration for dfdaemon. pub server: StorageServer, - /// cache is the configuration for the cache storage. - pub cache: Cache, - /// dir is the directory to store task's metadata and content. #[serde(default = "crate::default_storage_dir")] pub dir: PathBuf, @@ -975,6 +928,35 @@ pub struct Storage { /// read_buffer_size is the buffer size for reading piece from disk, default is 128KB. #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, + + /// cache_capacity is the cache capacity for the preheat task, default is 100. + /// + /// Cache storage: + /// 1. The user initiates a preheat job, where the peer downloads and caches content into memory and disk. + /// 2. Other peers performing the same task can directly access the preheated peer's memory to speed up the download. + /// ``` + /// | + /// 1.preheat + /// | + /// | + /// +--------------------------------------------------+ + /// | | Peer | + /// | | +-----------+ | + /// | | -- partial -->| cache | | + /// | | | +-----------+ | + /// | v | | | | + /// | downloaded | miss | | +-------------+ + /// | content -->| | --- hit ------>|<-- 2.download -->| Peer | + /// | | | ^ | +-------------+ + /// | | v | | + /// | | +-----------+ | | + /// | -- full -->| disk |---------- | + /// | +-----------+ | + /// | | + /// +--------------------------------------------------+ + /// ``` + #[serde(default = "default_storage_cache_capacity")] + pub cache_capacity: usize, } /// Storage implements Default. @@ -982,11 +964,11 @@ impl Default for Storage { fn default() -> Self { Storage { server: StorageServer::default(), - cache: Cache::default(), dir: crate::default_storage_dir(), keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), read_buffer_size: default_storage_read_buffer_size(), + cache_capacity: default_storage_cache_capacity(), } } } From b80775562938cb00fdb7c5cb24566c79c326f9f9 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 9 Jan 2025 13:51:15 +0800 Subject: [PATCH 09/20] chore: modify the comment of 'default_storage_cache_capacity()' Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 801430c9..36236ed8 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -188,7 +188,7 @@ fn default_storage_read_buffer_size() -> usize { 128 * 1024 } -/// default_storage_cache_capacity is the default cache capacity for the preheat task, default is +/// default_storage_cache_capacity is the default cache capacity for the preheat job, default is /// 100. #[inline] fn default_storage_cache_capacity() -> usize { From 826c78a5d127cd770c0a2488d7bc3ac24f933585 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 9 Jan 2025 15:44:40 +0800 Subject: [PATCH 10/20] feat: add def & impl of Cache and cache access logic Signed-off-by: southwest <1403572259@qq.com> --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- dragonfly-client-storage/src/cache.rs | 29 ++++++++++++++++++------- dragonfly-client-storage/src/lib.rs | 30 +++++++++++++------------- dragonfly-client/src/bin/dfget/main.rs | 1 + dragonfly-client/src/proxy/mod.rs | 1 + dragonfly-client/src/resource/piece.rs | 5 +++-- 7 files changed, 44 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8e159a6..b1260f5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,9 +908,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.1.3" +version = "2.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da4ce0486d3b9c5a8cfa137f9180807d2079aee1b86464d28847e548dc8c3f9c" +checksum = "b4134303088f6234035dc92efbd218feab8358403ef5a4a917925b9c04dd4f10" dependencies = [ "prost 0.13.4", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 5318735b..ea64afd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.2 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.2" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.2" } thiserror = "1.0" -dragonfly-api = "=2.1.3" +dragonfly-api = "=2.1.4" reqwest = { version = "0.12.4", features = [ "stream", "native-tls", diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index b5ed6f60..5fa7ac7d 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -30,7 +30,7 @@ pub struct Cache { pieces: Arc>>, } -/// Cache implements the cache for storing http response by LRU algorithm. +/// Cache implements the cache for storing piece content by LRU algorithm. impl Cache { /// new creates a new cache with the specified capacity. pub fn new(capacity: usize) -> Result { @@ -54,25 +54,32 @@ impl Cache { // Calculate the range of bytes to return based on the range provided let (target_offset, target_length) = if let Some(range) = range { - let target_offset = max(offset, range.start); - let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; - (target_offset, target_length) + let target_offset = max(offset, range.start) - offset; + let target_length = + min(offset + length - 1, range.start + range.length - 1) - target_offset - offset + 1; + (target_offset as usize, target_length as usize) } else { - (offset, length) + (0, length as usize) }; + + let begin = target_offset; + let end = target_offset + target_length; + if begin >= piece_content.len() || end > piece_content.len() { + return Err(Error::InvalidParameter); + } // Slice the content to match the required range and return it as a Vec - let content_slice = &piece_content[target_offset as usize..(target_offset + target_length) as usize]; + let content_slice = &piece_content[begin..end]; Ok(Cursor::new(content_slice.to_vec())) } - /// get gets the piece content from the cache. + /// get_piece gets the piece content from the cache. pub fn get_piece(&self, id: &str) -> Option { let mut pieces = self.pieces.lock().unwrap(); pieces.get(id).cloned() } - /// add create the piece content into the cache, if the key already exists, no operation will + /// add_piece add the piece content into the cache, if the key already exists, no operation will /// be performed. pub fn add_piece(&self, id: &str, content: bytes::Bytes) { let mut pieces = self.pieces.lock().unwrap(); @@ -80,4 +87,10 @@ impl Cache { pieces.put(id.to_string(), content); } } + + /// is_empty checks if the cache is empty. + pub fn is_empty(&self) -> bool { + let pieces = self.pieces.lock().unwrap(); + pieces.is_empty() + } } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 0a97a7af..82026e47 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -21,6 +21,7 @@ use dragonfly_client_util::digest::{Algorithm, Digest}; use reqwest::header::HeaderMap; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; @@ -46,7 +47,7 @@ pub struct Storage { content: content::Content, /// cache implements the cache for preheat task. - cache: Option, + cache: cache::Cache, } /// Storage implements the storage. @@ -56,11 +57,7 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; - let cache = if config.storage.cache.enable { - Some(cache::Cache::new(config.storage.cache.capacity)?) - } else { - None - }; + let cache = cache::Cache::new(config.storage.cache_capacity.clone())?; Ok(Storage { config, @@ -384,7 +381,7 @@ impl Storage { piece_id: &str, task_id: &str, range: Option, - ) -> Result { + ) -> Result>> { // Wait for the piece to be finished. self.wait_for_piece_finished(piece_id).await?; @@ -394,14 +391,17 @@ impl Storage { // Get the piece metadata and return the content of the piece. match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { - if let Some(cache) = &self.cache { - // Try to read the piece content from cache - if let Ok(cache_reader) = cache - .read_piece(piece_id,piece.offset, piece.length, range) - .await + if !self.cache.is_empty() { + match self + .cache + .read_piece(piece_id, piece.offset, piece.length, range) + .await { - self.metadata.upload_task_finished(task_id)?; - return Ok(cache_reader); + Ok(cache_reader) => { + // Finish uploading the task. + self.metadata.upload_task_finished(task_id)?; + return Ok(Box::pin(cache_reader)); + }Err(_err) => {} } } match self @@ -412,7 +412,7 @@ impl Storage { Ok(reader) => { // Finish uploading the task. self.metadata.upload_task_finished(task_id)?; - Ok(reader) + Ok(Box::pin(reader)) } Err(err) => { // Failed uploading the task. diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 126a41fe..f1e8bd6f 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -757,6 +757,7 @@ async fn download( need_piece_content, object_storage, hdfs, + load_to_cache: false, }), }) .await diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 3ebd325c..53272f00 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -1109,6 +1109,7 @@ fn make_download_task_request( hdfs: None, is_prefetch: false, need_piece_content: false, + load_to_cache: false, }), }) } diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 56771c5b..de1fe223 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -30,6 +30,7 @@ use dragonfly_client_util::id_generator::IDGenerator; use leaky_bucket::RateLimiter; use reqwest::header::{self, HeaderMap}; use std::collections::HashMap; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -328,7 +329,7 @@ impl Piece { length: u64, range: Option, disable_rate_limit: bool, - ) -> Result { + ) -> Result>> { // Span record the piece_id. Span::current().record("piece_id", piece_id); @@ -390,7 +391,7 @@ impl Piece { range: Option, disable_rate_limit: bool, is_prefetch: bool, - ) -> Result { + ) -> Result>> { // Span record the piece_id. Span::current().record("piece_id", piece_id); From e6f0d82290ca4c347784328de94d80b472508c98 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 9 Jan 2025 15:48:46 +0800 Subject: [PATCH 11/20] chore: add diagram and comment for Cache Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 29 ++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 5fa7ac7d..667c0c08 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -24,9 +24,36 @@ use tokio::io::AsyncRead; use std::io::Cursor; /// Cache is the cache for storing piece content by LRU algorithm. +/// +/// Cache storage: +/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. +/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. +/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no +/// page cache for the first read. +/// ``` +/// +/// 1.Preheat +/// | +/// | +/// +--------------------------------------------------+ +/// | | Peer | +/// | | +-----------+ | +/// | | -- Partial -->| Cache | | +/// | | | +-----------+ | +/// | v | | | | +/// | Download | Miss | | +/// | Task -->| | --- Hit ------>|<-- 2.Download +/// | | | ^ | +/// | | v | | +/// | | +-----------+ | | +/// | -- Full -->| Disk |---------- | +/// | +-----------+ | +/// | | +/// +--------------------------------------------------+ +/// ``` #[derive(Clone)] pub struct Cache { - /// pieces stores the piece cache data with piece id and value. + /// pieces stores the piece with piece id and content. pieces: Arc>>, } From 1b3ade7eee6a1accfc4723a56429a24078184a3a Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Fri, 10 Jan 2025 17:18:23 +0800 Subject: [PATCH 12/20] feat: wrting cache Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 15 ++++++++--- dragonfly-client-storage/src/lib.rs | 9 ++++++- .../src/resource/persistent_cache_task.rs | 1 + dragonfly-client/src/resource/piece.rs | 27 ++++++++++++++++++- dragonfly-client/src/resource/task.rs | 18 +++++++++++++ 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 667c0c08..49a81cb1 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -95,9 +95,18 @@ impl Cache { return Err(Error::InvalidParameter); } - // Slice the content to match the required range and return it as a Vec - let content_slice = &piece_content[begin..end]; - Ok(Cursor::new(content_slice.to_vec())) + let piece_content = piece_content.slice(begin..end); + Ok(Cursor::new(piece_content)) + } + + /// write_piece_to_cache write the piece content to the cache. + pub fn write_piece_to_cache( + &self, + piece_id: &str, + content: bytes::Bytes + ) { + // Add the piece content to the cache + self.add_piece(piece_id, content); } /// get_piece gets the piece content from the cache. diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 82026e47..4de4664e 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -57,7 +57,7 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; - let cache = cache::Cache::new(config.storage.cache_capacity.clone())?; + let cache = cache::Cache::new(config.storage.cache_capacity)?; Ok(Storage { config, @@ -482,6 +482,13 @@ impl Storage { } } + /// load_piece_to_cache loads the piece content to the cache. + #[instrument(skip_all)] + pub fn load_piece_to_cache(&self, piece_id: &str, piece_content: bytes::Bytes) { + // Load the piece content to the cache. + self.cache.write_piece_to_cache(piece_id, piece_content) + } + /// get_piece returns the piece metadata. #[instrument(skip_all)] pub fn get_piece(&self, piece_id: &str) -> Result> { diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 197c8489..a8138d89 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -853,6 +853,7 @@ impl PersistentCacheTask { length, parent.clone(), false, + false, ) .await .map_err(|err| { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index de1fe223..0c29d83a 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -32,6 +32,7 @@ use reqwest::header::{self, HeaderMap}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use std::io::Cursor; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::{error, info, instrument, Span}; @@ -465,6 +466,7 @@ impl Piece { length: u64, parent: piece_collector::CollectedParent, is_prefetch: bool, + load_to_cache: bool, ) -> Result { // Span record the piece_id. Span::current().record("piece_id", piece_id); @@ -515,6 +517,10 @@ impl Piece { error!("set piece metadata failed: {}", err) }; })?; + + if load_to_cache { + self.storage.load_piece_to_cache(piece_id, bytes::Bytes::from(content.clone())); + } // Record the finish of downloading piece. match self @@ -562,6 +568,7 @@ impl Piece { length: u64, request_header: HeaderMap, is_prefetch: bool, + load_to_cache: bool, object_storage: Option, hdfs: Option, ) -> Result { @@ -676,6 +683,24 @@ impl Piece { start_time.elapsed(), ); + let reader = &mut response.reader; + if load_to_cache { + let mut content = bytes::BytesMut::new(); + reader + .read_buf(&mut content) + .await + .inspect_err(|err| { + error!("read response failed: {}", err); + if let Some(err) = self.storage.download_piece_failed(piece_id).err() { + error!("set piece metadata failed: {}", err) + }; + })?; + + self.storage.load_piece_to_cache(piece_id, content.clone().freeze()); + + *reader = Box::new(Cursor::new(content.freeze())); + } + // Record the finish of downloading piece. match self .storage @@ -684,7 +709,7 @@ impl Piece { task_id, offset, length, - &mut response.reader, + reader, ) .await { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 544cd667..5e688a06 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -350,6 +350,7 @@ impl Task { host_id, peer_id, request.need_piece_content, + request.load_to_cache, interested_pieces.clone(), download_progress_tx.clone(), ) @@ -674,6 +675,7 @@ impl Task { remaining_interested_pieces.clone(), request.is_prefetch, request.need_piece_content, + request.load_to_cache, download_progress_tx.clone(), in_stream_tx.clone(), ) @@ -917,6 +919,7 @@ impl Task { interested_pieces: Vec, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, download_progress_tx: Sender>, in_stream_tx: Sender, ) -> ClientResult> { @@ -977,6 +980,7 @@ impl Task { finished_pieces: Arc>>, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, ) -> ClientResult { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); @@ -997,6 +1001,7 @@ impl Task { length, parent.clone(), is_prefetch, + load_to_cache, ) .await .map_err(|err| { @@ -1131,6 +1136,7 @@ impl Task { finished_pieces.clone(), is_prefetch, need_piece_content, + load_to_cache, ) .in_current_span(), ); @@ -1244,6 +1250,7 @@ impl Task { request_header: HeaderMap, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, piece_manager: Arc, storage: Arc, semaphore: Arc, @@ -1268,6 +1275,7 @@ impl Task { length, request_header, is_prefetch, + load_to_cache, object_storage, hdfs, ) @@ -1372,6 +1380,7 @@ impl Task { request_header.clone(), request.is_prefetch, request.need_piece_content, + request.load_to_cache, self.piece.clone(), self.storage.clone(), semaphore.clone(), @@ -1494,6 +1503,7 @@ impl Task { host_id: &str, peer_id: &str, need_piece_content: bool, + load_to_cache: bool, interested_pieces: Vec, download_progress_tx: Sender>, ) -> ClientResult> { @@ -1558,6 +1568,11 @@ impl Task { reader.read_exact(&mut content).await.inspect_err(|err| { error!("read piece {} failed: {:?}", piece_id, err); })?; + + // If load_to_cache is true, load the piece content to the cache. + if load_to_cache { + self.storage.load_piece_to_cache(piece_id.as_str(), bytes::Bytes::from(content.clone())); + } piece.content = Some(content); } @@ -1634,6 +1649,7 @@ impl Task { length: u64, request_header: HeaderMap, is_prefetch: bool, + load_to_cache: bool, piece_manager: Arc, storage: Arc, semaphore: Arc, @@ -1657,6 +1673,7 @@ impl Task { length, request_header, is_prefetch, + load_to_cache, object_storage, hdfs, ) @@ -1715,6 +1732,7 @@ impl Task { interested_piece.length, request_header.clone(), request.is_prefetch, + request.load_to_cache, self.piece.clone(), self.storage.clone(), semaphore.clone(), From 3cdde85c54b56245d000aba78071db04feb4a943 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 15:02:09 +0800 Subject: [PATCH 13/20] feat: add unit tests and code optimization Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 274 +++++++++++++++++++++++-- dragonfly-client-storage/src/lib.rs | 37 +++- dragonfly-client/src/resource/piece.rs | 39 ++-- dragonfly-client/src/resource/task.rs | 22 +- 4 files changed, 332 insertions(+), 40 deletions(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 49a81cb1..1b39fa91 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -14,13 +14,14 @@ * limitations under the License. */ +use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_api::common::v2::Range; use lru::LruCache; use std::cmp::{max, min}; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; use std::io::Cursor; /// Cache is the cache for storing piece content by LRU algorithm. @@ -53,17 +54,22 @@ use std::io::Cursor; /// ``` #[derive(Clone)] pub struct Cache { - /// pieces stores the piece with piece id and content. + /// config is the configuration of the dfdaemon. + config: Arc, + + /// pieces stores the pieces with their piece id and content. pieces: Arc>>, } /// Cache implements the cache for storing piece content by LRU algorithm. impl Cache { /// new creates a new cache with the specified capacity. - pub fn new(capacity: usize) -> Result { - let capacity = NonZeroUsize::new(capacity).ok_or(Error::InvalidParameter)?; + pub fn new(config: Arc) -> Result { + let capacity = NonZeroUsize::new(config.storage.cache_capacity) + .ok_or(Error::InvalidParameter)?; let pieces = Arc::new(Mutex::new(LruCache::new(capacity))); - Ok(Cache { pieces }) + + Ok(Cache { config, pieces }) } /// read_piece reads the piece from the cache. @@ -75,7 +81,7 @@ impl Cache { range: Option, ) -> Result { // Try to get the piece content from the cache - let Some(piece_content) = self.get_piece(piece_id) else { + let Some(piece_content) = self.get_piece(piece_id).await else { return Err(Error::PieceNotFound(piece_id.to_string())); }; @@ -95,29 +101,65 @@ impl Cache { return Err(Error::InvalidParameter); } - let piece_content = piece_content.slice(begin..end); + let piece_content = piece_content.slice(begin..end).to_vec(); Ok(Cursor::new(piece_content)) } - /// write_piece_to_cache write the piece content to the cache. - pub fn write_piece_to_cache( + /// write_piece writes the piece content to the cache. + pub async fn write_piece( &self, piece_id: &str, - content: bytes::Bytes + reader: &mut R, ) { - // Add the piece content to the cache - self.add_piece(piece_id, content); + let content = match self.copy_piece_content(reader).await { + Ok(piece_content) => piece_content, + Err(_err) => { + eprintln!("Failed to write piece content: {}", piece_id); + return; + } + }; + + self.add_piece(piece_id, content.clone().into()).await; + } + + /// copy_piece_content reads the content from the provided asynchronous reader into a vector. + /// + /// ### Purpose + /// This method is designed to handle scenarios where downloading from source returns + /// a stream instead of the piece content directly when downloading from parent peer. + pub async fn copy_piece_content( + &self, + reader: &mut R, + ) -> Result> { + let mut content = Vec::new(); + let mut buffer = vec![0; self.config.storage.write_buffer_size]; + let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + + loop { + match reader.read(&mut buffer).await { + Ok(n) => { + if n == 0 { + break; + } + content.extend_from_slice(&buffer[..n]); + } + Err(e) => { + eprintln!("Failed to read from reader: {}", e); + } + } + } + Ok(content) } /// get_piece gets the piece content from the cache. - pub fn get_piece(&self, id: &str) -> Option { + pub async fn get_piece(&self, id: &str) -> Option { let mut pieces = self.pieces.lock().unwrap(); pieces.get(id).cloned() } /// add_piece add the piece content into the cache, if the key already exists, no operation will /// be performed. - pub fn add_piece(&self, id: &str, content: bytes::Bytes) { + pub async fn add_piece(&self, id: &str, content: bytes::Bytes) { let mut pieces = self.pieces.lock().unwrap(); if !pieces.contains(id) { pieces.put(id.to_string(), content); @@ -129,4 +171,208 @@ impl Cache { let pieces = self.pieces.lock().unwrap(); pieces.is_empty() } + + /// contains_piece checks whether the piece exists in the cache. + pub fn contains_piece(&self, id: &str) -> bool { + let pieces = self.pieces.lock().unwrap(); + pieces.contains(id) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use dragonfly_client_config::dfdaemon::Config; + use std::sync::Arc; + use std::io::Cursor; + + fn create_test_config() -> Arc { + Arc::new(Config { + storage: dragonfly_client_config::dfdaemon::Storage { + cache_capacity: 10, + write_buffer_size: 1024, + ..Default::default() + }, + ..Default::default() + }) + } + + #[tokio::test] + async fn test_cache_creation() { + let config = create_test_config(); + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + assert!(cache.is_empty(), "Cache should be empty upon creation"); + } + + #[tokio::test] + async fn test_write_and_read_piece() { + let config = create_test_config(); + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + let piece_id = "test_piece"; + let content = b"Hello, Dragonfly!".to_vec(); + let mut reader = Cursor::new(content.clone()); + + // Write the piece to the cache + cache.write_piece(piece_id, &mut reader).await; + + // Read the piece from the cache + let result = cache.read_piece(piece_id, 0, content.len() as u64, None).await; + + assert!( + result.is_ok(), + "Failed to read the piece: {:?}", + result.err() + ); + + let mut read_buffer = Vec::new(); + result.unwrap().read_to_end(&mut read_buffer).await.unwrap(); + + assert_eq!(read_buffer, content, "The read content does not match"); + } + + #[tokio::test] + async fn test_read_piece_with_range() { + let config = create_test_config(); + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + let piece_id = "test_piece"; + let content = b"0123456789".to_vec(); + let mut reader = Cursor::new(content.clone()); + + // Write the piece to the cache + cache.write_piece(piece_id, &mut reader).await; + + // Read a range from the piece + let range = Some(Range { + start: 2, + length: 5, + }); + + let result = cache.read_piece(piece_id, 0, content.len() as u64, range).await; + + assert!( + result.is_ok(), + "Failed to read the piece with range: {:?}", + result.err() + ); + + let mut read_buffer = Vec::new(); + result.unwrap().read_to_end(&mut read_buffer).await.unwrap(); + + assert_eq!(read_buffer, b"23456", "The range read content is incorrect"); + } + + #[tokio::test] + async fn test_get_and_add_piece() { + let config = create_test_config(); + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + let piece_id = "test_piece"; + let content = Bytes::from("Test content"); + + // Add a piece directly + cache.add_piece(piece_id, content.clone()).await; + + // Retrieve the piece + let cached_piece = cache.get_piece(piece_id).await; + + assert!( + cached_piece.is_some(), + "The piece should exist in the cache" + ); + assert_eq!( + cached_piece.unwrap(), + content, + "The cached content does not match" + ); + } + + #[tokio::test] + async fn test_cache_eviction() { + let config = Arc::new(Config { + storage: dragonfly_client_config::dfdaemon::Storage { + cache_capacity: 2, + write_buffer_size: 1024, + ..Default::default() + }, + ..Default::default() + }); + + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + cache.add_piece("piece1", Bytes::from("Content 1")).await; + cache.add_piece("piece2", Bytes::from("Content 2")).await; + + assert!( + cache.get_piece("piece1").await.is_some(), + "Piece1 should still exist" + ); + assert!( + cache.get_piece("piece2").await.is_some(), + "Piece2 should still exist" + ); + + cache.add_piece("piece3", Bytes::from("Content 3")).await; + + assert!( + cache.get_piece("piece1").await.is_none(), + "Piece1 should have been evicted" + ); + assert!( + cache.get_piece("piece2").await.is_some(), + "Piece2 should still exist" + ); + assert!( + cache.get_piece("piece3").await.is_some(), + "Piece3 should exist in the cache" + ); + } + + #[tokio::test] +async fn test_contains_piece() { + let config = Arc::new(Config { + storage: dragonfly_client_config::dfdaemon::Storage { + cache_capacity: 3, + write_buffer_size: 1024, + ..Default::default() + }, + ..Default::default() + }); + + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + let piece_id1 = "piece1"; + let piece_id2 = "piece2"; + + assert!( + !cache.contains_piece(piece_id1), + "Cache should not contain piece1 initially" + ); + + + cache.add_piece(piece_id1, Bytes::from("Content 1")).await; + assert!( + cache.contains_piece(piece_id1), + "Cache should contain piece1 after it is added" + ); + + assert!( + !cache.contains_piece(piece_id2), + "Cache should not contain piece2" + ); + + + cache.add_piece(piece_id2, Bytes::from("Content 2")).await; + assert!( + cache.contains_piece(piece_id1), + "Cache should still contain piece1" + ); + assert!( + cache.contains_piece(piece_id2), + "Cache should contain piece2 after it is added" + ); +} + } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 4de4664e..e2bcd0bb 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -19,6 +19,7 @@ use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::digest::{Algorithm, Digest}; use reqwest::header::HeaderMap; +use tracing::info; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; @@ -57,7 +58,7 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; - let cache = cache::Cache::new(config.storage.cache_capacity)?; + let cache = cache::Cache::new(config.clone())?; Ok(Storage { config, @@ -391,6 +392,7 @@ impl Storage { // Get the piece metadata and return the content of the piece. match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { + // Try to upload piece content form cache. if !self.cache.is_empty() { match self .cache @@ -399,11 +401,15 @@ impl Storage { { Ok(cache_reader) => { // Finish uploading the task. + info!("hit cache: {}", piece_id); self.metadata.upload_task_finished(task_id)?; return Ok(Box::pin(cache_reader)); - }Err(_err) => {} + } + Err(_err) => {} } } + + // Upload piece content from storage when cache entry is not hit or cache is empty. match self .content .read_piece(task_id, piece.offset, piece.length, range) @@ -482,11 +488,32 @@ impl Storage { } } - /// load_piece_to_cache loads the piece content to the cache. + /// upload_piece_from_cache uploads the piece content by piece id from cache. + #[instrument(skip_all)] + pub async fn upload_piece_from_cache( + &self, + piece_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result{ + self.cache.read_piece(piece_id, offset, length, range).await + } + + /// load_piece_to_cache loads the piece content with piece id to the cache. #[instrument(skip_all)] - pub fn load_piece_to_cache(&self, piece_id: &str, piece_content: bytes::Bytes) { + pub async fn load_piece_to_cache( + &self, + piece_id: &str, + reader: &mut R, + ) { + // If the piece is already in the cache, return. + if self.cache.contains_piece(piece_id) { + return; + } + // Load the piece content to the cache. - self.cache.write_piece_to_cache(piece_id, piece_content) + self.cache.write_piece(piece_id, reader).await; } /// get_piece returns the piece metadata. diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 0c29d83a..e5345633 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -32,7 +32,6 @@ use reqwest::header::{self, HeaderMap}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::io::Cursor; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::{error, info, instrument, Span}; @@ -518,8 +517,16 @@ impl Piece { }; })?; + // Load piece content to cache from parent if load_to_cache { - self.storage.load_piece_to_cache(piece_id, bytes::Bytes::from(content.clone())); + self + .storage + .load_piece_to_cache( + piece_id, + &mut content.clone().as_slice(), + ) + .await; + info!("load piece content to cache piece {}", piece_id); } // Record the finish of downloading piece. @@ -683,22 +690,26 @@ impl Piece { start_time.elapsed(), ); - let reader = &mut response.reader; + let mut reader= response.reader; + if load_to_cache { - let mut content = bytes::BytesMut::new(); - reader - .read_buf(&mut content) - .await - .inspect_err(|err| { - error!("read response failed: {}", err); + // Load piece content to cache from source. + self.storage.load_piece_to_cache(piece_id, &mut reader).await; + info!("load piece content to cache: piece_id={}", piece_id); + + let cache_reader = match self.storage.upload_piece_from_cache(piece_id, offset, length, None).await { + Ok(reader) => reader, + Err(err) => { + error!("download piece finished: {}", err); if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; - })?; - - self.storage.load_piece_to_cache(piece_id, content.clone().freeze()); + + return Err(err); + } + }; - *reader = Box::new(Cursor::new(content.freeze())); + reader = Box::new(cache_reader); } // Record the finish of downloading piece. @@ -709,7 +720,7 @@ impl Piece { task_id, offset, length, - reader, + &mut reader, ) .await { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 5e688a06..520c57d2 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -1547,8 +1547,8 @@ impl Task { created_at: Some(prost_wkt_types::Timestamp::from(piece.created_at)), }; - // If need_piece_content is true, read the piece content from the local. - if need_piece_content { + // If need_piece_content or load_to_cache is true, read the piece content from the local. + if need_piece_content || load_to_cache { let mut reader = self .piece .download_from_local_into_async_read( @@ -1568,13 +1568,21 @@ impl Task { reader.read_exact(&mut content).await.inspect_err(|err| { error!("read piece {} failed: {:?}", piece_id, err); })?; - - // If load_to_cache is true, load the piece content to the cache. - if load_to_cache { - self.storage.load_piece_to_cache(piece_id.as_str(), bytes::Bytes::from(content.clone())); + + if need_piece_content { + piece.content = Some(content.clone()); } - piece.content = Some(content); + if load_to_cache { + // Load piece content to cache. + self + .storage + .load_piece_to_cache( + piece_id.as_str(), + &mut content.clone().as_slice(), + ) + .await; + } } // Send the download progress. From 436687ea8f8827511a2e9e4550588a15c2eeaae8 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 18:00:48 +0800 Subject: [PATCH 14/20] chore: resolve conlicts Signed-off-by: southwest <1403572259@qq.com> --- Cargo.lock | 12 ++--- Cargo.toml | 2 +- dragonfly-client-config/src/dfdaemon.rs | 66 +++++++++++++------------ 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1260f5c..ed2b57e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,9 +908,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.1.4" +version = "2.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4134303088f6234035dc92efbd218feab8358403ef5a4a917925b9c04dd4f10" +checksum = "4cc74bdb827031beb5d5ab3856a7469790e46bf70a978846d1248b7e30d19706" dependencies = [ "prost 0.13.4", "prost-types", @@ -4577,9 +4577,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -4595,9 +4595,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index ea64afd6..984f0272 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.2 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.2" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.2" } thiserror = "1.0" -dragonfly-api = "=2.1.4" +dragonfly-api = "=2.1.6" reqwest = { version = "0.12.4", features = [ "stream", "native-tls", diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 36236ed8..460b3b4a 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -589,22 +589,22 @@ impl UploadClient { /// The workflow diagram is as follows: ///``` /// +----------+ -/// ----------------| parent |--------------- +/// ----------------| Parent |--------------- /// | +----------+ | -/// host info piece metadata +/// Host Info Piece Metadata /// +------------|-----------------------------------------|------------+ /// | | | | -/// | | peer | | +/// | | Peer | | /// | v v | /// | +------------------+ +------------------+ | -/// | | ParentSelector | ---optimal parent---> | PieceCollector | | +/// | | ParentSelector | ---Optimal Parent---> | PieceCollector | | /// | +------------------+ +------------------+ | /// | | | -/// | piece metadata | +/// | Piece Metadata | /// | | | /// | v | /// | +------------+ | -/// | | download | | +/// | | Download | | /// | +------------+ | /// +-------------------------------------------------------------------+ /// ``` @@ -929,32 +929,34 @@ pub struct Storage { #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, - /// cache_capacity is the cache capacity for the preheat task, default is 100. - /// - /// Cache storage: - /// 1. The user initiates a preheat job, where the peer downloads and caches content into memory and disk. - /// 2. Other peers performing the same task can directly access the preheated peer's memory to speed up the download. - /// ``` - /// | - /// 1.preheat - /// | - /// | - /// +--------------------------------------------------+ - /// | | Peer | - /// | | +-----------+ | - /// | | -- partial -->| cache | | - /// | | | +-----------+ | - /// | v | | | | - /// | downloaded | miss | | +-------------+ - /// | content -->| | --- hit ------>|<-- 2.download -->| Peer | - /// | | | ^ | +-------------+ - /// | | v | | - /// | | +-----------+ | | - /// | -- full -->| disk |---------- | - /// | +-----------+ | - /// | | - /// +--------------------------------------------------+ - /// ``` +/// Cache is the cache for storing piece content by LRU algorithm. +/// +/// Cache storage: +/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. +/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. +/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no +/// page cache for the first read. +/// ``` +/// +/// 1.Preheat +/// | +/// | +/// +--------------------------------------------------+ +/// | | Peer | +/// | | +-----------+ | +/// | | -- Partial -->| Cache | | +/// | | | +-----------+ | +/// | v | | | | +/// | Download | Miss | | +/// | Task -->| | --- Hit ------>|<-- 2.Download +/// | | | ^ | +/// | | v | | +/// | | +-----------+ | | +/// | -- Full -->| Disk |---------- | +/// | +-----------+ | +/// | | +/// +--------------------------------------------------+ +/// ``` #[serde(default = "default_storage_cache_capacity")] pub cache_capacity: usize, } From 7ef46a71f368626d37d47250095c5d4353037e0a Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 18:02:20 +0800 Subject: [PATCH 15/20] chore: resolve conlicts Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 460b3b4a..28c92906 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -929,7 +929,7 @@ pub struct Storage { #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, -/// Cache is the cache for storing piece content by LRU algorithm. +/// cache_capacity is the cache capacity for downloading, default is 100. /// /// Cache storage: /// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. From 0a0c5169129a025ddb131f1357203457ab0261a8 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 18:03:23 +0800 Subject: [PATCH 16/20] chore: resolve conlicts Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 56 ++++++++++++------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 28c92906..5689efea 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -929,34 +929,34 @@ pub struct Storage { #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, -/// cache_capacity is the cache capacity for downloading, default is 100. -/// -/// Cache storage: -/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. -/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. -/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no -/// page cache for the first read. -/// ``` -/// -/// 1.Preheat -/// | -/// | -/// +--------------------------------------------------+ -/// | | Peer | -/// | | +-----------+ | -/// | | -- Partial -->| Cache | | -/// | | | +-----------+ | -/// | v | | | | -/// | Download | Miss | | -/// | Task -->| | --- Hit ------>|<-- 2.Download -/// | | | ^ | -/// | | v | | -/// | | +-----------+ | | -/// | -- Full -->| Disk |---------- | -/// | +-----------+ | -/// | | -/// +--------------------------------------------------+ -/// ``` + /// cache_capacity is the cache capacity for downloading, default is 100. + /// + /// Cache storage: + /// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. + /// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. + /// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no + /// page cache for the first read. + /// ``` + /// + /// 1.Preheat + /// | + /// | + /// +--------------------------------------------------+ + /// | | Peer | + /// | | +-----------+ | + /// | | -- Partial -->| Cache | | + /// | | | +-----------+ | + /// | v | | | | + /// | Download | Miss | | + /// | Task -->| | --- Hit ------>|<-- 2.Download + /// | | | ^ | + /// | | v | | + /// | | +-----------+ | | + /// | -- Full -->| Disk |---------- | + /// | +-----------+ | + /// | | + /// +--------------------------------------------------+ + /// ``` #[serde(default = "default_storage_cache_capacity")] pub cache_capacity: usize, } From 69d01f3fdd808a87437e53c26721b58f59cea830 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 18:04:39 +0800 Subject: [PATCH 17/20] chore: resolve conlicts Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-config/src/dfdaemon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 5689efea..5df53ed2 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -930,7 +930,7 @@ pub struct Storage { pub read_buffer_size: usize, /// cache_capacity is the cache capacity for downloading, default is 100. - /// + /// /// Cache storage: /// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. /// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. From 04f07025015d4b87ddcf8241df6abc118b6c110c Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 18:26:26 +0800 Subject: [PATCH 18/20] chore: fixed fmt issues. Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 138 ++++++++++++------------- dragonfly-client-storage/src/lib.rs | 10 +- dragonfly-client/src/resource/piece.rs | 34 +++--- dragonfly-client/src/resource/task.rs | 12 +-- 4 files changed, 92 insertions(+), 102 deletions(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 1b39fa91..4792dd28 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -14,18 +14,18 @@ * limitations under the License. */ +use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; -use dragonfly_api::common::v2::Range; use lru::LruCache; use std::cmp::{max, min}; +use std::io::Cursor; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; -use std::io::Cursor; /// Cache is the cache for storing piece content by LRU algorithm. -/// +/// /// Cache storage: /// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. /// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. @@ -56,7 +56,7 @@ use std::io::Cursor; pub struct Cache { /// config is the configuration of the dfdaemon. config: Arc, - + /// pieces stores the pieces with their piece id and content. pieces: Arc>>, } @@ -65,10 +65,10 @@ pub struct Cache { impl Cache { /// new creates a new cache with the specified capacity. pub fn new(config: Arc) -> Result { - let capacity = NonZeroUsize::new(config.storage.cache_capacity) - .ok_or(Error::InvalidParameter)?; + let capacity = + NonZeroUsize::new(config.storage.cache_capacity).ok_or(Error::InvalidParameter)?; let pieces = Arc::new(Mutex::new(LruCache::new(capacity))); - + Ok(Cache { config, pieces }) } @@ -84,12 +84,13 @@ impl Cache { let Some(piece_content) = self.get_piece(piece_id).await else { return Err(Error::PieceNotFound(piece_id.to_string())); }; - + // Calculate the range of bytes to return based on the range provided let (target_offset, target_length) = if let Some(range) = range { - let target_offset = max(offset, range.start) - offset; - let target_length = - min(offset + length - 1, range.start + range.length - 1) - target_offset - offset + 1; + let target_offset = max(offset, range.start) - offset; + let target_length = + min(offset + length - 1, range.start + range.length - 1) - target_offset - offset + + 1; (target_offset as usize, target_length as usize) } else { (0, length as usize) @@ -100,17 +101,13 @@ impl Cache { if begin >= piece_content.len() || end > piece_content.len() { return Err(Error::InvalidParameter); } - + let piece_content = piece_content.slice(begin..end).to_vec(); Ok(Cursor::new(piece_content)) } /// write_piece writes the piece content to the cache. - pub async fn write_piece( - &self, - piece_id: &str, - reader: &mut R, - ) { + pub async fn write_piece(&self, piece_id: &str, reader: &mut R) { let content = match self.copy_piece_content(reader).await { Ok(piece_content) => piece_content, Err(_err) => { @@ -123,9 +120,9 @@ impl Cache { } /// copy_piece_content reads the content from the provided asynchronous reader into a vector. - /// + /// /// ### Purpose - /// This method is designed to handle scenarios where downloading from source returns + /// This method is designed to handle scenarios where downloading from source returns /// a stream instead of the piece content directly when downloading from parent peer. pub async fn copy_piece_content( &self, @@ -184,8 +181,8 @@ mod tests { use super::*; use bytes::Bytes; use dragonfly_client_config::dfdaemon::Config; - use std::sync::Arc; use std::io::Cursor; + use std::sync::Arc; fn create_test_config() -> Arc { Arc::new(Config { @@ -218,7 +215,9 @@ mod tests { cache.write_piece(piece_id, &mut reader).await; // Read the piece from the cache - let result = cache.read_piece(piece_id, 0, content.len() as u64, None).await; + let result = cache + .read_piece(piece_id, 0, content.len() as u64, None) + .await; assert!( result.is_ok(), @@ -250,7 +249,9 @@ mod tests { length: 5, }); - let result = cache.read_piece(piece_id, 0, content.len() as u64, range).await; + let result = cache + .read_piece(piece_id, 0, content.len() as u64, range) + .await; assert!( result.is_ok(), @@ -299,12 +300,12 @@ mod tests { }, ..Default::default() }); - + let cache = Cache::new(config.clone()).expect("Failed to create cache"); - + cache.add_piece("piece1", Bytes::from("Content 1")).await; cache.add_piece("piece2", Bytes::from("Content 2")).await; - + assert!( cache.get_piece("piece1").await.is_some(), "Piece1 should still exist" @@ -313,9 +314,9 @@ mod tests { cache.get_piece("piece2").await.is_some(), "Piece2 should still exist" ); - + cache.add_piece("piece3", Bytes::from("Content 3")).await; - + assert!( cache.get_piece("piece1").await.is_none(), "Piece1 should have been evicted" @@ -328,51 +329,48 @@ mod tests { cache.get_piece("piece3").await.is_some(), "Piece3 should exist in the cache" ); - } + } #[tokio::test] -async fn test_contains_piece() { - let config = Arc::new(Config { - storage: dragonfly_client_config::dfdaemon::Storage { - cache_capacity: 3, - write_buffer_size: 1024, + async fn test_contains_piece() { + let config = Arc::new(Config { + storage: dragonfly_client_config::dfdaemon::Storage { + cache_capacity: 3, + write_buffer_size: 1024, + ..Default::default() + }, ..Default::default() - }, - ..Default::default() - }); - - let cache = Cache::new(config.clone()).expect("Failed to create cache"); - - let piece_id1 = "piece1"; - let piece_id2 = "piece2"; - - assert!( - !cache.contains_piece(piece_id1), - "Cache should not contain piece1 initially" - ); - - - cache.add_piece(piece_id1, Bytes::from("Content 1")).await; - assert!( - cache.contains_piece(piece_id1), - "Cache should contain piece1 after it is added" - ); - - assert!( - !cache.contains_piece(piece_id2), - "Cache should not contain piece2" - ); - - - cache.add_piece(piece_id2, Bytes::from("Content 2")).await; - assert!( - cache.contains_piece(piece_id1), - "Cache should still contain piece1" - ); - assert!( - cache.contains_piece(piece_id2), - "Cache should contain piece2 after it is added" - ); -} + }); + + let cache = Cache::new(config.clone()).expect("Failed to create cache"); + + let piece_id1 = "piece1"; + let piece_id2 = "piece2"; + + assert!( + !cache.contains_piece(piece_id1), + "Cache should not contain piece1 initially" + ); + cache.add_piece(piece_id1, Bytes::from("Content 1")).await; + assert!( + cache.contains_piece(piece_id1), + "Cache should contain piece1 after it is added" + ); + + assert!( + !cache.contains_piece(piece_id2), + "Cache should not contain piece2" + ); + + cache.add_piece(piece_id2, Bytes::from("Content 2")).await; + assert!( + cache.contains_piece(piece_id1), + "Cache should still contain piece1" + ); + assert!( + cache.contains_piece(piece_id2), + "Cache should contain piece2 after it is added" + ); + } } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index e2bcd0bb..72bf2a32 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -19,19 +19,19 @@ use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::digest::{Algorithm, Digest}; use reqwest::header::HeaderMap; -use tracing::info; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; +use tracing::info; use tracing::{debug, error, instrument}; +pub mod cache; pub mod content; pub mod metadata; pub mod storage_engine; -pub mod cache; /// DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL is the default interval for waiting for the piece to be finished. pub const DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL: Duration = Duration::from_millis(100); @@ -496,15 +496,15 @@ impl Storage { offset: u64, length: u64, range: Option, - ) -> Result{ + ) -> Result { self.cache.read_piece(piece_id, offset, length, range).await } /// load_piece_to_cache loads the piece content with piece id to the cache. #[instrument(skip_all)] pub async fn load_piece_to_cache( - &self, - piece_id: &str, + &self, + piece_id: &str, reader: &mut R, ) { // If the piece is already in the cache, return. diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index e5345633..58701c8f 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -516,16 +516,12 @@ impl Piece { error!("set piece metadata failed: {}", err) }; })?; - + // Load piece content to cache from parent if load_to_cache { - self - .storage - .load_piece_to_cache( - piece_id, - &mut content.clone().as_slice(), - ) - .await; + self.storage + .load_piece_to_cache(piece_id, &mut content.clone().as_slice()) + .await; info!("load piece content to cache piece {}", piece_id); } @@ -690,21 +686,27 @@ impl Piece { start_time.elapsed(), ); - let mut reader= response.reader; + let mut reader = response.reader; if load_to_cache { // Load piece content to cache from source. - self.storage.load_piece_to_cache(piece_id, &mut reader).await; + self.storage + .load_piece_to_cache(piece_id, &mut reader) + .await; info!("load piece content to cache: piece_id={}", piece_id); - let cache_reader = match self.storage.upload_piece_from_cache(piece_id, offset, length, None).await { + let cache_reader = match self + .storage + .upload_piece_from_cache(piece_id, offset, length, None) + .await + { Ok(reader) => reader, Err(err) => { error!("download piece finished: {}", err); if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; - + return Err(err); } }; @@ -715,13 +717,7 @@ impl Piece { // Record the finish of downloading piece. match self .storage - .download_piece_from_source_finished( - piece_id, - task_id, - offset, - length, - &mut reader, - ) + .download_piece_from_source_finished(piece_id, task_id, offset, length, &mut reader) .await { Ok(piece) => { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 520c57d2..3b299cc2 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -1575,14 +1575,10 @@ impl Task { if load_to_cache { // Load piece content to cache. - self - .storage - .load_piece_to_cache( - piece_id.as_str(), - &mut content.clone().as_slice(), - ) - .await; - } + self.storage + .load_piece_to_cache(piece_id.as_str(), &mut content.clone().as_slice()) + .await; + } } // Send the download progress. From 13500b047049f2c63ed796102487df13c11bc680 Mon Sep 17 00:00:00 2001 From: southwest <1403572259@qq.com> Date: Thu, 16 Jan 2025 19:16:02 +0800 Subject: [PATCH 19/20] fix: upload_piece Signed-off-by: southwest <1403572259@qq.com> --- dragonfly-client-storage/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 1b1ebebf..4dffb8ad 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -425,7 +425,7 @@ impl Storage { match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { // Try to upload piece content form cache. - if !self.cache.is_empty() { + if !self.cache.is_empty() && self.cache.contains_piece(piece_id) { match self .cache .read_piece(piece_id, piece.offset, piece.length, range) From fe27f7e540844bad06d4af886e8239b7095df2be Mon Sep 17 00:00:00 2001 From: southwest miao <1403572259@qq.com> Date: Thu, 23 Jan 2025 21:14:15 +0800 Subject: [PATCH 20/20] fix: fmt Signed-off-by: southwest miao <1403572259@qq.com> --- dragonfly-client-storage/src/cache.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dragonfly-client-storage/src/cache.rs b/dragonfly-client-storage/src/cache.rs index 3ba86ae2..19226699 100644 --- a/dragonfly-client-storage/src/cache.rs +++ b/dragonfly-client-storage/src/cache.rs @@ -360,7 +360,10 @@ mod tests { let key = format!("concurrent-{}", i); let data = vec![i as u8; piece_length as usize]; let mut writer = Cursor::new(data.clone()); - cache.write_piece(&key, &mut writer, piece_length).await.unwrap(); + cache + .write_piece(&key, &mut writer, piece_length) + .await + .unwrap(); let mut reader = cache.read_piece(&key, 0, piece_length, None).await.unwrap(); let mut buffer = Vec::new();