Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): Remove SlidingSyncInner::past_positions #3833

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/matrix-sdk/src/sliding_sync/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{
collections::BTreeMap,
fmt::Debug,
num::NonZeroUsize,
sync::{Arc, RwLock as StdRwLock},
time::Duration,
};

use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
use matrix_sdk_common::timer;
use ruma::OwnedRoomId;
use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
use url::Url;
Expand Down Expand Up @@ -285,8 +284,6 @@ impl SlidingSyncBuilder {
rooms,

position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
// SAFETY: `unwrap` is safe because 20 is not zero.
past_positions: StdRwLock::new(RingBuffer::new(NonZeroUsize::new(20).unwrap())),

sticky: StdRwLock::new(SlidingSyncStickyManager::new(
SlidingSyncStickyParameters::new(
Expand Down
8 changes: 0 additions & 8 deletions crates/matrix-sdk/src/sliding_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ pub enum Error {
#[error("The sliding sync response could not be handled: {0}")]
BadResponse(String),

/// The response we've received from the server has already been received in
/// the past because it has a `pos` that we have recently seen.
#[error("The sliding sync response has already been received: `pos={pos:?}`")]
ResponseAlreadyReceived {
/// The `pos`ition that has been received.
pos: Option<String>,
},

/// A `SlidingSyncListRequestGenerator` has been used without having been
/// initialized. It happens when a response is handled before a request has
/// been sent. It usually happens when testing.
Expand Down
82 changes: 7 additions & 75 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::{
use async_stream::stream;
use futures_core::stream::Stream;
pub use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
use matrix_sdk_common::timer;
use ruma::{
api::{client::error::ErrorKind, OutgoingRequest},
assign, OwnedEventId, OwnedRoomId, RoomId,
Expand Down Expand Up @@ -112,9 +112,6 @@ pub(super) struct SlidingSyncInner {
/// `position` being updated, before sending a new request.
position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,

/// Past position markers.
past_positions: StdRwLock<RingBuffer<SlidingSyncPositionMarkers>>,

/// The lists of this Sliding Sync instance.
lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,

Expand Down Expand Up @@ -258,23 +255,6 @@ impl SlidingSync {
) -> Result<UpdateSummary, crate::Error> {
let pos = Some(sliding_sync_response.pos.clone());

{
debug!("Update position markers");

// Look up for this new `pos` in the past position markers.
let past_positions = self.inner.past_positions.read().unwrap();

// The `pos` received by the server has already been received in the past!
if past_positions.iter().any(|position| position.pos == pos) {
error!(
?sliding_sync_response,
"Sliding Sync response has ALREADY been handled by the client in the past"
);

return Err(Error::ResponseAlreadyReceived { pos }.into());
}
}

let must_process_rooms_response = self.must_process_rooms_response().await;

trace!(yes = must_process_rooms_response, "Must process rooms response?");
Expand Down Expand Up @@ -425,10 +405,6 @@ impl SlidingSync {
// Save the new position markers.
position.pos = pos;

// Keep this position markers in memory, in case it pops from the server.
let mut past_positions = self.inner.past_positions.write().unwrap();
past_positions.push(position.clone());

Ok(update_summary)
}

Expand Down Expand Up @@ -734,11 +710,6 @@ impl SlidingSync {
yield Ok(updates);
}

// Here, errors we can safely ignore.
Err(crate::Error::SlidingSync(Error::ResponseAlreadyReceived { .. })) => {
continue;
}

// Here, errors we **cannot** ignore, and that must stop the sync loop.
Err(error) => {
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
Expand Down Expand Up @@ -774,8 +745,8 @@ impl SlidingSync {

/// Expire the current Sliding Sync session.
///
/// Expiring a Sliding Sync session means: resetting `pos`. It also cleans
/// up the `past_positions`, and resets sticky parameters.
/// Expiring a Sliding Sync session means: resetting `pos`. It also resets
/// sticky parameters.
///
/// This should only be used when it's clear that this session was about to
/// expire anyways, and should be used only in very specific cases (e.g.
Expand All @@ -796,9 +767,6 @@ impl SlidingSync {
"couldn't invalidate sliding sync frozen state when expiring session: {err}"
);
}

let mut past_positions = self.inner.past_positions.write().unwrap();
past_positions.clear();
}

// Force invalidation of all the sticky parameters.
Expand Down Expand Up @@ -1471,11 +1439,6 @@ mod tests {

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 1);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
}

// Next request doesn't ask to enable the extension.
Expand Down Expand Up @@ -1503,19 +1466,12 @@ mod tests {

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 2);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
}

// Next request isn't successful because it receives an already
// Next request is successful despite it receives an already
// received `pos` from the server.
{
// First response with an already seen `pos`.
let _mock_guard1 = Mock::given(SlidingSyncMatcher)
let _mock_guard = Mock::given(SlidingSyncMatcher)
.respond_with(|request: &Request| {
// Repeat the txn_id in the response, if set.
let request: PartialRequest = request.body_json().unwrap();
Expand All @@ -1529,37 +1485,16 @@ mod tests {
.mount_as_scoped(&server)
.await;

// Second response with a new `pos`.
let _mock_guard2 = Mock::given(SlidingSyncMatcher)
.respond_with(|request: &Request| {
// Repeat the txn_id in the response, if set.
let request: PartialRequest = request.body_json().unwrap();

ResponseTemplate::new(200).set_body_json(json!({
"txn_id": request.txn_id,
"pos": "2", // <- new!
}))
})
.mount_as_scoped(&server)
.await;

let next = sync.next().await;
assert_matches!(next, Some(Ok(_update_summary)));

// `pos` has been updated.
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("2".to_owned()));

// `past_positions` has been updated.
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
assert_eq!(past_positions.len(), 3);
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
assert_eq!(past_positions.get(2).unwrap().pos, Some("2".to_owned()));
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
}

// Stop responding with successful requests!
//
// When responding with M_UNKNOWN_POS, that regenerates the sticky parameters,
// When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
// so they're reset. It also resets the `pos`.
{
let _mock_guard = Mock::given(SlidingSyncMatcher)
Expand All @@ -1578,9 +1513,6 @@ mod tests {
// `pos` has been reset.
assert!(sliding_sync.inner.position.lock().await.pos.is_none());

// `past_positions` has been reset.
assert!(sliding_sync.inner.past_positions.read().unwrap().is_empty());

// Next request asks to enable the extension again.
let (request, _, _) =
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
Expand Down
Loading