diff --git a/Cargo.toml b/Cargo.toml index c153a23..10e7585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ either = "1.5.3" hashlink = "0.8.2" lioness = "0.1.2" log = "0.4.17" -multiaddr = "0.17.1" parking_lot = "0.12.1" rand = "0.8.5" rand_chacha = "0.3.1" diff --git a/src/core/cover.rs b/src/core/cover.rs index cb51cc3..63e64d8 100644 --- a/src/core/cover.rs +++ b/src/core/cover.rs @@ -35,9 +35,9 @@ pub enum CoverKind { Loop, } -pub fn gen_cover_packet( +pub fn gen_cover_packet( rng: &mut (impl Rng + CryptoRng), - topology: &Topology, + topology: &Topology, ns: &dyn NetworkStatus, kind: CoverKind, num_hops: usize, diff --git a/src/core/mod.rs b/src/core/mod.rs index 5ea1456..4f70b56 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -70,11 +70,9 @@ use arrayvec::ArrayVec; use bitflags::bitflags; use either::Either; use log::{debug, info, trace}; -use multiaddr::Multiaddr; use rand::Rng; use std::{ cmp::{max, min}, - collections::HashSet, time::{Duration, Instant}, }; @@ -147,11 +145,11 @@ pub enum PostErr { BadSurb, } -fn post_session( - sessions: &mut Sessions, +fn post_session( + sessions: &mut Sessions, status: SessionStatus, index: SessionIndex, -) -> Result<&mut Session, PostErr> { +) -> Result<&mut Session, PostErr> { let Some(rel_index) = RelSessionIndex::from_session_index(index, status.current_index) else { return Err(if index < status.current_index { PostErr::SessionNoLongerActive(index) @@ -186,7 +184,7 @@ impl From for PostErr { /// Returns a conservative estimate of the time taken for the last packet in the authored packet /// queue to get dispatched plus the time taken for all reply packets to get through the authored /// packet queue at the far end. -fn estimate_authored_packet_queue_delay(config: &Config, session: &Session) -> Duration { +fn estimate_authored_packet_queue_delay(config: &Config, session: &Session) -> Duration { let rate_mul = // When transitioning between sessions, the rate is halved 0.5 * @@ -261,7 +259,7 @@ impl RequestMetrics { bitflags! { /// Flags to indicate events that have occurred. Note that these may be set spuriously. pub struct Events: u32 { - /// The reserved peers returned by [`Mixnet::reserved_peer_addresses`] have changed. + /// The reserved peers returned by [`Mixnet::reserved_peers`] have changed. const RESERVED_PEERS_CHANGED = 0b1; /// The deadline returned by [`Mixnet::next_forward_packet_deadline`] has changed. const NEXT_FORWARD_PACKET_DEADLINE_CHANGED = 0b10; @@ -276,14 +274,15 @@ bitflags! { } } -/// Mixnet core state. -pub struct Mixnet { +/// Mixnet core state. `X` is the type of the extra data stored for each mixnode +/// ([`Mixnode::extra`]). +pub struct Mixnet { config: Config, /// Index and phase of current session. session_status: SessionStatus, /// Current and previous sessions. - sessions: Sessions, + sessions: Sessions, /// Key-exchange key pair for the next session. next_kx_pair: Option, @@ -300,7 +299,7 @@ pub struct Mixnet { events: Events, } -impl Mixnet { +impl Mixnet { /// Create a new `Mixnet`. pub fn new(config: Config) -> Self { let sessions = Sessions { @@ -398,13 +397,10 @@ impl Mixnet { /// /// - Checking for connectivity (they are passed to [`NetworkStatus::is_connected`]). /// - Sending packets (they are put in [`AddressedPacket::peer_id`]). - /// - /// The mixnode external addresses are merely collated and returned by - /// [`reserved_peer_addresses`](Self::reserved_peer_addresses). pub fn maybe_set_mixnodes( &mut self, rel_session_index: RelSessionIndex, - mixnodes: &mut dyn FnMut() -> Result, MixnodesErr>, + mixnodes: &mut dyn FnMut() -> Result>, MixnodesErr>, ) { let session = &mut self.sessions[rel_session_index]; if !matches!(session, SessionSlot::Empty | SessionSlot::KxPair(_)) { @@ -483,13 +479,9 @@ impl Mixnet { .public() } - /// Returns the addresses of the peers we should try to maintain connections to. - pub fn reserved_peer_addresses(&self) -> HashSet { - self.sessions - .iter() - .flat_map(|session| session.topology.reserved_peer_addresses()) - .cloned() - .collect() + /// Returns the mixnodes we should try to maintain connections to. + pub fn reserved_peers(&self) -> impl Iterator> { + self.sessions.iter().flat_map(|session| session.topology.reserved_peers()) } /// Handle an incoming packet. If the packet completes a message, the message is returned. diff --git a/src/core/request_builder.rs b/src/core/request_builder.rs index 9142f84..5f2c864 100644 --- a/src/core/request_builder.rs +++ b/src/core/request_builder.rs @@ -37,15 +37,15 @@ pub struct RouteMetrics { pub forwarding_delay: Delay, } -pub struct RequestBuilder<'topology> { - route_generator: RouteGenerator<'topology>, +pub struct RequestBuilder<'topology, X> { + route_generator: RouteGenerator<'topology, X>, destination_index: MixnodeIndex, } -impl<'topology> RequestBuilder<'topology> { +impl<'topology, X> RequestBuilder<'topology, X> { pub fn new( rng: &mut (impl Rng + CryptoRng), - topology: &'topology Topology, + topology: &'topology Topology, ns: &dyn NetworkStatus, destination_index: Option, ) -> Result { diff --git a/src/core/sessions.rs b/src/core/sessions.rs index 4834f27..a08377a 100644 --- a/src/core/sessions.rs +++ b/src/core/sessions.rs @@ -30,11 +30,11 @@ use std::{ time::Duration, }; -pub struct Session { +pub struct Session { /// Key-exchange key pair. pub kx_pair: KxPair, /// Mixnode topology. - pub topology: Topology, + pub topology: Topology, /// Queue of packets authored by us, to be dispatched in place of drop cover traffic. pub authored_packet_queue: AuthoredPacketQueue, /// See [`SessionConfig`](super::config::SessionConfig::mean_authored_packet_period). @@ -84,27 +84,27 @@ impl Add for RelSessionIndex { } } -pub enum SessionSlot { +pub enum SessionSlot { Empty, KxPair(KxPair), /// Like [`Empty`](Self::Empty), but we should not try to create a [`Session`] struct. Disabled, - Full(Session), + Full(Session), } -impl SessionSlot { +impl SessionSlot { pub fn is_empty(&self) -> bool { matches!(self, Self::Empty) } - pub fn as_option(&self) -> Option<&Session> { + pub fn as_option(&self) -> Option<&Session> { match self { Self::Full(session) => Some(session), _ => None, } } - pub fn as_mut_option(&mut self) -> Option<&mut Session> { + pub fn as_mut_option(&mut self) -> Option<&mut Session> { match self { Self::Full(session) => Some(session), _ => None, @@ -112,32 +112,32 @@ impl SessionSlot { } } -pub struct Sessions { - pub current: SessionSlot, - pub prev: SessionSlot, +pub struct Sessions { + pub current: SessionSlot, + pub prev: SessionSlot, } -impl Sessions { +impl Sessions { pub fn is_empty(&self) -> bool { self.current.is_empty() && self.prev.is_empty() } - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator> { [&self.current, &self.prev] .into_iter() .filter_map(|session| session.as_option()) } /// This is guaranteed to return the current session first, if it exists. - pub fn enumerate_mut(&mut self) -> impl Iterator { + pub fn enumerate_mut(&mut self) -> impl Iterator)> { [(RelSessionIndex::Current, &mut self.current), (RelSessionIndex::Prev, &mut self.prev)] .into_iter() .filter_map(|(index, session)| session.as_mut_option().map(|session| (index, session))) } } -impl Index for Sessions { - type Output = SessionSlot; +impl Index for Sessions { + type Output = SessionSlot; fn index(&self, index: RelSessionIndex) -> &Self::Output { match index { @@ -147,7 +147,7 @@ impl Index for Sessions { } } -impl IndexMut for Sessions { +impl IndexMut for Sessions { fn index_mut(&mut self, index: RelSessionIndex) -> &mut Self::Output { match index { RelSessionIndex::Current => &mut self.current, diff --git a/src/core/topology.rs b/src/core/topology.rs index 21fd75d..643f009 100644 --- a/src/core/topology.rs +++ b/src/core/topology.rs @@ -25,22 +25,21 @@ use super::sphinx::{ }; use arrayvec::ArrayVec; use either::Either; -use multiaddr::Multiaddr; use rand::{seq::SliceRandom, CryptoRng, Rng}; use std::{ cmp::{max, min}, fmt, }; -/// Key-exchange public key, peer ID, and external addresses for a mixnode. +/// Per-mixnode data. #[derive(Clone)] -pub struct Mixnode { +pub struct Mixnode { /// Key-exchange public key for the mixnode. pub kx_public: KxPublic, /// Peer ID for the mixnode. pub peer_id: PeerId, - /// External addresses for the mixnode. - pub external_addresses: Vec, + /// Extra data; for use by the crate user. + pub extra: X, } enum LocalNode { @@ -65,17 +64,17 @@ pub enum TopologyErr { NoConnectedGatewayMixnodes, } -pub struct Topology { - mixnodes: Vec, +pub struct Topology { + mixnodes: Vec>, local_kx_public: KxPublic, local_node: LocalNode, } -impl Topology { +impl Topology { /// `mixnodes` must be no longer than [`MAX_MIXNODE_INDEX + 1`](MAX_MIXNODE_INDEX). pub fn new( rng: &mut impl Rng, - mixnodes: Vec, + mixnodes: Vec>, local_kx_public: &KxPublic, num_gateway_mixnodes: u32, ) -> Self { @@ -129,7 +128,7 @@ impl Topology { matches!(self.local_node, LocalNode::Mixnode(_)) } - pub fn reserved_peer_addresses(&self) -> impl Iterator { + pub fn reserved_peers(&self) -> impl Iterator> { let indices = match &self.local_node { LocalNode::Mixnode(local_index) => Either::Left({ // Connect to all other mixnodes (ie exclude the local node) @@ -139,7 +138,7 @@ impl Topology { LocalNode::NonMixnode(gateway_indices) => Either::Right(gateway_indices.iter().map(|index| index.get())), }; - indices.flat_map(|index| self.mixnodes[index as usize].external_addresses.iter()) + indices.map(|index| &self.mixnodes[index as usize]) } pub fn mixnode_index_to_peer_id(&self, index: MixnodeIndex) -> Result { @@ -157,7 +156,7 @@ impl Topology { } } -impl fmt::Display for Topology { +impl fmt::Display for Topology { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match &self.local_node { LocalNode::Mixnode(local_index) => write!(fmt, "Local node is mixnode {local_index}"), @@ -222,16 +221,16 @@ impl UsedIndices { } } -pub struct RouteGenerator<'topology> { - topology: &'topology Topology, +pub struct RouteGenerator<'topology, X> { + topology: &'topology Topology, local_peer_id: PeerId, /// Always empty if the local node is a mixnode. Otherwise, the subset of the gateway mixnodes /// from the topology that are currently connected. connected_gateway_indices: ArrayVec, } -impl<'topology> RouteGenerator<'topology> { - pub fn new(topology: &'topology Topology, ns: &dyn NetworkStatus) -> Self { +impl<'topology, X> RouteGenerator<'topology, X> { + pub fn new(topology: &'topology Topology, ns: &dyn NetworkStatus) -> Self { let connected_gateway_indices = match &topology.local_node { LocalNode::Mixnode(_) => ArrayVec::new(), // If we're not a mixnode, we should have attempted to connect to a number of "gateway" @@ -251,7 +250,7 @@ impl<'topology> RouteGenerator<'topology> { Self { topology, local_peer_id: ns.local_peer_id(), connected_gateway_indices } } - pub fn topology(&self) -> &'topology Topology { + pub fn topology(&self) -> &'topology Topology { self.topology } diff --git a/src/lib.rs b/src/lib.rs index 6c850f9..31055b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,8 +23,7 @@ //! //! This crate is mostly network agnostic. While it determines which nodes should be connected and //! which packets should be sent where, it does not care _how_ this is done. It's not entirely -//! agnostic; it uses multiaddrs for peer addresses and assumes that peers have 32-byte -//! globally-unique identifiers. +//! agnostic; it assumes that peers have 32-byte globally-unique identifiers. #![warn(missing_docs)] #![forbid(unsafe_code)] diff --git a/src/reply_manager.rs b/src/reply_manager.rs index 5342368..668e43c 100644 --- a/src/reply_manager.rs +++ b/src/reply_manager.rs @@ -85,7 +85,7 @@ impl ReplyContext { &self.message_id } - fn post_reply(&mut self, reply: &Reply, mixnet: &mut Mixnet, config: &Config) { + fn post_reply(&mut self, reply: &Reply, mixnet: &mut Mixnet, config: &Config) { for _ in 0..config.max_posts { if let Err(err) = mixnet.post_reply( &mut self.surbs, @@ -140,10 +140,10 @@ impl ReplyManager { /// If `Some` is returned, the caller should handle the request and then call either /// [`abandon`](Self::abandon) or [`complete`](Self::complete) with the [`ReplyContext`]. The /// `Vec` contains the request message data. - pub fn insert( + pub fn insert( &mut self, message: RequestMessage, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ) -> Option<(ReplyContext, Vec)> { let mut reply_context = ReplyContext { session_index: message.session_index, @@ -198,11 +198,11 @@ impl ReplyManager { } /// Complete a request. This will post the reply and cache it for repeat requests. - pub fn complete( + pub fn complete( &mut self, mut reply_context: ReplyContext, data: Vec, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ) { let state = match self.states.entry(reply_context.message_id) { Entry::Occupied(entry) => match entry.into_mut() { diff --git a/src/request_manager/mod.rs b/src/request_manager/mod.rs index 19ad753..7051aa3 100644 --- a/src/request_manager/mod.rs +++ b/src/request_manager/mod.rs @@ -127,9 +127,9 @@ impl> RequestManager { /// Update the current session index and phase. This should be called after /// [`Mixnet::set_session_status`]. This may post messages to `mixnet`. - pub fn update_session_status( + pub fn update_session_status( &mut self, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C, ) { @@ -187,7 +187,13 @@ impl> RequestManager { /// [`Request::handle_post_err`] is called. /// - The retry limit is reached. In this case, [`Request::handle_retry_limit_reached`] is /// called. - pub fn insert(&mut self, request: R, mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C) { + pub fn insert( + &mut self, + request: R, + mixnet: &mut Mixnet, + ns: &dyn NetworkStatus, + context: &C, + ) { debug_assert!(self.has_space()); let state = RequestState { request, @@ -224,10 +230,10 @@ impl> RequestManager { None } - fn process_post_queue( + fn process_post_queue( &mut self, rel_session_index: Option, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C, ) { @@ -305,9 +311,9 @@ impl> RequestManager { /// when the /// [`SPACE_IN_AUTHORED_PACKET_QUEUE`](super::core::Events::SPACE_IN_AUTHORED_PACKET_QUEUE) /// event fires. - pub fn process_post_queues( + pub fn process_post_queues( &mut self, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C, ) { @@ -331,10 +337,10 @@ impl> RequestManager { } } - fn retry( + fn retry( &mut self, mut state: RequestState, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C, ) { @@ -395,9 +401,9 @@ impl> RequestManager { /// Pop the next request from the internal retry queue. This should be called whenever the /// deadline returned by [`next_retry_deadline`](Self::next_retry_deadline) is reached. This /// may post messages to `mixnet`. Returns `false` if the internal retry queue is empty. - pub fn pop_next_retry( + pub fn pop_next_retry( &mut self, - mixnet: &mut Mixnet, + mixnet: &mut Mixnet, ns: &dyn NetworkStatus, context: &C, ) -> bool { diff --git a/tests/core.rs b/tests/core.rs index 9089a60..cb851b0 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -24,7 +24,6 @@ use mixnet::core::{ Config, Events, Message, MessageId, Mixnet, Mixnode, NetworkStatus, PeerId, RelSessionIndex, SessionIndex, SessionPhase, SessionStatus, MESSAGE_ID_SIZE, }; -use multiaddr::{multiaddr, multihash::Multihash, Multiaddr}; use parking_lot::Mutex; use rand::{Rng, RngCore}; use std::{ @@ -41,22 +40,9 @@ fn log_target(peer_index: usize) -> &'static str { .or_insert_with(|| Box::leak(format!("mixnet({peer_index})").into_boxed_str())) } -fn multiaddr_from_peer_id(id: &PeerId) -> Multiaddr { - // Just need to be able to get id back out in peer_id_from_multiaddr. Abuse Certhash to store. - multiaddr!(Certhash(Multihash::wrap(0, id).unwrap())) -} - -fn peer_id_from_multiaddr(multiaddr: &Multiaddr) -> PeerId { - let mut protocols = multiaddr.into_iter(); - let multiaddr::Protocol::Certhash(hash) = protocols.next().unwrap() else { unreachable!() }; - assert!(protocols.next().is_none()); - assert_eq!(hash.code(), 0); - hash.digest().try_into().unwrap() -} - struct Peer { id: PeerId, - mixnet: Mixnet, + mixnet: Mixnet<()>, } struct PeerNetworkStatus<'id, 'connections> { @@ -95,22 +81,18 @@ impl Network { } } - fn maybe_set_mixnodes(&mut self, rel_session_index: RelSessionIndex, mixnodes: &[Mixnode]) { + fn maybe_set_mixnodes(&mut self, rel_session_index: RelSessionIndex, mixnodes: &[Mixnode<()>]) { for peer in &mut self.peers { peer.mixnet .maybe_set_mixnodes(rel_session_index, &mut || Ok(mixnodes.to_owned())); } } - fn next_mixnodes(&mut self, peer_indices: impl Iterator) -> Vec { + fn next_mixnodes(&mut self, peer_indices: impl Iterator) -> Vec> { peer_indices .map(|index| { let peer = &mut self.peers[index]; - Mixnode { - kx_public: *peer.mixnet.next_kx_public(), - peer_id: peer.id, - external_addresses: vec![multiaddr_from_peer_id(&peer.id)], - } + Mixnode { kx_public: *peer.mixnet.next_kx_public(), peer_id: peer.id, extra: () } }) .collect() } @@ -122,11 +104,7 @@ impl Network { if events.contains(Events::RESERVED_PEERS_CHANGED) { self.connections.insert( peer.id, - peer.mixnet - .reserved_peer_addresses() - .iter() - .map(peer_id_from_multiaddr) - .collect(), + peer.mixnet.reserved_peers().map(|mixnode| mixnode.peer_id).collect(), ); } let ns = PeerNetworkStatus { id: &peer.id, connections: &self.connections };