Skip to content

Commit

Permalink
user mnemonic, marshal logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 23, 2024
1 parent cf719dc commit bb85b5b
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ prometheus = { version = "0.13.3" }
lazy_static = "1.4.0"
derive_builder = "0.13.1"
async-std = { version = "1.12.0", features = ["tokio1", "attributes"] }
rkyv = { version = "0.7.44", features = ["validation"] }
rkyv = { version = "0.7.44", features = ["validation"] }
3 changes: 1 addition & 2 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ paste = "1.0.14"
prometheus = { workspace = true }
lazy_static = { workspace = true }
derive_builder.workspace = true
mnemonic = "1.0.1"
rkyv.workspace = true
derivative = "2.2.0"
dashmap = "5.5.3"
dashmap = "5.5.3"
20 changes: 6 additions & 14 deletions broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use std::{collections::HashSet, sync::Arc};
use dashmap::DashMap;
pub use direct::DirectMap;
use proto::{
connection::{protocols::{Protocol, Sender}, Bytes},
connection::{
protocols::{Protocol, Sender},
Bytes,
},
discovery::BrokerIdentifier,
message::Topic,
BrokerProtocol, UserProtocol,
mnemonic, BrokerProtocol, UserProtocol,
};
use tokio::{spawn, sync::RwLock};
use tracing::{error, warn};
Expand Down Expand Up @@ -39,17 +42,6 @@ pub struct Connections {
broadcast_map: BroadcastMap,
}

/// A macro for generating a cute little user mnemonic from a hash
#[macro_export]
macro_rules! mnemonic {
($item: expr) => {{
use std::hash::{Hash, Hasher};
let mut state = std::hash::DefaultHasher::new();
$item.hash(&mut state);
mnemonic::to_string(&state.finish().to_le_bytes())
}};
}

impl Connections {
/// Create a new `Connections`. Requires an identity for
/// version vector conflict resolution.
Expand Down Expand Up @@ -326,7 +318,7 @@ impl Connections {
} else {
// Warn if the recipient user did not exist.
// TODO: remove this
warn!("user {} did not exist in map", mnemonic!(user_public_key));
warn!("user {} did not exist in map", mnemonic(&user_public_key));
}
}

Expand Down
3 changes: 2 additions & 1 deletion broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use proto::{
authenticate_with_broker, bail,
connection::{
auth::broker::BrokerAuth,
protocols::{Protocol, Receiver}, Bytes,
protocols::{Protocol, Receiver},
Bytes,
},
crypto::signature::SignatureScheme,
discovery::BrokerIdentifier,
Expand Down
6 changes: 3 additions & 3 deletions broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use proto::{
},
crypto::signature::SignatureScheme,
message::Message,
UserProtocol,
mnemonic, UserProtocol,
};
use tracing::{error, info};

use crate::{metrics, mnemonic, Inner};
use crate::{metrics, Inner};

impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerScheme, UserScheme> {
/// This function handles a user (public) connection.
Expand All @@ -40,7 +40,7 @@ impl<BrokerScheme: SignatureScheme, UserScheme: SignatureScheme> Inner<BrokerSch

// Create a human-readable user identifier (by public key)
let public_key = Bytes::from(public_key);
let user_identifier = mnemonic!(public_key);
let user_identifier = mnemonic(&public_key);
info!("{user_identifier} connected");

// Create new batch sender
Expand Down
9 changes: 7 additions & 2 deletions marshal/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use proto::{
connection::{auth::marshal::MarshalAuth, protocols::Protocol},
crypto::signature::SignatureScheme,
DiscoveryClientType, UserProtocol,
mnemonic, DiscoveryClientType, UserProtocol,
};
use tracing::info;

use crate::Marshal;

Expand All @@ -16,6 +17,10 @@ impl<Scheme: SignatureScheme> Marshal<Scheme> {
mut discovery_client: DiscoveryClientType,
) {
// Verify (authenticate) the connection
let _ = MarshalAuth::<Scheme>::verify_user(&connection, &mut discovery_client).await;
if let Ok(user_public_key) =
MarshalAuth::<Scheme>::verify_user(&connection, &mut discovery_client).await
{
info!("user {} authenticated", mnemonic(&user_public_key));
}
}
}
1 change: 1 addition & 0 deletions proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ warp = "0.3.6"
anyhow = "1.0.79"
kanal = "0.1.0-pre8"
rkyv.workspace = true
mnemonic = "1.0.1"

[dev-dependencies]
portpicker = "0.1.1"
14 changes: 10 additions & 4 deletions proto/src/connection/auth/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{

use tracing::error;

use crate::crypto::signature::{KeyPair, Serializable};
use crate::{
bail,
connection::protocols::{Protocol, Receiver, Sender},
Expand All @@ -18,6 +17,10 @@ use crate::{
message::{AuthenticateResponse, AuthenticateWithKey, Message, Topic},
BrokerProtocol, DiscoveryClientType, UserProtocol,
};
use crate::{
connection::Bytes,
crypto::signature::{KeyPair, Serializable},
};

/// This is the `BrokerAuth` struct that we define methods to for authentication purposes.
pub struct BrokerAuth<Scheme: SignatureScheme> {
Expand Down Expand Up @@ -81,7 +84,7 @@ impl<Scheme: SignatureScheme> BrokerAuth<Scheme> {
),
broker_identifier: &BrokerIdentifier,
discovery_client: &mut DiscoveryClientType,
) -> Result<(Vec<u8>, Vec<Topic>)> {
) -> Result<(Bytes, Vec<Topic>)> {
// Receive the permit
let auth_message = bail!(
connection.1.recv_message().await,
Expand Down Expand Up @@ -143,8 +146,11 @@ impl<Scheme: SignatureScheme> BrokerAuth<Scheme> {
fail_verification_with_message!(connection, "wrong message type");
};

// Return the public key
Ok((serialized_public_key, subscribed_topics_message))
// Return the public key and the initially subscribed topics
Ok((
Bytes::from(serialized_public_key),
subscribed_topics_message,
))
}

/// Authenticate with a broker (as a broker).
Expand Down
14 changes: 11 additions & 3 deletions proto/src/connection/auth/marshal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{

use tracing::error;

use crate::crypto::signature::{Serializable, SignatureScheme};
use crate::{
bail,
connection::protocols::{Protocol, Receiver, Sender},
Expand All @@ -17,6 +16,10 @@ use crate::{
message::{AuthenticateResponse, Message},
DiscoveryClientType, UserProtocol,
};
use crate::{
connection::Bytes,
crypto::signature::{Serializable, SignatureScheme},
};

/// This is the `BrokerAuth` struct that we define methods to for authentication purposes.
pub struct MarshalAuth<Scheme: SignatureScheme> {
Expand All @@ -40,7 +43,7 @@ impl<Scheme: SignatureScheme> MarshalAuth<Scheme> {
<UserProtocol as Protocol>::Receiver,
),
discovery_client: &mut DiscoveryClientType,
) -> Result<()> {
) -> Result<Bytes> {
// Receive the signed message from the user
let auth_message = bail!(
connection.1.recv_message().await,
Expand Down Expand Up @@ -78,6 +81,11 @@ impl<Scheme: SignatureScheme> MarshalAuth<Scheme> {
fail_verification_with_message!(connection, "timestamp is too old");
}

// Serialize the public key so we can get its mnemonic
let Ok(public_key) = public_key.serialize() else {
fail_verification_with_message!(connection, "failed to serialize public key");
};

// Get the broker with the least amount of connections
// TODO: do a macro for this
let broker_with_least_connections = match discovery_client
Expand Down Expand Up @@ -118,6 +126,6 @@ impl<Scheme: SignatureScheme> MarshalAuth<Scheme> {
// Send the permit to the user, along with the public broker advertise address
let _ = connection.0.send_message(response_message).await;

Ok(())
Ok(Bytes::from(public_key))
}
}
5 changes: 4 additions & 1 deletion proto/src/connection/protocols/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use kanal::{unbounded_async, AsyncReceiver, AsyncSender};
use tokio::{sync::RwLock, task::spawn_blocking};

use crate::{
bail, connection::Bytes, error::{Error, Result}, message::Message
bail,
connection::Bytes,
error::{Error, Result},
message::Message,
};
use std::{
collections::HashMap,
Expand Down
7 changes: 6 additions & 1 deletion proto/src/connection/protocols/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ use quinn::{ClientConfig, Connecting, Endpoint, ServerConfig, VarInt};
#[cfg(feature = "insecure")]
use crate::crypto::tls::SkipServerVerification;
use crate::{
bail, bail_option, connection::Bytes, crypto, error::{Error, Result}, message::Message, parse_socket_address, MAX_MESSAGE_SIZE
bail, bail_option,
connection::Bytes,
crypto,
error::{Error, Result},
message::Message,
parse_socket_address, MAX_MESSAGE_SIZE,
};
use std::{
net::{SocketAddr, ToSocketAddrs},
Expand Down
6 changes: 5 additions & 1 deletion proto/src/connection/protocols/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ use tokio::{
use crate::connection::metrics;

use crate::{
bail, bail_option, connection::Bytes, error::{Error, Result}, message::Message, parse_socket_address, read_length_delimited, write_length_delimited, MAX_MESSAGE_SIZE
bail, bail_option,
connection::Bytes,
error::{Error, Result},
message::Message,
parse_socket_address, read_length_delimited, write_length_delimited, MAX_MESSAGE_SIZE,
};
use std::{net::ToSocketAddrs, sync::Arc};

Expand Down
14 changes: 13 additions & 1 deletion proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
//! This crate defines the common code structures and constants used by both the
//! broker client and server.

use connection::protocols::{quic::Quic, tcp::Tcp};
use std::hash::{Hash, Hasher};

use connection::{
protocols::{quic::Quic, tcp::Tcp},
Bytes,
};

pub mod connection;
pub mod crypto;
Expand Down Expand Up @@ -41,3 +46,10 @@ pub const QUIC_MAX_CONCURRENT_STREAMS: u64 = 10;
pub mod messages_capnp {
include!(concat!(env!("OUT_DIR"), "/messages_capnp.rs"));
}

/// A function for generating a cute little user mnemonic from a hash
pub fn mnemonic(bytes: &Bytes) -> String {
let mut state = std::hash::DefaultHasher::new();
bytes.hash(&mut state);
mnemonic::to_string(state.finish().to_le_bytes())
}

0 comments on commit bb85b5b

Please sign in to comment.