Skip to content

Commit

Permalink
Fix breaking change on quic-rpc and update schema
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Nov 1, 2024
1 parent dcd615d commit 5c6f57d
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 55 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ rust-version = "1.81"

[workspace.dependencies]
# First party dependencies
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "bbc69c5cb2" }
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "99d59e8fab" }

# Third party dependencies used by one or more of our crates
async-channel = "2.3"
Expand Down
2 changes: 1 addition & 1 deletion core/crates/cloud-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ anyhow = "1.0.86"
dashmap = "6.1.0"
iroh-net = { version = "0.27", features = ["discovery-local-network", "iroh-relay"] }
paste = "=1.0.15"
quic-rpc = { version = "0.12.1", features = ["quinn-transport"] }
quic-rpc = { version = "0.13.0", features = ["quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.11" }
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
Expand Down
21 changes: 9 additions & 12 deletions core/crates/cloud-services/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::p2p::{NotifyUser, UserResponse};

use sd_cloud_schema::{Client, Service, ServicesALPN};
use sd_cloud_schema::{Client, Request, Response, ServicesALPN};

use std::{net::SocketAddr, sync::Arc, time::Duration};

use futures::Stream;
use iroh_net::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient, RpcMessage};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
Expand All @@ -19,10 +19,10 @@ use super::{
};

#[derive(Debug, Default, Clone)]
enum ClientState {
enum ClientState<In: RpcMessage, Out: RpcMessage> {
#[default]
NotConnected,
Connected(Client<QuinnConnection<Service>, Service>),
Connected(Client<QuinnConnection<In, Out>>),
}

/// Cloud services are a optional feature that allows you to interact with the cloud services
Expand All @@ -35,7 +35,7 @@ enum ClientState {
/// that core can always operate without the cloud services.
#[derive(Debug)]
pub struct CloudServices {
client_state: Arc<RwLock<ClientState>>,
client_state: Arc<RwLock<ClientState<Response, Request>>>,
get_cloud_api_address: Url,
http_client: ClientWithMiddleware,
domain_name: String,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl CloudServices {
http_client: &ClientWithMiddleware,
get_cloud_api_address: Url,
domain_name: String,
) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
let cloud_api_address = http_client
.get(get_cloud_api_address)
.send()
Expand Down Expand Up @@ -256,9 +256,6 @@ impl CloudServices {
.map_err(Error::FailedToCreateEndpoint)?;
endpoint.set_default_client_config(client_config);

// TODO(@fogodev): It's possible that we can't keep the connection alive all the time,
// and need to use single shot connections. I will only be sure when we have
// actually battle-tested the cloud services in core.
Ok(Client::new(RpcClient::new(QuinnConnection::new(
endpoint,
cloud_api_address,
Expand All @@ -271,9 +268,9 @@ impl CloudServices {
/// If the client is not connected, it will try to connect to the cloud services.
/// Available routes documented in
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
pub async fn client(&self) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
if let ClientState::Connected(client) = { self.client_state.read().await.clone() } {
return Ok(client);
pub async fn client(&self) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
return Ok(client.clone());
}

// If we're not connected, we need to try to connect.
Expand Down
16 changes: 9 additions & 7 deletions core/crates/cloud-services/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sd_cloud_schema::{cloud_p2p, sync::groups, Service};
use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response};
use sd_utils::error::FileIOError;

use std::{io, net::AddrParseError};
Expand Down Expand Up @@ -68,7 +68,9 @@ pub enum Error {
#[error("Failed to connect to Cloud P2P node: {0}")]
ConnectToCloudP2PNode(anyhow::Error),
#[error("Communication error with Cloud P2P node: {0}")]
CloudP2PRpcCommunication(#[from] rpc::Error<QuinnConnection<cloud_p2p::Service>>),
CloudP2PRpcCommunication(
#[from] rpc::Error<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>,
),
#[error("Cloud P2P not initialized")]
CloudP2PNotInitialized,
#[error("Failed to initialize LocalSwarmDiscovery: {0}")]
Expand All @@ -78,15 +80,15 @@ pub enum Error {

// Communication errors
#[error("Failed to communicate with RPC backend: {0}")]
RpcCommunication(#[from] rpc::Error<QuinnConnection<Service>>),
RpcCommunication(#[from] rpc::Error<QuinnConnection<Response, Request>>),
#[error("Failed to communicate with Server Streaming RPC backend: {0}")]
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Service>>),
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Response, Request>>),
#[error("Failed to receive next response from Server Streaming RPC backend: {0}")]
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Service>>),
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Response, Request>>),
#[error("Failed to communicate with Bidi Streaming RPC backend: {0}")]
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Service>>),
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Response, Request>>),
#[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")]
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Service>>),
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Response, Request>>),
#[error("Error from backend: {0}")]
Backend(#[from] sd_cloud_schema::Error),
#[error("Failed to get access token from refresher: {0}")]
Expand Down
10 changes: 4 additions & 6 deletions core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{token_refresher::TokenRefresher, Error};

use sd_cloud_schema::{
cloud_p2p::{Client, CloudP2PALPN, Service},
cloud_p2p::{Client, CloudP2PALPN},
devices,
sync::groups,
};
Expand All @@ -24,8 +24,7 @@ pub async fn dispatch_notifier(
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
msgs_tx: flume::Sender<Message>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
Expand Down Expand Up @@ -64,8 +63,7 @@ async fn notify_peers(
device_pub_id: devices::PubId,
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
Expand Down Expand Up @@ -130,7 +128,7 @@ async fn connect_and_send_notification(
connection_id: &NodeId,
endpoint: &Endpoint,
) -> Result<(), Error> {
let client = Client::new(RpcClient::new(QuinnConnection::<Service>::from_connection(
let client = Client::new(RpcClient::new(QuinnConnection::from_connection(
endpoint
.connect(*connection_id, CloudP2PALPN::LATEST)
.await
Expand Down
26 changes: 11 additions & 15 deletions core/crates/cloud-services/src/p2p/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ pub struct Runner {
current_device_pub_id: devices::PubId,
token_refresher: TokenRefresher,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
msgs_tx: flume::Sender<Message>,
endpoint: Endpoint,
Expand Down Expand Up @@ -112,12 +111,14 @@ impl Clone for Runner {
}

struct PendingSyncGroupJoin {
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
channel: RpcChannel<Service, QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>>,
request: authorize_new_device_in_sync_group::Request,
this_device: Device,
since: Instant,
}

type P2PServerEndpoint = QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>;

impl Runner {
pub async fn new(
current_device_pub_id: devices::PubId,
Expand Down Expand Up @@ -152,10 +153,7 @@ impl Runner {
#[allow(clippy::large_enum_variant)]
enum StreamMessage {
AcceptResult(
Result<
Accepting<Service, QuinnServerEndpoint<Service>>,
RpcServerError<QuinnServerEndpoint<Service>>,
>,
Result<Accepting<Service, P2PServerEndpoint>, RpcServerError<P2PServerEndpoint>>,
),
Message(Message),
UserResponse(UserResponse),
Expand Down Expand Up @@ -361,7 +359,7 @@ impl Runner {
async fn handle_request(
&self,
request: cloud_p2p::Request,
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
channel: RpcChannel<Service, P2PServerEndpoint>,
) {
match request {
cloud_p2p::Request::AuthorizeNewDeviceInSyncGroup(
Expand Down Expand Up @@ -598,7 +596,7 @@ impl Runner {
async fn connect_to_first_available_client(
endpoint: &Endpoint,
devices_in_group: &[(devices::PubId, NodeId)],
) -> Result<Client<QuinnConnection<Service>, Service>, CloudP2PError> {
) -> Result<Client<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
for (device_pub_id, device_connection_id) in devices_in_group {
if let Ok(connection) = endpoint
.connect(*device_connection_id, CloudP2PALPN::LATEST)
Expand All @@ -607,8 +605,9 @@ async fn connect_to_first_available_client(
|e| error!(?e, %device_pub_id, "Failed to connect to authorizor device candidate"),
) {
debug!(%device_pub_id, "Connected to authorizor device candidate");

return Ok(Client::new(RpcClient::new(
QuinnConnection::<Service>::from_connection(connection),
QuinnConnection::from_connection(connection),
)));
}
}
Expand All @@ -618,10 +617,7 @@ async fn connect_to_first_available_client(

fn setup_server_endpoint(
endpoint: Endpoint,
) -> (
RpcServer<Service, QuinnServerEndpoint<Service>>,
JoinHandle<()>,
) {
) -> (RpcServer<Service, P2PServerEndpoint>, JoinHandle<()>) {
let local_addr = {
let (ipv4_addr, maybe_ipv6_addr) = endpoint.bound_sockets();
// Trying to give preference to IPv6 addresses because it's 2024
Expand All @@ -631,7 +627,7 @@ fn setup_server_endpoint(
let (connections_tx, connections_rx) = flume::bounded(16);

(
RpcServer::new(QuinnServerEndpoint::<Service>::handle_connections(
RpcServer::new(QuinnServerEndpoint::handle_connections(
connections_rx,
local_addr,
)),
Expand Down
4 changes: 2 additions & 2 deletions core/crates/cloud-services/src/sync/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sd_cloud_schema::{
groups,
messages::{pull, MessagesCollection},
},
Client, Service,
Client, Request, Response,
};
use sd_core_sync::{
cloud_crdt_op_db, CRDTOperation, CompressedCRDTOperationsPerModel, SyncManager,
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct Receiver {
sync_group_pub_id: groups::PubId,
device_pub_id: devices::PubId,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Service>>,
cloud_client: Client<QuinnConnection<Response, Request>>,
key_manager: Arc<KeyManager>,
sync: SyncManager,
notifiers: Arc<ReceiveAndIngestNotifiers>,
Expand Down
4 changes: 2 additions & 2 deletions core/crates/cloud-services/src/sync/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sd_cloud_schema::{
devices,
error::{ClientSideError, NotFoundError},
sync::{groups, messages},
Client, Service,
Client, Request, Response,
};
use sd_crypto::{
cloud::{OneShotEncryption, SecretKey, StreamEncryption},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct Sender {
sync_group_pub_id: groups::PubId,
sync: SyncManager,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Service>>,
cloud_client: Client<QuinnConnection<Response, Request>>,
key_manager: Arc<KeyManager>,
is_active: Arc<AtomicBool>,
state_notify: Arc<Notify>,
Expand Down
6 changes: 3 additions & 3 deletions core/src/api/cloud/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sd_cloud_schema::{
ClientRegistration, ClientRegistrationFinishParameters, ClientRegistrationFinishResult,
ClientRegistrationStartResult,
},
Client, NodeId, Service, SpacedriveCipherSuite,
Client, NodeId, Request, Response, SpacedriveCipherSuite,
};
use sd_crypto::{cloud::secret_key::SecretKey, CryptoRng};

Expand Down Expand Up @@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
}

pub async fn hello(
client: &Client<QuinnConnection<Service>, Service>,
client: &Client<QuinnConnection<Response, Request>>,
access_token: AccessToken,
device_pub_id: PubId,
hashed_pub_id: Hash,
Expand Down Expand Up @@ -270,7 +270,7 @@ pub struct DeviceRegisterData {
}

pub async fn register(
client: &Client<QuinnConnection<Service>, Service>,
client: &Client<QuinnConnection<Response, Request>>,
access_token: AccessToken,
DeviceRegisterData {
pub_id,
Expand Down
12 changes: 9 additions & 3 deletions core/src/api/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sd_cloud_schema::{
auth,
error::{ClientSideError, Error},
sync::groups,
users, Client, SecretKey as IrohSecretKey, Service,
users, Client, Request, Response, SecretKey as IrohSecretKey,
};
use sd_crypto::{CryptoRng, SeedableRng};
use sd_utils::error::report_error;
Expand All @@ -32,7 +32,7 @@ mod sync_groups;

async fn try_get_cloud_services_client(
node: &Node,
) -> Result<Client<QuinnConnection<Service>, Service>, sd_core_cloud_services::Error> {
) -> Result<Client<QuinnConnection<Response, Request>>, sd_core_cloud_services::Error> {
node.cloud_services
.client()
.await
Expand Down Expand Up @@ -302,7 +302,13 @@ async fn initialize_cloud_sync(

async fn get_client_and_access_token(
node: &Node,
) -> Result<(Client<QuinnConnection<Service>, Service>, auth::AccessToken), rspc::Error> {
) -> Result<
(
Client<QuinnConnection<Response, Request>>,
auth::AccessToken,
),
rspc::Error,
> {
(
try_get_cloud_services_client(node),
node.cloud_services
Expand Down

0 comments on commit 5c6f57d

Please sign in to comment.