diff --git a/broker/src/connections/mod.rs b/broker/src/connections/mod.rs index 865cd49..4bccf5d 100644 --- a/broker/src/connections/mod.rs +++ b/broker/src/connections/mod.rs @@ -63,19 +63,19 @@ impl Connections broker. /// We send this to other brokers so they can merge it. - pub async fn get_full_user_sync(self: &Arc) -> DirectMap { + pub fn get_full_user_sync(self: &Arc) -> DirectMap { self.direct_map.read().get_full() } /// Get the differences in the versioned vector map of user -> broker /// We send this to other brokers so they can merge it. - pub async fn get_partial_user_sync(self: &Arc) -> DirectMap { + pub fn get_partial_user_sync(self: &Arc) -> DirectMap { self.direct_map.write().diff() } /// Apply a received user sync map. Overwrites our values if they are old. /// Kicks off users that are now connected elsewhere. - pub async fn apply_user_sync(self: &Arc, map: DirectMap) { + pub fn apply_user_sync(self: &Arc, map: DirectMap) { // Merge the maps, returning the difference let users_to_remove = self.direct_map.write().merge(map); @@ -135,7 +135,7 @@ impl Connections, user_public_key: Bytes, connection: ::Sender, @@ -183,7 +183,7 @@ impl Connections, @@ -195,7 +195,7 @@ impl Connections) { + pub fn subscribe_user_to(&self, user_public_key: &Bytes, topics: Vec) { self.broadcast_map .users .write() @@ -203,10 +203,10 @@ impl Connections, + topics: &[Topic], ) { self.broadcast_map .brokers @@ -215,7 +215,7 @@ impl Connections) { + pub fn unsubscribe_user_from(&self, user_public_key: &Bytes, topics: Vec) { self.broadcast_map .users .write() @@ -289,7 +289,7 @@ impl Connections, user_public_key: UserPublicKey, message: Bytes, @@ -318,7 +318,7 @@ impl Connections, mut topics: Vec, message: Bytes, diff --git a/broker/src/handlers/broker.rs b/broker/src/handlers/broker.rs index ab0c9d6..6067cd9 100644 --- a/broker/src/handlers/broker.rs +++ b/broker/src/handlers/broker.rs @@ -53,7 +53,7 @@ impl< .add_broker(broker_identifier.clone(), sender); // Send a full user sync - if let Err(err) = self.full_user_sync(&broker_identifier).await { + if let Err(err) = self.full_user_sync(&broker_identifier) { error!("failed to perform full user sync: {err}"); self.connections.remove_broker(&broker_identifier); return; @@ -61,7 +61,7 @@ impl< // Send a full topic sync // TODO: macro removals or something - if let Err(err) = self.full_topic_sync(&broker_identifier).await { + if let Err(err) = self.full_topic_sync(&broker_identifier) { error!("failed to perform full topic sync: {err}"); self.connections.remove_broker(&broker_identifier); return; @@ -69,12 +69,12 @@ impl< // If we have `strong_consistency` enabled, send partials #[cfg(feature = "strong_consistency")] - if let Err(err) = self.partial_topic_sync().await { + if let Err(err) = self.partial_topic_sync() { error!("failed to perform partial topic sync: {err}"); } #[cfg(feature = "strong_consistency")] - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); } @@ -111,8 +111,7 @@ impl< let user_public_key = Bytes::from(direct.recipient.clone()); self.connections - .send_direct(user_public_key, message, true) - .await; + .send_direct(user_public_key, message, true); } // If we receive a broadcast message from a broker, we want to send it to all interested users @@ -120,21 +119,19 @@ impl< let message = Bytes::from(message.serialize().expect("serialization failed")); let topics = broadcast.topics.clone(); - self.connections.send_broadcast(topics, message, true).await; + self.connections.send_broadcast(topics, message, true); } // If we receive a subscribe message from a broker, we add them as "interested" locally. Message::Subscribe(subscribe) => { self.connections - .subscribe_broker_to(broker_identifier, subscribe) - .await; + .subscribe_broker_to(broker_identifier, subscribe); } // If we receive a subscribe message from a broker, we remove them as "interested" locally. Message::Unsubscribe(unsubscribe) => { self.connections - .unsubscribe_broker_from(broker_identifier, unsubscribe) - .await; + .unsubscribe_broker_from(broker_identifier, &unsubscribe); } // If we receive a `UserSync` message, we want to sync with our map @@ -146,7 +143,7 @@ impl< "failed to deserialize user sync message" ); - self.connections.apply_user_sync(user_sync).await; + self.connections.apply_user_sync(user_sync); } // Do nothing if we receive an unexpected message diff --git a/broker/src/handlers/user.rs b/broker/src/handlers/user.rs index 9c5989b..98a348e 100644 --- a/broker/src/handlers/user.rs +++ b/broker/src/handlers/user.rs @@ -50,21 +50,19 @@ impl< let (sender, receiver) = connection; // Add our user - self.connections.add_user(public_key.clone(), sender).await; + self.connections.add_user(public_key.clone(), sender); // Subscribe our user to their connections - self.connections - .subscribe_user_to(&public_key, topics) - .await; + self.connections.subscribe_user_to(&public_key, topics); // If we have `strong_consistency` enabled, send partials #[cfg(feature = "strong_consistency")] - if let Err(err) = self.partial_topic_sync().await { + if let Err(err) = self.partial_topic_sync() { error!("failed to perform partial topic sync: {err}"); } #[cfg(feature = "strong_consistency")] - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); } @@ -107,8 +105,7 @@ impl< let user_public_key = Bytes::from(direct.recipient.clone()); self.connections - .send_direct(user_public_key, message, false) - .await; + .send_direct(user_public_key, message, false); } // If we get a broadcast message from a user, send it to both brokers and users. @@ -116,23 +113,18 @@ impl< let message = Bytes::from(message.serialize().expect("serialization failed")); let topics = broadcast.topics.clone(); - self.connections - .send_broadcast(topics, message, false) - .await; + self.connections.send_broadcast(topics, message, false); } // Subscribe messages from users will just update the state locally Message::Subscribe(subscribe) => { - self.connections - .subscribe_user_to(public_key, subscribe) - .await; + self.connections.subscribe_user_to(public_key, subscribe); } // Unsubscribe messages from users will just update the state locally Message::Unsubscribe(unsubscribe) => { self.connections - .unsubscribe_user_from(public_key, unsubscribe) - .await; + .unsubscribe_user_from(public_key, unsubscribe); } _ => return, diff --git a/broker/src/tasks/sync.rs b/broker/src/tasks/sync.rs index 4fb33dc..c5f8e7d 100644 --- a/broker/src/tasks/sync.rs +++ b/broker/src/tasks/sync.rs @@ -45,9 +45,9 @@ impl< /// /// # Errors /// - If we fail to serialize the message - pub async fn full_user_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { + pub fn full_user_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { // Get full user sync map - let full_sync_map = self.connections.get_full_user_sync().await; + let full_sync_map = self.connections.get_full_user_sync(); // Serialize the message let raw_message = prepare_sync_message!(full_sync_map); @@ -63,9 +63,9 @@ impl< /// /// # Errors /// - If we fail to serialize the message - pub async fn partial_user_sync(self: &Arc) -> Result<()> { + pub fn partial_user_sync(self: &Arc) -> Result<()> { // Get full user sync map - let partial_sync_map = self.connections.get_partial_user_sync().await; + let partial_sync_map = self.connections.get_partial_user_sync(); // Return if we haven't had any changes if partial_sync_map.underlying_map.is_empty() { @@ -86,7 +86,7 @@ impl< /// /// # Errors /// - if we fail to serialize the message - pub async fn full_topic_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { + pub fn full_topic_sync(self: &Arc, broker: &BrokerIdentifier) -> Result<()> { // Get full list of topics let topics = self.connections.get_full_topic_sync(); @@ -108,7 +108,7 @@ impl< /// /// # Errors /// - If we fail to serialize the message - pub async fn partial_topic_sync(self: &Arc) -> Result<()> { + pub fn partial_topic_sync(self: &Arc) -> Result<()> { // Get partial list of topics let (additions, removals) = self.connections.get_partial_topic_sync(); @@ -144,12 +144,12 @@ impl< pub async fn run_sync_task(self: Arc) { loop { // Perform user sync - if let Err(err) = self.partial_user_sync().await { + if let Err(err) = self.partial_user_sync() { error!("failed to perform partial user sync: {err}"); } // Perform topic sync - if let Err(err) = self.partial_topic_sync().await { + if let Err(err) = self.partial_topic_sync() { error!("failed to perform partial user sync: {err}"); }; diff --git a/broker/src/test.rs b/broker/src/test.rs index 3ebb8ce..254e93c 100644 --- a/broker/src/test.rs +++ b/broker/src/test.rs @@ -118,8 +118,7 @@ impl RunDefinition { broker_under_test .inner .connections - .add_user(identifier.clone(), to_tester) - .await; + .add_user(identifier.clone(), to_tester); // Spawn our user receiver in the broker under test let inner = broker_under_test.inner.clone();