diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 83b7483f6af..e1d286f17d6 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -22,7 +22,7 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ crypto::store::types::RoomKeyInfo, encryption::backups::BackupState, - event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate}, + event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate}, executor::spawn, send_queue::RoomSendQueueUpdate, Room, @@ -356,7 +356,7 @@ where async fn room_event_cache_updates_task( room_event_cache: RoomEventCache, timeline_controller: TimelineController, - mut event_subscriber: RoomEventCacheListener, + mut room_event_cache_subscriber: RoomEventCacheSubscriber, timeline_focus: TimelineFocus, ) { trace!("Spawned the event subscriber task."); @@ -364,7 +364,7 @@ async fn room_event_cache_updates_task( loop { trace!("Waiting for an event."); - let update = match event_subscriber.recv().await { + let update = match room_event_cache_subscriber.recv().await { Ok(up) => up, Err(RecvError::Closed) => break, Err(RecvError::Lagged(num_skipped)) => { diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 947b78ea86d..f62ebf7d286 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -6,6 +6,8 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate +### Features + - Add logging to `Room::join`. ([#5260](https://github.com/matrix-org/matrix-rust-sdk/pull/5260)) - `ClientServerCapabilities` has been renamed to `ClientServerInfo`. Alongside this, @@ -16,6 +18,11 @@ All notable changes to this project will be documented in this file. If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear. ([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099)) +### Refactor + +- `RoomEventCacheListener` is renamed `RoomEventCacheSubscriber` + ([#5269](https://github.com/matrix-org/matrix-rust-sdk/pull/5269)) + ## [0.12.0] - 2025-06-10 ### Features diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 8a6a21eb0c8..8f7a45ef9a7 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -63,7 +63,7 @@ mod room; pub mod paginator; pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus}; -pub use room::{RoomEventCache, RoomEventCacheListener}; +pub use room::{RoomEventCache, RoomEventCacheSubscriber}; /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] @@ -188,18 +188,20 @@ impl EventCache { client.subscribe_to_ignore_user_list_changes(), )); - let (tx, rx) = mpsc::channel(32); + let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32); // Force-initialize the sender in the [`RoomEventCacheInner`]. - self.inner.auto_shrink_sender.get_or_init(|| tx); + self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender); - let auto_shrink_linked_chunk_tasks = - spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx)); + let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task( + self.inner.clone(), + auto_shrink_receiver, + )); Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, - auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks, + auto_shrink_linked_chunk_task, }) }); @@ -274,14 +276,14 @@ impl EventCache { /// The auto-shrink mechanism works this way: /// /// - Each time there's a new subscriber to a [`RoomEventCache`], it will - /// increment the active number of listeners to that room, aka - /// [`RoomEventCacheState::listener_count`]. + /// increment the active number of subscribers to that room, aka + /// [`RoomEventCacheState::subscriber_count`]. /// - When that subscriber is dropped, it will decrement that count; and /// notify the task below if it reached 0. /// - The task spawned here, owned by the [`EventCacheInner`], will listen /// to such notifications that a room may be shrunk. It will attempt an /// auto-shrink, by letting the inner state decide whether this is a good - /// time to do so (new listeners might have spawned in the meanwhile). + /// time to do so (new subscribers might have spawned in the meanwhile). #[instrument(skip_all)] async fn auto_shrink_linked_chunk_task( inner: Arc, @@ -301,11 +303,11 @@ impl EventCache { trace!("waiting for state lock…"); let mut state = room.inner.state.write().await; - match state.auto_shrink_if_no_listeners().await { + match state.auto_shrink_if_no_subscribers().await { Ok(diffs) => { if let Some(diffs) = diffs { // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any - // listeners, right? RIGHT? Especially because the state is guarded behind + // subscribers, right? RIGHT? Especially because the state is guarded behind // a lock. // // However, better safe than sorry, and it's cheap to send an update here, diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 3d7b474e09d..717076fb731 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -66,10 +66,15 @@ impl fmt::Debug for RoomEventCache { } } -/// Thin wrapper for a room event cache listener, so as to trigger side-effects -/// when all listeners are gone. +/// Thin wrapper for a room event cache subscriber, so as to trigger +/// side-effects when all subscribers are gone. +/// +/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no +/// more subscribers are active. This is an optimisation to reduce the number of +/// data held in memory by a [`RoomEventCache`]: when no more subscribers are +/// active, all data are reduced to the minimum. #[allow(missing_debug_implementations)] -pub struct RoomEventCacheListener { +pub struct RoomEventCacheSubscriber { /// Underlying receiver of the room event cache's updates. recv: Receiver, @@ -80,17 +85,19 @@ pub struct RoomEventCacheListener { auto_shrink_sender: mpsc::Sender, /// Shared instance of the auto-shrinker. - listener_count: Arc, + subscriber_count: Arc, } -impl Drop for RoomEventCacheListener { +impl Drop for RoomEventCacheSubscriber { fn drop(&mut self) { - let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst); + let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst); - trace!("dropping a room event cache listener; previous count: {previous_listener_count}"); + trace!( + "dropping a room event cache subscriber; previous count: {previous_subscriber_count}" + ); - if previous_listener_count == 1 { - // We were the last instance of the listener; let the auto-shrinker know by + if previous_subscriber_count == 1 { + // We were the last instance of the subscriber; let the auto-shrinker know by // notifying it of our room id. let mut room_id = self.room_id.clone(); @@ -120,12 +127,12 @@ impl Drop for RoomEventCacheListener { } } - trace!("sent notification to the parent channel that we were the last listener"); + trace!("sent notification to the parent channel that we were the last subscriber"); } } } -impl Deref for RoomEventCacheListener { +impl Deref for RoomEventCacheSubscriber { type Target = Receiver; fn deref(&self) -> &Self::Target { @@ -133,7 +140,7 @@ impl Deref for RoomEventCacheListener { } } -impl DerefMut for RoomEventCacheListener { +impl DerefMut for RoomEventCacheSubscriber { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.recv } @@ -162,7 +169,7 @@ impl RoomEventCache { /// Read all current events. /// /// Use [`RoomEventCache::subscribe`] to get all current events, plus a - /// listener/subscriber. + /// subscriber. pub async fn events(&self) -> Vec { let state = self.inner.state.read().await; @@ -173,24 +180,24 @@ impl RoomEventCache { /// events. /// /// Use [`RoomEventCache::events`] to get all current events without the - /// listener/subscriber. Creating, and especially dropping, a - /// [`RoomEventCacheListener`] isn't free. - pub async fn subscribe(&self) -> (Vec, RoomEventCacheListener) { + /// subscriber. Creating, and especially dropping, a + /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects. + pub async fn subscribe(&self) -> (Vec, RoomEventCacheSubscriber) { let state = self.inner.state.read().await; let events = state.events().events().map(|(_position, item)| item.clone()).collect(); - let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst); - trace!("added a room event cache listener; new count: {}", previous_listener_count + 1); + let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst); + trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1); let recv = self.inner.sender.subscribe(); - let listener = RoomEventCacheListener { + let subscriber = RoomEventCacheSubscriber { recv, room_id: self.inner.room_id.clone(), auto_shrink_sender: self.inner.auto_shrink_sender.clone(), - listener_count: state.listener_count.clone(), + subscriber_count: state.subscriber_count.clone(), }; - (events, listener) + (events, subscriber) } /// Return a [`RoomPagination`] API object useful for running @@ -506,9 +513,9 @@ mod private { pagination_status: SharedObservable, - /// An atomic count of the current number of listeners of the + /// An atomic count of the current number of subscriber of the /// [`super::RoomEventCache`]. - pub(super) listener_count: Arc, + pub(super) subscriber_count: Arc, } impl RoomEventCacheState { @@ -561,7 +568,7 @@ mod private { store, events, waited_for_initial_prev_token: false, - listener_count: Default::default(), + subscriber_count: Default::default(), pagination_status, }) } @@ -772,17 +779,17 @@ mod private { Ok(()) } - /// Automatically shrink the room if there are no listeners, as - /// indicated by the atomic number of active listeners. + /// Automatically shrink the room if there are no more subscribers, as + /// indicated by the atomic number of active subscribers. #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub(crate) async fn auto_shrink_if_no_listeners( + pub(crate) async fn auto_shrink_if_no_subscribers( &mut self, ) -> Result>>, EventCacheError> { - let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst); + let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst); - trace!(listener_count, "received request to auto-shrink"); + trace!(subscriber_count, "received request to auto-shrink"); - if listener_count == 0 { + if subscriber_count == 0 { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. self.shrink_to_last_chunk().await?; @@ -2574,9 +2581,9 @@ mod timed_tests { assert!(stream1.is_empty()); - // Have another listener subscribe to the event cache. + // Have another subscriber. // Since it's not the first one, and the previous one loaded some more events, - // the second listener seems them all. + // the second subscribers sees them all. let (events2, stream2) = room_event_cache.subscribe().await; assert_eq!(events2.len(), 2); assert_eq!(events2[0].event_id().as_deref(), Some(evid1)); @@ -2599,7 +2606,7 @@ mod timed_tests { { // Check the inner state: there's no more shared auto-shrinker. let state = room_event_cache.inner.state.read().await; - assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0); + assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0); } // Getting the events will only give us the latest chunk.