From 1ce90ab63e0134d87ae820eb9c470495ce9e8489 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 12 Aug 2024 13:56:54 +0200 Subject: [PATCH] fix(sdk): Remove `SlidingSyncInner::past_positions`. The patch https://github.com/matrix-org/matrix-rust-sdk/pull/2395 has introduced `SlidingSyncInner::past_positions` as a mechanism to filter duplicated responses. It was a problem because the sliding sync `ops` could easily create corrupted states if they were applied more than once. Since https://github.com/matrix-org/matrix-rust-sdk/pull/3664/, `ops` are ignored. Now, `past_positions` create a problem with the sliding sync native implementation inside Synapse because `pos` can stay the same between multiple responses. While `past_positions` was helpful to fix bugs in the past, it's no longer necessary today. Moreover, it breaks an invariant about `pos`: we must consider it as a blackbox. It means we must ignore if a `pos` value has been received in the past or not. This invariant has been broken for good reasons, but it now creates new issues. This patch removes `past_positions`, along with the associated code (like `Error::ResponseAlreadyReceived` for example). --- crates/matrix-sdk/src/sliding_sync/builder.rs | 5 +- crates/matrix-sdk/src/sliding_sync/error.rs | 8 -- crates/matrix-sdk/src/sliding_sync/mod.rs | 82 ++----------------- 3 files changed, 8 insertions(+), 87 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index abff2b17cd5..a3377a6d64d 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -1,13 +1,12 @@ use std::{ collections::BTreeMap, fmt::Debug, - num::NonZeroUsize, sync::{Arc, RwLock as StdRwLock}, time::Duration, }; use matrix_sdk_base::sliding_sync::http; -use matrix_sdk_common::{ring_buffer::RingBuffer, timer}; +use matrix_sdk_common::timer; use ruma::OwnedRoomId; use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock}; use url::Url; @@ -285,8 +284,6 @@ impl SlidingSyncBuilder { rooms, position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })), - // SAFETY: `unwrap` is safe because 20 is not zero. - past_positions: StdRwLock::new(RingBuffer::new(NonZeroUsize::new(20).unwrap())), sticky: StdRwLock::new(SlidingSyncStickyManager::new( SlidingSyncStickyParameters::new( diff --git a/crates/matrix-sdk/src/sliding_sync/error.rs b/crates/matrix-sdk/src/sliding_sync/error.rs index 34431f21fdc..15ebaf9e911 100644 --- a/crates/matrix-sdk/src/sliding_sync/error.rs +++ b/crates/matrix-sdk/src/sliding_sync/error.rs @@ -13,14 +13,6 @@ pub enum Error { #[error("The sliding sync response could not be handled: {0}")] BadResponse(String), - /// The response we've received from the server has already been received in - /// the past because it has a `pos` that we have recently seen. - #[error("The sliding sync response has already been received: `pos={pos:?}`")] - ResponseAlreadyReceived { - /// The `pos`ition that has been received. - pos: Option, - }, - /// A `SlidingSyncListRequestGenerator` has been used without having been /// initialized. It happens when a response is handled before a request has /// been sent. It usually happens when testing. diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index abeba273fd2..5ee776a0935 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -35,7 +35,7 @@ use std::{ use async_stream::stream; use futures_core::stream::Stream; pub use matrix_sdk_base::sliding_sync::http; -use matrix_sdk_common::{ring_buffer::RingBuffer, timer}; +use matrix_sdk_common::timer; use ruma::{ api::{client::error::ErrorKind, OutgoingRequest}, assign, OwnedEventId, OwnedRoomId, RoomId, @@ -112,9 +112,6 @@ pub(super) struct SlidingSyncInner { /// `position` being updated, before sending a new request. position: Arc>, - /// Past position markers. - past_positions: StdRwLock>, - /// The lists of this Sliding Sync instance. lists: AsyncRwLock>, @@ -258,23 +255,6 @@ impl SlidingSync { ) -> Result { let pos = Some(sliding_sync_response.pos.clone()); - { - debug!("Update position markers"); - - // Look up for this new `pos` in the past position markers. - let past_positions = self.inner.past_positions.read().unwrap(); - - // The `pos` received by the server has already been received in the past! - if past_positions.iter().any(|position| position.pos == pos) { - error!( - ?sliding_sync_response, - "Sliding Sync response has ALREADY been handled by the client in the past" - ); - - return Err(Error::ResponseAlreadyReceived { pos }.into()); - } - } - let must_process_rooms_response = self.must_process_rooms_response().await; trace!(yes = must_process_rooms_response, "Must process rooms response?"); @@ -425,10 +405,6 @@ impl SlidingSync { // Save the new position markers. position.pos = pos; - // Keep this position markers in memory, in case it pops from the server. - let mut past_positions = self.inner.past_positions.write().unwrap(); - past_positions.push(position.clone()); - Ok(update_summary) } @@ -734,11 +710,6 @@ impl SlidingSync { yield Ok(updates); } - // Here, errors we can safely ignore. - Err(crate::Error::SlidingSync(Error::ResponseAlreadyReceived { .. })) => { - continue; - } - // Here, errors we **cannot** ignore, and that must stop the sync loop. Err(error) => { if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { @@ -774,8 +745,8 @@ impl SlidingSync { /// Expire the current Sliding Sync session. /// - /// Expiring a Sliding Sync session means: resetting `pos`. It also cleans - /// up the `past_positions`, and resets sticky parameters. + /// Expiring a Sliding Sync session means: resetting `pos`. It also resets + /// sticky parameters. /// /// This should only be used when it's clear that this session was about to /// expire anyways, and should be used only in very specific cases (e.g. @@ -796,9 +767,6 @@ impl SlidingSync { "couldn't invalidate sliding sync frozen state when expiring session: {err}" ); } - - let mut past_positions = self.inner.past_positions.write().unwrap(); - past_positions.clear(); } // Force invalidation of all the sticky parameters. @@ -1471,11 +1439,6 @@ mod tests { // `pos` has been updated. assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned())); - - // `past_positions` has been updated. - let past_positions = sliding_sync.inner.past_positions.read().unwrap(); - assert_eq!(past_positions.len(), 1); - assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned())); } // Next request doesn't ask to enable the extension. @@ -1503,19 +1466,12 @@ mod tests { // `pos` has been updated. assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned())); - - // `past_positions` has been updated. - let past_positions = sliding_sync.inner.past_positions.read().unwrap(); - assert_eq!(past_positions.len(), 2); - assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned())); - assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned())); } - // Next request isn't successful because it receives an already + // Next request is successful despite it receives an already // received `pos` from the server. { - // First response with an already seen `pos`. - let _mock_guard1 = Mock::given(SlidingSyncMatcher) + let _mock_guard = Mock::given(SlidingSyncMatcher) .respond_with(|request: &Request| { // Repeat the txn_id in the response, if set. let request: PartialRequest = request.body_json().unwrap(); @@ -1529,37 +1485,16 @@ mod tests { .mount_as_scoped(&server) .await; - // Second response with a new `pos`. - let _mock_guard2 = Mock::given(SlidingSyncMatcher) - .respond_with(|request: &Request| { - // Repeat the txn_id in the response, if set. - let request: PartialRequest = request.body_json().unwrap(); - - ResponseTemplate::new(200).set_body_json(json!({ - "txn_id": request.txn_id, - "pos": "2", // <- new! - })) - }) - .mount_as_scoped(&server) - .await; - let next = sync.next().await; assert_matches!(next, Some(Ok(_update_summary))); // `pos` has been updated. - assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("2".to_owned())); - - // `past_positions` has been updated. - let past_positions = sliding_sync.inner.past_positions.read().unwrap(); - assert_eq!(past_positions.len(), 3); - assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned())); - assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned())); - assert_eq!(past_positions.get(2).unwrap().pos, Some("2".to_owned())); + assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned())); } // Stop responding with successful requests! // - // When responding with M_UNKNOWN_POS, that regenerates the sticky parameters, + // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters, // so they're reset. It also resets the `pos`. { let _mock_guard = Mock::given(SlidingSyncMatcher) @@ -1578,9 +1513,6 @@ mod tests { // `pos` has been reset. assert!(sliding_sync.inner.position.lock().await.pos.is_none()); - // `past_positions` has been reset. - assert!(sliding_sync.inner.past_positions.read().unwrap().is_empty()); - // Next request asks to enable the extension again. let (request, _, _) = sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;