Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Trait and macro of integration tests for `EventCacheStore` implementations.

use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use assert_matches::assert_matches;
use matrix_sdk_common::{
Expand Down Expand Up @@ -891,8 +891,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.await
.unwrap();

let duplicated_events = self
.filter_duplicated_events(
let duplicated_events = BTreeMap::from_iter(
self.filter_duplicated_events(
linked_chunk_id,
vec![
event_comte.event_id().unwrap().to_owned(),
Expand All @@ -904,20 +904,22 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
],
)
.await
.unwrap();
.unwrap(),
);

assert_eq!(duplicated_events.len(), 3);

assert_eq!(
duplicated_events[0],
(event_comte.event_id().unwrap(), Position::new(CId::new(0), 0))
*duplicated_events.get(&event_comte.event_id().unwrap()).unwrap(),
Position::new(CId::new(0), 0)
);
assert_eq!(
duplicated_events[1],
(event_morbier.event_id().unwrap(), Position::new(CId::new(2), 0))
*duplicated_events.get(&event_morbier.event_id().unwrap()).unwrap(),
Position::new(CId::new(2), 0)
);
assert_eq!(
duplicated_events[2],
(event_mont_dor.event_id().unwrap(), Position::new(CId::new(2), 1))
*duplicated_events.get(&event_mont_dor.event_id().unwrap()).unwrap(),
Position::new(CId::new(2), 1)
);
}

Expand Down Expand Up @@ -1018,24 +1020,29 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Save All The Things!
self.save_event(room_id, e1).await.unwrap();
self.save_event(room_id, edit_e1).await.unwrap();
self.save_event(room_id, reaction_e1).await.unwrap();
self.save_event(room_id, reaction_e1.clone()).await.unwrap();
self.save_event(room_id, e2).await.unwrap();
self.save_event(another_room_id, e3).await.unwrap();
self.save_event(another_room_id, reaction_e3).await.unwrap();

// Finding relations without a filter returns all of them.
let relations = self.find_event_relations(room_id, eid1, None).await.unwrap();
assert_eq!(relations.len(), 2);
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1)));
// The position is `None` for items outside the linked chunk.
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none()));
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(reaction_eid1) && pos.is_none()));

// Finding relations with a filter only returns a subset.
let relations = self
.find_event_relations(room_id, eid1, Some(&[RelationType::Replacement]))
.await
.unwrap();
assert_eq!(relations.len(), 1);
assert_eq!(relations[0].event_id().as_deref(), Some(edit_eid1));
assert_eq!(relations[0].0.event_id().as_deref(), Some(edit_eid1));

let relations = self
.find_event_relations(
Expand All @@ -1046,15 +1053,44 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.await
.unwrap();
assert_eq!(relations.len(), 2);
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.event_id().as_deref() == Some(reaction_eid1)));
assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(edit_eid1)));
assert!(relations.iter().any(|r| r.0.event_id().as_deref() == Some(reaction_eid1)));

// We can't find relations using the wrong room.
let relations = self
.find_event_relations(another_room_id, eid1, Some(&[RelationType::Replacement]))
.await
.unwrap();
assert!(relations.is_empty());

// But if an event exists in the linked chunk, we may have its position when
// it's found as a relationship.

// Add reaction_e1 to the room's linked chunk.
self.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems { at: Position::new(CId::new(0), 0), items: vec![reaction_e1] },
],
)
.await
.unwrap();

// When looking for aggregations to e1, we should have the position for
// reaction_e1.
let relations = self.find_event_relations(room_id, eid1, None).await.unwrap();

// The position is set for `reaction_eid1` now.
assert!(relations.iter().any(|(ev, pos)| {
ev.event_id().as_deref() == Some(reaction_eid1)
&& *pos == Some(Position::new(CId::new(0), 0))
}));

// But it's still not set for the other related events.
assert!(relations
.iter()
.any(|(ev, pos)| ev.event_id().as_deref() == Some(edit_eid1) && pos.is_none()));
}

async fn test_save_event(&self) {
Expand Down
43 changes: 21 additions & 22 deletions crates/matrix-sdk-base/src/event_cache/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use matrix_sdk_common::{
linked_chunk::{
relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator,
ChunkMetadata, LinkedChunkId, Position, RawChunk, Update,
ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position, RawChunk, Update,
},
ring_buffer::RingBuffer,
store_locks::memory_store_helper::try_take_leased_lock,
Expand Down Expand Up @@ -192,17 +192,17 @@ impl EventCacheStore for MemoryStore {
linked_chunk_id: LinkedChunkId<'_>,
mut events: Vec<OwnedEventId>,
) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
// Collect all duplicated events.
if events.is_empty() {
return Ok(Vec::new());
}

let inner = self.inner.read().unwrap();

let mut duplicated_events = Vec::new();

for (event, position) in inner.events.unordered_linked_chunk_items(linked_chunk_id) {
// If `events` is empty, we can short-circuit.
if events.is_empty() {
break;
}

for (event, position) in
inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
{
if let Some(known_event_id) = event.event_id() {
// This event is a duplicate!
if let Some(index) =
Expand All @@ -223,10 +223,12 @@ impl EventCacheStore for MemoryStore {
) -> Result<Option<Event>, Self::Error> {
let inner = self.inner.read().unwrap();

let event = inner.events.items().find_map(|(event, this_linked_chunk_id)| {
(room_id == this_linked_chunk_id.room_id() && event.event_id()? == event_id)
.then_some(event.clone())
});
let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());

let event = inner
.events
.items(&target_linked_chunk_id)
.find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));

Ok(event)
}
Expand All @@ -236,20 +238,17 @@ impl EventCacheStore for MemoryStore {
room_id: &RoomId,
event_id: &EventId,
filters: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error> {
) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
let inner = self.inner.read().unwrap();

let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());

let filters = compute_filters_string(filters);

let related_events = inner
.events
.items()
.filter_map(|(event, this_linked_chunk_id)| {
// Must be in the same room.
if room_id != this_linked_chunk_id.room_id() {
return None;
}

.items(&target_linked_chunk_id)
.filter_map(|(event, pos)| {
// Must have a relation.
let (related_to, rel_type) = extract_event_relation(event.raw())?;

Expand All @@ -260,9 +259,9 @@ impl EventCacheStore for MemoryStore {

// Must not be filtered out.
if let Some(filters) = &filters {
filters.contains(&rel_type).then_some(event.clone())
filters.contains(&rel_type).then_some((event.clone(), pos))
} else {
Some(event.clone())
Some((event.clone(), pos))
}
})
.collect();
Expand Down
10 changes: 7 additions & 3 deletions crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ pub trait EventCacheStore: AsyncTraitDeps {
event_id: &EventId,
) -> Result<Option<Event>, Self::Error>;

/// Find all the events that relate to a given event.
/// Find all the events (alongside their position in the room's linked
/// chunk, if available) that relate to a given event.
///
/// The only events which don't have a position are those which have been
/// saved out-of-band using [`Self::save_event`].
///
/// Note: it doesn't process relations recursively: for instance, if
/// requesting only thread events, it will NOT return the aggregated
Expand All @@ -148,7 +152,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
room_id: &RoomId,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error>;
) -> Result<Vec<(Event, Option<Position>)>, Self::Error>;

/// Save an event, that might or might not be part of an existing linked
/// chunk.
Expand Down Expand Up @@ -373,7 +377,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
room_id: &RoomId,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Result<Vec<Event>, Self::Error> {
) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
}

Expand Down
7 changes: 1 addition & 6 deletions crates/matrix-sdk-common/src/linked_chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ impl LinkedChunkId<'_> {
LinkedChunkId::Room(room_id) => OwnedLinkedChunkId::Room((*room_id).to_owned()),
}
}

pub fn room_id(&self) -> &RoomId {
match self {
LinkedChunkId::Room(room_id) => room_id,
}
}
}

impl PartialEq<&OwnedLinkedChunkId> for LinkedChunkId<'_> {
Expand Down Expand Up @@ -168,6 +162,7 @@ impl Display for OwnedLinkedChunkId {
}

impl OwnedLinkedChunkId {
#[cfg(test)]
fn as_ref(&self) -> LinkedChunkId<'_> {
match self {
OwnedLinkedChunkId::Room(room_id) => LinkedChunkId::Room(room_id.as_ref()),
Expand Down
Loading
Loading