Skip to content

Commit

Permalink
switch to sync functions
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 25, 2024
1 parent d36b4e6 commit 9f3d499
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 49 deletions.
22 changes: 11 additions & 11 deletions broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco

/// Get the full versioned vector map of user -> broker.
/// We send this to other brokers so they can merge it.
pub async fn get_full_user_sync(self: &Arc<Self>) -> DirectMap {
pub fn get_full_user_sync(self: &Arc<Self>) -> DirectMap {
self.direct_map.read().get_full()
}

/// Get the differences in the versioned vector map of user -> broker
/// We send this to other brokers so they can merge it.
pub async fn get_partial_user_sync(self: &Arc<Self>) -> DirectMap {
pub fn get_partial_user_sync(self: &Arc<Self>) -> DirectMap {
self.direct_map.write().diff()
}

/// Apply a received user sync map. Overwrites our values if they are old.
/// Kicks off users that are now connected elsewhere.
pub async fn apply_user_sync(self: &Arc<Self>, map: DirectMap) {
pub fn apply_user_sync(self: &Arc<Self>, map: DirectMap) {
// Merge the maps, returning the difference
let users_to_remove = self.direct_map.write().merge(map);

Expand Down Expand Up @@ -135,7 +135,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco

/// Insert a user into our map. Updates the versioned vector that
/// keeps track of which users are connected where.
pub async fn add_user(
pub fn add_user(
self: &Arc<Self>,
user_public_key: Bytes,
connection: <UserProtocol as Protocol>::Sender,
Expand Down Expand Up @@ -183,7 +183,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
}

/// Locally subscribe a broker to some topics.
pub async fn subscribe_broker_to(
pub fn subscribe_broker_to(
&self,
broker_identifier: &BrokerIdentifier,
topics: Vec<Topic>,
Expand All @@ -195,18 +195,18 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
}

/// Locally subscribe a user to some topics.
pub async fn subscribe_user_to(&self, user_public_key: &Bytes, topics: Vec<Topic>) {
pub fn subscribe_user_to(&self, user_public_key: &Bytes, topics: Vec<Topic>) {
self.broadcast_map
.users
.write()
.associate_key_with_values(user_public_key, topics);
}

/// Locally unsubscribe a broker from some topics.
pub async fn unsubscribe_broker_from(
pub fn unsubscribe_broker_from(
&self,
broker_identifier: &BrokerIdentifier,
topics: Vec<Topic>,
topics: &[Topic],
) {
self.broadcast_map
.brokers
Expand All @@ -215,7 +215,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
}

/// Locally unsubscribe a broker from some topics.
pub async fn unsubscribe_user_from(&self, user_public_key: &Bytes, topics: Vec<Topic>) {
pub fn unsubscribe_user_from(&self, user_public_key: &Bytes, topics: Vec<Topic>) {
self.broadcast_map
.users
.write()
Expand Down Expand Up @@ -289,7 +289,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
/// Send a direct message to either a user or a broker. First figures out where the message
/// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing;
/// if we receive a message from a broker we should only be forwarding it to applicable users.
pub async fn send_direct(
pub fn send_direct(
self: &Arc<Self>,
user_public_key: UserPublicKey,
message: Bytes,
Expand Down Expand Up @@ -318,7 +318,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
/// Send a broadcast message to both users and brokers. First figures out where the message
/// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing;
/// if we receive a message from a broker we should only be forwarding it to applicable users.
pub async fn send_broadcast(
pub fn send_broadcast(
self: &Arc<Self>,
mut topics: Vec<Topic>,
message: Bytes,
Expand Down
21 changes: 9 additions & 12 deletions broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,28 @@ impl<
.add_broker(broker_identifier.clone(), sender);

// Send a full user sync
if let Err(err) = self.full_user_sync(&broker_identifier).await {
if let Err(err) = self.full_user_sync(&broker_identifier) {
error!("failed to perform full user sync: {err}");
self.connections.remove_broker(&broker_identifier);
return;
};

// Send a full topic sync
// TODO: macro removals or something
if let Err(err) = self.full_topic_sync(&broker_identifier).await {
if let Err(err) = self.full_topic_sync(&broker_identifier) {
error!("failed to perform full topic sync: {err}");
self.connections.remove_broker(&broker_identifier);
return;
};

// If we have `strong_consistency` enabled, send partials
#[cfg(feature = "strong_consistency")]
if let Err(err) = self.partial_topic_sync().await {
if let Err(err) = self.partial_topic_sync() {
error!("failed to perform partial topic sync: {err}");
}

#[cfg(feature = "strong_consistency")]
if let Err(err) = self.partial_user_sync().await {
if let Err(err) = self.partial_user_sync() {
error!("failed to perform partial user sync: {err}");
}

Expand Down Expand Up @@ -111,30 +111,27 @@ impl<
let user_public_key = Bytes::from(direct.recipient.clone());

self.connections
.send_direct(user_public_key, message, true)
.await;
.send_direct(user_public_key, message, true);
}

// If we receive a broadcast message from a broker, we want to send it to all interested users
Message::Broadcast(ref broadcast) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let topics = broadcast.topics.clone();

self.connections.send_broadcast(topics, message, true).await;
self.connections.send_broadcast(topics, message, true);
}

// If we receive a subscribe message from a broker, we add them as "interested" locally.
Message::Subscribe(subscribe) => {
self.connections
.subscribe_broker_to(broker_identifier, subscribe)
.await;
.subscribe_broker_to(broker_identifier, subscribe);
}

// If we receive a subscribe message from a broker, we remove them as "interested" locally.
Message::Unsubscribe(unsubscribe) => {
self.connections
.unsubscribe_broker_from(broker_identifier, unsubscribe)
.await;
.unsubscribe_broker_from(broker_identifier, &unsubscribe);
}

// If we receive a `UserSync` message, we want to sync with our map
Expand All @@ -146,7 +143,7 @@ impl<
"failed to deserialize user sync message"
);

self.connections.apply_user_sync(user_sync).await;
self.connections.apply_user_sync(user_sync);
}

// Do nothing if we receive an unexpected message
Expand Down
24 changes: 8 additions & 16 deletions broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,19 @@ impl<
let (sender, receiver) = connection;

// Add our user
self.connections.add_user(public_key.clone(), sender).await;
self.connections.add_user(public_key.clone(), sender);

// Subscribe our user to their connections
self.connections
.subscribe_user_to(&public_key, topics)
.await;
self.connections.subscribe_user_to(&public_key, topics);

// If we have `strong_consistency` enabled, send partials
#[cfg(feature = "strong_consistency")]
if let Err(err) = self.partial_topic_sync().await {
if let Err(err) = self.partial_topic_sync() {
error!("failed to perform partial topic sync: {err}");
}

#[cfg(feature = "strong_consistency")]
if let Err(err) = self.partial_user_sync().await {
if let Err(err) = self.partial_user_sync() {
error!("failed to perform partial user sync: {err}");
}

Expand Down Expand Up @@ -107,32 +105,26 @@ impl<
let user_public_key = Bytes::from(direct.recipient.clone());

self.connections
.send_direct(user_public_key, message, false)
.await;
.send_direct(user_public_key, message, false);
}

// If we get a broadcast message from a user, send it to both brokers and users.
Message::Broadcast(ref broadcast) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let topics = broadcast.topics.clone();

self.connections
.send_broadcast(topics, message, false)
.await;
self.connections.send_broadcast(topics, message, false);
}

// Subscribe messages from users will just update the state locally
Message::Subscribe(subscribe) => {
self.connections
.subscribe_user_to(public_key, subscribe)
.await;
self.connections.subscribe_user_to(public_key, subscribe);
}

// Unsubscribe messages from users will just update the state locally
Message::Unsubscribe(unsubscribe) => {
self.connections
.unsubscribe_user_from(public_key, unsubscribe)
.await;
.unsubscribe_user_from(public_key, unsubscribe);
}

_ => return,
Expand Down
16 changes: 8 additions & 8 deletions broker/src/tasks/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ impl<
///
/// # Errors
/// - If we fail to serialize the message
pub async fn full_user_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
pub fn full_user_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
// Get full user sync map
let full_sync_map = self.connections.get_full_user_sync().await;
let full_sync_map = self.connections.get_full_user_sync();

// Serialize the message
let raw_message = prepare_sync_message!(full_sync_map);
Expand All @@ -63,9 +63,9 @@ impl<
///
/// # Errors
/// - If we fail to serialize the message
pub async fn partial_user_sync(self: &Arc<Self>) -> Result<()> {
pub fn partial_user_sync(self: &Arc<Self>) -> Result<()> {
// Get full user sync map
let partial_sync_map = self.connections.get_partial_user_sync().await;
let partial_sync_map = self.connections.get_partial_user_sync();

// Return if we haven't had any changes
if partial_sync_map.underlying_map.is_empty() {
Expand All @@ -86,7 +86,7 @@ impl<
///
/// # Errors
/// - if we fail to serialize the message
pub async fn full_topic_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
pub fn full_topic_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
// Get full list of topics
let topics = self.connections.get_full_topic_sync();

Expand All @@ -108,7 +108,7 @@ impl<
///
/// # Errors
/// - If we fail to serialize the message
pub async fn partial_topic_sync(self: &Arc<Self>) -> Result<()> {
pub fn partial_topic_sync(self: &Arc<Self>) -> Result<()> {
// Get partial list of topics
let (additions, removals) = self.connections.get_partial_topic_sync();

Expand Down Expand Up @@ -144,12 +144,12 @@ impl<
pub async fn run_sync_task(self: Arc<Self>) {
loop {
// Perform user sync
if let Err(err) = self.partial_user_sync().await {
if let Err(err) = self.partial_user_sync() {
error!("failed to perform partial user sync: {err}");
}

// Perform topic sync
if let Err(err) = self.partial_topic_sync().await {
if let Err(err) = self.partial_topic_sync() {
error!("failed to perform partial user sync: {err}");
};

Expand Down
3 changes: 1 addition & 2 deletions broker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ impl RunDefinition {
broker_under_test
.inner
.connections
.add_user(identifier.clone(), to_tester)
.await;
.add_user(identifier.clone(), to_tester);

// Spawn our user receiver in the broker under test
let inner = broker_under_test.inner.clone();
Expand Down

0 comments on commit 9f3d499

Please sign in to comment.