Skip to content

Commit

Permalink
Get rid of multiaddr dependency
Browse files Browse the repository at this point in the history
Allow the crate user to store whatever data they like alongside each
mixnode.
  • Loading branch information
zdave-parity committed Sep 14, 2023
1 parent f1dab87 commit 2f25cd8
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 107 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ either = "1.5.3"
hashlink = "0.8.2"
lioness = "0.1.2"
log = "0.4.17"
multiaddr = "0.17.1"
parking_lot = "0.12.1"
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down
4 changes: 2 additions & 2 deletions src/core/cover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub enum CoverKind {
Loop,
}

pub fn gen_cover_packet(
pub fn gen_cover_packet<X>(
rng: &mut (impl Rng + CryptoRng),
topology: &Topology,
topology: &Topology<X>,
ns: &dyn NetworkStatus,
kind: CoverKind,
num_hops: usize,
Expand Down
36 changes: 14 additions & 22 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ use arrayvec::ArrayVec;
use bitflags::bitflags;
use either::Either;
use log::{debug, info, trace};
use multiaddr::Multiaddr;
use rand::Rng;
use std::{
cmp::{max, min},
collections::HashSet,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -147,11 +145,11 @@ pub enum PostErr {
BadSurb,
}

fn post_session(
sessions: &mut Sessions,
fn post_session<X>(
sessions: &mut Sessions<X>,
status: SessionStatus,
index: SessionIndex,
) -> Result<&mut Session, PostErr> {
) -> Result<&mut Session<X>, PostErr> {
let Some(rel_index) = RelSessionIndex::from_session_index(index, status.current_index) else {
return Err(if index < status.current_index {
PostErr::SessionNoLongerActive(index)
Expand Down Expand Up @@ -186,7 +184,7 @@ impl From<CheckSpaceErr> for PostErr {
/// Returns a conservative estimate of the time taken for the last packet in the authored packet
/// queue to get dispatched plus the time taken for all reply packets to get through the authored
/// packet queue at the far end.
fn estimate_authored_packet_queue_delay(config: &Config, session: &Session) -> Duration {
fn estimate_authored_packet_queue_delay<X>(config: &Config, session: &Session<X>) -> Duration {
let rate_mul =
// When transitioning between sessions, the rate is halved
0.5 *
Expand Down Expand Up @@ -261,7 +259,7 @@ impl RequestMetrics {
bitflags! {
/// Flags to indicate events that have occurred. Note that these may be set spuriously.
pub struct Events: u32 {
/// The reserved peers returned by [`Mixnet::reserved_peer_addresses`] have changed.
/// The reserved peers returned by [`Mixnet::reserved_peers`] have changed.
const RESERVED_PEERS_CHANGED = 0b1;
/// The deadline returned by [`Mixnet::next_forward_packet_deadline`] has changed.
const NEXT_FORWARD_PACKET_DEADLINE_CHANGED = 0b10;
Expand All @@ -276,14 +274,15 @@ bitflags! {
}
}

/// Mixnet core state.
pub struct Mixnet {
/// Mixnet core state. `X` is the type of the extra data stored for each mixnode
/// ([`Mixnode::extra`]).
pub struct Mixnet<X> {
config: Config,

/// Index and phase of current session.
session_status: SessionStatus,
/// Current and previous sessions.
sessions: Sessions,
sessions: Sessions<X>,
/// Key-exchange key pair for the next session.
next_kx_pair: Option<KxPair>,

Expand All @@ -300,7 +299,7 @@ pub struct Mixnet {
events: Events,
}

impl Mixnet {
impl<X> Mixnet<X> {
/// Create a new `Mixnet`.
pub fn new(config: Config) -> Self {
let sessions = Sessions {
Expand Down Expand Up @@ -398,13 +397,10 @@ impl Mixnet {
///
/// - Checking for connectivity (they are passed to [`NetworkStatus::is_connected`]).
/// - Sending packets (they are put in [`AddressedPacket::peer_id`]).
///
/// The mixnode external addresses are merely collated and returned by
/// [`reserved_peer_addresses`](Self::reserved_peer_addresses).
pub fn maybe_set_mixnodes(
&mut self,
rel_session_index: RelSessionIndex,
mixnodes: &mut dyn FnMut() -> Result<Vec<Mixnode>, MixnodesErr>,
mixnodes: &mut dyn FnMut() -> Result<Vec<Mixnode<X>>, MixnodesErr>,
) {
let session = &mut self.sessions[rel_session_index];
if !matches!(session, SessionSlot::Empty | SessionSlot::KxPair(_)) {
Expand Down Expand Up @@ -483,13 +479,9 @@ impl Mixnet {
.public()
}

/// Returns the addresses of the peers we should try to maintain connections to.
pub fn reserved_peer_addresses(&self) -> HashSet<Multiaddr> {
self.sessions
.iter()
.flat_map(|session| session.topology.reserved_peer_addresses())
.cloned()
.collect()
/// Returns the mixnodes we should try to maintain connections to.
pub fn reserved_peers(&self) -> impl Iterator<Item = &Mixnode<X>> {
self.sessions.iter().flat_map(|session| session.topology.reserved_peers())
}

/// Handle an incoming packet. If the packet completes a message, the message is returned.
Expand Down
8 changes: 4 additions & 4 deletions src/core/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ pub struct RouteMetrics {
pub forwarding_delay: Delay,
}

pub struct RequestBuilder<'topology> {
route_generator: RouteGenerator<'topology>,
pub struct RequestBuilder<'topology, X> {
route_generator: RouteGenerator<'topology, X>,
destination_index: MixnodeIndex,
}

impl<'topology> RequestBuilder<'topology> {
impl<'topology, X> RequestBuilder<'topology, X> {
pub fn new(
rng: &mut (impl Rng + CryptoRng),
topology: &'topology Topology,
topology: &'topology Topology<X>,
ns: &dyn NetworkStatus,
destination_index: Option<MixnodeIndex>,
) -> Result<Self, TopologyErr> {
Expand Down
32 changes: 16 additions & 16 deletions src/core/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use std::{
time::Duration,
};

pub struct Session {
pub struct Session<X> {
/// Key-exchange key pair.
pub kx_pair: KxPair,
/// Mixnode topology.
pub topology: Topology,
pub topology: Topology<X>,
/// Queue of packets authored by us, to be dispatched in place of drop cover traffic.
pub authored_packet_queue: AuthoredPacketQueue,
/// See [`SessionConfig`](super::config::SessionConfig::mean_authored_packet_period).
Expand Down Expand Up @@ -84,60 +84,60 @@ impl Add<SessionIndex> for RelSessionIndex {
}
}

pub enum SessionSlot {
pub enum SessionSlot<X> {
Empty,
KxPair(KxPair),
/// Like [`Empty`](Self::Empty), but we should not try to create a [`Session`] struct.
Disabled,
Full(Session),
Full(Session<X>),
}

impl SessionSlot {
impl<X> SessionSlot<X> {
pub fn is_empty(&self) -> bool {
matches!(self, Self::Empty)
}

pub fn as_option(&self) -> Option<&Session> {
pub fn as_option(&self) -> Option<&Session<X>> {
match self {
Self::Full(session) => Some(session),
_ => None,
}
}

pub fn as_mut_option(&mut self) -> Option<&mut Session> {
pub fn as_mut_option(&mut self) -> Option<&mut Session<X>> {
match self {
Self::Full(session) => Some(session),
_ => None,
}
}
}

pub struct Sessions {
pub current: SessionSlot,
pub prev: SessionSlot,
pub struct Sessions<X> {
pub current: SessionSlot<X>,
pub prev: SessionSlot<X>,
}

impl Sessions {
impl<X> Sessions<X> {
pub fn is_empty(&self) -> bool {
self.current.is_empty() && self.prev.is_empty()
}

pub fn iter(&self) -> impl Iterator<Item = &Session> {
pub fn iter(&self) -> impl Iterator<Item = &Session<X>> {
[&self.current, &self.prev]
.into_iter()
.filter_map(|session| session.as_option())
}

/// This is guaranteed to return the current session first, if it exists.
pub fn enumerate_mut(&mut self) -> impl Iterator<Item = (RelSessionIndex, &mut Session)> {
pub fn enumerate_mut(&mut self) -> impl Iterator<Item = (RelSessionIndex, &mut Session<X>)> {
[(RelSessionIndex::Current, &mut self.current), (RelSessionIndex::Prev, &mut self.prev)]
.into_iter()
.filter_map(|(index, session)| session.as_mut_option().map(|session| (index, session)))
}
}

impl Index<RelSessionIndex> for Sessions {
type Output = SessionSlot;
impl<X> Index<RelSessionIndex> for Sessions<X> {
type Output = SessionSlot<X>;

fn index(&self, index: RelSessionIndex) -> &Self::Output {
match index {
Expand All @@ -147,7 +147,7 @@ impl Index<RelSessionIndex> for Sessions {
}
}

impl IndexMut<RelSessionIndex> for Sessions {
impl<X> IndexMut<RelSessionIndex> for Sessions<X> {
fn index_mut(&mut self, index: RelSessionIndex) -> &mut Self::Output {
match index {
RelSessionIndex::Current => &mut self.current,
Expand Down
33 changes: 16 additions & 17 deletions src/core/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,21 @@ use super::sphinx::{
};
use arrayvec::ArrayVec;
use either::Either;
use multiaddr::Multiaddr;
use rand::{seq::SliceRandom, CryptoRng, Rng};
use std::{
cmp::{max, min},
fmt,
};

/// Key-exchange public key, peer ID, and external addresses for a mixnode.
/// Per-mixnode data.
#[derive(Clone)]
pub struct Mixnode {
pub struct Mixnode<X> {
/// Key-exchange public key for the mixnode.
pub kx_public: KxPublic,
/// Peer ID for the mixnode.
pub peer_id: PeerId,
/// External addresses for the mixnode.
pub external_addresses: Vec<Multiaddr>,
/// Extra data; for use by the crate user.
pub extra: X,
}

enum LocalNode {
Expand All @@ -65,17 +64,17 @@ pub enum TopologyErr {
NoConnectedGatewayMixnodes,
}

pub struct Topology {
mixnodes: Vec<Mixnode>,
pub struct Topology<X> {
mixnodes: Vec<Mixnode<X>>,
local_kx_public: KxPublic,
local_node: LocalNode,
}

impl Topology {
impl<X> Topology<X> {
/// `mixnodes` must be no longer than [`MAX_MIXNODE_INDEX + 1`](MAX_MIXNODE_INDEX).
pub fn new(
rng: &mut impl Rng,
mixnodes: Vec<Mixnode>,
mixnodes: Vec<Mixnode<X>>,
local_kx_public: &KxPublic,
num_gateway_mixnodes: u32,
) -> Self {
Expand Down Expand Up @@ -129,7 +128,7 @@ impl Topology {
matches!(self.local_node, LocalNode::Mixnode(_))
}

pub fn reserved_peer_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
pub fn reserved_peers(&self) -> impl Iterator<Item = &Mixnode<X>> {
let indices = match &self.local_node {
LocalNode::Mixnode(local_index) => Either::Left({
// Connect to all other mixnodes (ie exclude the local node)
Expand All @@ -139,7 +138,7 @@ impl Topology {
LocalNode::NonMixnode(gateway_indices) =>
Either::Right(gateway_indices.iter().map(|index| index.get())),
};
indices.flat_map(|index| self.mixnodes[index as usize].external_addresses.iter())
indices.map(|index| &self.mixnodes[index as usize])
}

pub fn mixnode_index_to_peer_id(&self, index: MixnodeIndex) -> Result<PeerId, TopologyErr> {
Expand All @@ -157,7 +156,7 @@ impl Topology {
}
}

impl fmt::Display for Topology {
impl<X> fmt::Display for Topology<X> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match &self.local_node {
LocalNode::Mixnode(local_index) => write!(fmt, "Local node is mixnode {local_index}"),
Expand Down Expand Up @@ -222,16 +221,16 @@ impl UsedIndices {
}
}

pub struct RouteGenerator<'topology> {
topology: &'topology Topology,
pub struct RouteGenerator<'topology, X> {
topology: &'topology Topology<X>,
local_peer_id: PeerId,
/// Always empty if the local node is a mixnode. Otherwise, the subset of the gateway mixnodes
/// from the topology that are currently connected.
connected_gateway_indices: ArrayVec<MixnodeIndex, MAX_CONNECTED_GATEWAY_INDICES>,
}

impl<'topology> RouteGenerator<'topology> {
pub fn new(topology: &'topology Topology, ns: &dyn NetworkStatus) -> Self {
impl<'topology, X> RouteGenerator<'topology, X> {
pub fn new(topology: &'topology Topology<X>, ns: &dyn NetworkStatus) -> Self {
let connected_gateway_indices = match &topology.local_node {
LocalNode::Mixnode(_) => ArrayVec::new(),
// If we're not a mixnode, we should have attempted to connect to a number of "gateway"
Expand All @@ -251,7 +250,7 @@ impl<'topology> RouteGenerator<'topology> {
Self { topology, local_peer_id: ns.local_peer_id(), connected_gateway_indices }
}

pub fn topology(&self) -> &'topology Topology {
pub fn topology(&self) -> &'topology Topology<X> {
self.topology
}

Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
//!
//! This crate is mostly network agnostic. While it determines which nodes should be connected and
//! which packets should be sent where, it does not care _how_ this is done. It's not entirely
//! agnostic; it uses multiaddrs for peer addresses and assumes that peers have 32-byte
//! globally-unique identifiers.
//! agnostic; it assumes that peers have 32-byte globally-unique identifiers.
#![warn(missing_docs)]
#![forbid(unsafe_code)]
Expand Down
10 changes: 5 additions & 5 deletions src/reply_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ReplyContext {
&self.message_id
}

fn post_reply(&mut self, reply: &Reply, mixnet: &mut Mixnet, config: &Config) {
fn post_reply<X>(&mut self, reply: &Reply, mixnet: &mut Mixnet<X>, config: &Config) {
for _ in 0..config.max_posts {
if let Err(err) = mixnet.post_reply(
&mut self.surbs,
Expand Down Expand Up @@ -140,10 +140,10 @@ impl ReplyManager {
/// If `Some` is returned, the caller should handle the request and then call either
/// [`abandon`](Self::abandon) or [`complete`](Self::complete) with the [`ReplyContext`]. The
/// `Vec<u8>` contains the request message data.
pub fn insert(
pub fn insert<X>(
&mut self,
message: RequestMessage,
mixnet: &mut Mixnet,
mixnet: &mut Mixnet<X>,
) -> Option<(ReplyContext, Vec<u8>)> {
let mut reply_context = ReplyContext {
session_index: message.session_index,
Expand Down Expand Up @@ -198,11 +198,11 @@ impl ReplyManager {
}

/// Complete a request. This will post the reply and cache it for repeat requests.
pub fn complete(
pub fn complete<X>(
&mut self,
mut reply_context: ReplyContext,
data: Vec<u8>,
mixnet: &mut Mixnet,
mixnet: &mut Mixnet<X>,
) {
let state = match self.states.entry(reply_context.message_id) {
Entry::Occupied(entry) => match entry.into_mut() {
Expand Down
Loading

0 comments on commit 2f25cd8

Please sign in to comment.