Skip to content

Commit

Permalink
feat(sdk): Remove the experimental-not-simplified-sliding-sync feat…
Browse files Browse the repository at this point in the history
…ure flag.

This patch is the second over two patches to change the support
of Simplified MSC3575 from a compiler feature flag to a runtime
configuration (see previous patch for the first one).

First off, this patch removes all uses of the
`experimental-not-simplified-sliding-sync` feature flag.

Second, this patch uses the `Client::is_simplified_sliding_sync_enabled`
method to know whether Simplified MSC3575 `Request`s must be transformed
into MSC3575 `Request`s and vice versa for the `Response`s.

To achieve that, the `SlidingSync::sync_once` method moves most of
its body into a new inner private `send` method that takes a generic
for the `Request` because now it can be `http::msc3575::Request`
or a `http::simplified_msc3575::Request`. This `send` function
has a specific bound for the `Request::IncomingResponse`:
`http::Into::<http::simplified_msc3575::Response>`, so that we are sure
we can get back a Simplified MSC3575 `Response` in all cases. To make
it work in both cases, this patch adds a `From<Response> for Response`
implementation in `http::simplified_msc3575`.
  • Loading branch information
Hywan committed Jul 12, 2024
1 parent bfee923 commit a8a1315
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 113 deletions.
2 changes: 0 additions & 2 deletions bindings/matrix-sdk-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ features = [
"e2e-encryption",
"experimental-oidc",
"experimental-sliding-sync",
"experimental-not-simplified-sliding-sync", # we don't want simplified MSC3575 yet!
"experimental-widgets",
"markdown",
"rustls-tls", # note: differ from block below
Expand All @@ -73,7 +72,6 @@ features = [
"e2e-encryption",
"experimental-oidc",
"experimental-sliding-sync",
"experimental-not-simplified-sliding-sync", # we don't want simplified MSC3575 yet!
"experimental-widgets",
"markdown",
"native-tls", # note: differ from block above
Expand Down
6 changes: 6 additions & 0 deletions crates/matrix-sdk-base/src/sliding_sync/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ pub mod simplified_msc3575 {

use super::{msc3575 as ss, From};

impl From<Response> for Response {
fn from(value: Response) -> Self {
value
}
}

impl From<ss::Response> for Response {
fn from(value: ss::Response) -> Self {
let mut s = Self::new(value.pos);
Expand Down
1 change: 0 additions & 1 deletion crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ experimental-sliding-sync = [
"matrix-sdk-base/experimental-sliding-sync",
"reqwest/gzip",
]
experimental-not-simplified-sliding-sync = ["experimental-sliding-sync"]
experimental-widgets = ["dep:language-tags", "dep:uuid"]

docsrs = ["e2e-encryption", "sqlite", "indexeddb", "sso-login", "qrcode", "image-proc"]
Expand Down
236 changes: 132 additions & 104 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use async_stream::stream;
use futures_core::stream::Stream;
pub use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
use ruma::{api::client::error::ErrorKind, assign, OwnedEventId, OwnedRoomId, RoomId};
use ruma::{
api::{client::error::ErrorKind, OutgoingRequest},
assign, OwnedEventId, OwnedRoomId, RoomId,
};
use serde::{Deserialize, Serialize};
use tokio::{
select, spawn,
Expand Down Expand Up @@ -536,127 +539,152 @@ impl SlidingSync {

#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, mut position_guard) =
let (request, request_config, position_guard) =
self.generate_sync_request(&mut LazyTransactionId::new()).await?;

// The code manipulates `Request` and `Response` from Simplified MSC3575
// because it's simpler. If the `experimental-not-simplified-sliding-sync`
// feature flag is turned on, we transform the Simplified MSC3575 `Request`
// into a MSC3575 `Request`.
#[cfg(feature = "experimental-not-simplified-sliding-sync")]
let request: http::msc3575::Request = http::From::from(request);

debug!("Sending request");

// Prepare the request.
let request =
self.inner.client.send(request, Some(request_config)).with_homeserver_override(
self.inner.sliding_sync_proxy.as_ref().map(ToString::to_string),
);
// because it's the future standard. If
// `Client::is_simplified_sliding_sync_enabled` is turned off, the
// Simplified MSC3575 `Request` must be transformed into a MSC3575 `Request`.
return if !self.inner.client.is_simplified_sliding_sync_enabled() {
send(
self,
http::Into::<http::msc3575::Request>::into(request),
request_config,
position_guard,
)
.await
} else {
send(self, request, request_config, position_guard).await
};

// Send the request and get a response with end-to-end encryption support.
//
// Sending the `/sync` request out when end-to-end encryption is enabled means
// that we need to also send out any outgoing e2ee related request out
// coming from the `OlmMachine::outgoing_requests()` method.
// The sending logic. It takes a generic `Request` because it can be a
// Simplified MSC3575 or a MSC3575 `Request`.
async fn send<Request>(
this: &SlidingSync,
request: Request,
request_config: RequestConfig,
mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
) -> Result<UpdateSummary>
where
Request: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
Request::IncomingResponse: Send
+ Sync
+
// This is required to get back a Simplified MSC3575 `Response` whatever the
// `Request` type.
http::Into<http::Response>,
crate::HttpError: From<ruma::api::error::FromHttpResponseError<Request::EndpointError>>,
{
debug!("Sending request");

#[cfg(feature = "e2e-encryption")]
let response = {
if self.is_e2ee_enabled() {
// Here, we need to run 2 things:
//
// 1. Send the sliding sync request and get a response,
// 2. Send the E2EE requests.
//
// We don't want to use a `join` or `try_join` because we want to fail if and
// only if sending the sliding sync request fails. Failing to send the E2EE
// requests should just result in a log.
//
// We also want to give the priority to sliding sync request. E2EE requests are
// sent concurrently to the sliding sync request, but the priority is on waiting
// a sliding sync response.
//
// If sending sliding sync request fails, the sending of E2EE requests must be
// aborted as soon as possible.
// Prepare the request.
let request =
this.inner.client.send(request, Some(request_config)).with_homeserver_override(
this.inner.sliding_sync_proxy.as_ref().map(ToString::to_string),
);

let client = self.inner.client.clone();
let e2ee_uploads = spawn(async move {
if let Err(error) = client.send_outgoing_requests().await {
error!(?error, "Error while sending outgoing E2EE requests");
}
})
// Ensure that the task is not running in detached mode. It is aborted when it's
// dropped.
.abort_on_drop();

// Wait on the sliding sync request success or failure early.
let response = request.await?;

// At this point, if `request` has been resolved successfully, we wait on
// `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
// long. Otherwise —if `request` has failed— `e2ee_uploads` has
// been dropped, so aborted.
e2ee_uploads.await.map_err(|error| Error::JoinError {
task_description: "e2ee_uploads".to_owned(),
error,
})?;

response
} else {
request.await?
}
};
// Send the request and get a response with end-to-end encryption support.
//
// Sending the `/sync` request out when end-to-end encryption is enabled means
// that we need to also send out any outgoing e2ee related request out
// coming from the `OlmMachine::outgoing_requests()` method.

// Send the request and get a response _without_ end-to-end encryption support.
#[cfg(not(feature = "e2e-encryption"))]
let response = request.await?;
#[cfg(feature = "e2e-encryption")]
let response = {
if this.is_e2ee_enabled() {
// Here, we need to run 2 things:
//
// 1. Send the sliding sync request and get a response,
// 2. Send the E2EE requests.
//
// We don't want to use a `join` or `try_join` because we want to fail if and
// only if sending the sliding sync request fails. Failing to send the E2EE
// requests should just result in a log.
//
// We also want to give the priority to sliding sync request. E2EE requests are
// sent concurrently to the sliding sync request, but the priority is on waiting
// a sliding sync response.
//
// If sending sliding sync request fails, the sending of E2EE requests must be
// aborted as soon as possible.

let client = this.inner.client.clone();
let e2ee_uploads = spawn(async move {
if let Err(error) = client.send_outgoing_requests().await {
error!(?error, "Error while sending outgoing E2EE requests");
}
})
// Ensure that the task is not running in detached mode. It is aborted when it's
// dropped.
.abort_on_drop();

// Wait on the sliding sync request success or failure early.
let response = request.await?;

// At this point, if `request` has been resolved successfully, we wait on
// `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
// long. Otherwise —if `request` has failed— `e2ee_uploads` has
// been dropped, so aborted.
e2ee_uploads.await.map_err(|error| Error::JoinError {
task_description: "e2ee_uploads".to_owned(),
error,
})?;

response
} else {
request.await?
}
};

// The code manipulates `Request` and `Response` from Simplified MSC3575
// because it's simpler. If the `experimental-not-simplified-sliding-sync`
// feature flag is turned on, the Simplified MSC3575 `Request` has been
// transformed into a MSC3575 `Request` a couple of lines above. Now it's
// time to do the opposite: transform the MSC3575 `Response` into a
// Simplified MSC3575 `Response`.
#[cfg(feature = "experimental-not-simplified-sliding-sync")]
let response: http::simplified_msc3575::Response = http::From::from(response);
// Send the request and get a response _without_ end-to-end encryption support.
#[cfg(not(feature = "e2e-encryption"))]
let response = request.await?;

debug!("Received response");
// The code manipulates `Request` and `Response` from Simplified MSC3575 because
// it's the future standard. But this function may have received a `Request`
// from Simplified MSC3575 or MSC3575. We need to get back a
// Simplified MSC3575 `Response`.
let response = http::Into::<http::simplified_msc3575::Response>::into(response);

// At this point, the request has been sent, and a response has been received.
//
// We must ensure the handling of the response cannot be stopped/
// cancelled. It must be done entirely, otherwise we can have
// corrupted/incomplete states for Sliding Sync and other parts of
// the code.
//
// That's why we are running the handling of the response in a spawned
// future that cannot be cancelled by anything.
let this = self.clone();
debug!("Received response");

// At this point, the request has been sent, and a response has been received.
//
// We must ensure the handling of the response cannot be stopped/
// cancelled. It must be done entirely, otherwise we can have
// corrupted/incomplete states for Sliding Sync and other parts of
// the code.
//
// That's why we are running the handling of the response in a spawned
// future that cannot be cancelled by anything.
let this = this.clone();

// Spawn a new future to ensure that the code inside this future cannot be
// cancelled if this method is cancelled.
let future = async move {
debug!("Start handling response");
// Spawn a new future to ensure that the code inside this future cannot be
// cancelled if this method is cancelled.
let future = async move {
debug!("Start handling response");

// In case the task running this future is detached, we must
// ensure responses are handled one at a time. At this point we still own
// `position_guard`, so we're fine.
// In case the task running this future is detached, we must
// ensure responses are handled one at a time. At this point we still own
// `position_guard`, so we're fine.

// Handle the response.
let updates = this.handle_response(response, &mut position_guard).await?;
// Handle the response.
let updates = this.handle_response(response, &mut position_guard).await?;

this.cache_to_storage(&position_guard).await?;
this.cache_to_storage(&position_guard).await?;

// Release the position guard lock.
// It means that other responses can be generated and then handled later.
drop(position_guard);
// Release the position guard lock.
// It means that other responses can be generated and then handled later.
drop(position_guard);

debug!("Done handling response");
debug!("Done handling response");

Ok(updates)
};
Ok(updates)
};

spawn(future.instrument(Span::current())).await.unwrap()
spawn(future.instrument(Span::current())).await.unwrap()
}
}

/// Create a _new_ Sliding Sync sync loop.
Expand Down
2 changes: 1 addition & 1 deletion labs/multiverse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ color-eyre = "0.6.2"
crossterm = "0.27.0"
futures-util = { workspace = true }
imbl = { workspace = true }
matrix-sdk = { path = "../../crates/matrix-sdk", features = ["sso-login", "experimental-not-simplified-sliding-sync"] }
matrix-sdk = { path = "../../crates/matrix-sdk", features = ["sso-login"] }
matrix-sdk-ui = { path = "../../crates/matrix-sdk-ui" }
ratatui = "0.26.1"
rpassword = "7.3.1"
Expand Down
6 changes: 1 addition & 5 deletions testing/matrix-sdk-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ futures-core = { workspace = true }
futures-util = { workspace = true }
http = { workspace = true }
matrix-sdk-base = { workspace = true, default-features = true, features = ["testing", "qrcode"] }
matrix-sdk = {workspace = true, default-features = true, features = [
"testing",
"qrcode",
"experimental-not-simplified-sliding-sync",
] }
matrix-sdk = {workspace = true, default-features = true, features = ["testing", "qrcode"] }
matrix-sdk-ui = { workspace = true, default-features = true }
matrix-sdk-test = { workspace = true }
once_cell = { workspace = true }
Expand Down

0 comments on commit a8a1315

Please sign in to comment.