diff --git a/node/src/node_test_utils.rs b/node/src/node_test_utils.rs index 293dbb464..b82c663a3 100644 --- a/node/src/node_test_utils.rs +++ b/node/src/node_test_utils.rs @@ -15,6 +15,7 @@ use crate::sub_lib::framer::FramedChunk; use crate::sub_lib::framer::Framer; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; +use crate::sub_lib::utils::MessageScheduler; use crate::test_utils::recorder::Recorder; use actix::Actor; use actix::Addr; @@ -307,6 +308,10 @@ pub fn make_stream_handler_pool_subs_from_recorder(addr: &Addr) -> Str bind: recipient!(addr, PoolBindMessage), node_query_response: recipient!(addr, DispatcherNodeQueryResponse), node_from_ui_sub: recipient!(addr, NodeFromUiMessage), + scheduled_node_query_response_sub: recipient!( + addr, + MessageScheduler + ), } } diff --git a/node/src/stream_handler_pool.rs b/node/src/stream_handler_pool.rs index 46c600e73..a2bac3507 100644 --- a/node/src/stream_handler_pool.rs +++ b/node/src/stream_handler_pool.rs @@ -30,12 +30,12 @@ use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; use crate::sub_lib::tokio_wrappers::WriteHalfWrapper; -use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY}; -use actix::Actor; +use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY}; use actix::Addr; use actix::Context; use actix::Handler; use actix::Recipient; +use actix::{Actor, AsyncContext}; use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::localhost; @@ -43,7 +43,6 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::io; use std::net::SocketAddr; -use std::thread; use std::time::Duration; use tokio::prelude::Future; @@ -62,6 +61,7 @@ pub struct StreamHandlerPoolSubs { pub bind: Recipient, pub node_query_response: Recipient, pub node_from_ui_sub: Recipient, + pub scheduled_node_query_response_sub: Recipient>, } impl Clone for StreamHandlerPoolSubs { @@ -73,6 +73,7 @@ impl Clone for StreamHandlerPoolSubs { bind: self.bind.clone(), node_query_response: self.node_query_response.clone(), node_from_ui_sub: self.node_from_ui_sub.clone(), + scheduled_node_query_response_sub: self.scheduled_node_query_response_sub.clone(), } } } @@ -155,6 +156,18 @@ impl Handler for StreamHandlerPool { } } +// TODO: GH-686 - This handler can be implemented using a Procedural Macro +impl Handler> for StreamHandlerPool +where + StreamHandlerPool: Handler, +{ + type Result = (); + + fn handle(&mut self, msg: MessageScheduler, ctx: &mut Self::Context) -> Self::Result { + ctx.notify_later(msg.scheduled_msg, msg.delay); + } +} + impl Handler for StreamHandlerPool { type Result = (); @@ -205,6 +218,10 @@ impl StreamHandlerPool { bind: recipient!(pool_addr, PoolBindMessage), node_query_response: recipient!(pool_addr, DispatcherNodeQueryResponse), node_from_ui_sub: recipient!(pool_addr, NodeFromUiMessage), + scheduled_node_query_response_sub: recipient!( + pool_addr, + MessageScheduler + ), } } @@ -533,18 +550,19 @@ impl StreamHandlerPool { peer_addr, msg.context.data.len() ); - let recipient = self + let scheduled_node_query_response_sub = self .self_subs_opt .as_ref() - .expect("StreamHandlerPool is unbound.") - .node_query_response + .expect("StreamHandlerPool is unbound") + .scheduled_node_query_response_sub .clone(); - // TODO FIXME revisit once SC-358/GH-96 is done (idea: use notify_later() to delay messages) - thread::spawn(move || { - // to avoid getting into too-tight a resubmit loop, add a delay; in a separate thread, to avoid delaying other traffic - thread::sleep(Duration::from_millis(100)); - recipient.try_send(msg).expect("StreamHandlerPool is dead"); - }); + + scheduled_node_query_response_sub + .try_send(MessageScheduler { + scheduled_msg: msg, + delay: Duration::from_millis(100), + }) + .expect("StreamHandlerPool is dead"); } fn open_new_stream_and_recycle_message( diff --git a/node/src/sub_lib/utils.rs b/node/src/sub_lib/utils.rs index 74758bd0b..9b9480821 100644 --- a/node/src/sub_lib/utils.rs +++ b/node/src/sub_lib/utils.rs @@ -237,6 +237,12 @@ where implement_as_any!(); } +#[derive(Message, Clone, PartialEq, Eq)] +pub struct MessageScheduler { + pub scheduled_msg: M, + pub delay: Duration, +} + #[cfg(test)] mod tests { use super::*; diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index a3935d27a..f68fc3b9e 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -43,6 +43,7 @@ use crate::sub_lib::set_consuming_wallet_message::SetConsumingWalletMessage; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::ui_gateway::UiGatewaySubs; +use crate::sub_lib::utils::MessageScheduler; use crate::test_utils::recorder_stop_conditions::StopConditions; use crate::test_utils::to_millis; use crate::test_utils::unshared_test_utils::system_killer_actor::SystemKillerActor; @@ -139,6 +140,17 @@ recorder_message_handler!(ScanForPayables); recorder_message_handler!(ConnectionProgressMessage); recorder_message_handler!(ScanForPendingPayables); +impl Handler> for Recorder +where + M: Message + PartialEq + Send + 'static, +{ + type Result = (); + + fn handle(&mut self, msg: MessageScheduler, _ctx: &mut Self::Context) { + self.handle_msg(msg) + } +} + impl Handler for Recorder { type Result = MessageResult;