diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index ef8724be79d..6026f2c0a06 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -1504,7 +1504,7 @@ impl Client { policy: MediaRetentionPolicy, ) -> Result<(), ClientError> { let closure = async || -> Result<_, EventCacheError> { - let store = self.inner.event_cache_store().lock().await?; + let store = self.inner.media_store().lock().await?; Ok(store.set_media_retention_policy(policy).await?) }; @@ -1552,7 +1552,7 @@ impl Client { // Clean up the media cache according to the current media retention policy. self.inner - .event_cache_store() + .media_store() .lock() .await .map_err(EventCacheError::from)? diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 250ed4e589e..6f6a3898101 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -56,7 +56,7 @@ use crate::{ InviteAcceptanceDetails, RoomStateFilter, SessionMeta, deserialized_responses::DisplayName, error::{Error, Result}, - event_cache::store::EventCacheStoreLock, + event_cache::store::{EventCacheStoreLock, media::MediaStoreLock}, response_processors::{self as processors, Context}, room::{ Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState, @@ -91,6 +91,9 @@ pub struct BaseClient { /// The store used by the event cache. event_cache_store: EventCacheStoreLock, + /// The store used by the media cache. + media_store: MediaStoreLock, + /// The store used for encryption. /// /// This field is only meant to be used for `OlmMachine` initialization. @@ -189,6 +192,7 @@ impl BaseClient { BaseClient { state_store: store, event_cache_store: config.event_cache_store, + media_store: config.media_store, #[cfg(feature = "e2e-encryption")] crypto_store: config.crypto_store, #[cfg(feature = "e2e-encryption")] @@ -222,6 +226,7 @@ impl BaseClient { let copy = Self { state_store: BaseStateStore::new(config.state_store), event_cache_store: config.event_cache_store, + media_store: config.media_store, // We copy the crypto store as well as the `OlmMachine` for two reasons: // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`. // 2. We need to ensure that the parent and child use the same data and caches inside @@ -306,6 +311,11 @@ impl BaseClient { &self.event_cache_store } + /// Get a reference to the media store. + pub fn media_store(&self) -> &MediaStoreLock { + &self.media_store + } + /// Check whether the client has been activated. /// /// See [`BaseClient::activate`] to know what it means. diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 72a6b3ed030..fde13e7d4b1 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -29,24 +29,17 @@ use matrix_sdk_common::{ }; use matrix_sdk_test::{ALICE, DEFAULT_TEST_ROOM_ID, event_factory::EventFactory}; use ruma::{ - EventId, RoomId, - api::client::media::get_content_thumbnail::v3::Method, - event_id, + EventId, RoomId, event_id, events::{ - AnyMessageLikeEvent, AnyTimelineEvent, - relation::RelationType, - room::{MediaSource, message::RoomMessageEventContentWithoutRelation}, + AnyMessageLikeEvent, AnyTimelineEvent, relation::RelationType, + room::message::RoomMessageEventContentWithoutRelation, }, - mxc_uri, push::Action, - room_id, uint, + room_id, }; -use super::{DynEventCacheStore, media::IgnoreMediaRetentionPolicy}; -use crate::{ - event_cache::{Gap, store::DEFAULT_CHUNK_CAPACITY}, - media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings}, -}; +use super::DynEventCacheStore; +use crate::event_cache::{Gap, store::DEFAULT_CHUNK_CAPACITY}; /// Create a test event with all data filled, for testing that linked chunk /// correctly stores event data. @@ -118,12 +111,6 @@ pub fn check_test_event(event: &TimelineEvent, text: &str) { /// `event_cache_store_integration_tests!` macro. #[allow(async_fn_in_trait)] pub trait EventCacheStoreIntegrationTests { - /// Test media content storage. - async fn test_media_content(&self); - - /// Test replacing a MXID. - async fn test_replace_media_key(&self); - /// Test handling updates to a linked chunk and reloading these updates from /// the store. async fn test_handle_updates_and_rebuild_linked_chunk(&self); @@ -163,190 +150,6 @@ pub trait EventCacheStoreIntegrationTests { } impl EventCacheStoreIntegrationTests for DynEventCacheStore { - async fn test_media_content(&self) { - let uri = mxc_uri!("mxc://localhost/media"); - let request_file = MediaRequestParameters { - source: MediaSource::Plain(uri.to_owned()), - format: MediaFormat::File, - }; - let request_thumbnail = MediaRequestParameters { - source: MediaSource::Plain(uri.to_owned()), - format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method( - Method::Crop, - uint!(100), - uint!(100), - )), - }; - - let other_uri = mxc_uri!("mxc://localhost/media-other"); - let request_other_file = MediaRequestParameters { - source: MediaSource::Plain(other_uri.to_owned()), - format: MediaFormat::File, - }; - - let content: Vec = "hello".into(); - let thumbnail_content: Vec = "world".into(); - let other_content: Vec = "foo".into(); - - // Media isn't present in the cache. - assert!( - self.get_media_content(&request_file).await.unwrap().is_none(), - "unexpected media found" - ); - assert!( - self.get_media_content(&request_thumbnail).await.unwrap().is_none(), - "media not found" - ); - - // Let's add the media. - self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No) - .await - .expect("adding media failed"); - - // Media is present in the cache. - assert_eq!( - self.get_media_content(&request_file).await.unwrap().as_ref(), - Some(&content), - "media not found though added" - ); - assert_eq!( - self.get_media_content_for_uri(uri).await.unwrap().as_ref(), - Some(&content), - "media not found by URI though added" - ); - - // Let's remove the media. - self.remove_media_content(&request_file).await.expect("removing media failed"); - - // Media isn't present in the cache. - assert!( - self.get_media_content(&request_file).await.unwrap().is_none(), - "media still there after removing" - ); - assert!( - self.get_media_content_for_uri(uri).await.unwrap().is_none(), - "media still found by URI after removing" - ); - - // Let's add the media again. - self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No) - .await - .expect("adding media again failed"); - - assert_eq!( - self.get_media_content(&request_file).await.unwrap().as_ref(), - Some(&content), - "media not found after adding again" - ); - - // Let's add the thumbnail media. - self.add_media_content( - &request_thumbnail, - thumbnail_content.clone(), - IgnoreMediaRetentionPolicy::No, - ) - .await - .expect("adding thumbnail failed"); - - // Media's thumbnail is present. - assert_eq!( - self.get_media_content(&request_thumbnail).await.unwrap().as_ref(), - Some(&thumbnail_content), - "thumbnail not found" - ); - - // We get a file with the URI, we don't know which one. - assert!( - self.get_media_content_for_uri(uri).await.unwrap().is_some(), - "media not found by URI though two where added" - ); - - // Let's add another media with a different URI. - self.add_media_content( - &request_other_file, - other_content.clone(), - IgnoreMediaRetentionPolicy::No, - ) - .await - .expect("adding other media failed"); - - // Other file is present. - assert_eq!( - self.get_media_content(&request_other_file).await.unwrap().as_ref(), - Some(&other_content), - "other file not found" - ); - assert_eq!( - self.get_media_content_for_uri(other_uri).await.unwrap().as_ref(), - Some(&other_content), - "other file not found by URI" - ); - - // Let's remove media based on URI. - self.remove_media_content_for_uri(uri).await.expect("removing all media for uri failed"); - - assert!( - self.get_media_content(&request_file).await.unwrap().is_none(), - "media wasn't removed" - ); - assert!( - self.get_media_content(&request_thumbnail).await.unwrap().is_none(), - "thumbnail wasn't removed" - ); - assert!( - self.get_media_content(&request_other_file).await.unwrap().is_some(), - "other media was removed" - ); - assert!( - self.get_media_content_for_uri(uri).await.unwrap().is_none(), - "media found by URI wasn't removed" - ); - assert!( - self.get_media_content_for_uri(other_uri).await.unwrap().is_some(), - "other media found by URI was removed" - ); - } - - async fn test_replace_media_key(&self) { - let uri = mxc_uri!("mxc://sendqueue.local/tr4n-s4ct-10n1-d"); - let req = MediaRequestParameters { - source: MediaSource::Plain(uri.to_owned()), - format: MediaFormat::File, - }; - - let content = "hello".as_bytes().to_owned(); - - // Media isn't present in the cache. - assert!(self.get_media_content(&req).await.unwrap().is_none(), "unexpected media found"); - - // Add the media. - self.add_media_content(&req, content.clone(), IgnoreMediaRetentionPolicy::No) - .await - .expect("adding media failed"); - - // Sanity-check: media is found after adding it. - assert_eq!(self.get_media_content(&req).await.unwrap().unwrap(), b"hello"); - - // Replacing a media request works. - let new_uri = mxc_uri!("mxc://matrix.org/tr4n-s4ct-10n1-d"); - let new_req = MediaRequestParameters { - source: MediaSource::Plain(new_uri.to_owned()), - format: MediaFormat::File, - }; - self.replace_media_key(&req, &new_req) - .await - .expect("replacing the media request key failed"); - - // Finding with the previous request doesn't work anymore. - assert!( - self.get_media_content(&req).await.unwrap().is_none(), - "unexpected media found with the old key" - ); - - // Finding with the new request does work. - assert_eq!(self.get_media_content(&new_req).await.unwrap().unwrap(), b"hello"); - } - async fn test_handle_updates_and_rebuild_linked_chunk(&self) { let room_id = room_id!("!r0:matrix.org"); let linked_chunk_id = LinkedChunkId::Room(room_id); @@ -1333,20 +1136,6 @@ macro_rules! event_cache_store_integration_tests { use super::get_event_cache_store; - #[async_test] - async fn test_media_content() { - let event_cache_store = - get_event_cache_store().await.unwrap().into_event_cache_store(); - event_cache_store.test_media_content().await; - } - - #[async_test] - async fn test_replace_media_key() { - let event_cache_store = - get_event_cache_store().await.unwrap().into_event_cache_store(); - event_cache_store.test_replace_media_key().await; - } - #[async_test] async fn test_handle_updates_and_rebuild_linked_chunk() { let event_cache_store = diff --git a/crates/matrix-sdk-base/src/event_cache/store/media/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/media/integration_tests.rs index 3ed5c89aeed..9cf885cadd3 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/media/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/media/integration_tests.rs @@ -12,26 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Trait and macro of integration tests for `EventCacheStoreMedia` +//! Trait and macro of integration tests for `MediaStoreInner` //! implementations. use ruma::{ events::room::MediaSource, + media::Method, mxc_uri, owned_mxc_uri, time::{Duration, SystemTime}, + uint, }; -use super::{ - EventCacheStoreMedia, MediaRetentionPolicy, media_service::IgnoreMediaRetentionPolicy, +use super::{MediaRetentionPolicy, MediaStoreInner, media_service::IgnoreMediaRetentionPolicy}; +use crate::{ + event_cache::store::media::MediaStore, + media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings}, }; -use crate::media::{MediaFormat, MediaRequestParameters}; -/// [`EventCacheStoreMedia`] integration tests. +/// [`MediaStoreInner`] integration tests. /// /// This trait is not meant to be used directly, but will be used with the -/// `event_cache_store_media_integration_tests!` macro. +/// `media_store_inner_integration_tests!` macro. #[allow(async_fn_in_trait)] -pub trait EventCacheStoreMediaIntegrationTests { +pub trait MediaStoreInnerIntegrationTests { /// Test media retention policy storage. async fn test_store_media_retention_policy(&self); @@ -56,9 +59,9 @@ pub trait EventCacheStoreMediaIntegrationTests { async fn test_store_last_media_cleanup_time(&self); } -impl EventCacheStoreMediaIntegrationTests for Store +impl MediaStoreInnerIntegrationTests for Store where - Store: EventCacheStoreMedia + std::fmt::Debug, + Store: MediaStoreInner + std::fmt::Debug, { async fn test_store_media_retention_policy(&self) { let stored = self.media_retention_policy_inner().await.unwrap(); @@ -961,7 +964,7 @@ where } } -/// Macro building to allow your [`EventCacheStoreMedia`] implementation to run +/// Macro building to allow your [`MediaStoreInner`] implementation to run /// the entire tests suite locally. /// /// Can be run with the `with_media_size_tests` argument to include more tests @@ -969,91 +972,424 @@ where /// recommended to run those in encrypted stores because the size of the /// encrypted content may vary compared to what the tests expect. /// -/// You need to provide an `async fn get_event_cache_store() -> -/// event_cache::store::Result` that provides a fresh event cache store -/// that implements `EventCacheStoreMedia` on the same level you invoke the +/// You need to provide an `async fn get_media_store() -> +/// event_cache::store::media::Result` that provides a fresh media +/// store that implements `MediaStoreInner` on the same level you invoke the /// macro. /// /// ## Usage Example: /// ```no_run -/// # use matrix_sdk_base::event_cache::store::{ -/// # EventCacheStore, +/// # use matrix_sdk_base::event_cache::store::media{ +/// # MediaStoreInner, /// # MemoryStore as MyStore, -/// # Result as EventCacheStoreResult, +/// # Result as MediaStoreResult, /// # }; /// /// #[cfg(test)] /// mod tests { -/// use super::{EventCacheStoreResult, MyStore}; +/// use super::{MediaStoreResult, MyStore}; /// -/// async fn get_event_cache_store() -> EventCacheStoreResult { +/// async fn get_media_store() -> MediaStoreResult { /// Ok(MyStore::new()) /// } /// -/// event_cache_store_media_integration_tests!(); +/// media_store_inner_integration_tests!(); /// } /// ``` #[allow(unused_macros, unused_extern_crates)] #[macro_export] -macro_rules! event_cache_store_media_integration_tests { +macro_rules! media_store_inner_integration_tests { (with_media_size_tests) => { - mod event_cache_store_media_integration_tests { - $crate::event_cache_store_media_integration_tests!(@inner); + mod media_store_inner_integration_tests { + $crate::media_store_inner_integration_tests!(@inner); #[async_test] async fn test_media_max_file_size() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_media_max_file_size().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_media_max_file_size().await; } #[async_test] async fn test_media_max_cache_size() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_media_max_cache_size().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_media_max_cache_size().await; } #[async_test] async fn test_media_ignore_max_size() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_media_ignore_max_size().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_media_ignore_max_size().await; } } }; () => { - mod event_cache_store_media_integration_tests { - $crate::event_cache_store_media_integration_tests!(@inner); + mod media_store_inner_integration_tests { + $crate::media_store_inner_integration_tests!(@inner); } }; (@inner) => { use matrix_sdk_test::async_test; - use $crate::event_cache::store::media::EventCacheStoreMediaIntegrationTests; + use $crate::event_cache::store::media::MediaStoreInnerIntegrationTests; - use super::get_event_cache_store; + use super::get_media_store; #[async_test] async fn test_store_media_retention_policy() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_store_media_retention_policy().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_store_media_retention_policy().await; } #[async_test] async fn test_media_expiry() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_media_expiry().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_media_expiry().await; } #[async_test] async fn test_media_ignore_expiry() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_media_ignore_expiry().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_media_ignore_expiry().await; } #[async_test] async fn test_store_last_media_cleanup_time() { - let event_cache_store_media = get_event_cache_store().await.unwrap(); - event_cache_store_media.test_store_last_media_cleanup_time().await; + let media_store_inner = get_media_store().await.unwrap(); + media_store_inner.test_store_last_media_cleanup_time().await; + } + }; +} + +/// [`MediaStore`] integration tests. +/// +/// This trait is not meant to be used directly, but will be used with the +/// `media_store_inner_integration_tests!` macro. +#[allow(async_fn_in_trait)] +pub trait MediaStoreIntegrationTests { + /// Test media content storage. + async fn test_media_content(&self); + + /// Test replacing a MXID. + async fn test_replace_media_key(&self); +} + +impl MediaStoreIntegrationTests for Store +where + Store: MediaStore + std::fmt::Debug, +{ + async fn test_media_content(&self) { + let uri = mxc_uri!("mxc://localhost/media"); + let request_file = MediaRequestParameters { + source: MediaSource::Plain(uri.to_owned()), + format: MediaFormat::File, + }; + let request_thumbnail = MediaRequestParameters { + source: MediaSource::Plain(uri.to_owned()), + format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method( + Method::Crop, + uint!(100), + uint!(100), + )), + }; + + let other_uri = mxc_uri!("mxc://localhost/media-other"); + let request_other_file = MediaRequestParameters { + source: MediaSource::Plain(other_uri.to_owned()), + format: MediaFormat::File, + }; + + let content: Vec = "hello".into(); + let thumbnail_content: Vec = "world".into(); + let other_content: Vec = "foo".into(); + + // Media isn't present in the cache. + assert!( + self.get_media_content(&request_file).await.unwrap().is_none(), + "unexpected media found" + ); + assert!( + self.get_media_content(&request_thumbnail).await.unwrap().is_none(), + "media not found" + ); + + // Let's add the media. + self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No) + .await + .expect("adding media failed"); + + // Media is present in the cache. + assert_eq!( + self.get_media_content(&request_file).await.unwrap().as_ref(), + Some(&content), + "media not found though added" + ); + assert_eq!( + self.get_media_content_for_uri(uri).await.unwrap().as_ref(), + Some(&content), + "media not found by URI though added" + ); + + // Let's remove the media. + self.remove_media_content(&request_file).await.expect("removing media failed"); + + // Media isn't present in the cache. + assert!( + self.get_media_content(&request_file).await.unwrap().is_none(), + "media still there after removing" + ); + assert!( + self.get_media_content_for_uri(uri).await.unwrap().is_none(), + "media still found by URI after removing" + ); + + // Let's add the media again. + self.add_media_content(&request_file, content.clone(), IgnoreMediaRetentionPolicy::No) + .await + .expect("adding media again failed"); + + assert_eq!( + self.get_media_content(&request_file).await.unwrap().as_ref(), + Some(&content), + "media not found after adding again" + ); + + // Let's add the thumbnail media. + self.add_media_content( + &request_thumbnail, + thumbnail_content.clone(), + IgnoreMediaRetentionPolicy::No, + ) + .await + .expect("adding thumbnail failed"); + + // Media's thumbnail is present. + assert_eq!( + self.get_media_content(&request_thumbnail).await.unwrap().as_ref(), + Some(&thumbnail_content), + "thumbnail not found" + ); + + // We get a file with the URI, we don't know which one. + assert!( + self.get_media_content_for_uri(uri).await.unwrap().is_some(), + "media not found by URI though two where added" + ); + + // Let's add another media with a different URI. + self.add_media_content( + &request_other_file, + other_content.clone(), + IgnoreMediaRetentionPolicy::No, + ) + .await + .expect("adding other media failed"); + + // Other file is present. + assert_eq!( + self.get_media_content(&request_other_file).await.unwrap().as_ref(), + Some(&other_content), + "other file not found" + ); + assert_eq!( + self.get_media_content_for_uri(other_uri).await.unwrap().as_ref(), + Some(&other_content), + "other file not found by URI" + ); + + // Let's remove media based on URI. + self.remove_media_content_for_uri(uri).await.expect("removing all media for uri failed"); + + assert!( + self.get_media_content(&request_file).await.unwrap().is_none(), + "media wasn't removed" + ); + assert!( + self.get_media_content(&request_thumbnail).await.unwrap().is_none(), + "thumbnail wasn't removed" + ); + assert!( + self.get_media_content(&request_other_file).await.unwrap().is_some(), + "other media was removed" + ); + assert!( + self.get_media_content_for_uri(uri).await.unwrap().is_none(), + "media found by URI wasn't removed" + ); + assert!( + self.get_media_content_for_uri(other_uri).await.unwrap().is_some(), + "other media found by URI was removed" + ); + } + + async fn test_replace_media_key(&self) { + let uri = mxc_uri!("mxc://sendqueue.local/tr4n-s4ct-10n1-d"); + let req = MediaRequestParameters { + source: MediaSource::Plain(uri.to_owned()), + format: MediaFormat::File, + }; + + let content = "hello".as_bytes().to_owned(); + + // Media isn't present in the cache. + assert!(self.get_media_content(&req).await.unwrap().is_none(), "unexpected media found"); + + // Add the media. + self.add_media_content(&req, content.clone(), IgnoreMediaRetentionPolicy::No) + .await + .expect("adding media failed"); + + // Sanity-check: media is found after adding it. + assert_eq!(self.get_media_content(&req).await.unwrap().unwrap(), b"hello"); + + // Replacing a media request works. + let new_uri = mxc_uri!("mxc://matrix.org/tr4n-s4ct-10n1-d"); + let new_req = MediaRequestParameters { + source: MediaSource::Plain(new_uri.to_owned()), + format: MediaFormat::File, + }; + self.replace_media_key(&req, &new_req) + .await + .expect("replacing the media request key failed"); + + // Finding with the previous request doesn't work anymore. + assert!( + self.get_media_content(&req).await.unwrap().is_none(), + "unexpected media found with the old key" + ); + + // Finding with the new request does work. + assert_eq!(self.get_media_content(&new_req).await.unwrap().unwrap(), b"hello"); + } +} + +/// Macro building to allow your [`MediaStore`] implementation to run +/// the entire tests suite locally. +/// +/// You need to provide an `async fn get_media_store() -> +/// event_cache::store::media::Result` that provides a fresh media store +/// that implements `MediaStoreInner` on the same level you invoke the +/// macro. +/// +/// ## Usage Example: +/// ```no_run +/// # use matrix_sdk_base::event_cache::store::media{ +/// # MediaStore, +/// # MemoryStore as MyStore, +/// # Result as MediaStoreResult, +/// # }; +/// +/// #[cfg(test)] +/// mod tests { +/// use super::{MediaStoreResult, MyStore}; +/// +/// async fn get_media_store() -> MediaStoreResult { +/// Ok(MyStore::new()) +/// } +/// +/// media_store_integration_tests!(); +/// } +/// ``` +#[allow(unused_macros, unused_extern_crates)] +#[macro_export] +macro_rules! media_store_integration_tests { + () => { + mod media_store_integration_tests { + use matrix_sdk_test::async_test; + use $crate::event_cache::store::media::MediaStoreIntegrationTests; + + use super::get_media_store; + + #[async_test] + async fn test_media_content() { + let media_store = get_media_store().await.unwrap(); + media_store.test_media_content().await; + } + + #[async_test] + async fn test_replace_media_key() { + let media_store = get_media_store().await.unwrap(); + media_store.test_replace_media_key().await; + } + } + }; +} + +/// Macro generating tests for the media store, related to time (mostly +/// for the cross-process lock). +#[allow(unused_macros)] +#[macro_export] +macro_rules! media_store_integration_tests_time { + () => { + mod media_store_integration_tests_time { + use std::time::Duration; + + #[cfg(all(target_family = "wasm", target_os = "unknown"))] + use gloo_timers::future::sleep; + use matrix_sdk_test::async_test; + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] + use tokio::time::sleep; + use $crate::event_cache::store::media::MediaStore; + + use super::get_media_store; + + #[async_test] + async fn test_lease_locks() { + let store = get_media_store().await.unwrap(); + + let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); + assert!(acquired0); + + // Should extend the lease automatically (same holder). + let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(acquired2); + + // Should extend the lease automatically (same holder + time is ok). + let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(acquired3); + + // Another attempt at taking the lock should fail, because it's taken. + let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired4); + + // Even if we insist. + let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired5); + + // That's a nice test we got here, go take a little nap. + sleep(Duration::from_millis(50)).await; + + // Still too early. + let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired55); + + // Ok you can take another nap then. + sleep(Duration::from_millis(250)).await; + + // At some point, we do get the lock. + let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); + assert!(acquired6); + + sleep(Duration::from_millis(1)).await; + + // The other gets it almost immediately too. + let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); + assert!(acquired7); + + sleep(Duration::from_millis(1)).await; + + // But when we take a longer lease... + let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(acquired8); + + // It blocks the other user. + let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(!acquired9); + + // We can hold onto our lease. + let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(acquired10); + } } }; } diff --git a/crates/matrix-sdk-base/src/event_cache/store/media/media_service.rs b/crates/matrix-sdk-base/src/event_cache/store/media/media_service.rs index a9ff1fbaf38..32f160ffea1 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/media/media_service.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/media/media_service.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, sync::Arc}; +use std::sync::Arc; -use async_trait::async_trait; use matrix_sdk_common::{ - AsyncTraitDeps, SendOutsideWasm, SyncOutsideWasm, + SendOutsideWasm, SyncOutsideWasm, executor::{JoinHandle, spawn}, locks::Mutex, }; @@ -25,10 +24,10 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::error; use super::MediaRetentionPolicy; -use crate::{event_cache::store::EventCacheStoreError, media::MediaRequestParameters}; +use crate::{event_cache::store::media::traits::MediaStoreInner, media::MediaRequestParameters}; /// API for implementors of [`EventCacheStore`] to manage their media through -/// their implementation of [`EventCacheStoreMedia`]. +/// their implementation of [`MediaStoreInner`]. /// /// [`EventCacheStore`]: crate::event_cache::store::EventCacheStore #[derive(Debug)] @@ -122,10 +121,10 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. + /// * `store` - The `MediaStoreInner`. /// /// * `policy` - The `MediaRetentionPolicy` to use. - pub async fn set_media_retention_policy( + pub async fn set_media_retention_policy( &self, store: &Store, policy: MediaRetentionPolicy, @@ -148,7 +147,7 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. + /// * `store` - The `MediaStoreInner`. /// /// * `request` - The `MediaRequestParameters` of the file. /// @@ -156,7 +155,7 @@ where /// /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be /// ignored. - pub async fn add_media_content( + pub async fn add_media_content( &self, store: &Store, request: &MediaRequestParameters, @@ -189,13 +188,13 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. + /// * `store` - The `MediaStoreInner`. /// /// * `request` - The `MediaRequestParameters` of the file. /// /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be /// ignored. - pub async fn set_ignore_media_retention_policy( + pub async fn set_ignore_media_retention_policy( &self, store: &Store, request: &MediaRequestParameters, @@ -208,10 +207,10 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. + /// * `store` - The `MediaStoreInner`. /// /// * `request` - The `MediaRequestParameters` of the file. - pub async fn get_media_content( + pub async fn get_media_content( &self, store: &Store, request: &MediaRequestParameters, @@ -229,10 +228,10 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. + /// * `store` - The `MediaStoreInner`. /// /// * `uri` - The `MxcUri` of the media file. - pub async fn get_media_content_for_uri( + pub async fn get_media_content_for_uri( &self, store: &Store, uri: &MxcUri, @@ -251,15 +250,15 @@ where /// /// # Arguments /// - /// * `store` - The `EventCacheStoreMedia`. - pub async fn clean_up_media_cache( + /// * `store` - The `MediaStoreInner`. + pub async fn clean_up_media_cache( &self, store: &Store, ) -> Result<(), Store::Error> { self.clean_up_media_cache_inner(store, self.now()).await } - async fn clean_up_media_cache_inner( + async fn clean_up_media_cache_inner( &self, store: &Store, current_time: SystemTime, @@ -290,7 +289,7 @@ where /// * The media retention policy's `cleanup_frequency` is set and enough /// time has passed since the last cleanup. /// * No other cleanup is running, - fn maybe_spawn_automatic_media_cache_cleanup( + fn maybe_spawn_automatic_media_cache_cleanup( &self, store: &Store, current_time: SystemTime, @@ -349,132 +348,6 @@ where } } -/// An abstract trait that can be used to implement different store backends -/// for the media cache of the SDK. -/// -/// The main purposes of this trait are to be able to centralize where we handle -/// [`MediaRetentionPolicy`] by wrapping this in a [`MediaService`], and to -/// simplify the implementation of tests by being able to have complete control -/// over the `SystemTime`s provided to the store. -#[cfg_attr(target_family = "wasm", async_trait(?Send))] -#[cfg_attr(not(target_family = "wasm"), async_trait)] -pub trait EventCacheStoreMedia: AsyncTraitDeps + Clone { - /// The error type used by this media cache store. - type Error: fmt::Debug + fmt::Display + Into; - - /// The persisted media retention policy in the media cache. - async fn media_retention_policy_inner( - &self, - ) -> Result, Self::Error>; - - /// Persist the media retention policy in the media cache. - /// - /// # Arguments - /// - /// * `policy` - The `MediaRetentionPolicy` to persist. - async fn set_media_retention_policy_inner( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Add a media file's content in the media cache. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequestParameters` of the file. - /// - /// * `content` - The content of the file. - /// - /// * `current_time` - The current time, to set the last access time of the - /// media. - /// - /// * `policy` - The media retention policy, to check whether the media is - /// too big to be cached. - /// - /// * `ignore_policy` - Whether the `MediaRetentionPolicy` should be ignored - /// for this media. This setting should be persisted alongside the media - /// and taken into account whenever the policy is used. - async fn add_media_content_inner( - &self, - request: &MediaRequestParameters, - content: Vec, - current_time: SystemTime, - policy: MediaRetentionPolicy, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Set whether the current [`MediaRetentionPolicy`] should be ignored for - /// the media. - /// - /// If the media of the given request is not found, this should be a noop. - /// - /// The change will be taken into account in the next cleanup. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequestParameters` of the file. - /// - /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be - /// ignored. - async fn set_ignore_media_retention_policy_inner( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Get a media file's content out of the media cache. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequestParameters` of the file. - /// - /// * `current_time` - The current time, to update the last access time of - /// the media. - async fn get_media_content_inner( - &self, - request: &MediaRequestParameters, - current_time: SystemTime, - ) -> Result>, Self::Error>; - - /// Get a media file's content associated to an `MxcUri` from the - /// media store. - /// - /// # Arguments - /// - /// * `uri` - The `MxcUri` of the media file. - /// - /// * `current_time` - The current time, to update the last access time of - /// the media. - async fn get_media_content_for_uri_inner( - &self, - uri: &MxcUri, - current_time: SystemTime, - ) -> Result>, Self::Error>; - - /// Clean up the media cache with the given policy. - /// - /// For the integration tests, it is expected that content that does not - /// pass the last access expiry and max file size criteria will be - /// removed first. After that, the remaining cache size should be - /// computed to compare against the max cache size criteria. - /// - /// # Arguments - /// - /// * `policy` - The media retention policy to use for the cleanup. The - /// `cleanup_frequency` will be ignored. - /// - /// * `current_time` - The current time, to be used to check for expired - /// content and to be stored as the time of the last media cache cleanup. - async fn clean_up_media_cache_inner( - &self, - policy: MediaRetentionPolicy, - current_time: SystemTime, - ) -> Result<(), Self::Error>; - - /// The time of the last media cache cleanup. - async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error>; -} - /// Whether the [`MediaRetentionPolicy`] should be ignored for the current /// content. /// @@ -544,18 +417,18 @@ mod tests { time::{Duration, SystemTime}, }; - use super::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService, TimeProvider}; + use super::{IgnoreMediaRetentionPolicy, MediaService, MediaStoreInner, TimeProvider}; use crate::{ - event_cache::store::{EventCacheStoreError, media::MediaRetentionPolicy}, + event_cache::store::media::{MediaRetentionPolicy, MediaStoreError}, media::{MediaFormat, MediaRequestParameters, UniqueKey}, }; #[derive(Debug, Default, Clone)] - struct MockEventCacheStoreMedia { - inner: Arc>, + struct MockMediaStoreInner { + inner: Arc>, } - impl MockEventCacheStoreMedia { + impl MockMediaStoreInner { /// Whether the store was accessed. fn accessed(&self) -> bool { self.inner.lock().accessed @@ -570,7 +443,7 @@ mod tests { /// /// Should be called for every access to the inner store as it also sets /// the `accessed` boolean. - fn inner(&self) -> MutexGuard<'_, MockEventCacheStoreMediaInner> { + fn inner(&self) -> MutexGuard<'_, MockMediaStoreInnerInner> { let mut inner = self.inner.lock(); inner.accessed = true; inner @@ -578,7 +451,7 @@ mod tests { } #[derive(Debug, Default)] - struct MockEventCacheStoreMediaInner { + struct MockMediaStoreInnerInner { /// Whether this store was accessed. /// /// Must be set to `true` for any operation that unlocks the store. @@ -614,26 +487,26 @@ mod tests { } #[derive(Debug)] - struct MockEventCacheStoreMediaError; + struct MockMediaStoreInnerError; - impl fmt::Display for MockEventCacheStoreMediaError { + impl fmt::Display for MockMediaStoreInnerError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "MockEventCacheStoreMediaError") + write!(f, "MockMediaStoreInnerError") } } - impl std::error::Error for MockEventCacheStoreMediaError {} + impl std::error::Error for MockMediaStoreInnerError {} - impl From for EventCacheStoreError { - fn from(value: MockEventCacheStoreMediaError) -> Self { + impl From for MediaStoreError { + fn from(value: MockMediaStoreInnerError) -> Self { Self::backend(value) } } #[cfg_attr(target_family = "wasm", async_trait(?Send))] #[cfg_attr(not(target_family = "wasm"), async_trait)] - impl EventCacheStoreMedia for MockEventCacheStoreMedia { - type Error = MockEventCacheStoreMediaError; + impl MediaStoreInner for MockMediaStoreInner { + type Error = MockMediaStoreInnerError; async fn media_retention_policy_inner( &self, @@ -787,7 +660,7 @@ mod tests { let now = SystemTime::UNIX_EPOCH; - let store = MockEventCacheStoreMedia::default(); + let store = MockMediaStoreInner::default(); let service = MediaService::with_time_provider(MockTimeProvider::new(now)); // By default an empty policy is used. @@ -877,7 +750,7 @@ mod tests { let now = SystemTime::UNIX_EPOCH; - let store = MockEventCacheStoreMedia::default(); + let store = MockMediaStoreInner::default(); let service = MediaService::with_time_provider(MockTimeProvider::new(now)); // Check that restoring the policy works. @@ -1034,7 +907,7 @@ mod tests { let now = SystemTime::UNIX_EPOCH; - let store = MockEventCacheStoreMedia::default(); + let store = MockMediaStoreInner::default(); let service = MediaService::with_time_provider(MockTimeProvider::new(now)); // Set an empty policy. diff --git a/crates/matrix-sdk-base/src/event_cache/store/media/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/media/mod.rs index 00a5ce1bb70..b96bdf2a06e 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/media/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/media/mod.rs @@ -16,13 +16,145 @@ mod media_retention_policy; mod media_service; +mod traits; #[cfg(any(test, feature = "testing"))] #[macro_use] pub mod integration_tests; +#[cfg(not(tarpaulin_include))] +use std::fmt; +use std::{ops::Deref, sync::Arc}; + +use matrix_sdk_common::store_locks::{ + BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError, +}; +use matrix_sdk_store_encryption::Error as StoreEncryptionError; +pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner}; + #[cfg(any(test, feature = "testing"))] -pub use self::integration_tests::EventCacheStoreMediaIntegrationTests; +pub use self::integration_tests::{MediaStoreInnerIntegrationTests, MediaStoreIntegrationTests}; pub use self::{ media_retention_policy::MediaRetentionPolicy, - media_service::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService}, + media_service::{IgnoreMediaRetentionPolicy, MediaService}, }; + +/// Media store specific error type. +#[derive(Debug, thiserror::Error)] +pub enum MediaStoreError { + /// An error happened in the underlying database backend. + #[error(transparent)] + Backend(Box), + + /// The store failed to encrypt or decrypt some data. + #[error("Error encrypting or decrypting data from the event cache store: {0}")] + Encryption(#[from] StoreEncryptionError), +} + +impl MediaStoreError { + /// Create a new [`Backend`][Self::Backend] error. + /// + /// Shorthand for `MediaStoreError::Backend(Box::new(error))`. + #[inline] + pub fn backend(error: E) -> Self + where + E: std::error::Error + Send + Sync + 'static, + { + Self::Backend(Box::new(error)) + } +} + +/// An `MediaStore` specific result type. +pub type Result = std::result::Result; + +/// The high-level public type to represent an `MediaStore` lock. +#[derive(Clone)] +pub struct MediaStoreLock { + /// The inner cross process lock that is used to lock the `MediaStore`. + cross_process_lock: Arc>, + + /// The store itself. + /// + /// That's the only place where the store exists. + store: Arc, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for MediaStoreLock { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("MediaStoreLock").finish_non_exhaustive() + } +} + +impl MediaStoreLock { + /// Create a new lock around the [`MediaStore`]. + /// + /// The `holder` argument represents the holder inside the + /// [`CrossProcessStoreLock::new`]. + pub fn new(store: S, holder: String) -> Self + where + S: IntoMediaStore, + { + let store = store.into_event_cache_store(); + + Self { + cross_process_lock: Arc::new(CrossProcessStoreLock::new( + LockableMediaStore(store.clone()), + "default".to_owned(), + holder, + )), + store, + } + } + + /// Acquire a spin lock (see [`CrossProcessStoreLock::spin_lock`]). + pub async fn lock(&self) -> Result, LockStoreError> { + let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?; + + Ok(MediaStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }) + } +} + +/// An RAII implementation of a “scoped lock” of an [`MediaStoreLock`]. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct MediaStoreLockGuard<'a> { + /// The cross process lock guard. + #[allow(unused)] + cross_process_lock_guard: CrossProcessStoreLockGuard, + + /// A reference to the store. + store: &'a DynMediaStore, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for MediaStoreLockGuard<'_> { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("MediaStoreLockGuard").finish_non_exhaustive() + } +} + +impl Deref for MediaStoreLockGuard<'_> { + type Target = DynMediaStore; + + fn deref(&self) -> &Self::Target { + self.store + } +} + +/// A type that wraps the [`MediaStore`] but implements [`BackingStore`] to +/// make it usable inside the cross process lock. +#[derive(Clone, Debug)] +struct LockableMediaStore(Arc); + +impl BackingStore for LockableMediaStore { + type LockError = MediaStoreError; + + async fn try_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> std::result::Result { + self.0.try_take_leased_lock(lease_duration_ms, key, holder).await + } +} diff --git a/crates/matrix-sdk-base/src/event_cache/store/media/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/media/traits.rs new file mode 100644 index 00000000000..d25d53061e4 --- /dev/null +++ b/crates/matrix-sdk-base/src/event_cache/store/media/traits.rs @@ -0,0 +1,429 @@ +// Copyright 2025 Kévin Commaille +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Types and traits regarding media caching of the event cache store. + +use std::{fmt, sync::Arc, time::SystemTime}; + +use async_trait::async_trait; +use matrix_sdk_common::AsyncTraitDeps; +use ruma::MxcUri; + +#[cfg(doc)] +use crate::event_cache::store::media::MediaService; +use crate::{ + event_cache::store::media::{ + IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaStoreError, + }, + media::MediaRequestParameters, +}; + +/// An abstract trait that can be used to implement different store backends +/// for the event cache of the SDK. +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +pub trait MediaStore: AsyncTraitDeps { + /// The error type used by this event cache store. + type Error: fmt::Debug + Into; + + /// Try to take a lock using the given store. + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result; + + /// Add a media file's content in the media store. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequest` of the file. + /// + /// * `content` - The content of the file. + async fn add_media_content( + &self, + request: &MediaRequestParameters, + content: Vec, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Replaces the given media's content key with another one. + /// + /// This should be used whenever a temporary (local) MXID has been used, and + /// it must now be replaced with its actual remote counterpart (after + /// uploading some content, or creating an empty MXC URI). + /// + /// ⚠ No check is performed to ensure that the media formats are consistent, + /// i.e. it's possible to update with a thumbnail key a media that was + /// keyed as a file before. The caller is responsible of ensuring that + /// the replacement makes sense, according to their use case. + /// + /// This should not raise an error when the `from` parameter points to an + /// unknown media, and it should silently continue in this case. + /// + /// # Arguments + /// + /// * `from` - The previous `MediaRequest` of the file. + /// + /// * `to` - The new `MediaRequest` of the file. + async fn replace_media_key( + &self, + from: &MediaRequestParameters, + to: &MediaRequestParameters, + ) -> Result<(), Self::Error>; + + /// Get a media file's content out of the media store. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequest` of the file. + async fn get_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result>, Self::Error>; + + /// Remove a media file's content from the media store. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequest` of the file. + async fn remove_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result<(), Self::Error>; + + /// Get a media file's content associated to an `MxcUri` from the + /// media store. + /// + /// In theory, there could be several files stored using the same URI and a + /// different `MediaFormat`. This API is meant to be used with a media file + /// that has only been stored with a single format. + /// + /// If there are several media files for a given URI in different formats, + /// this API will only return one of them. Which one is left as an + /// implementation detail. + /// + /// # Arguments + /// + /// * `uri` - The `MxcUri` of the media file. + async fn get_media_content_for_uri(&self, uri: &MxcUri) + -> Result>, Self::Error>; + + /// Remove all the media files' content associated to an `MxcUri` from the + /// media store. + /// + /// This should not raise an error when the `uri` parameter points to an + /// unknown media, and it should return an Ok result in this case. + /// + /// # Arguments + /// + /// * `uri` - The `MxcUri` of the media files. + async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error>; + + /// Set the `MediaRetentionPolicy` to use for deciding whether to store or + /// keep media content. + /// + /// # Arguments + /// + /// * `policy` - The `MediaRetentionPolicy` to use. + async fn set_media_retention_policy( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Get the current `MediaRetentionPolicy`. + fn media_retention_policy(&self) -> MediaRetentionPolicy; + + /// Set whether the current [`MediaRetentionPolicy`] should be ignored for + /// the media. + /// + /// The change will be taken into account in the next cleanup. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequestParameters` of the file. + /// + /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be + /// ignored. + async fn set_ignore_media_retention_policy( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Clean up the media cache with the current `MediaRetentionPolicy`. + /// + /// If there is already an ongoing cleanup, this is a noop. + async fn clean_up_media_cache(&self) -> Result<(), Self::Error>; +} + +/// An abstract trait that can be used to implement different store backends +/// for the media cache of the SDK. +/// +/// The main purposes of this trait are to be able to centralize where we handle +/// [`MediaRetentionPolicy`] by wrapping this in a [`MediaService`], and to +/// simplify the implementation of tests by being able to have complete control +/// over the `SystemTime`s provided to the store. +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +pub trait MediaStoreInner: AsyncTraitDeps + Clone { + /// The error type used by this media cache store. + type Error: fmt::Debug + fmt::Display + Into; + + /// The persisted media retention policy in the media cache. + async fn media_retention_policy_inner( + &self, + ) -> Result, Self::Error>; + + /// Persist the media retention policy in the media cache. + /// + /// # Arguments + /// + /// * `policy` - The `MediaRetentionPolicy` to persist. + async fn set_media_retention_policy_inner( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Add a media file's content in the media cache. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequestParameters` of the file. + /// + /// * `content` - The content of the file. + /// + /// * `current_time` - The current time, to set the last access time of the + /// media. + /// + /// * `policy` - The media retention policy, to check whether the media is + /// too big to be cached. + /// + /// * `ignore_policy` - Whether the `MediaRetentionPolicy` should be ignored + /// for this media. This setting should be persisted alongside the media + /// and taken into account whenever the policy is used. + async fn add_media_content_inner( + &self, + request: &MediaRequestParameters, + content: Vec, + current_time: SystemTime, + policy: MediaRetentionPolicy, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Set whether the current [`MediaRetentionPolicy`] should be ignored for + /// the media. + /// + /// If the media of the given request is not found, this should be a noop. + /// + /// The change will be taken into account in the next cleanup. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequestParameters` of the file. + /// + /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be + /// ignored. + async fn set_ignore_media_retention_policy_inner( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error>; + + /// Get a media file's content out of the media cache. + /// + /// # Arguments + /// + /// * `request` - The `MediaRequestParameters` of the file. + /// + /// * `current_time` - The current time, to update the last access time of + /// the media. + async fn get_media_content_inner( + &self, + request: &MediaRequestParameters, + current_time: SystemTime, + ) -> Result>, Self::Error>; + + /// Get a media file's content associated to an `MxcUri` from the + /// media store. + /// + /// # Arguments + /// + /// * `uri` - The `MxcUri` of the media file. + /// + /// * `current_time` - The current time, to update the last access time of + /// the media. + async fn get_media_content_for_uri_inner( + &self, + uri: &MxcUri, + current_time: SystemTime, + ) -> Result>, Self::Error>; + + /// Clean up the media cache with the given policy. + /// + /// For the integration tests, it is expected that content that does not + /// pass the last access expiry and max file size criteria will be + /// removed first. After that, the remaining cache size should be + /// computed to compare against the max cache size criteria. + /// + /// # Arguments + /// + /// * `policy` - The media retention policy to use for the cleanup. The + /// `cleanup_frequency` will be ignored. + /// + /// * `current_time` - The current time, to be used to check for expired + /// content and to be stored as the time of the last media cache cleanup. + async fn clean_up_media_cache_inner( + &self, + policy: MediaRetentionPolicy, + current_time: SystemTime, + ) -> Result<(), Self::Error>; + + /// The time of the last media cache cleanup. + async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error>; +} + +#[repr(transparent)] +struct EraseMediaStoreError(T); + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for EraseMediaStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +impl MediaStore for EraseMediaStoreError { + type Error = MediaStoreError; + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) + } + + async fn add_media_content( + &self, + request: &MediaRequestParameters, + content: Vec, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error> { + self.0.add_media_content(request, content, ignore_policy).await.map_err(Into::into) + } + + async fn replace_media_key( + &self, + from: &MediaRequestParameters, + to: &MediaRequestParameters, + ) -> Result<(), Self::Error> { + self.0.replace_media_key(from, to).await.map_err(Into::into) + } + + async fn get_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result>, Self::Error> { + self.0.get_media_content(request).await.map_err(Into::into) + } + + async fn remove_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result<(), Self::Error> { + self.0.remove_media_content(request).await.map_err(Into::into) + } + + async fn get_media_content_for_uri( + &self, + uri: &MxcUri, + ) -> Result>, Self::Error> { + self.0.get_media_content_for_uri(uri).await.map_err(Into::into) + } + + async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> { + self.0.remove_media_content_for_uri(uri).await.map_err(Into::into) + } + + async fn set_media_retention_policy( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), Self::Error> { + self.0.set_media_retention_policy(policy).await.map_err(Into::into) + } + + fn media_retention_policy(&self) -> MediaRetentionPolicy { + self.0.media_retention_policy() + } + + async fn set_ignore_media_retention_policy( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error> { + self.0.set_ignore_media_retention_policy(request, ignore_policy).await.map_err(Into::into) + } + + async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { + self.0.clean_up_media_cache().await.map_err(Into::into) + } +} + +/// A type-erased [`MediaStore`]. +pub type DynMediaStore = dyn MediaStore; + +/// A type that can be type-erased into `Arc`. +/// +/// This trait is not meant to be implemented directly outside +/// `matrix-sdk-base`, but it is automatically implemented for everything that +/// implements `MediaStore`. +pub trait IntoMediaStore { + #[doc(hidden)] + fn into_event_cache_store(self) -> Arc; +} + +impl IntoMediaStore for Arc { + fn into_event_cache_store(self) -> Arc { + self + } +} + +impl IntoMediaStore for T +where + T: MediaStore + Sized + 'static, +{ + fn into_event_cache_store(self) -> Arc { + Arc::new(EraseMediaStoreError(self)) + } +} + +// Turns a given `Arc` into `Arc` by attaching the +// `MediaStore` impl vtable of `EraseMediaStoreError`. +impl IntoMediaStore for Arc +where + T: MediaStore + 'static, +{ + fn into_event_cache_store(self) -> Arc { + let ptr: *const T = Arc::into_raw(self); + let ptr_erased = ptr as *const EraseMediaStoreError; + // SAFETY: EraseMediaStoreError is repr(transparent) so T and + // EraseMediaStoreError have the same layout and ABI + unsafe { Arc::from_raw(ptr_erased) } + } +} diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index cfbb44cd371..4c5ada743fe 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -16,6 +16,7 @@ use std::{ collections::HashMap, num::NonZeroUsize, sync::{Arc, RwLock as StdRwLock}, + time::SystemTime, }; use async_trait::async_trait; @@ -28,18 +29,20 @@ use matrix_sdk_common::{ store_locks::memory_store_helper::try_take_leased_lock, }; use ruma::{ - EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId, - events::relation::RelationType, - time::{Instant, SystemTime}, + EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId, events::relation::RelationType, + time::Instant, }; use tracing::error; use super::{ EventCacheStore, EventCacheStoreError, Result, compute_filters_string, extract_event_relation, - media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService}, + media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStoreInner}, }; use crate::{ - event_cache::{Event, Gap}, + event_cache::{ + Event, Gap, + store::media::{MediaStore, MediaStoreError}, + }, media::{MediaRequestParameters, UniqueKey as _}, }; @@ -49,14 +52,27 @@ use crate::{ #[derive(Debug, Clone)] pub struct MemoryStore { inner: Arc>, - media_service: MediaService, } #[derive(Debug)] struct MemoryStoreInner { - media: RingBuffer, leases: HashMap, events: RelationalLinkedChunk, +} + +/// In-memory, non-persistent implementation of the `MediaStore`. +/// +/// Default if no other is configured at startup. +#[derive(Debug, Clone)] +pub struct MemoryMediaStore { + inner: Arc>, + media_service: MediaService, +} + +#[derive(Debug)] +struct MemoryMediaStoreInner { + media: RingBuffer, + leases: HashMap, media_retention_policy: Option, last_media_cleanup_time: SystemTime, } @@ -83,6 +99,17 @@ struct MediaContent { const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap(); impl Default for MemoryStore { + fn default() -> Self { + Self { + inner: Arc::new(StdRwLock::new(MemoryStoreInner { + leases: Default::default(), + events: RelationalLinkedChunk::new(), + })), + } + } +} + +impl Default for MemoryMediaStore { fn default() -> Self { // Given that the store is empty, we won't need to clean it up right away. let last_media_cleanup_time = SystemTime::now(); @@ -90,10 +117,9 @@ impl Default for MemoryStore { media_service.restore(None, Some(last_media_cleanup_time)); Self { - inner: Arc::new(StdRwLock::new(MemoryStoreInner { + inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner { media: RingBuffer::new(NUMBER_OF_MEDIAS), leases: Default::default(), - events: RelationalLinkedChunk::new(), media_retention_policy: None, last_media_cleanup_time, })), @@ -109,6 +135,13 @@ impl MemoryStore { } } +impl MemoryMediaStore { + /// Create a new empty MemoryMediaStore + pub fn new() -> Self { + Self::default() + } +} + #[cfg_attr(target_family = "wasm", async_trait(?Send))] #[cfg_attr(not(target_family = "wasm"), async_trait)] impl EventCacheStore for MemoryStore { @@ -272,13 +305,30 @@ impl EventCacheStore for MemoryStore { self.inner.write().unwrap().events.save_item(room_id.to_owned(), event); Ok(()) } +} + +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +impl MediaStore for MemoryMediaStore { + type Error = MediaStoreError; + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + let mut inner = self.inner.write().unwrap(); + + Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder)) + } async fn add_media_content( &self, request: &MediaRequestParameters, data: Vec, ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<()> { + ) -> Result<(), Self::Error> { self.media_service.add_media_content(self, request, data, ignore_policy).await } @@ -301,11 +351,17 @@ impl EventCacheStore for MemoryStore { Ok(()) } - async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { + async fn get_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result>, Self::Error> { self.media_service.get_media_content(self, request).await } - async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { + async fn remove_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result<(), Self::Error> { let expected_key = request.unique_key(); let mut inner = self.inner.write().unwrap(); @@ -328,7 +384,7 @@ impl EventCacheStore for MemoryStore { self.media_service.get_media_content_for_uri(self, uri).await } - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { + async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> { let mut inner = self.inner.write().unwrap(); let positions = inner @@ -372,8 +428,8 @@ impl EventCacheStore for MemoryStore { #[cfg_attr(target_family = "wasm", async_trait(?Send))] #[cfg_attr(not(target_family = "wasm"), async_trait)] -impl EventCacheStoreMedia for MemoryStore { - type Error = EventCacheStoreError; +impl MediaStoreInner for MemoryMediaStore { + type Error = MediaStoreError; async fn media_retention_policy_inner( &self, @@ -578,13 +634,23 @@ impl EventCacheStoreMedia for MemoryStore { #[cfg(test)] mod tests { use super::{MemoryStore, Result}; - use crate::event_cache_store_media_integration_tests; + use crate::{ + event_cache::store::memory_store::MemoryMediaStore, event_cache_store_integration_tests, + event_cache_store_integration_tests_time, media_store_inner_integration_tests, + media_store_integration_tests, media_store_integration_tests_time, + }; async fn get_event_cache_store() -> Result { Ok(MemoryStore::new()) } + async fn get_media_store() -> Result { + Ok(MemoryMediaStore::new()) + } + event_cache_store_integration_tests!(); event_cache_store_integration_tests_time!(); - event_cache_store_media_integration_tests!(with_media_size_tests); + media_store_inner_integration_tests!(); + media_store_integration_tests!(); + media_store_integration_tests_time!(); } diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 85a582b4d1a..707707b6f92 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -42,7 +42,7 @@ use tracing::trace; #[cfg(any(test, feature = "testing"))] pub use self::integration_tests::EventCacheStoreIntegrationTests; pub use self::{ - memory_store::MemoryStore, + memory_store::{MemoryMediaStore, MemoryStore}, traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore}, }; diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index 84f46ac7134..70c5cee6fc3 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -22,16 +22,10 @@ use matrix_sdk_common::{ RawChunk, Update, }, }; -use ruma::{EventId, MxcUri, OwnedEventId, RoomId, events::relation::RelationType}; +use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType}; -use super::{ - EventCacheStoreError, - media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy}, -}; -use crate::{ - event_cache::{Event, Gap}, - media::MediaRequestParameters, -}; +use super::EventCacheStoreError; +use crate::event_cache::{Event, Gap}; /// A default capacity for linked chunks, when manipulating in conjunction with /// an `EventCacheStore` implementation. @@ -169,129 +163,6 @@ pub trait EventCacheStore: AsyncTraitDeps { /// If the event was already stored with the same id, it must be replaced, /// without causing an error. async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error>; - - /// Add a media file's content in the media store. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequest` of the file. - /// - /// * `content` - The content of the file. - async fn add_media_content( - &self, - request: &MediaRequestParameters, - content: Vec, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Replaces the given media's content key with another one. - /// - /// This should be used whenever a temporary (local) MXID has been used, and - /// it must now be replaced with its actual remote counterpart (after - /// uploading some content, or creating an empty MXC URI). - /// - /// ⚠ No check is performed to ensure that the media formats are consistent, - /// i.e. it's possible to update with a thumbnail key a media that was - /// keyed as a file before. The caller is responsible of ensuring that - /// the replacement makes sense, according to their use case. - /// - /// This should not raise an error when the `from` parameter points to an - /// unknown media, and it should silently continue in this case. - /// - /// # Arguments - /// - /// * `from` - The previous `MediaRequest` of the file. - /// - /// * `to` - The new `MediaRequest` of the file. - async fn replace_media_key( - &self, - from: &MediaRequestParameters, - to: &MediaRequestParameters, - ) -> Result<(), Self::Error>; - - /// Get a media file's content out of the media store. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequest` of the file. - async fn get_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result>, Self::Error>; - - /// Remove a media file's content from the media store. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequest` of the file. - async fn remove_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result<(), Self::Error>; - - /// Get a media file's content associated to an `MxcUri` from the - /// media store. - /// - /// In theory, there could be several files stored using the same URI and a - /// different `MediaFormat`. This API is meant to be used with a media file - /// that has only been stored with a single format. - /// - /// If there are several media files for a given URI in different formats, - /// this API will only return one of them. Which one is left as an - /// implementation detail. - /// - /// # Arguments - /// - /// * `uri` - The `MxcUri` of the media file. - async fn get_media_content_for_uri(&self, uri: &MxcUri) - -> Result>, Self::Error>; - - /// Remove all the media files' content associated to an `MxcUri` from the - /// media store. - /// - /// This should not raise an error when the `uri` parameter points to an - /// unknown media, and it should return an Ok result in this case. - /// - /// # Arguments - /// - /// * `uri` - The `MxcUri` of the media files. - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error>; - - /// Set the `MediaRetentionPolicy` to use for deciding whether to store or - /// keep media content. - /// - /// # Arguments - /// - /// * `policy` - The `MediaRetentionPolicy` to use. - async fn set_media_retention_policy( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Get the current `MediaRetentionPolicy`. - fn media_retention_policy(&self) -> MediaRetentionPolicy; - - /// Set whether the current [`MediaRetentionPolicy`] should be ignored for - /// the media. - /// - /// The change will be taken into account in the next cleanup. - /// - /// # Arguments - /// - /// * `request` - The `MediaRequestParameters` of the file. - /// - /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be - /// ignored. - async fn set_ignore_media_retention_policy( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error>; - - /// Clean up the media cache with the current `MediaRetentionPolicy`. - /// - /// If there is already an ongoing cleanup, this is a noop. - async fn clean_up_media_cache(&self) -> Result<(), Self::Error>; } #[repr(transparent)] @@ -390,71 +261,6 @@ impl EventCacheStore for EraseEventCacheStoreError { async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> { self.0.save_event(room_id, event).await.map_err(Into::into) } - - async fn add_media_content( - &self, - request: &MediaRequestParameters, - content: Vec, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error> { - self.0.add_media_content(request, content, ignore_policy).await.map_err(Into::into) - } - - async fn replace_media_key( - &self, - from: &MediaRequestParameters, - to: &MediaRequestParameters, - ) -> Result<(), Self::Error> { - self.0.replace_media_key(from, to).await.map_err(Into::into) - } - - async fn get_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result>, Self::Error> { - self.0.get_media_content(request).await.map_err(Into::into) - } - - async fn remove_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result<(), Self::Error> { - self.0.remove_media_content(request).await.map_err(Into::into) - } - - async fn get_media_content_for_uri( - &self, - uri: &MxcUri, - ) -> Result>, Self::Error> { - self.0.get_media_content_for_uri(uri).await.map_err(Into::into) - } - - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> { - self.0.remove_media_content_for_uri(uri).await.map_err(Into::into) - } - - async fn set_media_retention_policy( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), Self::Error> { - self.0.set_media_retention_policy(policy).await.map_err(Into::into) - } - - fn media_retention_policy(&self) -> MediaRetentionPolicy { - self.0.media_retention_policy() - } - - async fn set_ignore_media_retention_policy( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error> { - self.0.set_ignore_media_retention_policy(request, ignore_policy).await.map_err(Into::into) - } - - async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { - self.0.clean_up_media_cache().await.map_err(Into::into) - } } /// A type-erased [`EventCacheStore`]. diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 73c234a6848..81749527ab2 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -730,6 +730,7 @@ pub struct StoreConfig { pub(crate) crypto_store: Arc, pub(crate) state_store: Arc, pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock, + pub(crate) media_store: event_cache_store::media::MediaStoreLock, cross_process_store_locks_holder_name: String, } @@ -755,6 +756,10 @@ impl StoreConfig { event_cache_store::MemoryStore::new(), cross_process_store_locks_holder_name.clone(), ), + media_store: event_cache_store::media::MediaStoreLock::new( + event_cache_store::MemoryMediaStore::new(), + cross_process_store_locks_holder_name.clone(), + ), cross_process_store_locks_holder_name, } } @@ -785,6 +790,18 @@ impl StoreConfig { ); self } + + /// Set a custom implementation of an `MediaStore`. + pub fn media_store(mut self, media_store: S) -> Self + where + S: event_cache_store::media::IntoMediaStore, + { + self.media_store = event_cache_store::media::MediaStoreLock::new( + media_store, + self.cross_process_store_locks_holder_name.clone(), + ); + self + } } #[cfg(test)] diff --git a/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs b/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs index b8fb67dfdc6..d5aa3e193a4 100644 --- a/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs +++ b/crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs @@ -13,7 +13,9 @@ // limitations under the License use matrix_sdk_base::{ - event_cache::store::{EventCacheStore, EventCacheStoreError, MemoryStore}, + event_cache::store::{ + media::MediaStore, EventCacheStore, EventCacheStoreError, MemoryMediaStore, MemoryStore, + }, SendOutsideWasm, SyncOutsideWasm, }; use serde::de::Error; diff --git a/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs b/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs index 72376cbcc82..8d0e9fde7a3 100644 --- a/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs @@ -20,8 +20,8 @@ use indexed_db_futures::IdbDatabase; use matrix_sdk_base::{ event_cache::{ store::{ - media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy}, - EventCacheStore, MemoryStore, + media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaStore}, + EventCacheStore, MemoryMediaStore, MemoryStore, }, Event, Gap, }, @@ -534,121 +534,6 @@ impl_event_cache_store! { transaction.commit().await?; Ok(()) } - - #[instrument(skip_all)] - async fn add_media_content( - &self, - request: &MediaRequestParameters, - content: Vec, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .add_media_content(request, content, ignore_policy) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - async fn replace_media_key( - &self, - from: &MediaRequestParameters, - to: &MediaRequestParameters, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .replace_media_key(from, to) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - async fn get_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result>, IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .get_media_content(request) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - async fn remove_media_content( - &self, - request: &MediaRequestParameters, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .remove_media_content(request) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip(self))] - async fn get_media_content_for_uri( - &self, - uri: &MxcUri, - ) -> Result>, IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .get_media_content_for_uri(uri) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip(self))] - async fn remove_media_content_for_uri( - &self, - uri: &MxcUri, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .remove_media_content_for_uri(uri) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - async fn set_media_retention_policy( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .set_media_retention_policy(policy) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - fn media_retention_policy(&self) -> MediaRetentionPolicy { - let _timer = timer!("method"); - self.memory_store.media_retention_policy() - } - - #[instrument(skip_all)] - async fn set_ignore_media_retention_policy( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .set_ignore_media_retention_policy(request, ignore_policy) - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } - - #[instrument(skip_all)] - async fn clean_up_media_cache(&self) -> Result<(), IndexeddbEventCacheStoreError> { - let _timer = timer!("method"); - self.memory_store - .clean_up_media_cache() - .await - .map_err(IndexeddbEventCacheStoreError::MemoryStore) - } } #[cfg(test)] diff --git a/crates/matrix-sdk-indexeddb/src/lib.rs b/crates/matrix-sdk-indexeddb/src/lib.rs index df4ee66e49b..9cf630e5fb0 100644 --- a/crates/matrix-sdk-indexeddb/src/lib.rs +++ b/crates/matrix-sdk-indexeddb/src/lib.rs @@ -8,6 +8,8 @@ use thiserror::Error; mod crypto_store; #[cfg(feature = "event-cache-store")] mod event_cache_store; +#[cfg(feature = "event-cache-store")] +mod media_store; mod safe_encode; #[cfg(feature = "e2e-encryption")] mod serialize_bool_for_indexeddb; diff --git a/crates/matrix-sdk-indexeddb/src/media_store/builder.rs b/crates/matrix-sdk-indexeddb/src/media_store/builder.rs new file mode 100644 index 00000000000..6fb1f73f17d --- /dev/null +++ b/crates/matrix-sdk-indexeddb/src/media_store/builder.rs @@ -0,0 +1,37 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +use std::sync::Arc; + +use matrix_sdk_base::event_cache::store::{MemoryMediaStore, MemoryStore}; +use matrix_sdk_store_encryption::StoreCipher; +use web_sys::DomException; + +use crate::{ + media_store::{error::IndexeddbMediaStoreError, IndexeddbMediaStore}, + serializer::IndexeddbSerializer, +}; + +/// A type for conveniently building an [`IndexeddbMediaStore`] +#[derive(Default)] +pub struct IndexeddbMediaStoreBuilder {} + +impl IndexeddbMediaStoreBuilder { + /// Opens the IndexedDB database with the provided name. If successfully + /// opened, builds the [`IndexeddbMediaStore`] with that database + /// and the provided store cipher. + pub fn build(self) -> Result { + Ok(IndexeddbMediaStore { memory_store: MemoryMediaStore::new() }) + } +} diff --git a/crates/matrix-sdk-indexeddb/src/media_store/error.rs b/crates/matrix-sdk-indexeddb/src/media_store/error.rs new file mode 100644 index 00000000000..89b17a19bc1 --- /dev/null +++ b/crates/matrix-sdk-indexeddb/src/media_store/error.rs @@ -0,0 +1,22 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +use matrix_sdk_base::event_cache::store::{media::MediaStore, MemoryMediaStore}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum IndexeddbMediaStoreError { + #[error("media store: {0}")] + MemoryStore(::Error), +} diff --git a/crates/matrix-sdk-indexeddb/src/media_store/mod.rs b/crates/matrix-sdk-indexeddb/src/media_store/mod.rs new file mode 100644 index 00000000000..9fe72d1d304 --- /dev/null +++ b/crates/matrix-sdk-indexeddb/src/media_store/mod.rs @@ -0,0 +1,265 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +mod builder; +mod error; + +use matrix_sdk_base::{ + event_cache::store::{ + media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaStore}, + MemoryMediaStore, + }, + media::MediaRequestParameters, + timer, +}; +use ruma::MxcUri; +use tracing::instrument; + +use crate::media_store::{builder::IndexeddbMediaStoreBuilder, error::IndexeddbMediaStoreError}; + +/// A type for providing an IndexedDB implementation of [`MediaStore`][1]. +/// This is meant to be used as a backend to [`MediaStore`][1] in browser +/// contexts. +/// +/// [1]: matrix_sdk_base::event_cache::store::MediaStore +#[derive(Debug)] +pub struct IndexeddbMediaStore { + // An in-memory store for providing temporary implementations for + // functions of `MediaStore`. + // + // NOTE: This will be removed once we have IndexedDB-backed implementations for all + // functions in `MediaStore`. + memory_store: MemoryMediaStore, +} + +impl IndexeddbMediaStore { + /// Provides a type with which to conveniently build an + /// [`IndexeddbEventCacheStore`] + pub fn builder() -> IndexeddbMediaStoreBuilder { + IndexeddbMediaStoreBuilder::default() + } +} + +// Small hack to have the following macro invocation act as the appropriate +// trait impl block on wasm, but still be compiled on non-wasm as a regular +// impl block otherwise. +// +// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds, +// this hack allows us to still have most of rust-analyzer's IDE functionality +// within the impl block without having to set it up to check things against +// the wasm target (which would disable many other parts of the codebase). +#[cfg(target_arch = "wasm32")] +macro_rules! impl_media_store { + ( $($body:tt)* ) => { + #[async_trait::async_trait(?Send)] + impl MediaStore for IndexeddbMediaStore { + type Error = IndexeddbMediaStoreError; + + $($body)* + } + }; +} + +#[cfg(not(target_arch = "wasm32"))] +macro_rules! impl_media_store { + ( $($body:tt)* ) => { + impl IndexeddbMediaStore { + $($body)* + } + }; +} + +impl_media_store! { + #[instrument(skip(self))] + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + let _timer = timer!("method"); + self.memory_store + .try_take_leased_lock(lease_duration_ms, key, holder) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn add_media_content( + &self, + request: &MediaRequestParameters, + content: Vec, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .add_media_content(request, content, ignore_policy) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn replace_media_key( + &self, + from: &MediaRequestParameters, + to: &MediaRequestParameters, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .replace_media_key(from, to) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn get_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result>, IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .get_media_content(request) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn remove_media_content( + &self, + request: &MediaRequestParameters, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .remove_media_content(request) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip(self))] + async fn get_media_content_for_uri( + &self, + uri: &MxcUri, + ) -> Result>, IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .get_media_content_for_uri(uri) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip(self))] + async fn remove_media_content_for_uri( + &self, + uri: &MxcUri, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .remove_media_content_for_uri(uri) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn set_media_retention_policy( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .set_media_retention_policy(policy) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + fn media_retention_policy(&self) -> MediaRetentionPolicy { + let _timer = timer!("method"); + self.memory_store.media_retention_policy() + } + + #[instrument(skip_all)] + async fn set_ignore_media_retention_policy( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .set_ignore_media_retention_policy(request, ignore_policy) + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } + + #[instrument(skip_all)] + async fn clean_up_media_cache(&self) -> Result<(), IndexeddbMediaStoreError> { + let _timer = timer!("method"); + self.memory_store + .clean_up_media_cache() + .await + .map_err(IndexeddbMediaStoreError::MemoryStore) + } +} + +#[cfg(test)] +mod tests { + use matrix_sdk_base::{ + event_cache::store::media::{MediaStore, MediaStoreError}, + media_store_integration_tests, media_store_integration_tests_time, + }; + use matrix_sdk_test::async_test; + use uuid::Uuid; + + use crate::media_store::{error::IndexeddbMediaStoreError, IndexeddbMediaStore}; + + impl From for MediaStoreError { + fn from(value: IndexeddbMediaStoreError) -> Self { + Self::Backend(Box::new(value)) + } + } + + mod unencrypted { + use super::*; + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[allow(clippy::unused_async)] + async fn get_media_store() -> Result { + let name = format!("test-media-store-{}", Uuid::new_v4().as_hyphenated()); + Ok(IndexeddbMediaStore::builder().build()?) + } + + #[cfg(target_family = "wasm")] + media_store_integration_tests!(); + + #[cfg(target_family = "wasm")] + media_store_integration_tests_time!(); + } + + mod encrypted { + use super::*; + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + async fn get_media_store() -> Result { + let name = format!("test-media-store-{}", Uuid::new_v4().as_hyphenated()); + Ok(IndexeddbMediaStore::builder().build()?) + } + + #[cfg(target_family = "wasm")] + media_store_integration_tests!(); + + #[cfg(target_family = "wasm")] + media_store_integration_tests_time!(); + } +} diff --git a/crates/matrix-sdk-sqlite/migrations/media_store/001_init.sql b/crates/matrix-sdk-sqlite/migrations/media_store/001_init.sql new file mode 100644 index 00000000000..21830c491b1 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/media_store/001_init.sql @@ -0,0 +1,21 @@ +-- basic kv metadata like the database version and store cipher +CREATE TABLE "kv" ( + "key" TEXT PRIMARY KEY NOT NULL, + "value" BLOB NOT NULL +); + +CREATE TABLE "media" ( + "uri" BLOB NOT NULL, + "format" BLOB NOT NULL, + "data" BLOB NOT NULL, + "last_access" INTEGER NOT NULL, + "ignore_policy" BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY ("uri", "format") +); + +CREATE TABLE "lease_locks" ( + "key" TEXT PRIMARY KEY NOT NULL, + "holder" TEXT NOT NULL, + "expiration" REAL NOT NULL +); \ No newline at end of file diff --git a/crates/matrix-sdk-sqlite/src/error.rs b/crates/matrix-sdk-sqlite/src/error.rs index 877031f7f7a..10208e0b084 100644 --- a/crates/matrix-sdk-sqlite/src/error.rs +++ b/crates/matrix-sdk-sqlite/src/error.rs @@ -14,7 +14,7 @@ use deadpool_sqlite::{CreatePoolError, PoolError}; #[cfg(feature = "event-cache")] -use matrix_sdk_base::event_cache::store::EventCacheStoreError; +use matrix_sdk_base::event_cache::store::{media::MediaStoreError, EventCacheStoreError}; #[cfg(feature = "state-store")] use matrix_sdk_base::store::StoreError as StateStoreError; #[cfg(feature = "crypto-store")] @@ -169,4 +169,14 @@ impl From for EventCacheStoreError { } } +#[cfg(feature = "event-cache")] +impl From for MediaStoreError { + fn from(e: Error) -> Self { + match e { + Error::Encryption(e) => MediaStoreError::Encryption(e), + e => MediaStoreError::backend(e), + } + } +} + pub(crate) type Result = std::result::Result; diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 65ba9a73fe0..e902f767dad 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -21,27 +21,18 @@ use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ deserialized_responses::TimelineEvent, event_cache::{ - store::{ - compute_filters_string, extract_event_relation, - media::{ - EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, - MediaService, - }, - EventCacheStore, - }, + store::{compute_filters_string, extract_event_relation, EventCacheStore}, Event, Gap, }, linked_chunk::{ ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, }, - media::{MediaRequestParameters, UniqueKey}, timer, }; use matrix_sdk_store_encryption::StoreCipher; use ruma::{ - events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, - OwnedEventId, RoomId, + events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, }; use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior}; use tokio::{ @@ -53,20 +44,15 @@ use tracing::{debug, error, instrument, trace}; use crate::{ error::{Error, Result}, utils::{ - repeat_vars, time_to_timestamp, EncryptableStore, Key, SqliteAsyncConnExt, - SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt, + repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt, + SqliteKeyValueStoreConnExt, SqliteTransactionExt, }, OpenStoreError, SqliteStoreConfig, }; mod keys { - // Entries in Key-value store - pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy"; - pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time"; - // Tables pub const LINKED_CHUNKS: &str = "linked_chunks"; - pub const MEDIA: &str = "media"; } /// The database name. @@ -99,8 +85,6 @@ pub struct SqliteEventCacheStore { /// operations. All other connections are used for read operations. The /// lock is used to ensure there is one owner at a time. write_connection: Arc>, - - media_service: MediaService, } #[cfg(not(tarpaulin_include))] @@ -164,17 +148,11 @@ impl SqliteEventCacheStore { None => None, }; - let media_service = MediaService::new(); - let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?; - let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?; - media_service.restore(media_retention_policy, last_media_cleanup_time); - Ok(Self { store_cipher, pool, // Use `conn` as our selected write connections. write_connection: Arc::new(Mutex::new(conn)), - media_service, }) } @@ -1304,382 +1282,6 @@ impl EventCacheStore for SqliteEventCacheStore { }) .await } - - #[instrument(skip_all)] - async fn add_media_content( - &self, - request: &MediaRequestParameters, - content: Vec, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<()> { - let _timer = timer!("method"); - - self.media_service.add_media_content(self, request, content, ignore_policy).await - } - - #[instrument(skip_all)] - async fn replace_media_key( - &self, - from: &MediaRequestParameters, - to: &MediaRequestParameters, - ) -> Result<(), Self::Error> { - let _timer = timer!("method"); - - let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key()); - let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key()); - - let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key()); - let new_format = self.encode_key(keys::MEDIA, to.format.unique_key()); - - let conn = self.write().await?; - conn.execute( - r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#, - (new_uri, new_format, prev_uri, prev_format), - ) - .await?; - - Ok(()) - } - - #[instrument(skip_all)] - async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { - let _timer = timer!("method"); - - self.media_service.get_media_content(self, request).await - } - - #[instrument(skip_all)] - async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { - let _timer = timer!("method"); - - let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); - let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - - let conn = self.write().await?; - conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?; - - Ok(()) - } - - #[instrument(skip(self))] - async fn get_media_content_for_uri( - &self, - uri: &MxcUri, - ) -> Result>, Self::Error> { - let _timer = timer!("method"); - - self.media_service.get_media_content_for_uri(self, uri).await - } - - #[instrument(skip(self))] - async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - let _timer = timer!("method"); - - let uri = self.encode_key(keys::MEDIA, uri); - - let conn = self.write().await?; - conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?; - - Ok(()) - } - - #[instrument(skip_all)] - async fn set_media_retention_policy( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), Self::Error> { - let _timer = timer!("method"); - - self.media_service.set_media_retention_policy(self, policy).await - } - - #[instrument(skip_all)] - fn media_retention_policy(&self) -> MediaRetentionPolicy { - let _timer = timer!("method"); - - self.media_service.media_retention_policy() - } - - #[instrument(skip_all)] - async fn set_ignore_media_retention_policy( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error> { - let _timer = timer!("method"); - - self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await - } - - #[instrument(skip_all)] - async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { - let _timer = timer!("method"); - - self.media_service.clean_up_media_cache(self).await - } -} - -#[cfg_attr(target_family = "wasm", async_trait(?Send))] -#[cfg_attr(not(target_family = "wasm"), async_trait)] -impl EventCacheStoreMedia for SqliteEventCacheStore { - type Error = Error; - - async fn media_retention_policy_inner( - &self, - ) -> Result, Self::Error> { - let conn = self.read().await?; - conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await - } - - async fn set_media_retention_policy_inner( - &self, - policy: MediaRetentionPolicy, - ) -> Result<(), Self::Error> { - let conn = self.write().await?; - conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?; - Ok(()) - } - - async fn add_media_content_inner( - &self, - request: &MediaRequestParameters, - data: Vec, - last_access: SystemTime, - policy: MediaRetentionPolicy, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error> { - let ignore_policy = ignore_policy.is_yes(); - let data = self.encode_value(data)?; - - if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) { - return Ok(()); - } - - let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); - let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - let timestamp = time_to_timestamp(last_access); - - let conn = self.write().await?; - conn.execute( - "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)", - (uri, format, data, timestamp, ignore_policy), - ) - .await?; - - Ok(()) - } - - async fn set_ignore_media_retention_policy_inner( - &self, - request: &MediaRequestParameters, - ignore_policy: IgnoreMediaRetentionPolicy, - ) -> Result<(), Self::Error> { - let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); - let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - let ignore_policy = ignore_policy.is_yes(); - - let conn = self.write().await?; - conn.execute( - r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#, - (ignore_policy, uri, format), - ) - .await?; - - Ok(()) - } - - async fn get_media_content_inner( - &self, - request: &MediaRequestParameters, - current_time: SystemTime, - ) -> Result>, Self::Error> { - let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); - let format = self.encode_key(keys::MEDIA, request.format.unique_key()); - let timestamp = time_to_timestamp(current_time); - - let conn = self.write().await?; - let data = conn - .with_transaction::<_, rusqlite::Error, _>(move |txn| { - // Update the last access. - // We need to do this first so the transaction is in write mode right away. - // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions - txn.execute( - "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?", - (timestamp, &uri, &format), - )?; - - txn.query_row::, _, _>( - "SELECT data FROM media WHERE uri = ? AND format = ?", - (&uri, &format), - |row| row.get(0), - ) - .optional() - }) - .await?; - - data.map(|v| self.decode_value(&v).map(Into::into)).transpose() - } - - async fn get_media_content_for_uri_inner( - &self, - uri: &MxcUri, - current_time: SystemTime, - ) -> Result>, Self::Error> { - let uri = self.encode_key(keys::MEDIA, uri); - let timestamp = time_to_timestamp(current_time); - - let conn = self.write().await?; - let data = conn - .with_transaction::<_, rusqlite::Error, _>(move |txn| { - // Update the last access. - // We need to do this first so the transaction is in write mode right away. - // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions - txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?; - - txn.query_row::, _, _>( - "SELECT data FROM media WHERE uri = ?", - (&uri,), - |row| row.get(0), - ) - .optional() - }) - .await?; - - data.map(|v| self.decode_value(&v).map(Into::into)).transpose() - } - - async fn clean_up_media_cache_inner( - &self, - policy: MediaRetentionPolicy, - current_time: SystemTime, - ) -> Result<(), Self::Error> { - if !policy.has_limitations() { - // We can safely skip all the checks. - return Ok(()); - } - - let conn = self.write().await?; - let removed = conn - .with_transaction::<_, Error, _>(move |txn| { - let mut removed = false; - - // First, check media content that exceed the max filesize. - if let Some(max_file_size) = policy.computed_max_file_size() { - let count = txn.execute( - "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?", - (max_file_size,), - )?; - - if count > 0 { - removed = true; - } - } - - // Then, clean up expired media content. - if let Some(last_access_expiry) = policy.last_access_expiry { - let current_timestamp = time_to_timestamp(current_time); - let expiry_secs = last_access_expiry.as_secs(); - let count = txn.execute( - "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?", - (current_timestamp, expiry_secs), - )?; - - if count > 0 { - removed = true; - } - } - - // Finally, if the cache size is too big, remove old items until it fits. - if let Some(max_cache_size) = policy.max_cache_size { - // i64 is the integer type used by SQLite, use it here to avoid usize overflow - // during the conversion of the result. - let cache_size = txn - .query_row( - "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE", - (), - |row| { - // `sum()` returns `NULL` if there are no rows. - row.get::<_, Option>(0) - }, - )? - .unwrap_or_default(); - - // If the cache size is overflowing or bigger than max cache size, clean up. - if cache_size > max_cache_size { - // Get the sizes of the media contents ordered by last access. - let mut cached_stmt = txn.prepare_cached( - "SELECT rowid, length(data) FROM media \ - WHERE ignore_policy IS FALSE ORDER BY last_access DESC", - )?; - let content_sizes = cached_stmt - .query(())? - .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?))); - - let mut accumulated_items_size = 0u64; - let mut limit_reached = false; - let mut rows_to_remove = Vec::new(); - - for result in content_sizes { - let (row_id, size) = match result { - Ok(content_size) => content_size, - Err(error) => { - return Err(error.into()); - } - }; - - if limit_reached { - rows_to_remove.push(row_id); - continue; - } - - match accumulated_items_size.checked_add(size) { - Some(acc) if acc > max_cache_size => { - // We can stop accumulating. - limit_reached = true; - rows_to_remove.push(row_id); - } - Some(acc) => accumulated_items_size = acc, - None => { - // The accumulated size is overflowing but the setting cannot be - // bigger than usize::MAX, we can stop accumulating. - limit_reached = true; - rows_to_remove.push(row_id); - } - } - } - - if !rows_to_remove.is_empty() { - removed = true; - } - - txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| { - let sql_params = repeat_vars(row_ids.len()); - let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})"); - txn.prepare(&query)?.execute(params_from_iter(row_ids))?; - Ok(Vec::<()>::new()) - })?; - } - } - - txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?; - - Ok(removed) - }) - .await?; - - // If we removed media, defragment the database and free space on the - // filesystem. - if removed { - conn.vacuum().await?; - } - - Ok(()) - } - - async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error> { - let conn = self.read().await?; - conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await - } } fn find_event_relations_transaction( @@ -1855,7 +1457,6 @@ mod tests { use std::{ path::PathBuf, sync::atomic::{AtomicU32, Ordering::SeqCst}, - time::Duration, }; use assert_matches::assert_matches; @@ -1865,19 +1466,16 @@ mod tests { integration_tests::{ check_test_event, make_test_event, make_test_event_with_event_id, }, - media::IgnoreMediaRetentionPolicy, EventCacheStore, EventCacheStoreError, }, Gap, }, event_cache_store_integration_tests, event_cache_store_integration_tests_time, - event_cache_store_media_integration_tests, linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update}, - media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings}, }; use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID}; use once_cell::sync::Lazy; - use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint}; + use ruma::{event_id, room_id}; use tempfile::{tempdir, TempDir}; use super::SqliteEventCacheStore; @@ -1905,19 +1503,6 @@ mod tests { event_cache_store_integration_tests!(); event_cache_store_integration_tests_time!(); - event_cache_store_media_integration_tests!(with_media_size_tests); - - async fn get_event_cache_store_content_sorted_by_last_access( - event_cache_store: &SqliteEventCacheStore, - ) -> Vec> { - let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed"); - sqlite_db - .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| { - stmt.query(())?.mapped(|row| row.get(0)).collect() - }) - .await - .expect("querying media cache content by last access failed") - } #[async_test] async fn test_pool_size() { @@ -1929,73 +1514,6 @@ mod tests { assert_eq!(store.pool.status().max_size, 42); } - #[async_test] - async fn test_last_access() { - let event_cache_store = get_event_cache_store().await.expect("creating media cache failed"); - let uri = mxc_uri!("mxc://localhost/media"); - let file_request = MediaRequestParameters { - source: MediaSource::Plain(uri.to_owned()), - format: MediaFormat::File, - }; - let thumbnail_request = MediaRequestParameters { - source: MediaSource::Plain(uri.to_owned()), - format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method( - Method::Crop, - uint!(100), - uint!(100), - )), - }; - - let content: Vec = "hello world".into(); - let thumbnail_content: Vec = "hello…".into(); - - // Add the media. - event_cache_store - .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No) - .await - .expect("adding file failed"); - - // Since the precision of the timestamp is in seconds, wait so the timestamps - // differ. - tokio::time::sleep(Duration::from_secs(3)).await; - - event_cache_store - .add_media_content( - &thumbnail_request, - thumbnail_content.clone(), - IgnoreMediaRetentionPolicy::No, - ) - .await - .expect("adding thumbnail failed"); - - // File's last access is older than thumbnail. - let contents = - get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await; - - assert_eq!(contents.len(), 2, "media cache contents length is wrong"); - assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access"); - assert_eq!(contents[1], content, "file is not second-to-last access"); - - // Since the precision of the timestamp is in seconds, wait so the timestamps - // differ. - tokio::time::sleep(Duration::from_secs(3)).await; - - // Access the file so its last access is more recent. - let _ = event_cache_store - .get_media_content(&file_request) - .await - .expect("getting file failed") - .expect("file is missing"); - - // File's last access is more recent than thumbnail. - let contents = - get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await; - - assert_eq!(contents.len(), 2, "media cache contents length is wrong"); - assert_eq!(contents[0], content, "file is not last access"); - assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access"); - } - #[async_test] async fn test_linked_chunk_new_items_chunk() { let store = get_event_cache_store().await.expect("creating cache store failed"); @@ -2842,7 +2360,6 @@ mod encrypted_tests { use matrix_sdk_base::{ event_cache::store::{EventCacheStore, EventCacheStoreError}, event_cache_store_integration_tests, event_cache_store_integration_tests_time, - event_cache_store_media_integration_tests, }; use matrix_sdk_test::{async_test, event_factory::EventFactory}; use once_cell::sync::Lazy; @@ -2874,7 +2391,6 @@ mod encrypted_tests { event_cache_store_integration_tests!(); event_cache_store_integration_tests_time!(); - event_cache_store_media_integration_tests!(); #[async_test] async fn test_no_sqlite_injection_in_find_event_relations() { diff --git a/crates/matrix-sdk-sqlite/src/lib.rs b/crates/matrix-sdk-sqlite/src/lib.rs index 91d931271d4..4ab6b51820e 100644 --- a/crates/matrix-sdk-sqlite/src/lib.rs +++ b/crates/matrix-sdk-sqlite/src/lib.rs @@ -21,6 +21,8 @@ mod crypto_store; mod error; #[cfg(feature = "event-cache")] mod event_cache_store; +#[cfg(feature = "event-cache")] +mod media_store; #[cfg(feature = "state-store")] mod state_store; mod utils; @@ -37,6 +39,8 @@ pub use self::crypto_store::SqliteCryptoStore; pub use self::error::OpenStoreError; #[cfg(feature = "event-cache")] pub use self::event_cache_store::SqliteEventCacheStore; +#[cfg(feature = "event-cache")] +pub use self::media_store::SqliteMediaStore; #[cfg(feature = "state-store")] pub use self::state_store::{SqliteStateStore, DATABASE_NAME as STATE_STORE_DATABASE_NAME}; diff --git a/crates/matrix-sdk-sqlite/src/media_store.rs b/crates/matrix-sdk-sqlite/src/media_store.rs new file mode 100644 index 00000000000..688082ee4e4 --- /dev/null +++ b/crates/matrix-sdk-sqlite/src/media_store.rs @@ -0,0 +1,797 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An SQLite-based backend for the [`EventCacheStore`]. + +use std::{fmt, path::Path, sync::Arc}; + +use async_trait::async_trait; +use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; +use matrix_sdk_base::{ + event_cache::store::media::{ + IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore, MediaStoreInner, + }, + media::{MediaRequestParameters, UniqueKey}, + timer, +}; +use matrix_sdk_store_encryption::StoreCipher; +use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri}; +use rusqlite::{params_from_iter, OptionalExtension}; +use tokio::{ + fs, + sync::{Mutex, OwnedMutexGuard}, +}; +use tracing::{debug, instrument, trace}; + +use crate::{ + error::{Error, Result}, + utils::{ + repeat_vars, time_to_timestamp, EncryptableStore, SqliteAsyncConnExt, + SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt, + }, + OpenStoreError, SqliteStoreConfig, +}; + +mod keys { + // Entries in Key-value store + pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy"; + pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time"; + + // Tables + pub const MEDIA: &str = "media"; +} + +/// The database name. +const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3"; + +/// Identifier of the latest database version. +/// +/// This is used to figure whether the SQLite database requires a migration. +/// Every new SQL migration should imply a bump of this number, and changes in +/// the [`run_migrations`] function. +const DATABASE_VERSION: u8 = 1; + +/// An SQLite-based event cache store. +#[derive(Clone)] +pub struct SqliteMediaStore { + store_cipher: Option>, + + /// The pool of connections. + pool: SqlitePool, + + /// We make the difference between connections for read operations, and for + /// write operations. We keep a single connection apart from write + /// operations. All other connections are used for read operations. The + /// lock is used to ensure there is one owner at a time. + write_connection: Arc>, + + media_service: MediaService, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for SqliteMediaStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SqliteMediaStore").finish_non_exhaustive() + } +} + +impl EncryptableStore for SqliteMediaStore { + fn get_cypher(&self) -> Option<&StoreCipher> { + self.store_cipher.as_deref() + } +} + +impl SqliteMediaStore { + /// Open the SQLite-based event cache store at the given path using the + /// given passphrase to encrypt private data. + pub async fn open( + path: impl AsRef, + passphrase: Option<&str>, + ) -> Result { + Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await + } + + /// Open the SQLite-based event cache store with the config open config. + #[instrument(skip(config), fields(path = ?config.path))] + pub async fn open_with_config(config: SqliteStoreConfig) -> Result { + debug!(?config); + + let _timer = timer!("open_with_config"); + + let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config; + + fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?; + + let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME)); + config.pool = Some(pool_config); + + let pool = config.create_pool(Runtime::Tokio1)?; + + let this = Self::open_with_pool(pool, passphrase.as_deref()).await?; + this.write().await?.apply_runtime_config(runtime_config).await?; + + Ok(this) + } + + /// Open an SQLite-based event cache store using the given SQLite database + /// pool. The given passphrase will be used to encrypt private data. + async fn open_with_pool( + pool: SqlitePool, + passphrase: Option<&str>, + ) -> Result { + let conn = pool.get().await?; + + let version = conn.db_version().await?; + run_migrations(&conn, version).await?; + + let store_cipher = match passphrase { + Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)), + None => None, + }; + + let media_service = MediaService::new(); + let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?; + let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?; + media_service.restore(media_retention_policy, last_media_cleanup_time); + + Ok(Self { + store_cipher, + pool, + // Use `conn` as our selected write connections. + write_connection: Arc::new(Mutex::new(conn)), + media_service, + }) + } + + // Acquire a connection for executing read operations. + #[instrument(skip_all)] + async fn read(&self) -> Result { + trace!("Taking a `read` connection"); + let _timer = timer!("connection"); + + let connection = self.pool.get().await?; + + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key + // support must be enabled on a per-connection basis. Execute it every + // time we try to get a connection, since we can't guarantee a previous + // connection did enable it before. + connection.execute_batch("PRAGMA foreign_keys = ON;").await?; + + Ok(connection) + } + + // Acquire a connection for executing write operations. + #[instrument(skip_all)] + async fn write(&self) -> Result> { + trace!("Taking a `write` connection"); + let _timer = timer!("connection"); + + let connection = self.write_connection.clone().lock_owned().await; + + // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key + // support must be enabled on a per-connection basis. Execute it every + // time we try to get a connection, since we can't guarantee a previous + // connection did enable it before. + connection.execute_batch("PRAGMA foreign_keys = ON;").await?; + + Ok(connection) + } +} + +/// Run migrations for the given version of the database. +async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { + if version == 0 { + debug!("Creating database"); + } else if version < DATABASE_VERSION { + debug!(version, new_version = DATABASE_VERSION, "Upgrading database"); + } else { + return Ok(()); + } + + // Always enable foreign keys for the current connection. + conn.execute_batch("PRAGMA foreign_keys = ON;").await?; + + if version < 1 { + // First turn on WAL mode, this can't be done in the transaction, it fails with + // the error message: "cannot change into wal mode from within a transaction". + conn.execute_batch("PRAGMA journal_mode = wal;").await?; + conn.with_transaction(|txn| { + txn.execute_batch(include_str!("../migrations/media_store/001_init.sql"))?; + txn.set_db_version(1) + }) + .await?; + } + + Ok(()) +} + +#[async_trait] +impl MediaStore for SqliteMediaStore { + type Error = Error; + + #[instrument(skip(self))] + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + let _timer = timer!("method"); + + let key = key.to_owned(); + let holder = holder.to_owned(); + + let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration = now + lease_duration_ms as u64; + + let num_touched = self + .write() + .await? + .with_transaction(move |txn| { + txn.execute( + "INSERT INTO lease_locks (key, holder, expiration) + VALUES (?1, ?2, ?3) + ON CONFLICT (key) + DO + UPDATE SET holder = ?2, expiration = ?3 + WHERE holder = ?2 + OR expiration < ?4 + ", + (key, holder, expiration, now), + ) + }) + .await?; + + Ok(num_touched == 1) + } + + async fn add_media_content( + &self, + request: &MediaRequestParameters, + content: Vec, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<()> { + let _timer = timer!("method"); + + self.media_service.add_media_content(self, request, content, ignore_policy).await + } + + #[instrument(skip_all)] + async fn replace_media_key( + &self, + from: &MediaRequestParameters, + to: &MediaRequestParameters, + ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + + let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key()); + let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key()); + + let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key()); + let new_format = self.encode_key(keys::MEDIA, to.format.unique_key()); + + let conn = self.write().await?; + conn.execute( + r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#, + (new_uri, new_format, prev_uri, prev_format), + ) + .await?; + + Ok(()) + } + + #[instrument(skip_all)] + async fn get_media_content(&self, request: &MediaRequestParameters) -> Result>> { + let _timer = timer!("method"); + + self.media_service.get_media_content(self, request).await + } + + #[instrument(skip_all)] + async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { + let _timer = timer!("method"); + + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); + let format = self.encode_key(keys::MEDIA, request.format.unique_key()); + + let conn = self.write().await?; + conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?; + + Ok(()) + } + + #[instrument(skip(self))] + async fn get_media_content_for_uri( + &self, + uri: &MxcUri, + ) -> Result>, Self::Error> { + let _timer = timer!("method"); + + self.media_service.get_media_content_for_uri(self, uri).await + } + + #[instrument(skip(self))] + async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { + let _timer = timer!("method"); + + let uri = self.encode_key(keys::MEDIA, uri); + + let conn = self.write().await?; + conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?; + + Ok(()) + } + + #[instrument(skip_all)] + async fn set_media_retention_policy( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + + self.media_service.set_media_retention_policy(self, policy).await + } + + #[instrument(skip_all)] + fn media_retention_policy(&self) -> MediaRetentionPolicy { + let _timer = timer!("method"); + + self.media_service.media_retention_policy() + } + + #[instrument(skip_all)] + async fn set_ignore_media_retention_policy( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error> { + let _timer = timer!("method"); + + self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await + } + + #[instrument(skip_all)] + async fn clean_up_media_cache(&self) -> Result<(), Self::Error> { + let _timer = timer!("method"); + + self.media_service.clean_up_media_cache(self).await + } +} + +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +impl MediaStoreInner for SqliteMediaStore { + type Error = Error; + + async fn media_retention_policy_inner( + &self, + ) -> Result, Self::Error> { + let conn = self.read().await?; + conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await + } + + async fn set_media_retention_policy_inner( + &self, + policy: MediaRetentionPolicy, + ) -> Result<(), Self::Error> { + let conn = self.write().await?; + conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?; + Ok(()) + } + + async fn add_media_content_inner( + &self, + request: &MediaRequestParameters, + data: Vec, + last_access: SystemTime, + policy: MediaRetentionPolicy, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error> { + let ignore_policy = ignore_policy.is_yes(); + let data = self.encode_value(data)?; + + if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) { + return Ok(()); + } + + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); + let format = self.encode_key(keys::MEDIA, request.format.unique_key()); + let timestamp = time_to_timestamp(last_access); + + let conn = self.write().await?; + conn.execute( + "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)", + (uri, format, data, timestamp, ignore_policy), + ) + .await?; + + Ok(()) + } + + async fn set_ignore_media_retention_policy_inner( + &self, + request: &MediaRequestParameters, + ignore_policy: IgnoreMediaRetentionPolicy, + ) -> Result<(), Self::Error> { + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); + let format = self.encode_key(keys::MEDIA, request.format.unique_key()); + let ignore_policy = ignore_policy.is_yes(); + + let conn = self.write().await?; + conn.execute( + r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#, + (ignore_policy, uri, format), + ) + .await?; + + Ok(()) + } + + async fn get_media_content_inner( + &self, + request: &MediaRequestParameters, + current_time: SystemTime, + ) -> Result>, Self::Error> { + let uri = self.encode_key(keys::MEDIA, request.source.unique_key()); + let format = self.encode_key(keys::MEDIA, request.format.unique_key()); + let timestamp = time_to_timestamp(current_time); + + let conn = self.write().await?; + let data = conn + .with_transaction::<_, rusqlite::Error, _>(move |txn| { + // Update the last access. + // We need to do this first so the transaction is in write mode right away. + // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions + txn.execute( + "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?", + (timestamp, &uri, &format), + )?; + + txn.query_row::, _, _>( + "SELECT data FROM media WHERE uri = ? AND format = ?", + (&uri, &format), + |row| row.get(0), + ) + .optional() + }) + .await?; + + data.map(|v| self.decode_value(&v).map(Into::into)).transpose() + } + + async fn get_media_content_for_uri_inner( + &self, + uri: &MxcUri, + current_time: SystemTime, + ) -> Result>, Self::Error> { + let uri = self.encode_key(keys::MEDIA, uri); + let timestamp = time_to_timestamp(current_time); + + let conn = self.write().await?; + let data = conn + .with_transaction::<_, rusqlite::Error, _>(move |txn| { + // Update the last access. + // We need to do this first so the transaction is in write mode right away. + // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions + txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?; + + txn.query_row::, _, _>( + "SELECT data FROM media WHERE uri = ?", + (&uri,), + |row| row.get(0), + ) + .optional() + }) + .await?; + + data.map(|v| self.decode_value(&v).map(Into::into)).transpose() + } + + async fn clean_up_media_cache_inner( + &self, + policy: MediaRetentionPolicy, + current_time: SystemTime, + ) -> Result<(), Self::Error> { + if !policy.has_limitations() { + // We can safely skip all the checks. + return Ok(()); + } + + let conn = self.write().await?; + let removed = conn + .with_transaction::<_, Error, _>(move |txn| { + let mut removed = false; + + // First, check media content that exceed the max filesize. + if let Some(max_file_size) = policy.computed_max_file_size() { + let count = txn.execute( + "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?", + (max_file_size,), + )?; + + if count > 0 { + removed = true; + } + } + + // Then, clean up expired media content. + if let Some(last_access_expiry) = policy.last_access_expiry { + let current_timestamp = time_to_timestamp(current_time); + let expiry_secs = last_access_expiry.as_secs(); + let count = txn.execute( + "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?", + (current_timestamp, expiry_secs), + )?; + + if count > 0 { + removed = true; + } + } + + // Finally, if the cache size is too big, remove old items until it fits. + if let Some(max_cache_size) = policy.max_cache_size { + // i64 is the integer type used by SQLite, use it here to avoid usize overflow + // during the conversion of the result. + let cache_size = txn + .query_row( + "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE", + (), + |row| { + // `sum()` returns `NULL` if there are no rows. + row.get::<_, Option>(0) + }, + )? + .unwrap_or_default(); + + // If the cache size is overflowing or bigger than max cache size, clean up. + if cache_size > max_cache_size { + // Get the sizes of the media contents ordered by last access. + let mut cached_stmt = txn.prepare_cached( + "SELECT rowid, length(data) FROM media \ + WHERE ignore_policy IS FALSE ORDER BY last_access DESC", + )?; + let content_sizes = cached_stmt + .query(())? + .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?))); + + let mut accumulated_items_size = 0u64; + let mut limit_reached = false; + let mut rows_to_remove = Vec::new(); + + for result in content_sizes { + let (row_id, size) = match result { + Ok(content_size) => content_size, + Err(error) => { + return Err(error.into()); + } + }; + + if limit_reached { + rows_to_remove.push(row_id); + continue; + } + + match accumulated_items_size.checked_add(size) { + Some(acc) if acc > max_cache_size => { + // We can stop accumulating. + limit_reached = true; + rows_to_remove.push(row_id); + } + Some(acc) => accumulated_items_size = acc, + None => { + // The accumulated size is overflowing but the setting cannot be + // bigger than usize::MAX, we can stop accumulating. + limit_reached = true; + rows_to_remove.push(row_id); + } + } + } + + if !rows_to_remove.is_empty() { + removed = true; + } + + txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| { + let sql_params = repeat_vars(row_ids.len()); + let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})"); + txn.prepare(&query)?.execute(params_from_iter(row_ids))?; + Ok(Vec::<()>::new()) + })?; + } + } + + txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?; + + Ok(removed) + }) + .await?; + + // If we removed media, defragment the database and free space on the + // filesystem. + if removed { + conn.vacuum().await?; + } + + Ok(()) + } + + async fn last_media_cleanup_time_inner(&self) -> Result, Self::Error> { + let conn = self.read().await?; + conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await + } +} + +#[cfg(test)] +mod tests { + use std::{ + path::PathBuf, + sync::atomic::{AtomicU32, Ordering::SeqCst}, + time::Duration, + }; + + use matrix_sdk_base::{ + event_cache::store::media::{ + IgnoreMediaRetentionPolicy, MediaStore, MediaStoreError, + }, + media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings}, + media_store_inner_integration_tests, media_store_integration_tests, + media_store_integration_tests_time, + }; + use matrix_sdk_test::async_test; + use once_cell::sync::Lazy; + use ruma::{events::room::MediaSource, media::Method, mxc_uri, uint}; + use tempfile::{tempdir, TempDir}; + + use super::SqliteMediaStore; + use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig}; + + static TMP_DIR: Lazy = Lazy::new(|| tempdir().unwrap()); + static NUM: AtomicU32 = AtomicU32::new(0); + + fn new_media_store_workspace() -> PathBuf { + let name = NUM.fetch_add(1, SeqCst).to_string(); + TMP_DIR.path().join(name) + } + + async fn get_media_store() -> Result { + let tmpdir_path = new_media_store_workspace(); + + tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap()); + + Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap()) + } + + media_store_integration_tests!(); + media_store_integration_tests_time!(); + media_store_inner_integration_tests!(); + + async fn get_media_store_content_sorted_by_last_access( + media_store: &SqliteMediaStore, + ) -> Vec> { + let sqlite_db = media_store.read().await.expect("accessing sqlite db failed"); + sqlite_db + .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| { + stmt.query(())?.mapped(|row| row.get(0)).collect() + }) + .await + .expect("querying media cache content by last access failed") + } + + #[async_test] + async fn test_pool_size() { + let tmpdir_path = new_media_store_workspace(); + let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42); + + let store = SqliteMediaStore::open_with_config(store_open_config).await.unwrap(); + + assert_eq!(store.pool.status().max_size, 42); + } + + #[async_test] + async fn test_last_access() { + let media_store = get_media_store().await.expect("creating media cache failed"); + let uri = mxc_uri!("mxc://localhost/media"); + let file_request = MediaRequestParameters { + source: MediaSource::Plain(uri.to_owned()), + format: MediaFormat::File, + }; + let thumbnail_request = MediaRequestParameters { + source: MediaSource::Plain(uri.to_owned()), + format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method( + Method::Crop, + uint!(100), + uint!(100), + )), + }; + + let content: Vec = "hello world".into(); + let thumbnail_content: Vec = "hello…".into(); + + // Add the media. + media_store + .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No) + .await + .expect("adding file failed"); + + // Since the precision of the timestamp is in seconds, wait so the timestamps + // differ. + tokio::time::sleep(Duration::from_secs(3)).await; + + media_store + .add_media_content( + &thumbnail_request, + thumbnail_content.clone(), + IgnoreMediaRetentionPolicy::No, + ) + .await + .expect("adding thumbnail failed"); + + // File's last access is older than thumbnail. + let contents = get_media_store_content_sorted_by_last_access(&media_store).await; + + assert_eq!(contents.len(), 2, "media cache contents length is wrong"); + assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access"); + assert_eq!(contents[1], content, "file is not second-to-last access"); + + // Since the precision of the timestamp is in seconds, wait so the timestamps + // differ. + tokio::time::sleep(Duration::from_secs(3)).await; + + // Access the file so its last access is more recent. + let _ = media_store + .get_media_content(&file_request) + .await + .expect("getting file failed") + .expect("file is missing"); + + // File's last access is more recent than thumbnail. + let contents = get_media_store_content_sorted_by_last_access(&media_store).await; + + assert_eq!(contents.len(), 2, "media cache contents length is wrong"); + assert_eq!(contents[0], content, "file is not last access"); + assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access"); + } +} + +#[cfg(test)] +mod encrypted_tests { + use std::sync::atomic::{AtomicU32, Ordering::SeqCst}; + + use matrix_sdk_base::{ + event_cache::store::media::MediaStoreError, media_store_inner_integration_tests, + media_store_integration_tests, media_store_integration_tests_time, + }; + use once_cell::sync::Lazy; + use tempfile::{tempdir, TempDir}; + + use super::SqliteMediaStore; + + static TMP_DIR: Lazy = Lazy::new(|| tempdir().unwrap()); + static NUM: AtomicU32 = AtomicU32::new(0); + + async fn get_media_store() -> Result { + let name = NUM.fetch_add(1, SeqCst).to_string(); + let tmpdir_path = TMP_DIR.path().join(name); + + tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap()); + + Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password")) + .await + .unwrap()) + } + + media_store_integration_tests!(); + media_store_integration_tests_time!(); + media_store_inner_integration_tests!(); +} diff --git a/crates/matrix-sdk/src/client/builder/mod.rs b/crates/matrix-sdk/src/client/builder/mod.rs index af4b8d23d28..48c78d5a1af 100644 --- a/crates/matrix-sdk/src/client/builder/mod.rs +++ b/crates/matrix-sdk/src/client/builder/mod.rs @@ -666,11 +666,20 @@ async fn build_store_config( .event_cache_store({ let mut config = config.clone(); - if let Some(cache_path) = cache_path { + if let Some(ref cache_path) = cache_path { config = config.path(cache_path); } matrix_sdk_sqlite::SqliteEventCacheStore::open_with_config(config).await? + }) + .media_store({ + let mut config = config.clone(); + + if let Some(ref cache_path) = cache_path { + config = config.path(cache_path); + } + + matrix_sdk_sqlite::SqliteMediaStore::open_with_config(config).await? }); #[cfg(feature = "e2e-encryption")] diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 0337ea01b68..79e23d5352b 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -31,7 +31,7 @@ use futures_util::StreamExt; #[cfg(feature = "e2e-encryption")] use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings}; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreLock, + event_cache::store::{media::MediaStoreLock, EventCacheStoreLock}, store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse}, sync::{Notification, RoomUpdates}, BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta, @@ -726,6 +726,11 @@ impl Client { self.base_client().event_cache_store() } + /// Get a reference to the media store. + pub fn media_store(&self) -> &MediaStoreLock { + self.base_client().media_store() + } + /// Access the native Matrix authentication API with this client. pub fn matrix_auth(&self) -> MatrixAuth { MatrixAuth::new(self.clone()) diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index 34a05236bfc..f360710ab33 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -25,8 +25,8 @@ use matrix_sdk_base::crypto::{ CryptoStoreError, DecryptorError, KeyExportError, MegolmError, OlmError, }; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreError, Error as SdkBaseError, QueueWedgeError, RoomState, - StoreError, + event_cache::store::{media::MediaStoreError, EventCacheStoreError}, + Error as SdkBaseError, QueueWedgeError, RoomState, StoreError, }; use reqwest::Error as ReqwestError; use ruma::{ @@ -340,6 +340,10 @@ pub enum Error { #[error(transparent)] EventCacheStore(Box), + /// An error occurred in the media store. + #[error(transparent)] + MediaStore(Box), + /// An error encountered when trying to parse an identifier. #[error(transparent)] Identifier(#[from] IdParseError), @@ -507,6 +511,12 @@ impl From for Error { } } +impl From for Error { + fn from(error: MediaStoreError) -> Self { + Error::MediaStore(Box::new(error)) + } +} + #[cfg(feature = "qrcode")] impl From for Error { fn from(error: ScanError) -> Self { diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 65dc912defd..aa01fdbc534 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -82,7 +82,8 @@ pub use http_client::TransmissionProgress; pub use matrix_sdk_sqlite::SqliteCryptoStore; #[cfg(feature = "sqlite")] pub use matrix_sdk_sqlite::{ - SqliteEventCacheStore, SqliteStateStore, SqliteStoreConfig, STATE_STORE_DATABASE_NAME, + SqliteEventCacheStore, SqliteMediaStore, SqliteStateStore, SqliteStoreConfig, + STATE_STORE_DATABASE_NAME, }; pub use media::Media; pub use pusher::Pusher; diff --git a/crates/matrix-sdk/src/media.rs b/crates/matrix-sdk/src/media.rs index 7a5e0f18e30..90a88b40b95 100644 --- a/crates/matrix-sdk/src/media.rs +++ b/crates/matrix-sdk/src/media.rs @@ -428,7 +428,7 @@ impl Media { // Read from the cache. if use_cache { if let Some(content) = - self.client.event_cache_store().lock().await?.get_media_content(request).await? + self.client.media_store().lock().await?.get_media_content(request).await? { return Ok(content); } @@ -520,7 +520,7 @@ impl Media { if use_cache { self.client - .event_cache_store() + .media_store() .lock() .await? .add_media_content(request, content.clone(), IgnoreMediaRetentionPolicy::No) @@ -538,7 +538,7 @@ impl Media { async fn get_local_media_content(&self, uri: &MxcUri) -> Result> { // Read from the cache. self.client - .event_cache_store() + .media_store() .lock() .await? .get_media_content_for_uri(uri) @@ -552,7 +552,7 @@ impl Media { /// /// * `request` - The `MediaRequest` of the content. pub async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> { - Ok(self.client.event_cache_store().lock().await?.remove_media_content(request).await?) + Ok(self.client.media_store().lock().await?.remove_media_content(request).await?) } /// Delete all the media content corresponding to the given @@ -562,7 +562,7 @@ impl Media { /// /// * `uri` - The `MxcUri` of the files. pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - Ok(self.client.event_cache_store().lock().await?.remove_media_content_for_uri(uri).await?) + Ok(self.client.media_store().lock().await?.remove_media_content_for_uri(uri).await?) } /// Get the file of the given media event content. @@ -697,20 +697,20 @@ impl Media { /// /// * `policy` - The `MediaRetentionPolicy` to use. pub async fn set_media_retention_policy(&self, policy: MediaRetentionPolicy) -> Result<()> { - self.client.event_cache_store().lock().await?.set_media_retention_policy(policy).await?; + self.client.media_store().lock().await?.set_media_retention_policy(policy).await?; Ok(()) } /// Get the current `MediaRetentionPolicy`. pub async fn media_retention_policy(&self) -> Result { - Ok(self.client.event_cache_store().lock().await?.media_retention_policy()) + Ok(self.client.media_store().lock().await?.media_retention_policy()) } /// Clean up the media cache with the current [`MediaRetentionPolicy`]. /// /// If there is already an ongoing cleanup, this is a noop. pub async fn clean_up_media_cache(&self) -> Result<()> { - self.client.event_cache_store().lock().await?.clean_up_media_cache().await?; + self.client.media_store().lock().await?.clean_up_media_cache().await?; Ok(()) } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 022a7f14926..36a48b9226d 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -2426,7 +2426,7 @@ impl Room { .await?; if store_in_cache { - let cache_store_lock_guard = self.client.event_cache_store().lock().await?; + let media_store_lock_guard = self.client.media_store().lock().await?; // A failure to cache shouldn't prevent the whole upload from finishing // properly, so only log errors during caching. @@ -2435,7 +2435,7 @@ impl Room { let request = MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File }; - if let Err(err) = cache_store_lock_guard + if let Err(err) = media_store_lock_guard .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No) .await { @@ -2452,7 +2452,7 @@ impl Room { format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)), }; - if let Err(err) = cache_store_lock_guard + if let Err(err) = media_store_lock_guard .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No) .await { diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index d7146031041..71a745d98bd 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -141,7 +141,7 @@ use eyeball::SharedObservable; #[cfg(feature = "unstable-msc4274")] use matrix_sdk_base::store::FinishGalleryItemInfo; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreError, + event_cache::store::{media::MediaStoreError, EventCacheStoreError}, media::MediaRequestParameters, store::{ ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore, @@ -842,7 +842,7 @@ impl RoomSendQueue { let fut = async move { let data = room .client() - .event_cache_store() + .media_store() .lock() .await? .get_media_content(&cache_key) @@ -2348,6 +2348,10 @@ pub enum RoomSendQueueStorageError { #[error(transparent)] EventCacheStoreError(#[from] EventCacheStoreError), + /// Error caused by the event cache store. + #[error(transparent)] + MediaStoreError(#[from] MediaStoreError), + /// Error caused when attempting to get a handle on the event cache store. #[error(transparent)] LockError(#[from] LockStoreError), diff --git a/crates/matrix-sdk/src/send_queue/progress.rs b/crates/matrix-sdk/src/send_queue/progress.rs index 72fa1b900bc..11a0de2fab0 100644 --- a/crates/matrix-sdk/src/send_queue/progress.rs +++ b/crates/matrix-sdk/src/send_queue/progress.rs @@ -94,17 +94,17 @@ impl RoomSendQueue { }; // Get the size of the file being uploaded from the event cache. - let bytes = match room.client().event_cache_store().lock().await { + let bytes = match room.client().media_store().lock().await { Ok(cache) => match cache.get_media_content(cache_key).await { Ok(Some(content)) => content.len(), Ok(None) => 0, Err(err) => { - warn!("error when reading media content from cache store: {err}"); + warn!("error when reading media content from media store: {err}"); 0 } }, Err(err) => { - warn!("couldn't acquire cache store lock: {err}"); + warn!("couldn't acquire media store lock: {err}"); 0 } }; @@ -195,9 +195,9 @@ impl RoomSendQueue { return Ok(None); } - let cache_store_guard = client.event_cache_store().lock().await?; + let media_store_guard = client.media_store().lock().await?; - let maybe_content = cache_store_guard.get_media_content(&cache_key).await?; + let maybe_content = media_store_guard.get_media_content(&cache_key).await?; Ok(maybe_content.map(|c| c.len())) } diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index c12b5d496bb..8d5e2a97a4b 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -420,14 +420,11 @@ impl RoomSendQueue { file_media_request: &MediaRequestParameters, ) -> Result { let client = room.client(); - let cache_store = client - .event_cache_store() - .lock() - .await - .map_err(RoomSendQueueStorageError::LockError)?; + let media_store = + client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?; // Cache the file itself in the cache store. - cache_store + media_store .add_media_content( file_media_request, data, @@ -435,7 +432,7 @@ impl RoomSendQueue { IgnoreMediaRetentionPolicy::Yes, ) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; // Process the thumbnail, if it's been provided. if let Some(thumbnail) = thumbnail { @@ -449,7 +446,7 @@ impl RoomSendQueue { // Cache thumbnail in the cache store. let thumbnail_media_request = Media::make_local_file_media_request(&txn); - cache_store + media_store .add_media_content( &thumbnail_media_request, data, @@ -457,7 +454,7 @@ impl RoomSendQueue { IgnoreMediaRetentionPolicy::Yes, ) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; Ok(MediaCacheResult { upload_thumbnail_txn: Some(txn.clone()), @@ -793,12 +790,12 @@ impl QueueStorage { // At this point, all the requests and dependent requests have been cleaned up. // Perform the final step: empty the cache from the local items. { - let event_cache = client.event_cache_store().lock().await?; - event_cache + let media_store = client.media_store().lock().await?; + media_store .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn)) .await?; if let Some(txn) = &handles.upload_thumbnail_txn { - event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?; + media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?; } } @@ -938,22 +935,22 @@ async fn update_media_cache_keys_after_upload( let from_req = Media::make_local_file_media_request(file_upload_txn); trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store"); - let cache_store = - client.event_cache_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?; + let media_store = + client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?; // The media can now be removed during cleanups. - cache_store + media_store .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; - cache_store + media_store .replace_media_key( &from_req, &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File }, ) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; // Rename the thumbnail too, if needs be. if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) { @@ -968,18 +965,18 @@ async fn update_media_cache_keys_after_upload( trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store"); // The media can now be removed during cleanups. - cache_store + media_store .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; - cache_store + media_store .replace_media_key( &from_req, &MediaRequestParameters { source: new_source, format: MediaFormat::File }, ) .await - .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + .map_err(RoomSendQueueStorageError::MediaStoreError)?; } Ok(())