From f0d3da52e1e744287dd081359341411c83642d7a Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Wed, 10 May 2023 10:17:51 +0200 Subject: [PATCH] dev: fix remaning clippy warnings 2 --- project-words.txt | 1 + src/cache/image/manager.rs | 57 +++-- src/cache/mod.rs | 222 +++++++++++++++++- src/routes/proxy.rs | 5 + .../torrent_transferrer_tester.rs | 4 +- 5 files changed, 262 insertions(+), 27 deletions(-) diff --git a/project-words.txt b/project-words.txt index a76aa985..289364ad 100644 --- a/project-words.txt +++ b/project-words.txt @@ -21,6 +21,7 @@ hexlify httpseeds imagoodboy imdl +indexmap infohash jsonwebtoken leechers diff --git a/src/cache/image/manager.rs b/src/cache/image/manager.rs index bfef1589..9e8d814c 100644 --- a/src/cache/image/manager.rs +++ b/src/cache/image/manager.rs @@ -5,7 +5,7 @@ use std::time::{Duration, SystemTime}; use bytes::Bytes; use tokio::sync::RwLock; -use crate::cache::cache::BytesCache; +use crate::cache::BytesCache; use crate::config::Configuration; use crate::models::user::UserCompact; @@ -21,10 +21,10 @@ type UserQuotas = HashMap; #[must_use] pub fn now_in_secs() -> u64 { - match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { - Ok(n) => n.as_secs(), - Err(_) => panic!("SystemTime before UNIX EPOCH!"), - } + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!") + .as_secs() } #[derive(Clone)] @@ -48,14 +48,19 @@ impl ImageCacheQuota { } } - pub fn add_usage(&mut self, amount: usize) -> Result<(), ()> { + /// Add Usage Quota + /// + /// # Errors + /// + /// This function will return a `Error::UserQuotaMet` if user quota has been met. + pub fn add_usage(&mut self, amount: usize) -> Result<(), Error> { // Check if quota needs to be reset. if now_in_secs() - self.date_start_secs > self.period_secs { self.reset(); } if self.is_reached() { - return Err(()); + return Err(Error::UserQuotaMet); } self.usage = self.usage.saturating_add(amount); @@ -92,7 +97,7 @@ impl ImageCacheService { let reqwest_client = reqwest::Client::builder() .timeout(Duration::from_millis(settings.image_cache.max_request_timeout_ms)) .build() - .unwrap(); + .expect("unable to build client request"); drop(settings); @@ -106,33 +111,37 @@ impl ImageCacheService { /// Get an image from the url and insert it into the cache if it isn't cached already. /// Unauthenticated users can only get already cached images. + /// + /// # Errors + /// + /// Return a `Error::Unauthenticated` if the user has not been authenticated. pub async fn get_image_by_url(&self, url: &str, opt_user: Option) -> Result { if let Some(entry) = self.image_cache.read().await.get(url).await { return Ok(entry.bytes); } - if opt_user.is_none() { - return Err(Error::Unauthenticated); - } - - let user = opt_user.unwrap(); + match opt_user { + None => Err(Error::Unauthenticated), - self.check_user_quota(&user).await?; + Some(user) => { + self.check_user_quota(&user).await?; - let image_bytes = self.get_image_from_url_as_bytes(url).await?; + let image_bytes = self.get_image_from_url_as_bytes(url).await?; - self.check_image_size(&image_bytes).await?; + self.check_image_size(&image_bytes).await?; - // These two functions could be executed after returning the image to the client, - // but than we would need a dedicated task or thread that executes these functions. - // This can be problematic if a task is spawned after every user request. - // Since these functions execute very fast, I don't see a reason to further optimize this. - // For now. - self.update_image_cache(url, &image_bytes).await?; + // These two functions could be executed after returning the image to the client, + // but than we would need a dedicated task or thread that executes these functions. + // This can be problematic if a task is spawned after every user request. + // Since these functions execute very fast, I don't see a reason to further optimize this. + // For now. + self.update_image_cache(url, &image_bytes).await?; - self.update_user_quota(&user, image_bytes.len()).await?; + self.update_user_quota(&user, image_bytes.len()).await?; - Ok(image_bytes) + Ok(image_bytes) + } + } } async fn get_image_from_url_as_bytes(&self, url: &str) -> Result { diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 3afdefbc..1696cdb8 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,2 +1,222 @@ -pub mod cache; pub mod image; + +use bytes::Bytes; +use indexmap::IndexMap; + +#[derive(Debug)] +pub enum Error { + EntrySizeLimitExceedsTotalCapacity, + BytesExceedEntrySizeLimit, + CacheCapacityIsTooSmall, +} + +#[derive(Debug, Clone)] +pub struct BytesCacheEntry { + pub bytes: Bytes, +} + +// Individual entry destined for the byte cache. +impl BytesCacheEntry { + pub fn new(bytes: Bytes) -> Self { + Self { bytes } + } +} +#[allow(clippy::module_name_repetitions)] +pub struct BytesCache { + bytes_table: IndexMap, + total_capacity: usize, + entry_size_limit: usize, +} + +impl BytesCache { + #[must_use] + pub fn new() -> Self { + Self { + bytes_table: IndexMap::new(), + total_capacity: 0, + entry_size_limit: 0, + } + } + + // With a total capacity in bytes. + #[must_use] + pub fn with_capacity(capacity: usize) -> Self { + let mut new = Self::new(); + + new.total_capacity = capacity; + + new + } + + // With a limit for individual entry sizes. + #[must_use] + pub fn with_entry_size_limit(entry_size_limit: usize) -> Self { + let mut new = Self::new(); + + new.entry_size_limit = entry_size_limit; + + new + } + + /// Helper to create a new bytes cache with both an individual entry and size limit. + /// + /// # Errors + /// + /// This function will return `Error::EntrySizeLimitExceedsTotalCapacity` if the specified size is too large. + /// + pub fn with_capacity_and_entry_size_limit(capacity: usize, entry_size_limit: usize) -> Result { + if entry_size_limit > capacity { + return Err(Error::EntrySizeLimitExceedsTotalCapacity); + } + + let mut new = Self::new(); + + new.total_capacity = capacity; + new.entry_size_limit = entry_size_limit; + + Ok(new) + } + + #[allow(clippy::unused_async)] + pub async fn get(&self, key: &str) -> Option { + self.bytes_table.get(key).cloned() + } + + // Return the amount of entries in the map. + #[allow(clippy::unused_async)] + pub async fn len(&self) -> usize { + self.bytes_table.len() + } + + #[allow(clippy::unused_async)] + pub async fn is_empty(&self) -> bool { + self.bytes_table.is_empty() + } + + // Size of all the entry bytes combined. + #[must_use] + pub fn total_size(&self) -> usize { + let mut size: usize = 0; + + for (_, entry) in self.bytes_table.iter() { + size += entry.bytes.len(); + } + + size + } + + /// Adds a image to the cache. + /// + /// # Errors + /// + /// This function will return an error if there is not enough free size. + /// + // Insert bytes using key. + // TODO: Freed space might need to be reserved. Hold and pass write lock between functions? + // For TO DO above: semaphore: Arc, might be a solution. + #[allow(clippy::unused_async)] + pub async fn set(&mut self, key: String, bytes: Bytes) -> Result, Error> { + if bytes.len() > self.entry_size_limit { + return Err(Error::BytesExceedEntrySizeLimit); + } + + // Remove the old entry so that a new entry will be added as last in the queue. + let _ = self.bytes_table.shift_remove(&key); + + let bytes_cache_entry = BytesCacheEntry::new(bytes); + + self.free_size(bytes_cache_entry.bytes.len())?; + + Ok(self.bytes_table.insert(key, bytes_cache_entry)) + } + + // Free space. Size amount in bytes. + fn free_size(&mut self, size: usize) -> Result<(), Error> { + // Size may not exceed the total capacity of the bytes cache. + if size > self.total_capacity { + return Err(Error::CacheCapacityIsTooSmall); + } + + let cache_size = self.total_size(); + let size_to_be_freed = size.saturating_sub(self.total_capacity - cache_size); + let mut size_freed: usize = 0; + + while size_freed < size_to_be_freed { + let oldest_entry = self + .pop() + .expect("bytes cache has no more entries, yet there isn't enough space."); + + size_freed += oldest_entry.bytes.len(); + } + + Ok(()) + } + + // Remove and return the oldest entry. + pub fn pop(&mut self) -> Option { + self.bytes_table.shift_remove_index(0).map(|(_, entry)| entry) + } +} + +impl Default for BytesCache { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use crate::cache::BytesCache; + + #[tokio::test] + async fn set_bytes_cache_with_capacity_and_entry_size_limit_should_succeed() { + let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(6, 6).unwrap(); + let bytes: Bytes = Bytes::from("abcdef"); + + assert!(bytes_cache.set("1".to_string(), bytes).await.is_ok()); + } + + #[tokio::test] + async fn given_a_bytes_cache_with_a_capacity_and_entry_size_limit_it_should_allow_adding_new_entries_if_the_limit_is_not_exceeded( + ) { + let bytes: Bytes = Bytes::from("abcdef"); + + let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(bytes.len() * 2, bytes.len()).unwrap(); + + // Add first entry (6 bytes) + assert!(bytes_cache.set("key1".to_string(), bytes.clone()).await.is_ok()); + + // Add second entry (6 bytes) + assert!(bytes_cache.set("key2".to_string(), bytes).await.is_ok()); + + // Both entries were added because we did not reach the limit + assert_eq!(bytes_cache.len().await, 2); + } + + #[tokio::test] + async fn given_a_bytes_cache_with_a_capacity_and_entry_size_limit_it_should_not_allow_adding_new_entries_if_the_capacity_is_exceeded( + ) { + let bytes: Bytes = Bytes::from("abcdef"); + + let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(bytes.len() * 2 - 1, bytes.len()).unwrap(); + + // Add first entry (6 bytes) + assert!(bytes_cache.set("key1".to_string(), bytes.clone()).await.is_ok()); + + // Add second entry (6 bytes) + assert!(bytes_cache.set("key2".to_string(), bytes).await.is_ok()); + + // Only one entry is in the cache, because otherwise the total capacity would have been exceeded + assert_eq!(bytes_cache.len().await, 1); + } + + #[tokio::test] + async fn set_bytes_cache_with_capacity_and_entry_size_limit_should_fail() { + let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(6, 5).unwrap(); + let bytes: Bytes = Bytes::from("abcdef"); + + assert!(bytes_cache.set("1".to_string(), bytes).await.is_err()); + } +} diff --git a/src/routes/proxy.rs b/src/routes/proxy.rs index 0863a08c..c61b9326 100644 --- a/src/routes/proxy.rs +++ b/src/routes/proxy.rs @@ -53,6 +53,11 @@ fn load_error_images() { }); } +/// Get the proxy image. +/// +/// # Errors +/// +/// This function will return `Ok` only for now. pub async fn get_proxy_image(req: HttpRequest, app_data: WebAppData, path: web::Path) -> ServiceResult { // Check for optional user. let opt_user = app_data.auth.get_user_compact_from_request(&req).await.ok(); diff --git a/tests/upgrades/from_v1_0_0_to_v2_0_0/transferrer_testers/torrent_transferrer_tester.rs b/tests/upgrades/from_v1_0_0_to_v2_0_0/transferrer_testers/torrent_transferrer_tester.rs index 0ee1e123..ecc3511c 100644 --- a/tests/upgrades/from_v1_0_0_to_v2_0_0/transferrer_testers/torrent_transferrer_tester.rs +++ b/tests/upgrades/from_v1_0_0_to_v2_0_0/transferrer_testers/torrent_transferrer_tester.rs @@ -87,7 +87,7 @@ impl TorrentTester { #[allow(clippy::missing_panics_doc)] pub async fn assert_data_in_target_db(&self, upload_path: &str) { for torrent in &self.test_data.torrents { - let filepath = self.torrent_file_path(upload_path, torrent.torrent_id); + let filepath = Self::torrent_file_path(upload_path, torrent.torrent_id); let torrent_file = read_torrent_from_file(&filepath).unwrap(); @@ -98,7 +98,7 @@ impl TorrentTester { } } - pub fn torrent_file_path(&self, upload_path: &str, torrent_id: i64) -> String { + pub fn torrent_file_path(upload_path: &str, torrent_id: i64) -> String { format!("{}/{}.torrent", &upload_path, &torrent_id) }