Skip to content

Commit

Permalink
fix(sdk): Remove SlidingSyncInner::past_positions.
Browse files Browse the repository at this point in the history
The patch matrix-org#2395 has
introduced `SlidingSyncInner::past_positions` as a mechanism to filter
duplicated responses. It was a problem because the sliding sync `ops`
could easily create corrupted states if they were applied more than
once.

Since matrix-org#3664, `ops`
are ignored.

Now, `past_positions` create a problem with the sliding sync native
implementation inside Synapse because `pos` can stay the same between
multiple responses.

While `past_positions` was helpful to fix bugs in the past, it's no
longer necessary today. Moreover, it breaks an invariant about `pos`: we
must consider it as a blackbox. It means we must ignore if a `pos` value
has been received in the past or not. This invariant has been broken for
good reasons, but it now creates new issues.

This patch removes `past_positions`, along with the associated code
(like `Error::ResponseAlreadyReceived` for example).
  • Loading branch information
Hywan committed Aug 12, 2024
1 parent 35b62a1 commit 1ce90ab
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 87 deletions.
5 changes: 1 addition & 4 deletions crates/matrix-sdk/src/sliding_sync/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{
collections::BTreeMap,
fmt::Debug,
num::NonZeroUsize,
sync::{Arc, RwLock as StdRwLock},
time::Duration,
};

use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
use matrix_sdk_common::timer;
use ruma::OwnedRoomId;
use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
use url::Url;
Expand Down Expand Up @@ -285,8 +284,6 @@ impl SlidingSyncBuilder {
rooms,

position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
// SAFETY: `unwrap` is safe because 20 is not zero.
past_positions: StdRwLock::new(RingBuffer::new(NonZeroUsize::new(20).unwrap())),

sticky: StdRwLock::new(SlidingSyncStickyManager::new(
SlidingSyncStickyParameters::new(
Expand Down
8 changes: 0 additions & 8 deletions crates/matrix-sdk/src/sliding_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ pub enum Error {
#[error("The sliding sync response could not be handled: {0}")]
BadResponse(String),

/// The response we've received from the server has already been received in
/// the past because it has a `pos` that we have recently seen.
#[error("The sliding sync response has already been received: `pos={pos:?}`")]
ResponseAlreadyReceived {
/// The `pos`ition that has been received.
pos: Option<String>,
},

/// A `SlidingSyncListRequestGenerator` has been used without having been
/// initialized. It happens when a response is handled before a request has
/// been sent. It usually happens when testing.
Expand Down
82 changes: 7 additions & 75 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::{
use async_stream::stream;
use futures_core::stream::Stream;
pub use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
use matrix_sdk_common::timer;
use ruma::{
api::{client::error::ErrorKind, OutgoingRequest},
assign, OwnedEventId, OwnedRoomId, RoomId,
Expand Down Expand Up @@ -112,9 +112,6 @@ pub(super) struct SlidingSyncInner {
/// `position` being updated, before sending a new request.
position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,

/// Past position markers.
past_positions: StdRwLock<RingBuffer<SlidingSyncPositionMarkers>>,

/// The lists of this Sliding Sync instance.
lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,

Expand Down Expand Up @@ -258,23 +255,6 @@ impl SlidingSync {
) -> Result<UpdateSummary, crate::Error> {
let pos = Some(sliding_sync_response.pos.clone());

{
debug!("Update position markers");

// Look up for this new `pos` in the past position markers.
let past_positions = self.inner.past_positions.read().unwrap();

// The `pos` received by the server has already been received in the past!
if past_positions.iter().any(|position| position.pos == pos) {
error!(
?sliding_sync_response,
"Sliding Sync response has ALREADY been handled by the client in the past"
);

return Err(Error::ResponseAlreadyReceived { pos }.into());
}
}

let must_process_rooms_response = self.must_process_rooms_response().await;

trace!(yes = must_process_rooms_response, "Must process rooms response?");
Expand Down Expand Up @@ -425,10 +405,6 @@ impl SlidingSync {
// Save the new position markers.
position.pos = pos;

// Keep this position markers in memory, in case it pops from the server.
let mut past_positions = self.inner.past_positions.write().unwrap();
past_positions.push(position.clone());

Ok(update_summary)
}

Expand Down Expand Up @@ -734,11 +710,6 @@ impl SlidingSync {
yield Ok(updates);
}

// Here, errors we can safely ignore.
Err(crate::Error::SlidingSync(Error::ResponseAlreadyReceived { .. })) => {
continue;
}

// Here, errors we **cannot** ignore, and that must stop the sync loop.
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
Expand Down Expand Up @@ -774,8 +745,8 @@ impl SlidingSync {

/// Expire the current Sliding Sync session.
///
/// Expiring a Sliding Sync session means: resetting `pos`. It also cleans
/// up the `past_positions`, and resets sticky parameters.
/// Expiring a Sliding Sync session means: resetting `pos`. It also resets
/// sticky parameters.
///
/// This should only be used when it's clear that this session was about to
/// expire anyways, and should be used only in very specific cases (e.g.
Expand All @@ -796,9 +767,6 @@ impl SlidingSync {
"couldn't invalidate sliding sync frozen state when expiring session: {err}"
);
}

let mut past_positions = self.inner.past_positions.write().unwrap();
past_positions.clear();
}

// Force invalidation of all the sticky parameters.
Expand Down Expand Up @@ -1471,11 +1439,6 @@ mod tests {

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 1);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
}

// Next request doesn't ask to enable the extension.
Expand Down Expand Up @@ -1503,19 +1466,12 @@ mod tests {

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 2);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
}

// Next request isn't successful because it receives an already
// Next request is successful despite it receives an already
// received `pos` from the server.
{
// First response with an already seen `pos`.
let _mock_guard1 = Mock::given(SlidingSyncMatcher)
let _mock_guard = Mock::given(SlidingSyncMatcher)
.respond_with(|request: &Request| {
// Repeat the txn_id in the response, if set.
let request: PartialRequest = request.body_json().unwrap();
Expand All @@ -1529,37 +1485,16 @@ mod tests {
.mount_as_scoped(&server)
.await;

// Second response with a new `pos`.
let _mock_guard2 = Mock::given(SlidingSyncMatcher)
.respond_with(|request: &Request| {
// Repeat the txn_id in the response, if set.
let request: PartialRequest = request.body_json().unwrap();

ResponseTemplate::new(200).set_body_json(json!({
"txn_id": request.txn_id,
"pos": "2", // <- new!
}))
})
.mount_as_scoped(&server)
.await;

let next = sync.next().await;
assert_matches!(next, Some(Ok(_update_summary)));

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("2".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 3);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
assert_eq!(past_positions.get(2).unwrap().pos, Some("2".to_owned()));
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
}

// Stop responding with successful requests!
//
// When responding with M_UNKNOWN_POS, that regenerates the sticky parameters,
// When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
// so they're reset. It also resets the `pos`.
{
let _mock_guard = Mock::given(SlidingSyncMatcher)
Expand All @@ -1578,9 +1513,6 @@ mod tests {
// `pos` has been reset.
assert!(sliding_sync.inner.position.lock().await.pos.is_none());

// `past_positions` has been reset.
assert!(sliding_sync.inner.past_positions.read().unwrap().is_empty());

// Next request asks to enable the extension again.
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
Expand Down

0 comments on commit 1ce90ab

Please sign in to comment.