Skip to content

Commit

Permalink
GH-678: Create a Message Scheduler (#249)
Browse files Browse the repository at this point in the history
* Actor StreamHandlerPool and friends renamed to NeighborStreamHandlerPool

* More cleanup renaming

* One more...

* code formatting

* Big method is broken into three; needs more

* Big match statement is broken into three methods. Possibly more to come.

* Doesn't work; StreamStarter needs to be split and renamed

* Closer, but still doesn't work

* Make success and failure handler inside the open_new_stream_and_recycle_message

* Modify the way we return results

* Rename function name and variable

* Changed an error message

* Appeased clippy

* Appeased formatter

* master-commented: Corrected TODO comment

* GH-677: add the comment over stream_handler_pool, to make it easier to differentiate from the other

* GH-677: revert the rename to StreamHandlerPool

* GH-677: revert the rename to StreamHandlerPoolSubs

* GH-677: revert the rename to StreamHandlerPoolCluster

* GH-677: revert the name from NeighborPublicKey to Key inside the enum Endpoint

* GH-677: rename the filename from neighbor_stream_handler_pool.rs to stream_handler_pool.rs

* GH-677: rename NeighborStreamHandlerPool to StreamHandlerPool at other places

* GH-677: revert rename in the reamining places

* GH-677: rename in crash_command.rs

* GH-677: change the debug to an error

* GH-677: remove the comment 'way to big'

* GH-678: initialize ScheduleMessage

* GH-678: send a message succusfully

* GH-678: rename the name to MessageScheduler

* GH-678: introduce the schedule_msg as a field

* GH-678: reorder imports

* GH-678: fix the Send build error with schedule msg

* GH-678: use the generic Message Type for the MessageScheduler

* GH-679: add the ability to send the schedule msg

* GH-678: integrate the message scheduler with the old code

* GH-678: remove unnecessary bounds from the MessageScheduler

* GH-678: code cleanup

* GH-678: remove the NotifyHandler field from StreamHandlerPool

* GH-678: make the handler for the MessageScheduler generic

* GH-678: use ctx for notify later instead of NotifyHandlerReal

* GH-678: formatting fixes

* GH-678: migrate MessageScheduler to the messages.rs

* GH-678: Review 1 Changes (#253)

* feat: migrate MessageScheduler to the sub_lib/utils.rs

* feat: rename to scheduled_msg

* GH-678: rename duration to delay

* GH-678: rename the name of the sub

* GH-678: add a todo for the procedural macro card

* GH-678: rename to scheduled_node_query_response_sub

* GH-678: migrate the MessageScheduler handler for Recorder after the recorder_message_handler implementaions

---------

Co-authored-by: Dan Wiebe <[email protected]>
  • Loading branch information
utkarshg6 and dnwiebe authored Mar 31, 2023
1 parent 90048cb commit 7699dc2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
5 changes: 5 additions & 0 deletions node/src/node_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::sub_lib::framer::FramedChunk;
use crate::sub_lib::framer::Framer;
use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::utils::MessageScheduler;
use crate::test_utils::recorder::Recorder;
use actix::Actor;
use actix::Addr;
Expand Down Expand Up @@ -307,6 +308,10 @@ pub fn make_stream_handler_pool_subs_from_recorder(addr: &Addr<Recorder>) -> Str
bind: recipient!(addr, PoolBindMessage),
node_query_response: recipient!(addr, DispatcherNodeQueryResponse),
node_from_ui_sub: recipient!(addr, NodeFromUiMessage),
scheduled_node_query_response_sub: recipient!(
addr,
MessageScheduler<DispatcherNodeQueryResponse>
),
}
}

Expand Down
42 changes: 30 additions & 12 deletions node/src/stream_handler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,19 @@ use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use crate::sub_lib::tokio_wrappers::WriteHalfWrapper;
use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY};
use actix::Actor;
use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY};
use actix::Addr;
use actix::Context;
use actix::Handler;
use actix::Recipient;
use actix::{Actor, AsyncContext};
use masq_lib::logger::Logger;
use masq_lib::ui_gateway::NodeFromUiMessage;
use masq_lib::utils::localhost;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::io;
use std::net::SocketAddr;
use std::thread;
use std::time::Duration;
use tokio::prelude::Future;

Expand All @@ -62,6 +61,7 @@ pub struct StreamHandlerPoolSubs {
pub bind: Recipient<PoolBindMessage>,
pub node_query_response: Recipient<DispatcherNodeQueryResponse>,
pub node_from_ui_sub: Recipient<NodeFromUiMessage>,
pub scheduled_node_query_response_sub: Recipient<MessageScheduler<DispatcherNodeQueryResponse>>,
}

impl Clone for StreamHandlerPoolSubs {
Expand All @@ -73,6 +73,7 @@ impl Clone for StreamHandlerPoolSubs {
bind: self.bind.clone(),
node_query_response: self.node_query_response.clone(),
node_from_ui_sub: self.node_from_ui_sub.clone(),
scheduled_node_query_response_sub: self.scheduled_node_query_response_sub.clone(),
}
}
}
Expand Down Expand Up @@ -155,6 +156,18 @@ impl Handler<DispatcherNodeQueryResponse> for StreamHandlerPool {
}
}

// TODO: GH-686 - This handler can be implemented using a Procedural Macro
impl<M: actix::Message + 'static> Handler<MessageScheduler<M>> for StreamHandlerPool
where
StreamHandlerPool: Handler<M>,
{
type Result = ();

fn handle(&mut self, msg: MessageScheduler<M>, ctx: &mut Self::Context) -> Self::Result {
ctx.notify_later(msg.scheduled_msg, msg.delay);
}
}

impl Handler<PoolBindMessage> for StreamHandlerPool {
type Result = ();

Expand Down Expand Up @@ -205,6 +218,10 @@ impl StreamHandlerPool {
bind: recipient!(pool_addr, PoolBindMessage),
node_query_response: recipient!(pool_addr, DispatcherNodeQueryResponse),
node_from_ui_sub: recipient!(pool_addr, NodeFromUiMessage),
scheduled_node_query_response_sub: recipient!(
pool_addr,
MessageScheduler<DispatcherNodeQueryResponse>
),
}
}

Expand Down Expand Up @@ -533,18 +550,19 @@ impl StreamHandlerPool {
peer_addr,
msg.context.data.len()
);
let recipient = self
let scheduled_node_query_response_sub = self
.self_subs_opt
.as_ref()
.expect("StreamHandlerPool is unbound.")
.node_query_response
.expect("StreamHandlerPool is unbound")
.scheduled_node_query_response_sub
.clone();
// TODO FIXME revisit once SC-358/GH-96 is done (idea: use notify_later() to delay messages)
thread::spawn(move || {
// to avoid getting into too-tight a resubmit loop, add a delay; in a separate thread, to avoid delaying other traffic
thread::sleep(Duration::from_millis(100));
recipient.try_send(msg).expect("StreamHandlerPool is dead");
});

scheduled_node_query_response_sub
.try_send(MessageScheduler {
scheduled_msg: msg,
delay: Duration::from_millis(100),
})
.expect("StreamHandlerPool is dead");
}

fn open_new_stream_and_recycle_message(
Expand Down
6 changes: 6 additions & 0 deletions node/src/sub_lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ where
implement_as_any!();
}

#[derive(Message, Clone, PartialEq, Eq)]
pub struct MessageScheduler<M: Message> {
pub scheduled_msg: M,
pub delay: Duration,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 12 additions & 0 deletions node/src/test_utils/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::sub_lib::set_consuming_wallet_message::SetConsumingWalletMessage;
use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse;
use crate::sub_lib::stream_handler_pool::TransmitDataMsg;
use crate::sub_lib::ui_gateway::UiGatewaySubs;
use crate::sub_lib::utils::MessageScheduler;
use crate::test_utils::recorder_stop_conditions::StopConditions;
use crate::test_utils::to_millis;
use crate::test_utils::unshared_test_utils::system_killer_actor::SystemKillerActor;
Expand Down Expand Up @@ -139,6 +140,17 @@ recorder_message_handler!(ScanForPayables);
recorder_message_handler!(ConnectionProgressMessage);
recorder_message_handler!(ScanForPendingPayables);

impl<M> Handler<MessageScheduler<M>> for Recorder
where
M: Message + PartialEq + Send + 'static,
{
type Result = ();

fn handle(&mut self, msg: MessageScheduler<M>, _ctx: &mut Self::Context) {
self.handle_msg(msg)
}
}

impl Handler<NodeQueryMessage> for Recorder {
type Result = MessageResult<NodeQueryMessage>;

Expand Down

0 comments on commit 7699dc2

Please sign in to comment.