Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
crypto::store::types::RoomKeyInfo,
encryption::backups::BackupState,
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate},
executor::spawn,
send_queue::RoomSendQueueUpdate,
Room,
Expand Down Expand Up @@ -356,15 +356,15 @@ where
async fn room_event_cache_updates_task(
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut event_subscriber: RoomEventCacheListener,
mut room_event_cache_subscriber: RoomEventCacheSubscriber,
timeline_focus: TimelineFocus,
) {
trace!("Spawned the event subscriber task.");

loop {
trace!("Waiting for an event.");

let update = match event_subscriber.recv().await {
let update = match room_event_cache_subscriber.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
Expand Down
7 changes: 7 additions & 0 deletions crates/matrix-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ All notable changes to this project will be documented in this file.

## [Unreleased] - ReleaseDate

### Features

- Add logging to `Room::join`.
([#5260](https://github.com/matrix-org/matrix-rust-sdk/pull/5260))
- `ClientServerCapabilities` has been renamed to `ClientServerInfo`. Alongside this,
Expand All @@ -16,6 +18,11 @@ All notable changes to this project will be documented in this file.
If the to-device message was encrypted, the `EncryptionInfo` will be set. If it is `None` the message was sent in clear.
([#5099](https://github.com/matrix-org/matrix-rust-sdk/pull/5099))

### Refactor

- `RoomEventCacheListener` is renamed `RoomEventCacheSubscriber`
([#5269](https://github.com/matrix-org/matrix-rust-sdk/pull/5269))

## [0.12.0] - 2025-06-10

### Features
Expand Down
24 changes: 13 additions & 11 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod room;

pub mod paginator;
pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
pub use room::{RoomEventCache, RoomEventCacheListener};
pub use room::{RoomEventCache, RoomEventCacheSubscriber};

/// An error observed in the [`EventCache`].
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -188,18 +188,20 @@ impl EventCache {
client.subscribe_to_ignore_user_list_changes(),
));

let (tx, rx) = mpsc::channel(32);
let (auto_shrink_sender, auto_shrink_receiver) = mpsc::channel(32);

// Force-initialize the sender in the [`RoomEventCacheInner`].
self.inner.auto_shrink_sender.get_or_init(|| tx);
self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);

let auto_shrink_linked_chunk_tasks =
spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
let auto_shrink_linked_chunk_task = spawn(Self::auto_shrink_linked_chunk_task(
self.inner.clone(),
auto_shrink_receiver,
));

Arc::new(EventCacheDropHandles {
listen_updates_task,
ignore_user_list_update_task,
auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
auto_shrink_linked_chunk_task,
})
});

Expand Down Expand Up @@ -274,14 +276,14 @@ impl EventCache {
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of listeners to that room, aka
/// [`RoomEventCacheState::listener_count`].
/// increment the active number of subscribers to that room, aka
/// [`RoomEventCacheState::subscriber_count`].
/// - When that subscriber is dropped, it will decrement that count; and
/// notify the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen
/// to such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good
/// time to do so (new listeners might have spawned in the meanwhile).
/// time to do so (new subscribers might have spawned in the meanwhile).
#[instrument(skip_all)]
async fn auto_shrink_linked_chunk_task(
inner: Arc<EventCacheInner>,
Expand All @@ -301,11 +303,11 @@ impl EventCache {
trace!("waiting for state lock…");
let mut state = room.inner.state.write().await;

match state.auto_shrink_if_no_listeners().await {
match state.auto_shrink_if_no_subscribers().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// listeners, right? RIGHT? Especially because the state is guarded behind
// subscribers, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,
Expand Down
73 changes: 40 additions & 33 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ impl fmt::Debug for RoomEventCache {
}
}

/// Thin wrapper for a room event cache listener, so as to trigger side-effects
/// when all listeners are gone.
/// Thin wrapper for a room event cache subscriber, so as to trigger
/// side-effects when all subscribers are gone.
///
/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
/// more subscribers are active. This is an optimisation to reduce the number of
/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
/// active, all data are reduced to the minimum.
#[allow(missing_debug_implementations)]
pub struct RoomEventCacheListener {
pub struct RoomEventCacheSubscriber {
/// Underlying receiver of the room event cache's updates.
recv: Receiver<RoomEventCacheUpdate>,

Expand All @@ -80,17 +85,19 @@ pub struct RoomEventCacheListener {
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,

/// Shared instance of the auto-shrinker.
listener_count: Arc<AtomicUsize>,
subscriber_count: Arc<AtomicUsize>,
}

impl Drop for RoomEventCacheListener {
impl Drop for RoomEventCacheSubscriber {
fn drop(&mut self) {
let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);

trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
trace!(
"dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
);

if previous_listener_count == 1 {
// We were the last instance of the listener; let the auto-shrinker know by
if previous_subscriber_count == 1 {
// We were the last instance of the subscriber; let the auto-shrinker know by
// notifying it of our room id.

let mut room_id = self.room_id.clone();
Expand Down Expand Up @@ -120,20 +127,20 @@ impl Drop for RoomEventCacheListener {
}
}

trace!("sent notification to the parent channel that we were the last listener");
trace!("sent notification to the parent channel that we were the last subscriber");
}
}
}

impl Deref for RoomEventCacheListener {
impl Deref for RoomEventCacheSubscriber {
type Target = Receiver<RoomEventCacheUpdate>;

fn deref(&self) -> &Self::Target {
&self.recv
}
}

impl DerefMut for RoomEventCacheListener {
impl DerefMut for RoomEventCacheSubscriber {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.recv
}
Expand Down Expand Up @@ -162,7 +169,7 @@ impl RoomEventCache {
/// Read all current events.
///
/// Use [`RoomEventCache::subscribe`] to get all current events, plus a
/// listener/subscriber.
/// subscriber.
pub async fn events(&self) -> Vec<Event> {
let state = self.inner.state.read().await;

Expand All @@ -173,24 +180,24 @@ impl RoomEventCache {
/// events.
///
/// Use [`RoomEventCache::events`] to get all current events without the
/// listener/subscriber. Creating, and especially dropping, a
/// [`RoomEventCacheListener`] isn't free.
pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheListener) {
/// subscriber. Creating, and especially dropping, a
/// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
let state = self.inner.state.read().await;
let events = state.events().events().map(|(_position, item)| item.clone()).collect();

let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);

let recv = self.inner.sender.subscribe();
let listener = RoomEventCacheListener {
let subscriber = RoomEventCacheSubscriber {
recv,
room_id: self.inner.room_id.clone(),
auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
listener_count: state.listener_count.clone(),
subscriber_count: state.subscriber_count.clone(),
};

(events, listener)
(events, subscriber)
}

/// Return a [`RoomPagination`] API object useful for running
Expand Down Expand Up @@ -506,9 +513,9 @@ mod private {

pagination_status: SharedObservable<RoomPaginationStatus>,

/// An atomic count of the current number of listeners of the
/// An atomic count of the current number of subscriber of the
/// [`super::RoomEventCache`].
pub(super) listener_count: Arc<AtomicUsize>,
pub(super) subscriber_count: Arc<AtomicUsize>,
}

impl RoomEventCacheState {
Expand Down Expand Up @@ -561,7 +568,7 @@ mod private {
store,
events,
waited_for_initial_prev_token: false,
listener_count: Default::default(),
subscriber_count: Default::default(),
pagination_status,
})
}
Expand Down Expand Up @@ -772,17 +779,17 @@ mod private {
Ok(())
}

/// Automatically shrink the room if there are no listeners, as
/// indicated by the atomic number of active listeners.
/// Automatically shrink the room if there are no more subscribers, as
/// indicated by the atomic number of active subscribers.
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
pub(crate) async fn auto_shrink_if_no_listeners(
pub(crate) async fn auto_shrink_if_no_subscribers(
&mut self,
) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);

trace!(listener_count, "received request to auto-shrink");
trace!(subscriber_count, "received request to auto-shrink");

if listener_count == 0 {
if subscriber_count == 0 {
// If we are the last strong reference to the auto-shrinker, we can shrink the
// events data structure to its last chunk.
self.shrink_to_last_chunk().await?;
Expand Down Expand Up @@ -2574,9 +2581,9 @@ mod timed_tests {

assert!(stream1.is_empty());

// Have another listener subscribe to the event cache.
// Have another subscriber.
// Since it's not the first one, and the previous one loaded some more events,
// the second listener seems them all.
// the second subscribers sees them all.
let (events2, stream2) = room_event_cache.subscribe().await;
assert_eq!(events2.len(), 2);
assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
Expand All @@ -2599,7 +2606,7 @@ mod timed_tests {
{
// Check the inner state: there's no more shared auto-shrinker.
let state = room_event_cache.inner.state.read().await;
assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
}

// Getting the events will only give us the latest chunk.
Expand Down
Loading