Skip to content

Commit

Permalink
integration tests: rewrite test_toggling_reaction so it syncs in th…
Browse files Browse the repository at this point in the history
…e background
  • Loading branch information
bnjbvr committed Mar 25, 2024
1 parent 9480450 commit ce7143b
Showing 1 changed file with 70 additions and 64 deletions.
134 changes: 70 additions & 64 deletions testing/matrix-sdk-integration-testing/src/tests/reactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,37 @@ use anyhow::Result;
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use assign::assign;
use eyeball_im::VectorDiff;
use eyeball_im::{Vector, VectorDiff};
use futures_core::Stream;
use futures_util::{future::join_all, FutureExt, StreamExt};
use matrix_sdk::{
config::SyncSettings,
ruma::{
api::client::room::create_room::v3::Request as CreateRoomRequest,
events::{relation::Annotation, room::message::RoomMessageEventContent},
EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId,
},
Client, LoopCtrl,
use matrix_sdk::ruma::{
api::client::room::create_room::v3::Request as CreateRoomRequest,
events::{relation::Annotation, room::message::RoomMessageEventContent},
EventId, MilliSecondsSinceUnixEpoch, UserId,
};
use matrix_sdk_ui::timeline::{EventTimelineItem, RoomExt, TimelineItem};
use tokio::{spawn, task::JoinHandle, time::timeout};
use tracing::debug;
use tokio::{
spawn,
task::JoinHandle,
time::{sleep, timeout},
};
use tracing::{debug, warn};

use crate::helpers::TestClientBuilder;

/// Sync until we receive an update for a room.
///
/// Beware: it may sync forever, if it doesn't receive such an update!
async fn sync_until_update_for_room(client: &Client, room_id: &RoomId) -> Result<()> {
client
.sync_with_callback(SyncSettings::default(), |response| async move {
if response.rooms.join.iter().any(|(id, _)| id == room_id) {
LoopCtrl::Break
} else {
LoopCtrl::Continue
}
})
.await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_toggling_reaction() -> Result<()> {
// Set up sync for user Alice, and create a room.
let alice = TestClientBuilder::new("alice".to_owned()).use_sqlite().build().await?;
let alice = TestClientBuilder::new("alice".to_owned())
.randomize_username()
.use_sqlite()
.build()
.await?;

let alice_clone = alice.clone();
let alice_sync = spawn(async move {
alice_clone.sync(Default::default()).await.expect("sync failed!");
});

debug!("Creating room…");
let user_id = alice.user_id().unwrap().to_owned();
Expand All @@ -71,60 +64,71 @@ async fn test_toggling_reaction() -> Result<()> {
// message. Otherwise, we might get the remote echo before we started
// waiting for it.

let room_clone = room.clone();
let event_id_task: JoinHandle<Result<_>> = spawn(async move {
let mut num_attempts = 0;
let room = room_clone;
let timeline = room.timeline().await.unwrap();
let (_items, mut stream) = timeline.subscribe().await;

loop {
debug!(attempt = num_attempts + 1, "Syncing until the remote echo arrives…");

// We expect a quick update from sync, so timeout after 10 seconds, and ignore
// timeouts; they might just indicate we received the necessary
// information on the previous attempt, but racily tried to read the
// timeline items.
if let Ok(sync_result) =
timeout(Duration::from_secs(10), sync_until_update_for_room(&alice, room.room_id()))
.await
{
sync_result?;
}
let timeline = room.timeline().await.unwrap();
let (mut items, mut stream) = timeline.subscribe().await;

// Let time to the timeline for processing the update, at most 1 second.
let _ = timeout(Duration::from_secs(1), stream.next()).await;
let event_id_task: JoinHandle<Result<_>> = spawn(async move {
let find_event_id = |items: &Vector<Arc<TimelineItem>>| {
items.iter().find_map(|item| {
let event = item.as_event()?;
if event.content().as_message()?.body().trim() == "hi!" {
event.event_id().map(|event_id| event_id.to_owned())
} else {
None
}
})
};

if let Some(event_id) = find_event_id(&items) {
return Ok(event_id);
}

let items = timeline.items().await;
let last_item = items.last().unwrap().as_event().unwrap();
if !last_item.is_local_echo() {
return Ok(last_item.event_id().unwrap().to_owned());
}
warn!(?items, "Waiting for updates…");

if num_attempts == 2 {
panic!("had 3 sync responses and no echo of our own event");
while let Some(diff) = stream.next().await {
warn!(?diff, "received a diff");
diff.apply(&mut items);
if let Some(event_id) = find_event_id(&items) {
return Ok(event_id);
}
num_attempts += 1;
}

unreachable!();
});

// Create a timeline for this room.
debug!("Creating timeline…");
let timeline = room.timeline().await.unwrap();
let (_items, mut stream) = timeline.subscribe().await;

// Send message.
debug!("Sending initial message…");
timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await;

let event_id =
event_id_task.await.expect("failed to join").expect("Waiting for the event id failed");
let event_id = timeout(Duration::from_secs(10), event_id_task)
.await
.expect("timeout")
.expect("failed to join tokio task")
.expect("waiting for the event id failed");

alice_sync.abort();
let _ = alice_sync.await;

// Give a bit of time for the timeline to process all sync updates.
sleep(Duration::from_secs(1)).await;

let (mut items, mut stream) = timeline.subscribe().await;

// Skip all stream updates that have happened so far.
debug!("Skipping all other stream updates…");
while stream.next().now_or_never().is_some() {}
while let Some(Some(diff)) = stream.next().now_or_never() {
diff.apply(&mut items);
}

let message_position = timeline.items().await.len() - 1;
let message_position = items
.iter()
.enumerate()
.find_map(|(i, item)| (item.as_event()?.event_id()? == event_id).then_some(i))
.expect("couldn't find the final position for the event id");

let reaction_key = "👍";
let reaction = Annotation::new(event_id.clone(), reaction_key.into());
Expand Down Expand Up @@ -221,8 +225,10 @@ async fn assert_remote_added(
message_position: usize,
) {
let event = assert_event_is_updated(stream, event_id, message_position).await;

let reactions = event.reactions().get(&reaction.key).unwrap();
assert_eq!(reactions.senders().count(), 1);

let reaction = reactions.by_sender(user_id).next().unwrap();
let (reaction_tx_id, reaction_event_id) = reaction;
assert_matches!(reaction_tx_id, None);
Expand All @@ -241,7 +247,7 @@ async fn assert_event_is_updated(
index: usize,
) -> EventTimelineItem {
assert_let!(Some(VectorDiff::Set { index: i, value: event }) = stream.next().await);
assert_eq!(i, index);
assert_eq!(i, index, "unexpected position for event update, value = {event:?}");

let event = event.as_event().unwrap();
assert_eq!(event.event_id().unwrap(), event_id);
Expand Down

0 comments on commit ce7143b

Please sign in to comment.