@@ -91,10 +91,7 @@ impl TqState {
9191 pub fn send_reconfigure_msg ( & mut self ) {
9292 let ( coordinator, msg) = self . nexus . reconfigure_msg_for_latest_config ( ) ;
9393 let epoch_to_config = msg. epoch ;
94- if self . faults . crashed_nodes . contains ( coordinator) {
95- // We must abort the configuration. This mimics a timeout.
96- self . nexus . abort_reconfiguration ( ) ;
97- } else {
94+ if !self . faults . crashed_nodes . contains ( coordinator) {
9895 let ( node, ctx) = self
9996 . sut
10097 . nodes
@@ -176,7 +173,11 @@ impl TqState {
176173
177174 pub fn send_envelopes_from ( & mut self , id : & PlatformId ) {
178175 let ( _, ctx) = self . sut . nodes . get_mut ( id) . expect ( "node exists" ) ;
179- for envelope in ctx. drain_envelopes ( ) {
176+ // Only send envelopes to alive nodes
177+ for envelope in ctx
178+ . drain_envelopes ( )
179+ . filter ( |e| !self . faults . crashed_nodes . contains ( & e. to ) )
180+ {
180181 let msgs =
181182 self . bootstrap_network . entry ( envelope. to . clone ( ) ) . or_default ( ) ;
182183 msgs. push ( envelope) ;
@@ -220,6 +221,12 @@ impl TqState {
220221 Event :: Reconfigure ( nexus_config) => {
221222 self . apply_event_reconfigure ( nexus_config)
222223 }
224+ Event :: CrashNode ( id) => {
225+ self . apply_event_crash_node ( id) ;
226+ }
227+ Event :: RestartNode { id, connection_order } => {
228+ self . apply_event_restart_node ( id, connection_order) ;
229+ }
223230 }
224231 }
225232
@@ -327,6 +334,73 @@ impl TqState {
327334 self . underlay_network . push ( reply) ;
328335 }
329336
337+ fn apply_event_crash_node ( & mut self , id : PlatformId ) {
338+ // We clear all the crashed node's destination messages
339+ self . bootstrap_network . remove ( & id) ;
340+
341+ // Keep track of the crashed node
342+ self . faults . crashed_nodes . insert ( id. clone ( ) ) ;
343+
344+ // We get to define the semantics of the network with regards to an
345+ // inflight message sourced from a crashed node. We have two choices:
346+ // drop the message or let it be eventually delivered to the desination
347+ // if the destination node doesn't crash before delivery. We choose
348+ // the latter mostly for efficiency: we don't want to have to loop over
349+ // every destination in the bootstrap network and filter messages.
350+ //
351+ // However, we do still have to call `node.on_disconnect()` at all
352+ // connected nodes, so do that now. For simplicity, we do this at every
353+ // alive node in the same step.
354+ for ( _, ( node, ctx) ) in self
355+ . sut
356+ . nodes
357+ . iter_mut ( )
358+ . filter ( |( id, _) | !self . faults . crashed_nodes . contains ( id) )
359+ {
360+ node. on_disconnect ( ctx, id. clone ( ) ) ;
361+ }
362+ }
363+
364+ fn apply_event_restart_node (
365+ & mut self ,
366+ id : PlatformId ,
367+ connection_order : Vec < PlatformId > ,
368+ ) {
369+ // The node is no longer crashed.
370+ self . faults . crashed_nodes . remove ( & id) ;
371+
372+ // We need to clear the mutable state of the `Node`. We do this by
373+ // creating a new `Node` and passing in the existing context which
374+ // contains the persistent state.
375+ {
376+ let ( node, ctx) = self . sut . nodes . get_mut ( & id) . expect ( "node exists" ) ;
377+ ctx. clear_mutable_state ( ) ;
378+ * node = Node :: new ( & self . log , ctx) ;
379+ }
380+
381+ // We now need to connect to each node in the order given in
382+ // `connection_order`. We do this by calling `on_connect` at the
383+ // restarted node and the node in `connection_order`;
384+ for peer in connection_order {
385+ let ( peer_node, peer_ctx) =
386+ self . sut . nodes . get_mut ( & peer) . expect ( "node exists" ) ;
387+ // Inform the peer of the connection
388+ peer_node. on_connect ( peer_ctx, id. clone ( ) ) ;
389+ // Send any messages output as a result of the connection
390+ send_envelopes (
391+ peer_ctx,
392+ & mut self . bootstrap_network ,
393+ & mut self . faults ,
394+ ) ;
395+
396+ let ( node, ctx) = self . sut . nodes . get_mut ( & id) . expect ( "node exists" ) ;
397+ // Inform the restarted node of the connection
398+ node. on_connect ( ctx, peer) ;
399+ // Send any messages output as a result of the connection
400+ send_envelopes ( ctx, & mut self . bootstrap_network , & self . faults ) ;
401+ }
402+ }
403+
330404 fn apply_event_deliver_nexus_reply ( & mut self , recorded_reply : NexusReply ) {
331405 let mut latest_config = self . nexus . latest_config_mut ( ) ;
332406 let reply = self . underlay_network . pop ( ) . expect ( "reply exists" ) ;
@@ -404,7 +478,7 @@ impl TqState {
404478 }
405479
406480 // Send any messages as a result of handling this message
407- send_envelopes ( ctx, & mut self . bootstrap_network ) ;
481+ send_envelopes ( ctx, & mut self . bootstrap_network , & self . faults ) ;
408482
409483 // Remove any destinations with zero messages in-flight
410484 self . bootstrap_network . retain ( |_, msgs| !msgs. is_empty ( ) ) ;
@@ -462,8 +536,11 @@ impl TqState {
462536fn send_envelopes (
463537 ctx : & mut NodeCtx ,
464538 bootstrap_network : & mut BTreeMap < PlatformId , Vec < Envelope > > ,
539+ faults : & Faults ,
465540) {
466- for envelope in ctx. drain_envelopes ( ) {
541+ for envelope in
542+ ctx. drain_envelopes ( ) . filter ( |e| !faults. crashed_nodes . contains ( & e. to ) )
543+ {
467544 let envelopes =
468545 bootstrap_network. entry ( envelope. to . clone ( ) ) . or_default ( ) ;
469546 envelopes. push ( envelope) ;
0 commit comments