diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 76904fc731f..43d1a921cd4 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -14,7 +14,7 @@ //! Trait and macro of integration tests for `EventCacheStore` implementations. -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use assert_matches::assert_matches; use matrix_sdk_common::{ @@ -891,8 +891,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); - let duplicated_events = self - .filter_duplicated_events( + let duplicated_events = BTreeMap::from_iter( + self.filter_duplicated_events( linked_chunk_id, vec![ event_comte.event_id().unwrap().to_owned(), @@ -904,20 +904,22 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { ], ) .await - .unwrap(); + .unwrap(), + ); assert_eq!(duplicated_events.len(), 3); + assert_eq!( - duplicated_events[0], - (event_comte.event_id().unwrap(), Position::new(CId::new(0), 0)) + *duplicated_events.get(&event_comte.event_id().unwrap()).unwrap(), + Position::new(CId::new(0), 0) ); assert_eq!( - duplicated_events[1], - (event_morbier.event_id().unwrap(), Position::new(CId::new(2), 0)) + *duplicated_events.get(&event_morbier.event_id().unwrap()).unwrap(), + Position::new(CId::new(2), 0) ); assert_eq!( - duplicated_events[2], - (event_mont_dor.event_id().unwrap(), Position::new(CId::new(2), 1)) + *duplicated_events.get(&event_mont_dor.event_id().unwrap()).unwrap(), + Position::new(CId::new(2), 1) ); } @@ -1018,7 +1020,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { // Save All The Things! self.save_event(room_id, e1).await.unwrap(); self.save_event(room_id, edit_e1).await.unwrap(); - self.save_event(room_id, reaction_e1).await.unwrap(); + self.save_event(room_id, reaction_e1.clone()).await.unwrap(); self.save_event(room_id, e2).await.unwrap(); self.save_event(another_room_id, e3).await.unwrap(); self.save_event(another_room_id, reaction_e3).await.unwrap(); @@ -1026,8 +1028,13 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { // Finding relations without a filter returns all of them. let relations = self.find_event_relations(room_id, eid1, None).await.unwrap(); assert_eq!(relations.len(), 2); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1))); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1))); + // The position is `None` for items outside the linked chunk. + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none())); + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(reaction_eid1) && pos.is_none())); // Finding relations with a filter only returns a subset. let relations = self @@ -1035,7 +1042,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert_eq!(relations.len(), 1); - assert_eq!(relations[0].event_id().as_deref(), Some(edit_eid1)); + assert_eq!(relations[0].0.event_id().as_deref(), Some(edit_eid1)); let relations = self .find_event_relations( @@ -1046,8 +1053,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert_eq!(relations.len(), 2); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1))); - assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1))); + assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(edit_eid1))); + assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(reaction_eid1))); // We can't find relations using the wrong room. let relations = self @@ -1055,6 +1062,35 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore { .await .unwrap(); assert!(relations.is_empty()); + + // But if an event exists in the linked chunk, we may have its position when + // it's found as a relationship. + + // Add reaction_e1 to the room's linked chunk. + self.handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { previous: None, new: CId::new(0), next: None }, + Update::PushItems { at: Position::new(CId::new(0), 0), items: vec![reaction_e1] }, + ], + ) + .await + .unwrap(); + + // When looking for aggregations to e1, we should have the position for + // reaction_e1. + let relations = self.find_event_relations(room_id, eid1, None).await.unwrap(); + + // The position is set for `reaction_eid1` now. + assert!(relations.iter().any(|(ev, pos)| { + ev.event_id().as_deref() == Some(reaction_eid1) + && *pos == Some(Position::new(CId::new(0), 0)) + })); + + // But it's still not set for the other related events. + assert!(relations + .iter() + .any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none())); } async fn test_save_event(&self) { diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index dfec281b11c..eb7d70381d3 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use matrix_sdk_common::{ linked_chunk::{ relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator, - ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, + ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position, RawChunk, Update, }, ring_buffer::RingBuffer, store_locks::memory_store_helper::try_take_leased_lock, @@ -192,17 +192,17 @@ impl EventCacheStore for MemoryStore { linked_chunk_id: LinkedChunkId<'_>, mut events: Vec, ) -> Result, Self::Error> { - // Collect all duplicated events. + if events.is_empty() { + return Ok(Vec::new()); + } + let inner = self.inner.read().unwrap(); let mut duplicated_events = Vec::new(); - for (event, position) in inner.events.unordered_linked_chunk_items(linked_chunk_id) { - // If `events` is empty, we can short-circuit. - if events.is_empty() { - break; - } - + for (event, position) in + inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned()) + { if let Some(known_event_id) = event.event_id() { // This event is a duplicate! if let Some(index) = @@ -223,10 +223,12 @@ impl EventCacheStore for MemoryStore { ) -> Result, Self::Error> { let inner = self.inner.read().unwrap(); - let event = inner.events.items().find_map(|(event, this_linked_chunk_id)| { - (room_id == this_linked_chunk_id.room_id() && event.event_id()? == event_id) - .then_some(event.clone()) - }); + let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned()); + + let event = inner + .events + .items(&target_linked_chunk_id) + .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone())); Ok(event) } @@ -236,20 +238,17 @@ impl EventCacheStore for MemoryStore { room_id: &RoomId, event_id: &EventId, filters: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { let inner = self.inner.read().unwrap(); + let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned()); + let filters = compute_filters_string(filters); let related_events = inner .events - .items() - .filter_map(|(event, this_linked_chunk_id)| { - // Must be in the same room. - if room_id != this_linked_chunk_id.room_id() { - return None; - } - + .items(&target_linked_chunk_id) + .filter_map(|(event, pos)| { // Must have a relation. let (related_to, rel_type) = extract_event_relation(event.raw())?; @@ -260,9 +259,9 @@ impl EventCacheStore for MemoryStore { // Must not be filtered out. if let Some(filters) = &filters { - filters.contains(&rel_type).then_some(event.clone()) + filters.contains(&rel_type).then_some((event.clone(), pos)) } else { - Some(event.clone()) + Some((event.clone(), pos)) } }) .collect(); diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index 1a5f9f32150..e48a46c9205 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -134,7 +134,11 @@ pub trait EventCacheStore: AsyncTraitDeps { event_id: &EventId, ) -> Result, Self::Error>; - /// Find all the events that relate to a given event. + /// Find all the events (alongside their position in the room's linked + /// chunk, if available) that relate to a given event. + /// + /// The only events which don't have a position are those which have been + /// saved out-of-band using [`Self::save_event`]. /// /// Note: it doesn't process relations recursively: for instance, if /// requesting only thread events, it will NOT return the aggregated @@ -148,7 +152,7 @@ pub trait EventCacheStore: AsyncTraitDeps { room_id: &RoomId, event_id: &EventId, filter: Option<&[RelationType]>, - ) -> Result, Self::Error>; + ) -> Result)>, Self::Error>; /// Save an event, that might or might not be part of an existing linked /// chunk. @@ -373,7 +377,7 @@ impl EventCacheStore for EraseEventCacheStoreError { room_id: &RoomId, event_id: &EventId, filter: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-common/src/linked_chunk/mod.rs b/crates/matrix-sdk-common/src/linked_chunk/mod.rs index 87699cc4707..e2b3df16023 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/mod.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/mod.rs @@ -129,12 +129,6 @@ impl LinkedChunkId<'_> { LinkedChunkId::Room(room_id) => OwnedLinkedChunkId::Room((*room_id).to_owned()), } } - - pub fn room_id(&self) -> &RoomId { - match self { - LinkedChunkId::Room(room_id) => room_id, - } - } } impl PartialEq<&OwnedLinkedChunkId> for LinkedChunkId<'_> { @@ -168,6 +162,7 @@ impl Display for OwnedLinkedChunkId { } impl OwnedLinkedChunkId { + #[cfg(test)] fn as_ref(&self) -> LinkedChunkId<'_> { match self { OwnedLinkedChunkId::Room(room_id) => LinkedChunkId::Room(room_id.as_ref()), diff --git a/crates/matrix-sdk-common/src/linked_chunk/relational.rs b/crates/matrix-sdk-common/src/linked_chunk/relational.rs index 7b1ecec4ce1..20714c46cee 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/relational.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/relational.rs @@ -15,7 +15,10 @@ //! Implementation for a _relational linked chunk_, see //! [`RelationalLinkedChunk`]. -use std::{collections::HashMap, hash::Hash}; +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; use ruma::{OwnedEventId, OwnedRoomId}; @@ -81,7 +84,7 @@ pub struct RelationalLinkedChunk { items_chunks: Vec>, /// The items' content themselves. - items: HashMap>, + items: HashMap)>>, } /// The [`IndexableItem`] trait is used to mark items that can be indexed into a @@ -105,7 +108,7 @@ impl IndexableItem for TimelineEvent { impl RelationalLinkedChunk where Item: IndexableItem, - ItemId: Hash + PartialEq + Eq + Clone, + ItemId: Hash + PartialEq + Eq + Clone + Ord, { /// Create a new relational linked chunk. pub fn new() -> Self { @@ -129,11 +132,11 @@ where for update in updates { match update { Update::NewItemsChunk { previous, new, next } => { - insert_chunk(&mut self.chunks, linked_chunk_id, previous, new, next); + Self::insert_chunk(&mut self.chunks, linked_chunk_id, previous, new, next); } Update::NewGapChunk { previous, new, next, gap } => { - insert_chunk(&mut self.chunks, linked_chunk_id, previous, new, next); + Self::insert_chunk(&mut self.chunks, linked_chunk_id, previous, new, next); self.items_chunks.push(ItemRow { linked_chunk_id: linked_chunk_id.to_owned(), position: Position::new(new, 0), @@ -142,7 +145,7 @@ where } Update::RemoveChunk(chunk_identifier) => { - remove_chunk(&mut self.chunks, linked_chunk_id, chunk_identifier); + Self::remove_chunk(&mut self.chunks, linked_chunk_id, chunk_identifier); let indices_to_remove = self .items_chunks @@ -175,7 +178,7 @@ where self.items .entry(linked_chunk_id.to_owned()) .or_default() - .insert(item_id.clone(), item); + .insert(item_id.clone(), (item, Some(at))); self.items_chunks.push(ItemRow { linked_chunk_id: linked_chunk_id.to_owned(), position: at, @@ -199,7 +202,7 @@ where self.items .entry(linked_chunk_id.to_owned()) .or_default() - .insert(item_id.clone(), item); + .insert(item_id.clone(), (item, Some(at))); existing.item = Either::Item(item_id); } @@ -271,101 +274,93 @@ where } } } + } - fn insert_chunk( - chunks: &mut Vec, - linked_chunk_id: LinkedChunkId<'_>, - previous: Option, - new: ChunkIdentifier, - next: Option, - ) { - // Find the previous chunk, and update its next chunk. - if let Some(previous) = previous { - let entry_for_previous_chunk = chunks - .iter_mut() - .find( - |ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { - linked_chunk_id == linked_chunk_id_candidate && *chunk == previous - }, - ) - .expect("Previous chunk should be present"); - - // Link the chunk. - entry_for_previous_chunk.next_chunk = Some(new); - } + fn insert_chunk( + chunks: &mut Vec, + linked_chunk_id: LinkedChunkId<'_>, + previous: Option, + new: ChunkIdentifier, + next: Option, + ) { + // Find the previous chunk, and update its next chunk. + if let Some(previous) = previous { + let entry_for_previous_chunk = chunks + .iter_mut() + .find(|ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { + linked_chunk_id == linked_chunk_id_candidate && *chunk == previous + }) + .expect("Previous chunk should be present"); + + // Link the chunk. + entry_for_previous_chunk.next_chunk = Some(new); + } - // Find the next chunk, and update its previous chunk. - if let Some(next) = next { - let entry_for_next_chunk = chunks - .iter_mut() - .find( - |ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { - linked_chunk_id == linked_chunk_id_candidate && *chunk == next - }, - ) - .expect("Next chunk should be present"); - - // Link the chunk. - entry_for_next_chunk.previous_chunk = Some(new); - } + // Find the next chunk, and update its previous chunk. + if let Some(next) = next { + let entry_for_next_chunk = chunks + .iter_mut() + .find(|ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { + linked_chunk_id == linked_chunk_id_candidate && *chunk == next + }) + .expect("Next chunk should be present"); + + // Link the chunk. + entry_for_next_chunk.previous_chunk = Some(new); + } + + // Insert the chunk. + chunks.push(ChunkRow { + linked_chunk_id: linked_chunk_id.to_owned(), + previous_chunk: previous, + chunk: new, + next_chunk: next, + }); + } + + fn remove_chunk( + chunks: &mut Vec, + linked_chunk_id: LinkedChunkId<'_>, + chunk_to_remove: ChunkIdentifier, + ) { + let entry_nth_to_remove = chunks + .iter() + .enumerate() + .find_map( + |(nth, ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. })| { + (linked_chunk_id == linked_chunk_id_candidate && *chunk == chunk_to_remove) + .then_some(nth) + }, + ) + .expect("Remove an unknown chunk"); + + let ChunkRow { linked_chunk_id, previous_chunk: previous, next_chunk: next, .. } = + chunks.remove(entry_nth_to_remove); + + // Find the previous chunk, and update its next chunk. + if let Some(previous) = previous { + let entry_for_previous_chunk = chunks + .iter_mut() + .find(|ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { + &linked_chunk_id == linked_chunk_id_candidate && *chunk == previous + }) + .expect("Previous chunk should be present"); // Insert the chunk. - chunks.push(ChunkRow { - linked_chunk_id: linked_chunk_id.to_owned(), - previous_chunk: previous, - chunk: new, - next_chunk: next, - }); + entry_for_previous_chunk.next_chunk = next; } - fn remove_chunk( - chunks: &mut Vec, - linked_chunk_id: LinkedChunkId<'_>, - chunk_to_remove: ChunkIdentifier, - ) { - let entry_nth_to_remove = chunks - .iter() - .enumerate() - .find_map( - |(nth, ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. })| { - (linked_chunk_id == linked_chunk_id_candidate && *chunk == chunk_to_remove) - .then_some(nth) - }, - ) - .expect("Remove an unknown chunk"); - - let ChunkRow { linked_chunk_id, previous_chunk: previous, next_chunk: next, .. } = - chunks.remove(entry_nth_to_remove); - - // Find the previous chunk, and update its next chunk. - if let Some(previous) = previous { - let entry_for_previous_chunk = chunks - .iter_mut() - .find( - |ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { - &linked_chunk_id == linked_chunk_id_candidate && *chunk == previous - }, - ) - .expect("Previous chunk should be present"); - - // Insert the chunk. - entry_for_previous_chunk.next_chunk = next; - } + // Find the next chunk, and update its previous chunk. + if let Some(next) = next { + let entry_for_next_chunk = chunks + .iter_mut() + .find(|ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { + &linked_chunk_id == linked_chunk_id_candidate && *chunk == next + }) + .expect("Next chunk should be present"); - // Find the next chunk, and update its previous chunk. - if let Some(next) = next { - let entry_for_next_chunk = chunks - .iter_mut() - .find( - |ChunkRow { linked_chunk_id: linked_chunk_id_candidate, chunk, .. }| { - &linked_chunk_id == linked_chunk_id_candidate && *chunk == next - }, - ) - .expect("Next chunk should be present"); - - // Insert the chunk. - entry_for_next_chunk.previous_chunk = previous; - } + // Insert the chunk. + entry_for_next_chunk.previous_chunk = previous; } } @@ -373,37 +368,43 @@ where /// particular order. pub fn unordered_linked_chunk_items<'a>( &'a self, - target: LinkedChunkId<'a>, - ) -> impl Iterator { - self.items_chunks.iter().filter_map(move |item_row| { - if item_row.linked_chunk_id == target { - match &item_row.item { - Either::Item(item_id) => { - Some((self.items.get(&target.to_owned())?.get(item_id)?, item_row.position)) - } - Either::Gap(..) => None, - } - } else { - None - } + target: &OwnedLinkedChunkId, + ) -> impl 'a + Iterator { + self.items.get(target).into_iter().flat_map(|items| { + // Only keep items which have a position. + items.values().filter_map(|(item, pos)| pos.map(|pos| (item, pos))) }) } - /// Return an iterator over all items of all room linked chunks, without - /// their actual positions. + /// Return an iterator over all items of a given linked chunk, along with + /// their positions, if available. + /// + /// The only items which will NOT have a position are those saved with + /// [`Self::save_item`]. /// /// This will include out-of-band items. - pub fn items(&self) -> impl Iterator)> { - self.items.iter().flat_map(|(linked_chunk_id, items)| { - items.values().map(|item| (item, linked_chunk_id.as_ref())) - }) + pub fn items( + &self, + target: &OwnedLinkedChunkId, + ) -> impl Iterator)> { + self.items + .get(target) + .into_iter() + .flat_map(|items| items.values().map(|(item, pos)| (item, *pos))) } /// Save a single item "out-of-band" in the relational linked chunk. pub fn save_item(&mut self, room_id: OwnedRoomId, item: Item) { let id = item.id(); let linked_chunk_id = OwnedLinkedChunkId::Room(room_id); - self.items.entry(linked_chunk_id).or_default().insert(id, item); + + let map = self.items.entry(linked_chunk_id).or_default(); + if let Some(prev_value) = map.get_mut(&id) { + // If the item already exists, we keep the position. + prev_value.0 = item; + } else { + map.insert(id, (item, None)); + } } } @@ -411,7 +412,7 @@ impl RelationalLinkedChunk where Gap: Clone, Item: Clone, - ItemId: Hash + PartialEq + Eq, + ItemId: Hash + PartialEq + Eq + Ord, { /// Loads all the chunks. /// @@ -529,7 +530,7 @@ where impl Default for RelationalLinkedChunk where Item: IndexableItem, - ItemId: Hash + PartialEq + Eq + Clone, + ItemId: Hash + PartialEq + Eq + Clone + Ord, { fn default() -> Self { Self::new() @@ -548,7 +549,7 @@ fn load_raw_chunk( where Item: Clone, Gap: Clone, - ItemId: Hash + PartialEq + Eq, + ItemId: Hash + PartialEq + Eq + Ord, { // Find all items that correspond to the chunk. let mut items = relational_linked_chunk @@ -599,11 +600,14 @@ where collected_items .into_iter() .filter_map(|(item_id, _index)| { - relational_linked_chunk - .items - .get(&linked_chunk_id.to_owned())? - .get(item_id) - .cloned() + Some( + relational_linked_chunk + .items + .get(&linked_chunk_id.to_owned())? + .get(item_id)? + .0 + .clone(), + ) }) .collect(), ), @@ -719,6 +723,8 @@ where #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use assert_matches::assert_matches; use ruma::room_id; @@ -1393,16 +1399,17 @@ mod tests { ], ); - let mut events = - relational_linked_chunk.unordered_linked_chunk_items(linked_chunk_id.as_ref()); + let events = BTreeMap::from_iter( + relational_linked_chunk.unordered_linked_chunk_items(&linked_chunk_id), + ); - assert_eq!(events.next().unwrap(), (&'a', Position::new(CId::new(0), 0))); - assert_eq!(events.next().unwrap(), (&'b', Position::new(CId::new(0), 1))); - assert_eq!(events.next().unwrap(), (&'c', Position::new(CId::new(0), 2))); - assert_eq!(events.next().unwrap(), (&'d', Position::new(CId::new(1), 0))); - assert_eq!(events.next().unwrap(), (&'e', Position::new(CId::new(1), 1))); - assert_eq!(events.next().unwrap(), (&'f', Position::new(CId::new(1), 2))); - assert!(events.next().is_none()); + assert_eq!(events.len(), 6); + assert_eq!(*events.get(&'a').unwrap(), Position::new(CId::new(0), 0)); + assert_eq!(*events.get(&'b').unwrap(), Position::new(CId::new(0), 1)); + assert_eq!(*events.get(&'c').unwrap(), Position::new(CId::new(0), 2)); + assert_eq!(*events.get(&'d').unwrap(), Position::new(CId::new(1), 0)); + assert_eq!(*events.get(&'e').unwrap(), Position::new(CId::new(1), 1)); + assert_eq!(*events.get(&'f').unwrap(), Position::new(CId::new(1), 2)); } #[test] diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index f3f0f62980e..fc7c5fe15a9 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -1165,9 +1165,12 @@ impl EventCacheStore for SqliteEventCacheStore { room_id: &RoomId, event_id: &EventId, filters: Option<&[RelationType]>, - ) -> Result, Self::Error> { + ) -> Result)>, Self::Error> { let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); + let hashed_linked_chunk_id = + self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key()); + let event_id = event_id.to_owned(); let filters = filters.map(ToOwned::to_owned); let this = self.clone(); @@ -1190,19 +1193,34 @@ impl EventCacheStore for SqliteEventCacheStore { }; let query = format!( - "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {filter_query}" + "SELECT events.content, event_chunks.chunk_id, event_chunks.position + FROM events + LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ? + WHERE relates_to = ? AND room_id = ? {filter_query}" ); // Collect related events. let mut related = Vec::new(); - for ev in - txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| { - row.get::<_, Vec>(0) + for result in + txn.prepare(&query)?.query_map((hashed_linked_chunk_id, event_id.as_str(), hashed_room_id), |row| { + Ok(( + row.get::<_, Vec>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + )) })? { - let ev = ev?; - let ev = serde_json::from_slice(&this.decode_value(&ev)?)?; - related.push(ev); + let (event_blob, chunk_id, index) = result?; + + let event: Event = serde_json::from_slice(&this.decode_value(&event_blob)?)?; + + // Only build the position if both the chunk_id and position were present; in + // theory, they should either be present at the same time, or not at all. + let pos = chunk_id.zip(index).map(|(chunk_id, index)| { + Position::new(ChunkIdentifier::new(chunk_id), index) + }); + + related.push((event, pos)); } Ok(related) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 04edee51fb7..fcf4b5946dc 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -230,6 +230,13 @@ impl RoomEventCache { /// /// You can filter which types of related events to retrieve using /// `filter`. `None` will retrieve related events of any type. + /// + /// The related events are sorted like this: + /// - events saved out-of-band with `super::RoomEventCache::save_events` + /// will be located at the beginning of the array. + /// - events present in the linked chunk (be it in memory or in the + /// database) will be sorted according to their ordering in the linked + /// chunk. pub async fn event_with_relations( &self, event_id: &EventId, @@ -927,7 +934,6 @@ mod private { Ok(self.events.updates_as_vector_diffs()) } - #[cfg(test)] pub(crate) fn room_event_order(&self, event_pos: Position) -> Option { self.events.event_order(event_pos) } @@ -1157,6 +1163,14 @@ mod private { /// This goes straight to the database, as a simplification; we don't /// expect to need to have to look up in memory events, or that /// all the related events are actually loaded. + /// + /// The related events are sorted like this: + /// - events saved out-of-band with + /// [`super::RoomEventCache::save_events`] will be located at the + /// beginning of the array. + /// - events present in the linked chunk (be it in memory or in the + /// database) will be sorted according to their ordering in the linked + /// chunk. pub async fn find_event_with_relations( &self, event_id: &EventId, @@ -1176,7 +1190,8 @@ mod private { // transitive closure of all the related events. let mut related = store.find_event_relations(&self.room, event_id, filters.as_deref()).await?; - let mut stack = related.iter().filter_map(|event| event.event_id()).collect::>(); + let mut stack = + related.iter().filter_map(|(event, _pos)| event.event_id()).collect::>(); // Also keep track of already seen events, in case there's a loop in the // relation graph. @@ -1195,7 +1210,7 @@ mod private { let other_related = store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?; - stack.extend(other_related.iter().filter_map(|event| event.event_id())); + stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id())); related.extend(other_related); num_iters += 1; @@ -1203,6 +1218,35 @@ mod private { trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events"); + // Sort the results by their positions in the linked chunk, if available. + // + // If an event doesn't have a known position, it goes to the start of the array. + related.sort_by(|(_, lhs), (_, rhs)| { + use std::cmp::Ordering; + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => { + let lhs = self.room_event_order(*lhs); + let rhs = self.room_event_order(*rhs); + + // The events should have a definite position, but in the case they don't, + // still consider that not having a position means you'll end at the start + // of the array. + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(&rhs), + } + } + } + }); + + // Keep only the events, not their positions. + let related = related.into_iter().map(|(event, _pos)| event).collect(); + Ok(Some((target, related))) } diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 8d317008f5c..2721146ed2b 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -28,7 +28,10 @@ use matrix_sdk_test::{ }; use ruma::{ event_id, - events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, TimelineEventType}, + events::{ + room::message::RoomMessageEventContentWithoutRelation, AnySyncMessageLikeEvent, + AnySyncTimelineEvent, TimelineEventType, + }, room_id, user_id, EventId, RoomVersionId, }; use serde_json::json; @@ -2520,3 +2523,162 @@ async fn test_sync_while_back_paginate() { assert!(subscriber.is_empty()); } + +#[async_test] +async fn test_relations_ordering() { + let server = MatrixMockServer::new().await; + + let room_id = room_id!("!galette:saucisse.bzh"); + let f = EventFactory::new().room(room_id).sender(*ALICE); + + let target_event_id = event_id!("$1"); + + // Start with a prefilled event cache store that includes the target event. + let ev1 = f.text_msg("bonjour monde").event_id(target_event_id).into_event(); + + let event_cache_store = Arc::new(MemoryStore::new()); + event_cache_store + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + // An empty items chunk. + Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ev1.clone()], + }, + ], + ) + .await + .unwrap(); + + let client = server + .client_builder() + .store_config( + StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()), + ) + .build() + .await; + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let room = server.sync_joined_room(&client, room_id).await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (initial_events, mut listener) = room_event_cache.subscribe().await; + assert_eq!(initial_events.len(), 1); + assert!(listener.recv().now_or_never().is_none()); + + // Sanity check: there are no relations for the target event yet. + let (_, relations) = + room_event_cache.event_with_relations(target_event_id, None).await.unwrap(); + assert!(relations.is_empty()); + + let edit2 = event_id!("$edit2"); + let ev2 = f + .text_msg("* hola mundo") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hola mundo")) + .event_id(edit2) + .into_raw(); + + let edit3 = event_id!("$edit3"); + let ev3 = f + .text_msg("* ciao mondo") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("ciao mondo")) + .event_id(edit3) + .into_raw(); + + let edit4 = event_id!("$edit4"); + let ev4 = f + .text_msg("* hello world") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hello world")) + .event_id(edit4) + .into_raw(); + + // We receive two edit events via sync, as well as a gap; this will shrink the + // linked chunk. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(ev3.clone()) + .add_timeline_event(ev4.clone()) + .set_timeline_limited() + .set_timeline_prev_batch("prev_batch"), + ) + .await; + + // Wait for the listener to tell us we've received something. + loop { + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = listener.recv() + ); + // We've received the shrink. + if diffs.iter().any(|diff| matches!(diff, VectorDiff::Clear)) { + break; + } + } + + // At this point, relations are known for the target event. + let (_, relations) = + room_event_cache.event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 2); + // And the edit events are correctly ordered according to their position in the + // linked chunk. + assert_eq!(relations[0].event_id().unwrap(), edit3); + assert_eq!(relations[1].event_id().unwrap(), edit4); + + // Now, we resolve the gap; this returns ev2, another edit. + server + .mock_room_messages() + .match_from("prev_batch") + .ok(RoomMessagesResponseTemplate::default().events(vec![ev2.clone()])) + .named("room/messages") + .mock_once() + .mount() + .await; + + // Run the pagination. + let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap(); + assert!(outcome.reached_start.not()); + assert_eq!(outcome.events.len(), 1); + + { + // Sanity check: we load the first chunk with the first event, from disk, and + // reach the start of the timeline. + let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap(); + assert!(outcome.reached_start); + } + + // Relations are returned accordingly. + let (_, relations) = + room_event_cache.event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 3); + assert_eq!(relations[0].event_id().unwrap(), edit2); + assert_eq!(relations[1].event_id().unwrap(), edit3); + assert_eq!(relations[2].event_id().unwrap(), edit4); + + // If I save an additional event without storing it in the linked chunk, it will + // be present at the start of the relations list. + let edit5 = event_id!("$edit5"); + let ev5 = f + .text_msg("* hallo Welt") + .edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hallo Welt")) + .event_id(edit5) + .into_event(); + + server.mock_room_event().ok(ev5).mock_once().mount().await; + + // This saves the event, but without a position. + room.event(edit5, None).await.unwrap(); + + let (_, relations) = + room_event_cache.event_with_relations(target_event_id, None).await.unwrap(); + assert_eq!(relations.len(), 4); + assert_eq!(relations[0].event_id().unwrap(), edit5); + assert_eq!(relations[1].event_id().unwrap(), edit2); + assert_eq!(relations[2].event_id().unwrap(), edit3); + assert_eq!(relations[3].event_id().unwrap(), edit4); +}