Skip to content
17 changes: 11 additions & 6 deletions benchmarks/benches/linked_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use matrix_sdk::{
linked_chunk::{lazy_loader, LinkedChunk, Update},
linked_chunk::{lazy_loader, LinkedChunk, LinkedChunkId, Update},
SqliteEventCacheStore,
};
use matrix_sdk_base::event_cache::{
Expand All @@ -29,6 +29,7 @@ fn writing(c: &mut Criterion) {
.expect("Failed to create an asynchronous runtime");

let room_id = room_id!("!foo:bar.baz");
let linked_chunk_id = LinkedChunkId::Room(room_id);
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);

let mut group = c.benchmark_group("writing");
Expand Down Expand Up @@ -115,9 +116,9 @@ fn writing(c: &mut Criterion) {

if let Some(store) = &store {
let updates = linked_chunk.updates().unwrap().take();
store.handle_linked_chunk_updates(room_id, updates).await.unwrap();
store.handle_linked_chunk_updates(linked_chunk_id, updates).await.unwrap();
// Empty the store.
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await.unwrap();
store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await.unwrap();
}

},
Expand Down Expand Up @@ -145,6 +146,7 @@ fn reading(c: &mut Criterion) {
.expect("Failed to create an asynchronous runtime");

let room_id = room_id!("!foo:bar.baz");
let linked_chunk_id = LinkedChunkId::Room(room_id);
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);

let mut group = c.benchmark_group("reading");
Expand Down Expand Up @@ -195,7 +197,9 @@ fn reading(c: &mut Criterion) {

// Now persist the updates to recreate this full linked chunk.
let updates = lc.updates().unwrap().take();
runtime.block_on(store.handle_linked_chunk_updates(room_id, updates)).unwrap();
runtime
.block_on(store.handle_linked_chunk_updates(linked_chunk_id, updates))
.unwrap();
}

// Define the throughput.
Expand All @@ -206,7 +210,8 @@ fn reading(c: &mut Criterion) {
// Bench the routine.
bencher.to_async(&runtime).iter(|| async {
// Load the last chunk first,
let (last_chunk, chunk_id_gen) = store.load_last_chunk(room_id).await.unwrap();
let (last_chunk, chunk_id_gen) =
store.load_last_chunk(linked_chunk_id).await.unwrap();

let mut lc =
lazy_loader::from_last_chunk::<128, _, _>(last_chunk, chunk_id_gen)
Expand All @@ -216,7 +221,7 @@ fn reading(c: &mut Criterion) {
// Then load until the start of the linked chunk.
let mut cur_chunk_id = lc.chunks().next().unwrap().identifier();
while let Some(prev) =
store.load_previous_chunk(room_id, cur_chunk_id).await.unwrap()
store.load_previous_chunk(linked_chunk_id, cur_chunk_id).await.unwrap()
{
cur_chunk_id = prev.identifier;
lazy_loader::insert_new_first_chunk(&mut lc, prev)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benches/room_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
.lock()
.await
.unwrap()
.clear_all_rooms_chunks()
.clear_all_linked_chunks()
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ impl Client {
.map_err(EventCacheError::from)?;

// Clear all the room chunks. It's important to *not* call
// `EventCacheStore::clear_all_rooms_chunks` here, because there might be live
// `EventCacheStore::clear_all_linked_chunks` here, because there might be live
// observers of the linked chunks, and that would cause some very bad state
// mismatch.
self.inner.event_cache().clear_all_rooms().await?;
Expand Down
105 changes: 63 additions & 42 deletions crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use matrix_sdk_common::{
AlgorithmInfo, DecryptedRoomEvent, EncryptionInfo, TimelineEvent, TimelineEventKind,
VerificationState,
},
linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifier as CId, Position, Update},
linked_chunk::{
lazy_loader, ChunkContent, ChunkIdentifier as CId, LinkedChunkId, Position, Update,
},
};
use matrix_sdk_test::{event_factory::EventFactory, ALICE, DEFAULT_TEST_ROOM_ID};
use ruma::{
Expand Down Expand Up @@ -132,7 +134,7 @@ pub trait EventCacheStoreIntegrationTests {
async fn test_rebuild_empty_linked_chunk(&self);

/// Test that clear all the rooms' linked chunks works.
async fn test_clear_all_rooms_chunks(&self);
async fn test_clear_all_linked_chunks(&self);

/// Test that removing a room from storage empties all associated data.
async fn test_remove_room(&self);
Expand Down Expand Up @@ -337,9 +339,10 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

async fn test_handle_updates_and_rebuild_linked_chunk(&self) {
let room_id = room_id!("!r0:matrix.org");
let linked_chunk_id = LinkedChunkId::Room(room_id);

self.handle_linked_chunk_updates(
room_id,
linked_chunk_id,
vec![
// new chunk
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand Down Expand Up @@ -371,10 +374,11 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.unwrap();

// The linked chunk is correctly reloaded.
let lc =
lazy_loader::from_all_chunks::<3, _, _>(self.load_all_chunks(room_id).await.unwrap())
.unwrap()
.unwrap();
let lc = lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(linked_chunk_id).await.unwrap(),
)
.unwrap()
.unwrap();

let mut chunks = lc.chunks();

Expand Down Expand Up @@ -415,19 +419,20 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

async fn test_linked_chunk_incremental_loading(&self) {
let room_id = room_id!("!r0:matrix.org");
let linked_chunk_id = LinkedChunkId::Room(room_id);
let event = |msg: &str| make_test_event(room_id, msg);

// Load the last chunk, but none exists yet.
{
let (last_chunk, chunk_identifier_generator) =
self.load_last_chunk(room_id).await.unwrap();
self.load_last_chunk(linked_chunk_id).await.unwrap();

assert!(last_chunk.is_none());
assert_eq!(chunk_identifier_generator.current(), 0);
}

self.handle_linked_chunk_updates(
room_id,
linked_chunk_id,
vec![
// new chunk for items
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand Down Expand Up @@ -458,7 +463,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Load the last chunk.
let mut linked_chunk = {
let (last_chunk, chunk_identifier_generator) =
self.load_last_chunk(room_id).await.unwrap();
self.load_last_chunk(linked_chunk_id).await.unwrap();

assert_eq!(chunk_identifier_generator.current(), 2);

Expand Down Expand Up @@ -493,7 +498,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
{
let first_chunk = linked_chunk.chunks().next().unwrap().identifier();
let previous_chunk =
self.load_previous_chunk(room_id, first_chunk).await.unwrap().unwrap();
self.load_previous_chunk(linked_chunk_id, first_chunk).await.unwrap().unwrap();

lazy_loader::insert_new_first_chunk(&mut linked_chunk, previous_chunk).unwrap();

Expand Down Expand Up @@ -530,7 +535,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
{
let first_chunk = linked_chunk.chunks().next().unwrap().identifier();
let previous_chunk =
self.load_previous_chunk(room_id, first_chunk).await.unwrap().unwrap();
self.load_previous_chunk(linked_chunk_id, first_chunk).await.unwrap().unwrap();

lazy_loader::insert_new_first_chunk(&mut linked_chunk, previous_chunk).unwrap();

Expand Down Expand Up @@ -579,7 +584,8 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Load the previous chunk: there is none.
{
let first_chunk = linked_chunk.chunks().next().unwrap().identifier();
let previous_chunk = self.load_previous_chunk(room_id, first_chunk).await.unwrap();
let previous_chunk =
self.load_previous_chunk(linked_chunk_id, first_chunk).await.unwrap();

assert!(previous_chunk.is_none());
}
Expand Down Expand Up @@ -631,19 +637,21 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
async fn test_rebuild_empty_linked_chunk(&self) {
// When I rebuild a linked chunk from an empty store, it's empty.
let linked_chunk = lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(&DEFAULT_TEST_ROOM_ID).await.unwrap(),
self.load_all_chunks(LinkedChunkId::Room(&DEFAULT_TEST_ROOM_ID)).await.unwrap(),
)
.unwrap();
assert!(linked_chunk.is_none());
}

async fn test_clear_all_rooms_chunks(&self) {
async fn test_clear_all_linked_chunks(&self) {
let r0 = room_id!("!r0:matrix.org");
let linked_chunk_id0 = LinkedChunkId::Room(r0);
let r1 = room_id!("!r1:matrix.org");
let linked_chunk_id1 = LinkedChunkId::Room(r1);

// Add updates for the first room.
self.handle_linked_chunk_updates(
r0,
linked_chunk_id0,
vec![
// new chunk
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand All @@ -659,7 +667,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

// Add updates for the second room.
self.handle_linked_chunk_updates(
r1,
linked_chunk_id1,
vec![
// Empty items chunk.
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand All @@ -683,32 +691,42 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.unwrap();

// Sanity check: both linked chunks can be reloaded.
assert!(lazy_loader::from_all_chunks::<3, _, _>(self.load_all_chunks(r0).await.unwrap())
.unwrap()
.is_some());
assert!(lazy_loader::from_all_chunks::<3, _, _>(self.load_all_chunks(r1).await.unwrap())
.unwrap()
.is_some());
assert!(lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(linked_chunk_id0).await.unwrap()
)
.unwrap()
.is_some());
assert!(lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(linked_chunk_id1).await.unwrap()
)
.unwrap()
.is_some());

// Clear the chunks.
self.clear_all_rooms_chunks().await.unwrap();
self.clear_all_linked_chunks().await.unwrap();

// Both rooms now have no linked chunk.
assert!(lazy_loader::from_all_chunks::<3, _, _>(self.load_all_chunks(r0).await.unwrap())
.unwrap()
.is_none());
assert!(lazy_loader::from_all_chunks::<3, _, _>(self.load_all_chunks(r1).await.unwrap())
.unwrap()
.is_none());
assert!(lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(linked_chunk_id0).await.unwrap()
)
.unwrap()
.is_none());
assert!(lazy_loader::from_all_chunks::<3, _, _>(
self.load_all_chunks(linked_chunk_id1).await.unwrap()
)
.unwrap()
.is_none());
}

async fn test_remove_room(&self) {
let r0 = room_id!("!r0:matrix.org");
let linked_chunk_id0 = LinkedChunkId::Room(r0);
let r1 = room_id!("!r1:matrix.org");
let linked_chunk_id1 = LinkedChunkId::Room(r1);

// Add updates to the first room.
self.handle_linked_chunk_updates(
r0,
linked_chunk_id0,
vec![
// new chunk
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand All @@ -724,7 +742,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

// Add updates to the second room.
self.handle_linked_chunk_updates(
r1,
linked_chunk_id1,
vec![
// new chunk
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Expand All @@ -742,17 +760,19 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
self.remove_room(r0).await.unwrap();

// Check that r0 doesn't have a linked chunk anymore.
let r0_linked_chunk = self.load_all_chunks(r0).await.unwrap();
let r0_linked_chunk = self.load_all_chunks(linked_chunk_id0).await.unwrap();
assert!(r0_linked_chunk.is_empty());

// Check that r1 is unaffected.
let r1_linked_chunk = self.load_all_chunks(r1).await.unwrap();
let r1_linked_chunk = self.load_all_chunks(linked_chunk_id1).await.unwrap();
assert!(!r1_linked_chunk.is_empty());
}

async fn test_filter_duplicated_events(&self) {
let room_id = room_id!("!r0:matrix.org");
let linked_chunk_id = LinkedChunkId::Room(room_id);
let another_room_id = room_id!("!r1:matrix.org");
let another_linked_chunk_id = LinkedChunkId::Room(another_room_id);
let event = |msg: &str| make_test_event(room_id, msg);

let event_comte = event("comté");
Expand All @@ -764,7 +784,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
let event_mont_dor = event("mont d'or");

self.handle_linked_chunk_updates(
room_id,
linked_chunk_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
Expand All @@ -790,7 +810,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
// Add other events in another room, to ensure filtering take the `room_id` into
// account.
self.handle_linked_chunk_updates(
another_room_id,
another_linked_chunk_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
Expand All @@ -804,7 +824,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

let duplicated_events = self
.filter_duplicated_events(
room_id,
linked_chunk_id,
vec![
event_comte.event_id().unwrap().to_owned(),
event_raclette.event_id().unwrap().to_owned(),
Expand Down Expand Up @@ -835,14 +855,15 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
async fn test_find_event(&self) {
let room_id = room_id!("!r0:matrix.org");
let another_room_id = room_id!("!r1:matrix.org");
let another_linked_chunk_id = LinkedChunkId::Room(another_room_id);
let event = |msg: &str| make_test_event(room_id, msg);

let event_comte = event("comté");
let event_gruyere = event("gruyère");

// Add one event in one room.
self.handle_linked_chunk_updates(
room_id,
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
Expand All @@ -856,7 +877,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {

// Add another event in another room.
self.handle_linked_chunk_updates(
another_room_id,
another_linked_chunk_id,
vec![
Update::NewItemsChunk { previous: None, new: CId::new(0), next: None },
Update::PushItems {
Expand Down Expand Up @@ -885,7 +906,7 @@ impl EventCacheStoreIntegrationTests for DynEventCacheStore {
.is_none());

// Clearing the rooms also clears the event's storage.
self.clear_all_rooms_chunks().await.expect("failed to clear all rooms chunks");
self.clear_all_linked_chunks().await.expect("failed to clear all rooms chunks");
assert!(self
.find_event(room_id, event_comte.event_id().unwrap().as_ref())
.await
Expand Down Expand Up @@ -1085,10 +1106,10 @@ macro_rules! event_cache_store_integration_tests {
}

#[async_test]
async fn test_clear_all_rooms_chunks() {
async fn test_clear_all_linked_chunks() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_clear_all_rooms_chunks().await;
event_cache_store.test_clear_all_linked_chunks().await;
}

#[async_test]
Expand Down
Loading
Loading