Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions trust-quorum/src/node_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ impl NodeCtx {
alarms: BTreeSet::new(),
}
}

#[cfg(any(test, feature = "testing"))]
pub fn clear_mutable_state(&mut self) {
self.persistent_state_changed = false;
self.outgoing.clear();
self.connected.clear();
self.alarms.clear();
}
}

impl NodeCommonCtx for NodeCtx {
Expand Down
2 changes: 2 additions & 0 deletions trust-quorum/src/rack_secret_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,14 @@ pub struct ShareCollector {
shares: BTreeMap<PlatformId, Share>,
}

#[cfg(feature = "danger_partial_eq_ct_wrapper")]
impl PartialEq for ShareCollector {
fn eq(&self, other: &Self) -> bool {
self.config == other.config && self.shares == other.shares
}
}

#[cfg(feature = "danger_partial_eq_ct_wrapper")]
impl Eq for ShareCollector {}

impl<'daft> ShareCollectorDiff<'daft> {
Expand Down
11 changes: 11 additions & 0 deletions trust-quorum/test-utils/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ pub enum Event {
DeliverNexusReply(NexusReply),
CommitConfiguration(PlatformId),
Reconfigure(NexusConfig),
CrashNode(PlatformId),
RestartNode {
id: PlatformId,
connection_order: Vec<PlatformId>,
},
}

impl Event {
Expand All @@ -50,6 +55,12 @@ impl Event {
Self::ClearSecrets(id) => vec![id.clone()],
Self::CommitConfiguration(id) => vec![id.clone()],
Self::Reconfigure(_) => vec![],
Self::CrashNode(id) => vec![id.clone()],
Self::RestartNode { id, connection_order } => {
let mut nodes = connection_order.clone();
nodes.push(id.clone());
nodes
}
}
}
}
7 changes: 0 additions & 7 deletions trust-quorum/test-utils/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,6 @@ impl NexusState {
(&config.coordinator, config.to_reconfigure_msg(self.rack_id))
}

/// Abort the latest reconfiguration attempt
pub fn abort_reconfiguration(&mut self) {
let config = self.configs.iter().last().expect("at least one config");
// Can only abort while preparing
assert_eq!(config.op, NexusOp::Preparing);
}

pub fn latest_config(&self) -> &NexusConfig {
self.configs.iter().last().expect("at least one config")
}
Expand Down
91 changes: 84 additions & 7 deletions trust-quorum/test-utils/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ impl TqState {
pub fn send_reconfigure_msg(&mut self) {
let (coordinator, msg) = self.nexus.reconfigure_msg_for_latest_config();
let epoch_to_config = msg.epoch;
if self.faults.crashed_nodes.contains(coordinator) {
// We must abort the configuration. This mimics a timeout.
self.nexus.abort_reconfiguration();
} else {
if !self.faults.crashed_nodes.contains(coordinator) {
let (node, ctx) = self
.sut
.nodes
Expand Down Expand Up @@ -176,7 +173,11 @@ impl TqState {

pub fn send_envelopes_from(&mut self, id: &PlatformId) {
let (_, ctx) = self.sut.nodes.get_mut(id).expect("node exists");
for envelope in ctx.drain_envelopes() {
// Only send envelopes to alive nodes
for envelope in ctx
.drain_envelopes()
.filter(|e| !self.faults.crashed_nodes.contains(&e.to))
{
let msgs =
self.bootstrap_network.entry(envelope.to.clone()).or_default();
msgs.push(envelope);
Expand Down Expand Up @@ -220,6 +221,12 @@ impl TqState {
Event::Reconfigure(nexus_config) => {
self.apply_event_reconfigure(nexus_config)
}
Event::CrashNode(id) => {
self.apply_event_crash_node(id);
}
Event::RestartNode { id, connection_order } => {
self.apply_event_restart_node(id, connection_order);
}
}
}

Expand Down Expand Up @@ -327,6 +334,73 @@ impl TqState {
self.underlay_network.push(reply);
}

fn apply_event_crash_node(&mut self, id: PlatformId) {
// We clear all the crashed node's destination messages
self.bootstrap_network.remove(&id);

// Keep track of the crashed node
self.faults.crashed_nodes.insert(id.clone());

// We get to define the semantics of the network with regards to an
// inflight message sourced from a crashed node. We have two choices:
// drop the message or let it be eventually delivered to the desination
// if the destination node doesn't crash before delivery. We choose
// the latter mostly for efficiency: we don't want to have to loop over
// every destination in the bootstrap network and filter messages.
//
// However, we do still have to call `node.on_disconnect()` at all
// connected nodes, so do that now. For simplicity, we do this at every
// alive node in the same step.
for (_, (node, ctx)) in self
.sut
.nodes
.iter_mut()
.filter(|(id, _)| !self.faults.crashed_nodes.contains(id))
{
node.on_disconnect(ctx, id.clone());
}
}

fn apply_event_restart_node(
&mut self,
id: PlatformId,
connection_order: Vec<PlatformId>,
) {
// The node is no longer crashed.
self.faults.crashed_nodes.remove(&id);

// We need to clear the mutable state of the `Node`. We do this by
// creating a new `Node` and passing in the existing context which
// contains the persistent state.
{
let (node, ctx) = self.sut.nodes.get_mut(&id).expect("node exists");
ctx.clear_mutable_state();
*node = Node::new(&self.log, ctx);
}

// We now need to connect to each node in the order given in
// `connection_order`. We do this by calling `on_connect` at the
// restarted node and the node in `connection_order`;
for peer in connection_order {
let (peer_node, peer_ctx) =
self.sut.nodes.get_mut(&peer).expect("node exists");
// Inform the peer of the connection
peer_node.on_connect(peer_ctx, id.clone());
// Send any messages output as a result of the connection
send_envelopes(
peer_ctx,
&mut self.bootstrap_network,
&mut self.faults,
);

let (node, ctx) = self.sut.nodes.get_mut(&id).expect("node exists");
// Inform the restarted node of the connection
node.on_connect(ctx, peer);
// Send any messages output as a result of the connection
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);
}
}

fn apply_event_deliver_nexus_reply(&mut self, recorded_reply: NexusReply) {
let mut latest_config = self.nexus.latest_config_mut();
let reply = self.underlay_network.pop().expect("reply exists");
Expand Down Expand Up @@ -404,7 +478,7 @@ impl TqState {
}

// Send any messages as a result of handling this message
send_envelopes(ctx, &mut self.bootstrap_network);
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);

// Remove any destinations with zero messages in-flight
self.bootstrap_network.retain(|_, msgs| !msgs.is_empty());
Expand Down Expand Up @@ -462,8 +536,11 @@ impl TqState {
fn send_envelopes(
ctx: &mut NodeCtx,
bootstrap_network: &mut BTreeMap<PlatformId, Vec<Envelope>>,
faults: &Faults,
) {
for envelope in ctx.drain_envelopes() {
for envelope in
ctx.drain_envelopes().filter(|e| !faults.crashed_nodes.contains(&e.to))
{
let envelopes =
bootstrap_network.entry(envelope.to.clone()).or_default();
envelopes.push(envelope);
Expand Down
Loading
Loading