Skip to content

Commit

Permalink
dev: fix remaning clippy warnings 2
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed May 10, 2023
1 parent da91f97 commit f0d3da5
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 27 deletions.
1 change: 1 addition & 0 deletions project-words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hexlify
httpseeds
imagoodboy
imdl
indexmap
infohash
jsonwebtoken
leechers
Expand Down
57 changes: 33 additions & 24 deletions src/cache/image/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, SystemTime};
use bytes::Bytes;
use tokio::sync::RwLock;

use crate::cache::cache::BytesCache;
use crate::cache::BytesCache;
use crate::config::Configuration;
use crate::models::user::UserCompact;

Expand All @@ -21,10 +21,10 @@ type UserQuotas = HashMap<i64, ImageCacheQuota>;

#[must_use]
pub fn now_in_secs() -> u64 {
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_secs()
}

#[derive(Clone)]
Expand All @@ -48,14 +48,19 @@ impl ImageCacheQuota {
}
}

pub fn add_usage(&mut self, amount: usize) -> Result<(), ()> {
/// Add Usage Quota
///
/// # Errors
///
/// This function will return a `Error::UserQuotaMet` if user quota has been met.
pub fn add_usage(&mut self, amount: usize) -> Result<(), Error> {
// Check if quota needs to be reset.
if now_in_secs() - self.date_start_secs > self.period_secs {
self.reset();
}

if self.is_reached() {
return Err(());
return Err(Error::UserQuotaMet);
}

self.usage = self.usage.saturating_add(amount);
Expand Down Expand Up @@ -92,7 +97,7 @@ impl ImageCacheService {
let reqwest_client = reqwest::Client::builder()
.timeout(Duration::from_millis(settings.image_cache.max_request_timeout_ms))
.build()
.unwrap();
.expect("unable to build client request");

drop(settings);

Expand All @@ -106,33 +111,37 @@ impl ImageCacheService {

/// Get an image from the url and insert it into the cache if it isn't cached already.
/// Unauthenticated users can only get already cached images.
///
/// # Errors
///
/// Return a `Error::Unauthenticated` if the user has not been authenticated.
pub async fn get_image_by_url(&self, url: &str, opt_user: Option<UserCompact>) -> Result<Bytes, Error> {
if let Some(entry) = self.image_cache.read().await.get(url).await {
return Ok(entry.bytes);
}

if opt_user.is_none() {
return Err(Error::Unauthenticated);
}

let user = opt_user.unwrap();
match opt_user {
None => Err(Error::Unauthenticated),

self.check_user_quota(&user).await?;
Some(user) => {
self.check_user_quota(&user).await?;

let image_bytes = self.get_image_from_url_as_bytes(url).await?;
let image_bytes = self.get_image_from_url_as_bytes(url).await?;

self.check_image_size(&image_bytes).await?;
self.check_image_size(&image_bytes).await?;

// These two functions could be executed after returning the image to the client,
// but than we would need a dedicated task or thread that executes these functions.
// This can be problematic if a task is spawned after every user request.
// Since these functions execute very fast, I don't see a reason to further optimize this.
// For now.
self.update_image_cache(url, &image_bytes).await?;
// These two functions could be executed after returning the image to the client,
// but than we would need a dedicated task or thread that executes these functions.
// This can be problematic if a task is spawned after every user request.
// Since these functions execute very fast, I don't see a reason to further optimize this.
// For now.
self.update_image_cache(url, &image_bytes).await?;

self.update_user_quota(&user, image_bytes.len()).await?;
self.update_user_quota(&user, image_bytes.len()).await?;

Ok(image_bytes)
Ok(image_bytes)
}
}
}

async fn get_image_from_url_as_bytes(&self, url: &str) -> Result<Bytes, Error> {
Expand Down
222 changes: 221 additions & 1 deletion src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,222 @@
pub mod cache;
pub mod image;

use bytes::Bytes;
use indexmap::IndexMap;

#[derive(Debug)]
pub enum Error {
EntrySizeLimitExceedsTotalCapacity,
BytesExceedEntrySizeLimit,
CacheCapacityIsTooSmall,
}

#[derive(Debug, Clone)]
pub struct BytesCacheEntry {
pub bytes: Bytes,
}

// Individual entry destined for the byte cache.
impl BytesCacheEntry {
pub fn new(bytes: Bytes) -> Self {
Self { bytes }
}
}
#[allow(clippy::module_name_repetitions)]
pub struct BytesCache {
bytes_table: IndexMap<String, BytesCacheEntry>,
total_capacity: usize,
entry_size_limit: usize,
}

impl BytesCache {
#[must_use]
pub fn new() -> Self {
Self {
bytes_table: IndexMap::new(),
total_capacity: 0,
entry_size_limit: 0,
}
}

// With a total capacity in bytes.
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let mut new = Self::new();

new.total_capacity = capacity;

new
}

// With a limit for individual entry sizes.
#[must_use]
pub fn with_entry_size_limit(entry_size_limit: usize) -> Self {
let mut new = Self::new();

new.entry_size_limit = entry_size_limit;

new
}

/// Helper to create a new bytes cache with both an individual entry and size limit.
///
/// # Errors
///
/// This function will return `Error::EntrySizeLimitExceedsTotalCapacity` if the specified size is too large.
///
pub fn with_capacity_and_entry_size_limit(capacity: usize, entry_size_limit: usize) -> Result<Self, Error> {
if entry_size_limit > capacity {
return Err(Error::EntrySizeLimitExceedsTotalCapacity);
}

let mut new = Self::new();

new.total_capacity = capacity;
new.entry_size_limit = entry_size_limit;

Ok(new)
}

#[allow(clippy::unused_async)]
pub async fn get(&self, key: &str) -> Option<BytesCacheEntry> {
self.bytes_table.get(key).cloned()
}

// Return the amount of entries in the map.
#[allow(clippy::unused_async)]
pub async fn len(&self) -> usize {
self.bytes_table.len()
}

#[allow(clippy::unused_async)]
pub async fn is_empty(&self) -> bool {
self.bytes_table.is_empty()
}

// Size of all the entry bytes combined.
#[must_use]
pub fn total_size(&self) -> usize {
let mut size: usize = 0;

for (_, entry) in self.bytes_table.iter() {
size += entry.bytes.len();
}

size
}

/// Adds a image to the cache.
///
/// # Errors
///
/// This function will return an error if there is not enough free size.
///
// Insert bytes using key.
// TODO: Freed space might need to be reserved. Hold and pass write lock between functions?
// For TO DO above: semaphore: Arc<tokio::sync::Semaphore>, might be a solution.
#[allow(clippy::unused_async)]
pub async fn set(&mut self, key: String, bytes: Bytes) -> Result<Option<BytesCacheEntry>, Error> {
if bytes.len() > self.entry_size_limit {
return Err(Error::BytesExceedEntrySizeLimit);
}

// Remove the old entry so that a new entry will be added as last in the queue.
let _ = self.bytes_table.shift_remove(&key);

let bytes_cache_entry = BytesCacheEntry::new(bytes);

self.free_size(bytes_cache_entry.bytes.len())?;

Ok(self.bytes_table.insert(key, bytes_cache_entry))
}

// Free space. Size amount in bytes.
fn free_size(&mut self, size: usize) -> Result<(), Error> {
// Size may not exceed the total capacity of the bytes cache.
if size > self.total_capacity {
return Err(Error::CacheCapacityIsTooSmall);
}

let cache_size = self.total_size();
let size_to_be_freed = size.saturating_sub(self.total_capacity - cache_size);
let mut size_freed: usize = 0;

while size_freed < size_to_be_freed {
let oldest_entry = self
.pop()
.expect("bytes cache has no more entries, yet there isn't enough space.");

size_freed += oldest_entry.bytes.len();
}

Ok(())
}

// Remove and return the oldest entry.
pub fn pop(&mut self) -> Option<BytesCacheEntry> {
self.bytes_table.shift_remove_index(0).map(|(_, entry)| entry)
}
}

impl Default for BytesCache {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;

use crate::cache::BytesCache;

#[tokio::test]
async fn set_bytes_cache_with_capacity_and_entry_size_limit_should_succeed() {
let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(6, 6).unwrap();
let bytes: Bytes = Bytes::from("abcdef");

assert!(bytes_cache.set("1".to_string(), bytes).await.is_ok());
}

#[tokio::test]
async fn given_a_bytes_cache_with_a_capacity_and_entry_size_limit_it_should_allow_adding_new_entries_if_the_limit_is_not_exceeded(
) {
let bytes: Bytes = Bytes::from("abcdef");

let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(bytes.len() * 2, bytes.len()).unwrap();

// Add first entry (6 bytes)
assert!(bytes_cache.set("key1".to_string(), bytes.clone()).await.is_ok());

// Add second entry (6 bytes)
assert!(bytes_cache.set("key2".to_string(), bytes).await.is_ok());

// Both entries were added because we did not reach the limit
assert_eq!(bytes_cache.len().await, 2);
}

#[tokio::test]
async fn given_a_bytes_cache_with_a_capacity_and_entry_size_limit_it_should_not_allow_adding_new_entries_if_the_capacity_is_exceeded(
) {
let bytes: Bytes = Bytes::from("abcdef");

let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(bytes.len() * 2 - 1, bytes.len()).unwrap();

// Add first entry (6 bytes)
assert!(bytes_cache.set("key1".to_string(), bytes.clone()).await.is_ok());

// Add second entry (6 bytes)
assert!(bytes_cache.set("key2".to_string(), bytes).await.is_ok());

// Only one entry is in the cache, because otherwise the total capacity would have been exceeded
assert_eq!(bytes_cache.len().await, 1);
}

#[tokio::test]
async fn set_bytes_cache_with_capacity_and_entry_size_limit_should_fail() {
let mut bytes_cache = BytesCache::with_capacity_and_entry_size_limit(6, 5).unwrap();
let bytes: Bytes = Bytes::from("abcdef");

assert!(bytes_cache.set("1".to_string(), bytes).await.is_err());
}
}
5 changes: 5 additions & 0 deletions src/routes/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ fn load_error_images() {
});
}

/// Get the proxy image.
///
/// # Errors
///
/// This function will return `Ok` only for now.
pub async fn get_proxy_image(req: HttpRequest, app_data: WebAppData, path: web::Path<String>) -> ServiceResult<impl Responder> {
// Check for optional user.
let opt_user = app_data.auth.get_user_compact_from_request(&req).await.ok();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl TorrentTester {
#[allow(clippy::missing_panics_doc)]
pub async fn assert_data_in_target_db(&self, upload_path: &str) {
for torrent in &self.test_data.torrents {
let filepath = self.torrent_file_path(upload_path, torrent.torrent_id);
let filepath = Self::torrent_file_path(upload_path, torrent.torrent_id);

let torrent_file = read_torrent_from_file(&filepath).unwrap();

Expand All @@ -98,7 +98,7 @@ impl TorrentTester {
}
}

pub fn torrent_file_path(&self, upload_path: &str, torrent_id: i64) -> String {
pub fn torrent_file_path(upload_path: &str, torrent_id: i64) -> String {
format!("{}/{}.torrent", &upload_path, &torrent_id)
}

Expand Down

0 comments on commit f0d3da5

Please sign in to comment.