diff --git a/Cargo.lock b/Cargo.lock index 0b51dff5..8dda832d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1239,9 +1239,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7282d924be3275cec7f6756ff4121987bc6481325397dde6ba3e7802b1a8b1c" +checksum = "265d751d31d6780a3f956bb5b8022feba2d94eeee5a84ba64f4212eedca42213" [[package]] name = "linked-hash-map" @@ -1425,9 +1425,9 @@ checksum = "a9a7ab5d64814df0fe4a4b5ead45ed6c5f181ee3ff04ba344313a6c80446c5d4" [[package]] name = "once_cell" -version = "1.7.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10acf907b94fc1b1a152d08ef97e7759650268cf986bf127f387e602b02c7e5a" +checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" [[package]] name = "oncemutex" @@ -1470,9 +1470,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project-lite" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" +checksum = "0cf491442e4b033ed1c722cb9f0df5fcfcf4de682466c46469c36bc47dc5548a" [[package]] name = "pin-utils" @@ -1904,9 +1904,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43535db9747a4ba938c0ce0a98cc631a46ebf943c9e1d604e091df6007620bf6" +checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" dependencies = [ "itoa", "ryu", @@ -2148,9 +2148,9 @@ dependencies = [ [[package]] name = "tinytemplate" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ada8616fad06a2d0c455adc530de4ef57605a8120cc65da9653e0e9623ca74" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" dependencies = [ "serde", "serde_json", diff --git a/core/src/dispatch/mod.rs b/core/src/dispatch/mod.rs index 0c926d09..bcabddd6 100644 --- a/core/src/dispatch/mod.rs +++ b/core/src/dispatch/mod.rs @@ -47,6 +47,7 @@ pub mod queue_manager; // Default values for network config. const RETRY_CONNECTIONS_INTERVAL: u64 = 5000; +const BOOT_TIMEOUT: u64 = 5000; const MAX_RETRY_ATTEMPTS: u8 = 10; type NetHashMap = FxHashMap; @@ -74,6 +75,7 @@ pub struct NetworkConfig { tcp_nodelay: bool, max_connection_retry_attempts: u8, connection_retry_interval: u64, + boot_timeout: u64, } impl NetworkConfig { @@ -88,6 +90,7 @@ impl NetworkConfig { tcp_nodelay: true, max_connection_retry_attempts: MAX_RETRY_ATTEMPTS, connection_retry_interval: RETRY_CONNECTIONS_INTERVAL, + boot_timeout: BOOT_TIMEOUT, } } @@ -95,15 +98,9 @@ impl NetworkConfig { /// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self { buffer_config.validate(); - NetworkConfig { - addr, - transport: Transport::Tcp, - buffer_config, - custom_allocator: None, - tcp_nodelay: true, - max_connection_retry_attempts: MAX_RETRY_ATTEMPTS, - connection_retry_interval: RETRY_CONNECTIONS_INTERVAL, - } + let mut cfg = NetworkConfig::new(addr); + cfg.set_buffer_config(buffer_config); + cfg } /// Create a new config with `addr` and protocol [TCP](Transport::Tcp) @@ -122,6 +119,7 @@ impl NetworkConfig { tcp_nodelay: true, max_connection_retry_attempts: MAX_RETRY_ATTEMPTS, connection_retry_interval: RETRY_CONNECTIONS_INTERVAL, + boot_timeout: BOOT_TIMEOUT, } } @@ -192,6 +190,18 @@ impl NetworkConfig { pub fn get_connection_retry_interval(&self) -> u64 { self.connection_retry_interval } + + /// Configures how long the system will wait (in ms) for the network layer to set-up + /// + /// Default value is 5000 ms. + pub fn set_boot_timeout(&mut self, milliseconds: u64) { + self.boot_timeout = milliseconds; + } + + /// How long (in ms) the system will wait (in ms) for the network layer to set-up + pub fn get_boot_timeout(&self) -> u64 { + self.boot_timeout + } } /// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::Tcp) @@ -205,6 +215,7 @@ impl Default for NetworkConfig { tcp_nodelay: true, max_connection_retry_attempts: MAX_RETRY_ATTEMPTS, connection_retry_interval: RETRY_CONNECTIONS_INTERVAL, + boot_timeout: BOOT_TIMEOUT, } } } @@ -653,15 +664,12 @@ impl NetworkDispatcher { } } ConnectionState::Initializing => { - //debug!(self.ctx.log(), "Connection is initializing; queuing frame"); self.queue_manager.enqueue_data(data, addr); None } ConnectionState::Closed => { - // Enqueue the Frame and request a connection to the destination. self.queue_manager.enqueue_data(data, addr); if let Some(bridge) = &self.net_bridge { - // Request a new connection bridge.connect(Tcp, addr)?; } Some(ConnectionState::Initializing) @@ -829,9 +837,10 @@ impl NetworkDispatcher { if let Some(state) = self.connections.get_mut(&addr) { match state { ConnectionState::Connected => { - debug!( + trace!( self.ctx.log(), - "Closing channel to connected system {}", addr + "Closing channel to connected system {}", + addr ); if let Some(bridge) = &self.net_bridge { while self.queue_manager.has_data(&addr) { @@ -953,10 +962,9 @@ impl ComponentLifecycle for NetworkDispatcher { impl Provide for NetworkDispatcher { fn handle(&mut self, event: ::Request) -> Handled { - trace!( + debug!( self.ctx.log(), - "Received NetworkStatusPort Request {:?}", - event + "Received NetworkStatusPort Request {:?}", event ); match event { NetworkStatusRequest::DisconnectSystem(system_path) => { diff --git a/core/src/net/buffers/decode_buffer.rs b/core/src/net/buffers/decode_buffer.rs index acbcd067..fca09c5f 100644 --- a/core/src/net/buffers/decode_buffer.rs +++ b/core/src/net/buffers/decode_buffer.rs @@ -65,11 +65,29 @@ impl DecodeBuffer { // If the readable portion of the buffer would have less than `encode_buf_min_free_space` // Or if we would return less than 8 readable bytes we don't allow further writing into // the current buffer, caller must swap. - // TODO: Define what is a sensible amount of minimum bytes to be read at any given moment. - if self.writeable_len() < 8 { - return None; + if self.is_writeable() { + unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) } + } else { + None } - unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) } + } + + /// True if there is sufficient amount of writeable bytes + pub(crate) fn is_writeable(&mut self) -> bool { + self.writeable_len() > 8 + } + + /// Returns true if there is data to be decoded, else false + pub(crate) fn has_frame(&mut self) -> io::Result { + if self.decode_frame_head().is_err() { + return Err(io::Error::new(io::ErrorKind::InvalidData, "framing error")); + } + if let Some(head) = &self.next_frame_head { + if self.readable_len() >= head.content_length() { + return Ok(true); + } + } + Ok(false) } /// Swaps the underlying buffer in place with other @@ -86,7 +104,10 @@ impl DecodeBuffer { if let Some(mut overflow_chunk) = overflow { // TODO: change the config parameter to a separate value? let overflow_len = overflow_chunk.remaining(); - if self.writeable_len() - overflow_len > self.buffer_config.encode_buf_min_free_space { + if overflow_len < self.writeable_len() + && self.writeable_len() - overflow_len + > self.buffer_config.encode_buf_min_free_space + { // Just copy the overflow_chunk bytes, no need to chain. // the overflow must not exceed the new buffers capacity unsafe { @@ -145,47 +166,41 @@ impl DecodeBuffer { /// Tries to decode one frame from the readable part of the buffer pub fn get_frame(&mut self) -> Result { + self.decode_frame_head()?; if let Some(head) = &self.next_frame_head { if self.readable_len() >= head.content_length() { let head = self.next_frame_head.take().unwrap(); - let chunk_lease = self.read_chunk_lease(head.content_length()); - match head.frame_type() { + return match head.frame_type() { // Frames with empty bodies should be handled in frame-head decoding below. FrameType::Data => { - Data::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame) + Data::decode_from(self.read_chunk_lease(head.content_length())) + .map_err(|_| FramingError::InvalidFrame) } - FrameType::StreamRequest => StreamRequest::decode_from(chunk_lease) - .map_err(|_| FramingError::InvalidFrame), FrameType::Hello => { - Hello::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame) + Hello::decode_from(self.read_chunk_lease(head.content_length())) + .map_err(|_| FramingError::InvalidFrame) } FrameType::Start => { - Start::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame) - } - FrameType::Ack => { - Ack::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame) + Start::decode_from(self.read_chunk_lease(head.content_length())) + .map_err(|_| FramingError::InvalidFrame) } + // Frames without content match here for expediency, Decoder doesn't allow 0 length. + FrameType::Bye => Ok(Frame::Bye()), + FrameType::Ack => Ok(Frame::Ack()), _ => Err(FramingError::UnsupportedFrameType), - } - } else { - Err(FramingError::NoData) + }; } - } else if self.readable_len() >= FRAME_HEAD_LEN as usize { + } + Err(FramingError::NoData) + } + + fn decode_frame_head(&mut self) -> Result<(), FramingError> { + if self.next_frame_head.is_none() && self.readable_len() >= FRAME_HEAD_LEN as usize { let mut chunk_lease = self.read_chunk_lease(FRAME_HEAD_LEN as usize); let head = FrameHead::decode_from(&mut chunk_lease)?; - if head.content_length() == 0 { - match head.frame_type() { - // Frames without content match here for expediency, Decoder doesn't allow 0 length. - FrameType::Bye => Ok(Frame::Bye()), - _ => Err(FramingError::NoData), - } - } else { - self.next_frame_head = Some(head); - self.get_frame() - } - } else { - Err(FramingError::NoData) + self.next_frame_head = Some(head); } + Ok(()) } /// Extracts the readable portion (if any) from the active buffer as a ChunkLease @@ -198,11 +213,6 @@ impl DecodeBuffer { } None } - - /// Destroys the DecodeBuffer and returns the BufferChunk - pub(crate) fn destroy(self) -> BufferChunk { - self.buffer - } } #[cfg(test)] diff --git a/core/src/net/frames.rs b/core/src/net/frames.rs index f0c9e72c..04c4476b 100644 --- a/core/src/net/frames.rs +++ b/core/src/net/frames.rs @@ -47,10 +47,6 @@ impl From for FramingError { /// Core network frame definition #[derive(Debug)] pub enum Frame { - /// Request Credits for credit-based flow-control - StreamRequest(StreamRequest), - /// Give Credits for credit-based flow-control - CreditUpdate(CreditUpdate), /// Frame of Data Data(Data), /// Hello, used to initiate network channels @@ -58,7 +54,7 @@ pub enum Frame { /// Start, used to initiate network channels Start(Start), /// Ack to acknowledge that the connection is started. - Ack(Ack), + Ack(), /// Bye to signal that a channel is closing. Bye(), } @@ -67,38 +63,23 @@ impl Frame { /// Returns which FrameType a Frame is. pub fn frame_type(&self) -> FrameType { match *self { - Frame::StreamRequest(_) => FrameType::StreamRequest, - Frame::CreditUpdate(_) => FrameType::CreditUpdate, Frame::Data(_) => FrameType::Data, Frame::Hello(_) => FrameType::Hello, Frame::Start(_) => FrameType::Start, - Frame::Ack(_) => FrameType::Ack, + Frame::Ack() => FrameType::Ack, Frame::Bye() => FrameType::Bye, } } - /* - pub fn decode_from(buf: &mut ChunkLease) -> Result { - //let mut buf = buf.into_buf(); - let head = FrameHead::decode_from( buf)?; - match head.frame_type { - FrameType::StreamRequest => StreamRequest::decode_from( buf), - FrameType::Data => Data::decode_from( buf), - FrameType::CreditUpdate => CreditUpdate::decode_from( buf), - _ => unimplemented!(), - } - }*/ /// Encode a frame into a BufMut pub fn encode_into(&mut self, dst: &mut B) -> Result<(), FramingError> { let mut head = FrameHead::new(self.frame_type(), self.encoded_len()); head.encode_into(dst); match self { - Frame::StreamRequest(frame) => frame.encode_into(dst), - Frame::CreditUpdate(frame) => frame.encode_into(dst), Frame::Data(frame) => frame.encode_into(dst), Frame::Hello(frame) => frame.encode_into(dst), Frame::Start(frame) => frame.encode_into(dst), - Frame::Ack(frame) => frame.encode_into(dst), + Frame::Ack() => Ok(()), Frame::Bye() => Ok(()), } } @@ -106,12 +87,9 @@ impl Frame { /// Returns the number of bytes required to serialize this frame pub fn encoded_len(&self) -> usize { match *self { - Frame::StreamRequest(ref frame) => frame.encoded_len(), - Frame::CreditUpdate(ref frame) => frame.encoded_len(), Frame::Data(ref frame) => frame.encoded_len(), Frame::Hello(ref frame) => frame.encoded_len(), Frame::Start(ref frame) => frame.encoded_len(), - Frame::Ack(ref frame) => frame.encoded_len(), _ => 0, } } @@ -146,7 +124,7 @@ pub struct Data { } /// Hello, used to initiate network channels -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Hello { /// The Cannonical Address of the host saying Hello pub addr: SocketAddr, @@ -161,23 +139,12 @@ pub struct Start { pub id: Uuid, } -/// Hello, used to initiate network channels -#[derive(Debug)] -pub struct Ack { - /// Ack where we're ready to start receiving from. - pub offset: u128, -} - /// Byte-mappings for frame types #[repr(u8)] #[derive(Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq)] pub enum FrameType { - /// Request Credits for credit-based flow-control - StreamRequest = 0x01, /// Frame of Data Data = 0x02, - /// Give Credits for credit-based flow-control - CreditUpdate = 0x03, /// Hello, used to initiate network channels Hello = 0x04, /// Start, used to initiate network channels @@ -193,9 +160,7 @@ pub enum FrameType { impl From for FrameType { fn from(byte: u8) -> Self { match byte { - 0x01 => FrameType::StreamRequest, 0x02 => FrameType::Data, - 0x03 => FrameType::CreditUpdate, 0x04 => FrameType::Hello, 0x05 => FrameType::Start, 0x06 => FrameType::Ack, @@ -262,13 +227,6 @@ impl FrameHead { } } -impl StreamRequest { - #[allow(dead_code)] - pub(crate) fn new(credit_capacity: u32) -> Self { - StreamRequest { credit_capacity } - } -} - impl Hello { /// Create a new hello message pub fn new(addr: SocketAddr) -> Self { @@ -316,10 +274,6 @@ impl Data { impl FrameExt for Data { fn decode_from(payload: ChunkLease) -> Result { - /*if src.remaining() < 12 { - return Err(FramingError::InvalidFrame); - } */ - //let payload: Bytes = src.to_bytes(); let data_frame = Data { payload }; Ok(Frame::Data(data_frame)) } @@ -339,31 +293,6 @@ impl FrameExt for Data { } } -impl FrameExt for StreamRequest { - fn decode_from(mut src: ChunkLease) -> Result { - if src.remaining() < 8 { - return Err(FramingError::InvalidFrame); - } - //let stream_id: StreamId = src.get_u32_be().into(); - let credit = src.get_u32(); - let stream_req = StreamRequest { - credit_capacity: credit, - }; - Ok(Frame::StreamRequest(stream_req)) - } - - fn encode_into(&mut self, dst: &mut B) -> Result<(), FramingError> { - assert!(dst.remaining_mut() >= self.encoded_len()); - //dst.put_u32_be(self.stream_id.into()); - dst.put_u32(self.credit_capacity); - Ok(()) - } - - fn encoded_len(&self) -> usize { - 4 //stream_id + credit_capacity - } -} - impl FrameExt for Hello { fn decode_from(mut src: ChunkLease) -> Result { match src.get_u8() { @@ -467,34 +396,3 @@ impl FrameExt for Start { } } } - -impl FrameExt for Ack { - fn decode_from(mut src: ChunkLease) -> Result { - Ok(Frame::Ack(Ack { - offset: src.get_u128(), - })) - } - - fn encode_into(&mut self, dst: &mut B) -> Result<(), FramingError> { - dst.put_u128(self.offset); - Ok(()) - } - - fn encoded_len(&self) -> usize { - 16 - } -} - -impl FrameExt for CreditUpdate { - fn decode_from(_src: ChunkLease) -> Result { - unimplemented!() - } - - fn encode_into(&mut self, _dst: &mut B) -> Result<(), FramingError> { - unimplemented!() - } - - fn encoded_len(&self) -> usize { - 4 // stream_id + credit - } -} diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index 9a8f7cf5..17d2648f 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -6,13 +6,13 @@ use net::events::NetworkEvent; use crate::{ messaging::DispatchData, - net::{events::DispatchEvent, frames::*, network_thread::NetworkThread}, + net::{events::DispatchEvent, frames::*, network_thread::NetworkThreadBuilder}, prelude::NetworkConfig, }; use crossbeam_channel::{unbounded as channel, RecvError, SendError, Sender}; use mio::{Interest, Waker}; pub use std::net::SocketAddr; -use std::{io, sync::Arc, thread}; +use std::{io, panic, sync::Arc, thread, time::Duration}; #[allow(missing_docs)] pub mod buffers; @@ -158,7 +158,7 @@ pub struct Bridge { /// Reference back to the Kompact dispatcher dispatcher: Option, /// Socket the network actually bound on - bound_addr: Option, + bound_address: Option, shutdown_future: KFuture<()>, } @@ -178,7 +178,7 @@ impl Bridge { ) -> (Self, SocketAddr) { let (sender, receiver) = channel(); let (shutdown_p, shutdown_f) = promise(); - let (mut network_thread, waker) = NetworkThread::new( + match NetworkThreadBuilder::new( network_thread_log, addr, lookup, @@ -186,27 +186,37 @@ impl Bridge { shutdown_p, dispatcher_ref.clone(), network_config.clone(), - ); - let bound_addr = network_thread.addr; - let bridge = Bridge { - // cfg: BridgeConfig::default(), - log: bridge_log, - // lookup, - network_input_queue: sender, - waker, - dispatcher: Some(dispatcher_ref), - bound_addr: Some(bound_addr), - shutdown_future: shutdown_f, - }; - if let Err(e) = thread::Builder::new() - .name("network_thread".to_string()) - .spawn(move || { - network_thread.run(); - }) - { - panic!("Failed to start a Network Thread, error: {:?}", e); - } - (bridge, bound_addr) + ) { + Ok(mut network_thread_builder) => { + let bound_address = network_thread_builder.address; + let waker = network_thread_builder + .take_waker() + .expect("NetworkThread poll error"); + + let (started_p, started_f) = promise(); + run_network_thread(network_thread_builder, bridge_log.clone(), started_p) + .expect("Failed to spawn NetworkThread"); + started_f + .wait_timeout(Duration::from_millis(network_config.get_boot_timeout())) + .expect("NetworkThread time-out during boot sequence"); + + let bridge = Bridge { + // cfg: BridgeConfig::default(), + log: bridge_log, + // lookup, + network_input_queue: sender, + waker, + dispatcher: Some(dispatcher_ref), + bound_address: Some(bound_address), + shutdown_future: shutdown_f, + }; + + (bridge, bound_address) + } + Err(e) => { + panic!("Failed to build a Network Thread, error: {:?}", e); + } + } } /// Sets the dispatcher reference, returning the previously stored one @@ -236,7 +246,7 @@ impl Bridge { /// Returns the local address if already bound pub fn local_addr(&self) -> &Option { - &self.bound_addr + &self.bound_address } /// Forwards `serialized` to the NetworkThread and makes sure that it will wake up. @@ -300,6 +310,37 @@ impl Bridge { } } +fn run_network_thread( + builder: NetworkThreadBuilder, + logger: KompactLogger, + started_promise: KPromise<()>, +) -> async_std::io::Result<()> { + thread::Builder::new() + .name("network_thread".to_string()) + .spawn(move || { + if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let network_thread = builder.build(); + let _ = started_promise + .fulfil(()) + .expect("NetworkThread started but failed to fulfil promise"); + network_thread.run() + })) { + if let Some(error_msg) = e.downcast_ref::<&str>() { + error!(logger, "NetworkThread panicked with: {:?}", error_msg); + } else if let Some(error_msg) = e.downcast_ref::() { + error!(logger, "NetworkThread panicked with: {:?}", error_msg); + } else { + error!( + logger, + "NetworkThread panicked with a non-string message with type id={:?}", + e.type_id() + ); + }; + }; + }) + .map(|_| ()) +} + /// Errors which the NetworkBridge might return, not used for now. #[derive(Debug)] pub enum NetworkBridgeErr { @@ -347,7 +388,7 @@ pub(crate) fn interrupted(err: &io::Error) -> bool { } pub(crate) fn no_buffer_space(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::InvalidData + err.kind() == io::ErrorKind::InvalidInput } pub(crate) fn connection_reset(err: &io::Error) -> bool { @@ -358,6 +399,10 @@ pub(crate) fn broken_pipe(err: &io::Error) -> bool { err.kind() == io::ErrorKind::BrokenPipe } +pub(crate) fn out_of_buffers(err: &SerError) -> bool { + matches!(err, SerError::NoBuffersAvailable(_)) +} + /// A module with helper functions for testing network configurations/implementations pub mod net_test_helpers { use crate::prelude::*; diff --git a/core/src/net/network_channel.rs b/core/src/net/network_channel.rs index 19a1732e..5c34339e 100644 --- a/core/src/net/network_channel.rs +++ b/core/src/net/network_channel.rs @@ -2,14 +2,15 @@ use super::*; use crate::{ messaging::SerialisedFrame, net::{ - buffers::{BufferChunk, DecodeBuffer}, - frames::{Ack, Frame, FramingError, Hello, Start, FRAME_HEAD_LEN}, + buffers::{BufferChunk, BufferPool, DecodeBuffer}, + frames::{Frame, FramingError, Hello, Start, FRAME_HEAD_LEN}, }, }; use bytes::{Buf, BytesMut}; use mio::{net::TcpStream, Token}; use network_thread::*; use std::{ + cell::RefCell, cmp::Ordering, collections::VecDeque, fmt::Formatter, @@ -42,6 +43,7 @@ pub(crate) struct TcpChannel { stream: TcpStream, outbound_queue: VecDeque, pub token: Token, + address: SocketAddr, input_buffer: DecodeBuffer, pub state: ChannelState, pub messages: u32, @@ -53,6 +55,7 @@ impl TcpChannel { pub fn new( stream: TcpStream, token: Token, + address: SocketAddr, buffer_chunk: BufferChunk, state: ChannelState, own_addr: SocketAddr, @@ -63,6 +66,7 @@ impl TcpChannel { stream, outbound_queue: VecDeque::new(), token, + address, input_buffer, state, messages: 0, @@ -85,10 +89,6 @@ impl TcpChannel { matches!(self.state, ChannelState::Connected(_, _)) } - pub fn closed(&self) -> bool { - matches!(self.state, ChannelState::Closed(_, _)) - } - /// Internal helper function for special frames fn send_frame(&mut self, mut frame: Frame) -> () { let len = frame.encoded_len() + FRAME_HEAD_LEN as usize; @@ -122,42 +122,42 @@ impl TcpChannel { } /// Must be called when a Hello frame is received on the channel. - pub fn handle_hello(&mut self, hello: Hello) -> () { + pub fn handle_hello(&mut self, hello: &Hello) -> () { if let ChannelState::Requested(_, id) = self.state { // Has now received Hello(addr), must send Start(addr, uuid) and await ack let start = Frame::Start(Start::new(self.own_addr, id)); self.send_frame(start); self.state = ChannelState::Initialised(hello.addr, id); + self.address = hello.addr; } } /// Must be called when we Ack the channel. This means that the sender can start using the channel /// The receiver of the Ack must accept the Ack and use the channel. - pub fn handle_start(&mut self, addr: &SocketAddr, id: Uuid) -> () { + pub fn handle_start(&mut self, start: &Start) -> () { if let ChannelState::Initialising = self.state { // Method called because we received Start and want to send Ack. - let ack = Frame::Ack(Ack { offset: 0 }); // we don't use offsets yet. + let ack = Frame::Ack(); self.stream .set_nodelay(self.nodelay) .expect("set nodelay failed"); self.send_frame(ack); - self.state = ChannelState::Connected(*addr, id); + self.state = ChannelState::Connected(start.addr, start.id); + self.address = start.addr; } } - /// Returns true if it transitioned, false if it's not starting. - pub fn handle_ack(&mut self) -> bool { + pub fn address(&self) -> SocketAddr { + self.address + } + + pub fn handle_ack(&mut self) -> () { if let ChannelState::Initialised(addr, id) = self.state { // An Ack was received. Transition the channel. self.stream .set_nodelay(self.nodelay) .expect("set nodelay failed"); self.state = ChannelState::Connected(addr, id); - true - } else { - eprintln!("Bad state reached during channel initialisation (handle_ack). Handshake went wrong.\ - Connection will likely fail and a re-connect will occur. Non fatal"); - false } } @@ -173,43 +173,70 @@ impl TcpChannel { ret } + /// Performs receive and decode, should be called repeatedly + /// May return `Ok(Frame::Data)`, `Ok(Frame::Start)`, `Ok(Frame::Bye)`, or an Error. + pub fn read_frame(&mut self, buffer_pool: &RefCell) -> io::Result> { + if !self.input_buffer.has_frame()? { + match self.receive() { + Ok(_) => {} + Err(err) if no_buffer_space(&err) => { + if !&self.input_buffer.has_frame()? { + let mut pool = buffer_pool.borrow_mut(); + let mut buffer_chunk = pool.get_buffer().ok_or(err)?; + self.swap_buffer(&mut buffer_chunk); + pool.return_buffer(buffer_chunk); + drop(pool); + return self.read_frame(buffer_pool); + } + } + Err(err) => { + return Err(err); + } + }; + } + match self.decode() { + Ok(Frame::Hello(hello)) => Ok(Some(Frame::Hello(hello))), + Ok(Frame::Ack()) => { + self.handle_ack(); + Ok(Some(Frame::Ack())) + } + Ok(Frame::Bye()) => { + self.handle_bye(); + Ok(Some(Frame::Bye())) + } + Ok(frame) => Ok(Some(frame)), + Err(FramingError::NoData) => Ok(None), + Err(_) => Err(Error::new(ErrorKind::InvalidData, "Framing Error")), + } + } + /// This tries to read from the Tcp buffer into the DecodeBuffer, nothing else. - pub fn receive(&mut self) -> io::Result { - let mut read_bytes = 0; - let mut sum_read_bytes = 0; + fn receive(&mut self) -> io::Result<()> { let mut interrupts = 0; loop { - // Keep all the read bytes in the buffer without overwriting - if read_bytes > 0 { - self.input_buffer.advance_writeable(read_bytes); - read_bytes = 0; - } - if let Some(mut buf) = self.input_buffer.get_writeable() { - match self.stream.read(&mut buf) { - Ok(0) => { - return Ok(sum_read_bytes); - } - Ok(n) => { - sum_read_bytes += n; - read_bytes = n; - // continue looping and reading - } - Err(err) if would_block(&err) => { - return Ok(sum_read_bytes); - } - Err(err) if interrupted(&err) => { - // We should continue trying until no interruption - interrupts += 1; - if interrupts >= network_thread::MAX_INTERRUPTS { - return Err(err); - } - } - Err(err) => { + let mut buf = self + .input_buffer + .get_writeable() + .ok_or_else(|| io::Error::new(ErrorKind::InvalidInput, "No Buffer Space"))?; + match self.stream.read(&mut buf) { + Ok(0) => { + return Ok(()); + } + Ok(n) => { + self.input_buffer.advance_writeable(n); + } + Err(err) if would_block(&err) => { + return Ok(()); + } + Err(err) if interrupted(&err) => { + interrupts += 1; + if interrupts >= network_thread::MAX_INTERRUPTS { return Err(err); } } - } else { - return Err(Error::new(ErrorKind::InvalidData, "No space in Buffer")); + Err(err) => { + return Err(err); + } } } } @@ -236,19 +263,16 @@ impl TcpChannel { } /// Handles a Bye message. If the method returns Ok it is safe to shutdown. - pub fn handle_bye(&mut self) -> io::Result<()> { + pub fn handle_bye(&mut self) -> () { match self.state { ChannelState::Connected(addr, id) => { self.state = ChannelState::CloseReceived(addr, id); - self.send_bye()?; - // Bye has been sent and received, channel is now closed - self.state = ChannelState::Closed(addr, id); - Ok(()) - } - _ => { - // Any other state means we just shut it down. - io::Result::Ok(()) + if self.send_bye().is_ok() { + self.state = ChannelState::Closed(addr, id); + } } + ChannelState::CloseRequested(addr, id) => self.state = ChannelState::Closed(addr, id), + _ => {} } } @@ -265,16 +289,11 @@ impl TcpChannel { } } - /// Returns `true` if this channel was not in [ChannelState::Connected](ChannelState::Connected) - /// `true` means that it can safely be dropped. - pub fn shutdown(&mut self) -> bool { + /// Shuts down the channel stream + pub fn shutdown(&mut self) -> () { let _ = self.stream.shutdown(Both); // Discard errors while closing channels for now... - match self.state { - ChannelState::Connected(addr, id) => { - self.state = ChannelState::Closed(addr, id); - false - } - _ => true, + if let ChannelState::Connected(addr, id) = self.state { + self.state = ChannelState::Closed(addr, id); } } @@ -329,19 +348,16 @@ impl TcpChannel { // Would block "errors" are the OS's way of saying that the // connection is not actually ready to perform this I/O operation. Err(ref err) if would_block(err) => { - // re-insert the data at the front of the buffer and return self.outbound_queue.push_front(serialized_frame); return Ok(sent_bytes); } Err(err) if interrupted(&err) => { - // re-insert the data at the front of the buffer self.outbound_queue.push_front(serialized_frame); interrupts += 1; if interrupts >= MAX_INTERRUPTS { return Err(err); } } - // Other errors we'll consider fatal. Err(err) => { self.outbound_queue.push_front(serialized_frame); return Err(err); @@ -360,12 +376,7 @@ impl TcpChannel { } } - /// Destroys the channel and returns the Buffer - pub(crate) fn destroy(self) -> BufferChunk { - self.input_buffer.destroy() - } - - pub(crate) fn kill(self) -> () { + pub(crate) fn kill(&mut self) -> () { let _ = self.stream.shutdown(Both); } } diff --git a/core/src/net/network_thread.rs b/core/src/net/network_thread.rs index 4c6056f8..d0290e99 100644 --- a/core/src/net/network_thread.rs +++ b/core/src/net/network_thread.rs @@ -1,16 +1,22 @@ use super::*; use crate::{ - dispatch::NetworkConfig, - messaging::{DispatchEnvelope, EventEnvelope}, + dispatch::{ + lookup::{ActorLookup, LookupResult}, + NetworkConfig, + }, + messaging::{DispatchEnvelope, EventEnvelope, NetMessage, SerialisedFrame}, net::{ buffers::{BufferChunk, BufferPool, EncodeBuffer}, network_channel::{ChannelState, TcpChannel}, udp_state::UdpState, ConnectionState, + ConnectionState::Connected, }, + serialisation::ser_helpers::deserialise_chunk_lease, }; use crossbeam_channel::Receiver as Recv; use mio::{ + event::Event, net::{TcpListener, TcpStream, UdpSocket}, Events, Poll, @@ -18,9 +24,12 @@ use mio::{ }; use rustc_hash::FxHashMap; use std::{ + cell::{RefCell, RefMut}, collections::VecDeque, io, net::{Shutdown, SocketAddr}, + ops::DerefMut, + rc::Rc, sync::Arc, time::Duration, usize, @@ -40,748 +49,635 @@ pub const MAX_INTERRUPTS: i32 = 9; const MAX_BIND_RETRIES: usize = 5; const BIND_RETRY_INTERVAL: u64 = 1000; -/// Thread structure responsible for driving the Network IO -pub struct NetworkThread { +/// Builder struct, can be sent to a thread safely to launch a NetworkThread +pub struct NetworkThreadBuilder { + poll: Poll, + waker: Option, log: KompactLogger, - pub addr: SocketAddr, + pub address: SocketAddr, lookup: Arc>, - tcp_listener: Option, - udp_state: Option, - poll: Poll, - channel_map: FxHashMap, - token_map: FxHashMap, - token: Token, input_queue: Recv, + shutdown_promise: KPromise<()>, dispatcher_ref: DispatcherRef, - buffer_pool: BufferPool, - sent_bytes: u64, - received_bytes: u64, - sent_msgs: u64, - stopped: bool, - shutdown_promise: Option>, network_config: NetworkConfig, - retry_queue: VecDeque<(Token, bool, bool, usize)>, - out_of_buffers: bool, - encode_buffer: EncodeBuffer, + tcp_listener: TcpListener, } -/// Return values for IO Operations on the [NetworkChannel](net::network_channel::NetworkChannel) abstraction -#[derive(Debug, PartialEq, Eq)] -pub(super) enum IoReturn { - SwapBuffer, - LostConnection, - CloseConnection, - None, - Start(SocketAddr, Uuid), - Ack, -} - -impl NetworkThread { - /// Creates a struct for the NetworkThread and binds to a socket without actually spawning a thread. - /// The `input_queue` is used to send DispatchEvents to the thread but they won't be read unless - /// the `dispatcher_registration` is activated to wake up the thread. - /// `network_thread_sender` is used to confirm shutdown of the thread. +impl NetworkThreadBuilder { pub(crate) fn new( log: KompactLogger, - addr: SocketAddr, + address: SocketAddr, lookup: Arc>, input_queue: Recv, shutdown_promise: KPromise<()>, dispatcher_ref: DispatcherRef, network_config: NetworkConfig, - ) -> (NetworkThread, Waker) { - // Set-up the Listener - debug!( + ) -> Result { + let poll = Poll::new().expect("failed to create Poll instance in NetworkThread"); + let waker = + Waker::new(poll.registry(), DISPATCHER).expect("failed to create Waker for DISPATCHER"); + let tcp_listener = bind_with_retries(&address, MAX_BIND_RETRIES, &log)?; + let actual_address = tcp_listener.local_addr()?; + Ok(NetworkThreadBuilder { + poll, + tcp_listener, + waker: Some(waker), log, - "NetworkThread starting, trying to bind listener to address {}", &addr - ); - match bind_with_retries(&addr, MAX_BIND_RETRIES, &log) { - Ok(mut tcp_listener) => { - let actual_addr = tcp_listener.local_addr().expect("could not get real addr"); - let logger = log.new(o!("addr" => format!("{}", actual_addr))); - let mut udp_socket = - UdpSocket::bind(actual_addr).expect("could not bind UDP on TCP port"); - - // Set up polling for the Dispatcher and the listener. - let poll = Poll::new().expect("failed to create Poll instance in NetworkThread"); - - // Register Listener - let registry = poll.registry(); - registry - .register(&mut tcp_listener, TCP_SERVER, Interest::READABLE) - .expect("failed to register TCP SERVER"); - registry - .register( - &mut udp_socket, - UDP_SOCKET, - Interest::READABLE | Interest::WRITABLE, - ) - .expect("failed to register UDP SOCKET"); - - // Create waker for Dispatch - let waker = Waker::new(poll.registry(), DISPATCHER) - .expect("failed to create Waker for DISPATCHER"); - - let mut buffer_pool = BufferPool::with_config( - &network_config.get_buffer_config(), - &network_config.get_custom_allocator(), - ); + address: actual_address, + lookup, + input_queue, + shutdown_promise, + dispatcher_ref, + network_config, + }) + } - let encode_buffer = EncodeBuffer::with_config( - &network_config.get_buffer_config(), - &network_config.get_custom_allocator(), - ); + pub fn take_waker(&mut self) -> Option { + self.waker.take() + } - let udp_buffer = buffer_pool - .get_buffer() - .expect("Could not get buffer for setting up UDP"); - - let udp_state = - UdpState::new(udp_socket, udp_buffer, logger.clone(), &network_config); - let channel_map: FxHashMap = FxHashMap::default(); - let token_map: FxHashMap = FxHashMap::default(); - - ( - NetworkThread { - log: logger, - addr: actual_addr, - lookup, - tcp_listener: Some(tcp_listener), - udp_state: Some(udp_state), - poll, - channel_map, - token_map, - token: START_TOKEN, - input_queue, - buffer_pool, - sent_bytes: 0, - received_bytes: 0, - sent_msgs: 0, - stopped: false, - shutdown_promise: Some(shutdown_promise), - dispatcher_ref, - network_config, - retry_queue: VecDeque::new(), - out_of_buffers: false, - encode_buffer, - }, - waker, - ) - } - Err(e) => { - panic!( - "NetworkThread failed to bind to address: {:?}, addr {:?}", - e, &addr - ); - } + pub fn build(mut self) -> NetworkThread { + let actual_addr = self + .tcp_listener + .local_addr() + .expect("could not get real addr"); + let logger = self.log.new(o!("addr" => format!("{}", actual_addr))); + let mut udp_socket = UdpSocket::bind(actual_addr).expect("could not bind UDP on TCP port"); + + // Register Listeners + self.poll + .registry() + .register(&mut self.tcp_listener, TCP_SERVER, Interest::READABLE) + .expect("failed to register TCP SERVER"); + self.poll + .registry() + .register( + &mut udp_socket, + UDP_SOCKET, + Interest::READABLE | Interest::WRITABLE, + ) + .expect("failed to register UDP SOCKET"); + + let mut buffer_pool = BufferPool::with_config( + &self.network_config.get_buffer_config(), + &self.network_config.get_custom_allocator(), + ); + let encode_buffer = EncodeBuffer::with_config( + &self.network_config.get_buffer_config(), + &self.network_config.get_custom_allocator(), + ); + let udp_buffer = buffer_pool + .get_buffer() + .expect("Could not get buffer for setting up UDP"); + let udp_state = UdpState::new(udp_socket, udp_buffer, logger.clone(), &self.network_config); + + NetworkThread { + log: logger, + addr: actual_addr, + lookup: self.lookup, + tcp_listener: self.tcp_listener, + udp_state: Some(udp_state), + poll: self.poll, + address_map: FxHashMap::default(), + token_map: FxHashMap::default(), + token: START_TOKEN, + input_queue: self.input_queue, + buffer_pool: RefCell::new(buffer_pool), + stopped: false, + shutdown_promise: self.shutdown_promise, + dispatcher_ref: self.dispatcher_ref, + network_config: self.network_config, + retry_queue: VecDeque::new(), + out_of_buffers: false, + encode_buffer, } } +} +/// Thread structure responsible for driving the Network IO +pub struct NetworkThread { + log: KompactLogger, + pub addr: SocketAddr, + lookup: Arc>, + tcp_listener: TcpListener, + udp_state: Option, + poll: Poll, + address_map: FxHashMap>>, + token_map: FxHashMap>>, + token: Token, + input_queue: Recv, + dispatcher_ref: DispatcherRef, + buffer_pool: RefCell, + stopped: bool, + shutdown_promise: KPromise<()>, + network_config: NetworkConfig, + retry_queue: VecDeque, + out_of_buffers: bool, + encode_buffer: EncodeBuffer, +} - /// Event loop, spawn a thread calling this method start the thread. - pub fn run(&mut self) -> () { +impl NetworkThread { + pub fn run(mut self) -> () { + trace!(self.log, "NetworkThread starting"); let mut events = Events::with_capacity(MAX_POLL_EVENTS); - debug!(self.log, "Entering main EventLoop"); - let mut retry_queue = VecDeque::<(Token, bool, bool, usize)>::new(); - let mut timeout; loop { - // Retries happen for connection interrupts and buffer-swapping - // Performed in the main loop to avoid recursion - retry_queue.append(&mut self.retry_queue); - timeout = if self.out_of_buffers { - // Avoid looping retries when there are no Buffers - Some(Duration::from_millis( - self.network_config.get_connection_retry_interval(), - )) - } else if retry_queue.is_empty() { - None - } - // No need to timeout if there are no retries - else { - Some(Duration::from_secs(0)) - }; // Timeout immediately if retries are waiting - self.poll - .poll(&mut events, timeout) + .poll(&mut events, self.get_poll_timeout()) .expect("Error when calling Poll"); - for (token, readable, writeable, retries) in events + for event in events .iter() - .map(|e| (e.token(), e.is_readable(), e.is_writable(), 0_usize)) - .chain(retry_queue.drain(0..retry_queue.len())) + .map(|e| (EventWithRetries::from(e))) + .chain(self.retry_queue.split_off(0)) { - self.handle_event(token, readable, writeable, retries); + self.handle_event(event); if self.stopped { - let promise = self.shutdown_promise.take().expect("shutdown promise"); - if let Err(e) = promise.fulfil(()) { + if let Err(e) = self.shutdown_promise.fulfil(()) { error!(self.log, "Error, shutting down sender: {:?}", e); }; - debug!(self.log, "Stopped"); + trace!(self.log, "Stopped"); return; }; } } } - fn handle_event(&mut self, token: Token, readable: bool, writeable: bool, retries: usize) { - match token { + fn get_poll_timeout(&self) -> Option { + if self.out_of_buffers { + Some(Duration::from_millis( + self.network_config.get_connection_retry_interval(), + )) + } else if self.retry_queue.is_empty() { + None + } else { + Some(Duration::from_secs(0)) + } + } + + fn handle_event(&mut self, event: EventWithRetries) { + match event.token { TCP_SERVER => { - // Received an event for the TCP server socket.Accept the connection. - if let Err(e) = self.accept_stream() { - debug!(self.log, "Error while accepting stream {:?}", e); + if let Err(e) = self.receive_stream() { + error!(self.log, "Error while accepting stream {:?}", e); } } UDP_SOCKET => { - if let Some(ref mut udp_state) = self.udp_state { - if writeable { - match udp_state.try_write() { - Ok(n) => { - self.sent_bytes += n as u64; - } - Err(e) => { - warn!(self.log, "Error during UDP sending: {}", e); - } - } + if let Some(mut udp_state) = self.udp_state.take() { + if event.writeable { + self.write_udp(&mut udp_state); } - if readable { - match udp_state.try_read() { - Ok((n, ioret)) => { - if n > 0 { - self.received_bytes += n as u64; - } - if IoReturn::SwapBuffer == ioret { - if let Some(mut new_buffer) = self.buffer_pool.get_buffer() { - udp_state.swap_buffer(&mut new_buffer); - self.buffer_pool.return_buffer(new_buffer); - debug!(self.log, "Swapped UDP buffer"); - self.out_of_buffers = false; - // We do not count successful swaps in the retries, stay at the same count - self.retry_queue - .push_back((token, readable, writeable, retries)); - } else { - error!( - self.log, - "Could not get UDP buffer, retries: {}", retries - ); - self.out_of_buffers = true; - self.retry_queue.push_back(( - token, - readable, - writeable, - retries + 1, - )); - } - } - } - Err(e) => { - warn!(self.log, "Error during UDP reading: {}", e); - } - } - use dispatch::lookup::{ActorLookup, LookupResult}; - - // Forward the data frame to the correct actor - let lease_lookup = self.lookup.load(); - for envelope in udp_state.incoming_messages.drain(..) { - match lease_lookup.get_by_actor_path(&envelope.receiver) { - LookupResult::Ref(actor) => { - actor.enqueue(envelope); - } - LookupResult::Group(group) => { - group.route(envelope, &self.log); - } - LookupResult::None => { - debug!(self.log, "Could not find actor reference for destination: {:?}, dropping message", envelope.receiver); - } - LookupResult::Err(e) => { - error!( - self.log, - "An error occurred during local actor lookup for destination: {:?}, dropping message. The error was: {}", - envelope.receiver, - e - ); - } - } - } + if event.readable { + self.read_udp(&mut udp_state, event); } - } else { - debug!(self.log, "Poll triggered for removed UDP socket"); + self.udp_state = Some(udp_state); } } DISPATCHER => { - // Message available from Dispatcher, clear the poll readiness before receiving self.receive_dispatch(); } - token => { - // lookup its corresponding addr - let addr = { - if let Some(addr) = self.token_map.get(&token) { - *addr - } else { - debug!( - self.log, - "Poll triggered for removed channel, Token({})", token.0 - ); + _ => { + if event.writeable { + self.write_tcp(&event.token); + } + if event.readable { + self.read_tcp(&event); + } + } + } + } + + fn retry_event(&mut self, event: &EventWithRetries) -> () { + if event.retries <= self.network_config.get_max_connection_retry_attempts() { + self.retry_queue.push_back(event.get_retry_event()); + } else if let Some(channel) = self.get_channel_by_token(&event.token) { + self.lost_connection(channel.borrow_mut()); + } + } + + fn enqueue_writeable_event(&mut self, token: &Token) -> () { + self.retry_queue + .push_back(EventWithRetries::writeable_with_token(token)); + } + + fn get_buffer(&self) -> Option { + self.buffer_pool.borrow_mut().get_buffer() + } + + fn return_buffer(&self, buffer: BufferChunk) -> () { + self.buffer_pool.borrow_mut().return_buffer(buffer) + } + + fn receive_dispatch(&mut self) { + while let Ok(event) = self.input_queue.try_recv() { + self.handle_dispatch_event(event); + } + } + + fn handle_dispatch_event(&mut self, event: DispatchEvent) { + match event { + DispatchEvent::SendTcp(address, data) => { + self.send_tcp_message(address, data); + } + DispatchEvent::SendUdp(address, data) => { + self.send_udp_message(address, data); + } + DispatchEvent::Stop => { + self.stop(); + } + DispatchEvent::Kill => { + self.kill(); + } + DispatchEvent::Connect(addr) => { + self.request_stream(addr); + } + DispatchEvent::ClosedAck(addr) => { + self.handle_closed_ack(addr); + } + DispatchEvent::Close(addr) => { + self.close_connection(addr); + } + } + } + + fn get_channel_by_token(&self, token: &Token) -> Option>> { + self.token_map.get(token).cloned() + } + + fn get_channel_by_address(&self, address: &SocketAddr) -> Option>> { + self.address_map.get(address).cloned() + } + + fn reregister_channel_address( + &mut self, + old_address: SocketAddr, + new_address: SocketAddr, + ) -> () { + if let Some(channel_rc) = self.address_map.remove(&old_address) { + self.address_map.insert(new_address, channel_rc); + } + } + + fn read_tcp(&mut self, event: &EventWithRetries) -> () { + if let Some(channel_rc) = self.get_channel_by_token(&event.token) { + let mut channel = channel_rc.borrow_mut(); + loop { + match channel.read_frame(&self.buffer_pool) { + Ok(None) => { return; } - }; - let mut swap_buffer = false; - let mut lost_connection = false; - if writeable { - match self.try_write(&addr) { - IoReturn::LostConnection => { - // Remove and deregister - lost_connection = true; - } - IoReturn::CloseConnection => { - // A bye has been sent and received - debug!( - self.log, - "Connection shutdown gracefully, awaiting dispatcher Ack" - ); - self.dispatcher_ref.tell(DispatchEnvelope::Event( - EventEnvelope::Network(NetworkEvent::Connection( - addr, - ConnectionState::Closed, - )), - )); - } - _ => {} + Ok(Some(Frame::Data(data))) => { + self.handle_data_frame(data); } - } - if readable { - match self.try_read(&addr) { - IoReturn::LostConnection => { - // Remove and deregister - lost_connection = true; - } - IoReturn::SwapBuffer => { - swap_buffer = true; - } - _ => {} + Ok(Some(Frame::Start(start))) => { + self.handle_start(event, &mut channel, &start); + return; } - - match self.decode(&addr) { - IoReturn::Start(remote_addr, id) => { - self.handle_start(token, remote_addr, id); - } - IoReturn::LostConnection => { - // Remove and deregister - lost_connection = true; - } - IoReturn::CloseConnection => { - // A bye was received - self.handle_bye(&addr); - } - _ => (), + Ok(Some(Frame::Hello(hello))) => { + self.handle_hello(&mut *channel, &hello); } - if swap_buffer { - // Buffer full, we swap it and register for poll again - if let Some(channel) = self.channel_map.get_mut(&addr) { - if let Some(mut new_buffer) = self.buffer_pool.get_buffer() { - self.out_of_buffers = false; - channel.swap_buffer(&mut new_buffer); - self.buffer_pool.return_buffer(new_buffer); - debug!( - self.log, - "Swapped buffer for {:?}, \nbuffer_pool: {:?}", - &channel, - &self.buffer_pool - ); - // We do not count successful swaps in the retries, stay at the same count - self.retry_queue - .push_back((token, readable, writeable, retries)); - } else if retries - <= self.network_config.get_max_connection_retry_attempts() as usize - { - error!( - self.log, - "Could not get buffer for channel {}, retries {}", - &addr, - retries - ); - self.out_of_buffers = true; - self.retry_queue.push_back(( - token, - readable, - writeable, - retries + 1, - )); - } else { - error!( - self.log, - "Giving up on reading channel {}, retries {},\ - waiting for buffers timed out", - &addr, - retries - ); - lost_connection = true; - } - } - // We retry the event such that the read is performed with the new buffer. + Ok(Some(Frame::Ack())) => { + self.notify_connection_state(channel.address(), Connected); } - if lost_connection { - self.lost_connection(addr); + Ok(Some(Frame::Bye())) => { + self.handle_bye(&mut channel); + return; + } + Err(e) if no_buffer_space(&e) => { + self.out_of_buffers = true; + warn!(self.log, "Out of Buffers"); + drop(channel); + self.retry_event(event); + return; + } + Err(e) if connection_reset(&e) => { + warn!( + self.log, + "Connection lost, reset by peer {}", + channel.address() + ); + self.lost_connection(channel); + return; + } + Err(e) => { + warn!( + self.log, + "Error reading from channel {}: {}", + channel.address(), + &e + ); + return; } } } } } - /// During channel initialization the threeway handshake to establish connections culminates with this function - /// The Start(remote_addr, id) is received by the host on the receiving end of the channel initialisation. - /// The decision is made here and now. - /// If no other connection is registered for the remote host the decision is easy, we start the channel and send the ack. - /// If there are other connection attempts underway there are multiple possibilities: - /// The other connection has not started and does not have a known UUID: it will be killed, this channel will start. - /// The connection has already started, in which case this channel must be killed. - /// The connection has a known UUID but is not connected: Use the UUID as a tie breaker for which to kill and which to keep. - fn handle_start(&mut self, token: Token, remote_addr: SocketAddr, id: Uuid) -> () { - if let Some(registered_addr) = self.token_map.remove(&token) { - if remote_addr == registered_addr { - // The channel we received the start on was already registered with the appropriate address. - // There is no need to change anything, we can simply transition the channel. - debug!( + fn read_udp(&mut self, udp_state: &mut UdpState, event: EventWithRetries) -> () { + match udp_state.try_read(&self.buffer_pool) { + Ok(_) => {} + Err(e) if no_buffer_space(&e) => { + warn!( self.log, - "Got Start({}, ...) from {}, already registered with correct addr", - &remote_addr, - ®istered_addr + "Could not get UDP buffer, retries: {}", event.retries ); - } else { - // Make sure we only have one channel and that it's registered with the remote_addr - if let Some(mut channel) = self.channel_map.remove(®istered_addr) { - // There's a channel registered with the remote_addr - - if let Some(mut other_channel) = self.channel_map.remove(&remote_addr) { - // There's another channel for the same host, only one can survive. - // If we don't knw the Uuid yet the channel can safely be killed. The remote host must obey our Ack. - // It can not discard the other channel without receiving a Start(...) or Ack(...) for the other channel. - if let Some(other_id) = other_channel.get_id() { - // The other channel has a known id, if it doesn't there is no reason to keep it. - - if other_channel.connected() || other_id > id || other_channel.closed() - { - // The other channel should be kept and this one should be discarded. - debug!( - self.log, - "Got Start({}, ...) from {}, already connected", - &remote_addr, - ®istered_addr - ); - let _ = self.poll.registry().deregister(channel.stream_mut()); - let _ = channel.initiate_graceful_shutdown(); - self.channel_map.insert(remote_addr, other_channel); - // It will be driven to completion on its own. - return; - } - } - // We will keep this channel, not the other channel - info!( + self.out_of_buffers = true; + self.retry_event(&event); + } + Err(e) => { + warn!(self.log, "Error during UDP reading: {}", e); + } + } + while let Some(net_message) = udp_state.incoming_messages.pop_front() { + self.deliver_net_message(net_message); + } + } + + fn write_tcp(&mut self, token: &Token) -> () { + if let Some(channel_rc) = self.get_channel_by_token(token) { + let mut channel = channel_rc.borrow_mut(); + match channel.try_drain() { + Err(ref err) if broken_pipe(err) => { + self.lost_connection(channel); + } + Ok(_) => { + if let ChannelState::CloseReceived(addr, id) = channel.state { + channel.state = ChannelState::Closed(addr, id); + debug!( self.log, - "Dropping other_channel while starting channel {}", &remote_addr + "Connection shutdown gracefully, awaiting dispatcher Ack" ); - let _ = self - .poll - .registry() - .deregister(other_channel.stream_mut()) - .ok(); - other_channel.shutdown(); - drop(other_channel); - // Continue with `channel` + self.notify_connection_state(channel.address(), ConnectionState::Closed); } - // Re-insert the channel and continue starting it. - self.channel_map.insert(remote_addr, channel); - } else if let Some(channel) = self.channel_map.remove(&remote_addr) { - // Only one channel, re-insert the channel with the correct key - debug!( - self.log, - "Got Start({}, ...) from {}, changing name of channel.", - &remote_addr, - ®istered_addr - ); - self.channel_map.insert(remote_addr, channel); } - } - - // Make sure that the channel is registered correctly and in Connected State. - if let Some(channel) = self.channel_map.get_mut(&remote_addr) { - debug!( - self.log, - "Sending ack for {}, {}", &remote_addr, &channel.token.0 - ); - channel.handle_start(&remote_addr, id); - channel.token = token; - self.token_map.insert(token, remote_addr); - if let Err(e) = self.poll.registry().reregister( - channel.stream_mut(), - token, - Interest::WRITABLE | Interest::READABLE, - ) { + Err(e) => { warn!( self.log, - "Error when reregistering Poll for channel in handle_hello: {:?}", e + "Unhandled error while writing to {}\n{:?}", + channel.address(), + e ); - }; - - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(remote_addr, ConnectionState::Connected), - ))); + } } - } else { - panic!( - "No address registered for a token which yielded a hello msg, \ - should not be possible" - ); } } - fn handle_ack(&mut self, addr: &SocketAddr) -> () { - if let Some(channel) = self.channel_map.get_mut(addr) { - debug!(self.log, "Handling ack for {}", addr); - channel.handle_ack(); - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(*addr, ConnectionState::Connected), - ))); + fn write_udp(&mut self, udp_state: &mut UdpState) -> () { + match udp_state.try_write() { + Ok(_) => {} + Err(e) => { + warn!(self.log, "Error during UDP sending: {}", e); + } } } - fn handle_bye(&mut self, addr: &SocketAddr) -> () { - if let Some(channel) = self.channel_map.get_mut(addr) { - debug!(self.log, "Handling Bye for {}", addr); - if channel.handle_bye().is_ok() { - // Channel has been closed entirely - debug!( + fn send_tcp_message(&mut self, address: SocketAddr, data: DispatchData) { + if let Some(channel_rc) = self.get_channel_by_address(&address) { + let mut channel = channel_rc.borrow_mut(); + if channel.connected() { + match self.serialise_dispatch_data(data) { + Ok(frame) => { + channel.enqueue_serialised(frame); + self.enqueue_writeable_event(&channel.token); + } + Err(e) if out_of_buffers(&e) => { + self.out_of_buffers = true; + warn!( + self.log, + "No network buffers available, dropping outbound message.\ + slow down message rate or increase buffer limits." + ); + } + Err(e) => { + error!(self.log, "Error serialising message {}", e); + } + } + } else { + trace!( self.log, - "Connection shutdown gracefully, awaiting dispatcher Ack" + "Dispatch trying to route to non connected channel {:?}, rejecting the message", + channel ); - - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(*addr, ConnectionState::Closed), - ))); - // Wait for ClosedAck + self.reject_dispatch_data(address, data); } + } else { + trace!( + self.log, + "Dispatch trying to route to unrecognized address {}, rejecting the message", + address + ); + self.reject_dispatch_data(address, data); } } - fn try_write(&mut self, addr: &SocketAddr) -> IoReturn { - if let Some(channel) = self.channel_map.get_mut(&addr) { - match channel.try_drain() { - Err(ref err) if broken_pipe(err) => { - return IoReturn::LostConnection; - } - Ok(n) => { - self.sent_bytes += n as u64; - if let ChannelState::CloseReceived(addr, id) = channel.state { - channel.state = ChannelState::Closed(addr, id); - return IoReturn::CloseConnection; + fn send_udp_message(&mut self, address: SocketAddr, data: DispatchData) { + if let Some(mut udp_state) = self.udp_state.take() { + match self.serialise_dispatch_data(data) { + Ok(frame) => { + udp_state.enqueue_serialised(address, frame); + match udp_state.try_write() { + Ok(_) => {} + Err(e) => { + warn!(self.log, "Error during UDP sending: {}", e); + debug!(self.log, "UDP error debug info: {:?}", e); + } } } - Err(e) => { - error!( + Err(e) if out_of_buffers(&e) => { + self.out_of_buffers = true; + warn!( self.log, - "Unhandled error while writing to {}\n{:?}", addr, e + "No network buffers available, dropping outbound message.\ + slow down message rate or increase buffer limits." ); } + Err(e) => { + error!(self.log, "Error serialising message {}", e); + } } + self.udp_state = Some(udp_state); + } else { + self.reject_dispatch_data(address, data); + trace!( + self.log, + "Rejecting UDP message to {} as socket is already shut down.", + address + ); } - IoReturn::None } - fn try_read(&mut self, addr: &SocketAddr) -> IoReturn { - let mut ret = IoReturn::None; - if let Some(channel) = self.channel_map.get_mut(&addr) { - match channel.receive() { - Ok(n) => { - self.received_bytes += n as u64; - } - Err(ref err) if no_buffer_space(err) => { - debug!(self.log, "no_buffer_space for channel {:?}", channel); - ret = IoReturn::SwapBuffer - } - Err(err) if interrupted(&err) || would_block(&err) => { - // Just retry later - } - Err(err) if connection_reset(&err) || broken_pipe(&err) => { - debug!( - self.log, - "Connection_reset to peer {}, shutting down the channel", &addr - ); - ret = IoReturn::LostConnection - } - Err(err) => { - // Fatal error don't try to read again - error!( - self.log, - "Error while reading from peer {}:\n{:?}", &addr, &err - ); - } + fn handle_data_frame(&self, data: Data) -> () { + let buf = data.payload(); + let envelope = deserialise_chunk_lease(buf).expect("s11n errors"); + self.deliver_net_message(envelope); + } + + fn deliver_net_message(&self, envelope: NetMessage) -> () { + let lease_lookup = self.lookup.load(); + match lease_lookup.get_by_actor_path(&envelope.receiver) { + LookupResult::Ref(actor) => { + actor.enqueue(envelope); + } + LookupResult::Group(group) => { + group.route(envelope, &self.log); + } + LookupResult::None => { + warn!( + self.log, + "Could not find actor reference for destination: {:?}, dropping message", + envelope.receiver + ); + } + LookupResult::Err(e) => { + error!( + self.log, + "An error occurred during local actor lookup for destination: {:?}, dropping message. The error was: {}", + envelope.receiver, + e + ); } } - ret } - fn decode(&mut self, addr: &SocketAddr) -> IoReturn { - let mut ret = IoReturn::None; - // ret is used as return place-holder and internal flow-control. - if let Some(channel) = self.channel_map.get_mut(addr) { - loop { - match channel.decode() { - Err(FramingError::NoData) => { - // Done - return ret; - } - Ok(Frame::Data(fr)) => { - use dispatch::lookup::{ActorLookup, LookupResult}; - use serialisation::ser_helpers::deserialise_chunk_lease; - - // Forward the data frame to the correct actor - let lease_lookup = self.lookup.load(); - let buf = fr.payload(); - let envelope = deserialise_chunk_lease(buf).expect("s11n errors"); - match lease_lookup.get_by_actor_path(&envelope.receiver) { - LookupResult::Ref(actor) => { - actor.enqueue(envelope); - } - LookupResult::Group(group) => { - group.route(envelope, &self.log); - } - LookupResult::None => { - warn!(self.log, "Could not find actor reference for destination: {:?}, dropping message", envelope.receiver); - } - LookupResult::Err(e) => { - error!( - self.log, - "An error occurred during local actor lookup for destination: {:?}, dropping message. The error was: {}", - envelope.receiver, - e - ); - } - } - } - Ok(Frame::Hello(hello)) => { - // Channel handles hello internally. We can continue decoding. - debug!(self.log, "Handling Hello({}) from {}", &hello.addr, &addr); - channel.handle_hello(hello); - } - Ok(Frame::Start(start)) => { - // Channel handles hello internally. NetworkThread decides in next state transition - return IoReturn::Start(start.addr, start.id); - } - Ok(Frame::Ack(_)) => { - // We need to handle Acks immediately outside of the loop, then continue the loop - ret = IoReturn::Ack; - break; - } - Ok(Frame::Bye()) => { - debug!(self.log, "Received Bye from {}", &addr); - return IoReturn::CloseConnection; - } - Err(FramingError::InvalidMagicNum((check, slice))) => { - // There is no way to recover from this error right now. Would need resending mechanism - // or accept data loss and close the channel. - panic!("NetworkThread {} Unaligned buffer error for {}. {:?}, Magic_num: {:X}, Slice:{:?}", - self.addr, &addr, channel, check, slice); - } - Err(FramingError::InvalidFrame) => { - // Bad but not fatal error - error!(self.log, "Invalid Frame received on channel {:?}", channel); - } - Err(e) => { - error!(self.log, "Unhandled error {:?} from {:?}", &e, &addr); - } - Ok(other_frame) => error!( - self.log, - "Received unexpected frame type {:?} from {:?}", - other_frame.frame_type(), - channel - ), + fn handle_hello(&mut self, channel: &mut TcpChannel, hello: &Hello) { + self.reregister_channel_address(channel.address(), hello.addr); + channel.handle_hello(hello); + } + + /// During channel initialization the threeway handshake to establish connections culminates with this function + /// The Start(remote_addr, id) is received by the host on the receiving end of the channel initialisation. + /// The decision is made here and now. + /// If no other connection is registered for the remote host the decision is easy, we start the channel and send the ack. + /// If there are other connection attempts underway there are multiple possibilities: + /// The other connection has not started and does not have a known UUID: it will be killed, this channel will start. + /// The connection has already started, in which case this channel must be killed. + /// The connection has a known UUID but is not connected: Use the UUID as a tie breaker for which to kill and which to keep. + fn handle_start(&mut self, event: &EventWithRetries, channel: &mut TcpChannel, start: &Start) { + if let Some(other_channel_rc) = self.get_channel_by_address(&start.addr) { + debug!( + self.log, + "Merging channels for remote system {}", &start.addr + ); + let mut other_channel = other_channel_rc.borrow_mut(); + if let Some(other_id) = other_channel.get_id() { + if other_channel.connected() || other_id > start.id { + // The other channel should be kept and this one should be discarded. + let _ = channel.send_bye(); + self.drop_channel(channel); + return; + } else { + self.drop_channel(other_channel.deref_mut()); + self.reregister_channel_address(channel.address(), start.addr); } } } - match ret { - IoReturn::Ack => { - self.handle_ack(addr); - // We must continue decoding after. - self.decode(addr) + self.reregister_channel_address(channel.address(), start.addr); + channel.handle_start(&start); + self.retry_event(event); + self.notify_connection_state(start.addr, ConnectionState::Connected); + } + + fn handle_bye(&mut self, channel: &mut TcpChannel) -> () { + match channel.state { + ChannelState::Closed(_, _) => { + trace!(self.log, "Connection shutdown gracefully"); + self.notify_connection_state(channel.address(), ConnectionState::Closed); + self.drop_channel(channel); } - _ => ret, + ChannelState::CloseReceived(_, _) => {} + _ => { + self.drop_channel(channel); + } + } + } + + fn handle_closed_ack(&mut self, address: SocketAddr) -> () { + if let Some(channel_rc) = self.get_channel_by_address(&address) { + let mut channel = channel_rc.borrow_mut(); + if let ChannelState::Connected(_, _) = channel.state { + error!(self.log, "ClosedAck for connected Channel: {:#?}", &channel); + } else { + self.drop_channel(&mut channel) + } + } else { + error!( + self.log, + "ClosedAck for unrecognized address: {:#?}", &address + ); } } - fn request_stream(&mut self, addr: SocketAddr) { - // Make sure we never request request a stream to someone we already have a connection to - // Async communication with the dispatcher can lead to this - if let Some(channel) = self.channel_map.remove(&addr) { - // We already have a connection set-up - // the connection request must have been sent before the channel was initialized + fn drop_channel(&mut self, channel: &mut TcpChannel) { + let _ = self.poll.registry().deregister(channel.stream_mut()); + self.token_map.remove(&channel.token); + self.address_map.remove(&channel.address()); + channel.shutdown(); + let mut buffer = BufferChunk::new(0); + channel.swap_buffer(&mut buffer); + self.return_buffer(buffer); + } + + fn request_stream(&mut self, address: SocketAddr) { + if let Some(channel_rc) = self.get_channel_by_address(&address) { + let mut channel = channel_rc.borrow_mut(); match channel.state { ChannelState::Connected(_, _) => { - // log and inform Dispatcher to make sure it knows we're connected. debug!( self.log, - "Asked to request connection to already connected host {}", &addr + "Asked to request connection to already connected host {}", &address ); - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(addr, ConnectionState::Connected), - ))); - self.channel_map.insert(addr, channel); + self.notify_connection_state(address, ConnectionState::Connected); return; } ChannelState::Closed(_, _) => { - // We're waiting for the ClosedAck from the NetworkDispatcher - // This shouldn't happen but the system will likely recover from it eventually debug!( self.log, - "Requested connection to host before receiving ClosedAck {}", &addr + "Requested connection to host before receiving ClosedAck {}", &address ); - self.channel_map.insert(addr, channel); return; } _ => { - // It was an old attempt, remove it and continue with the new request - drop(channel); + self.drop_channel(&mut channel); } } } - // Fetch a buffer before we make the request - if let Some(buffer) = self.buffer_pool.get_buffer() { - debug!(self.log, "Requesting connection to {}", &addr); - match TcpStream::connect(addr) { + if let Some(buffer) = self.get_buffer() { + trace!(self.log, "Requesting connection to {}", &address); + match TcpStream::connect(address) { Ok(stream) => { self.store_stream( stream, - &addr, - ChannelState::Requested(addr, Uuid::new_v4()), + address, + ChannelState::Requested(address, Uuid::new_v4()), buffer, ); } Err(e) => { - error!( + // Connection will be re-requested + trace!( self.log, - "Failed to connect to remote host {}, error: {:?}", &addr, e + "Failed to connect to remote host {}, error: {:?}", + &address, + e ); + self.return_buffer(buffer); } } } else { - // No buffers available, we reject the connection - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(addr, ConnectionState::Closed), - ))); + self.out_of_buffers = true; + trace!( + self.log, + "No Buffers available when attempting to connect to remote host {}", + &address + ); } } - #[allow(irrefutable_let_patterns)] - fn accept_stream(&mut self) -> io::Result<()> { - while let (stream, addr) = (self.tcp_listener.as_ref().unwrap()).accept()? { - if let Some(buffer) = self.buffer_pool.get_buffer() { - debug!(self.log, "Accepting connection from {}", &addr); - self.store_stream(stream, &addr, ChannelState::Initialising, buffer); + fn receive_stream(&mut self) -> io::Result<()> { + while let Ok((stream, address)) = self.tcp_listener.accept() { + if let Some(buffer) = self.get_buffer() { + trace!(self.log, "Accepting connection from {}", &address); + self.store_stream(stream, address, ChannelState::Initialising, buffer); } else { - // If we can't get a buffer we reject the channel immediately stream.shutdown(Shutdown::Both)?; } } @@ -791,205 +687,66 @@ impl NetworkThread { fn store_stream( &mut self, stream: TcpStream, - addr: &SocketAddr, + address: SocketAddr, state: ChannelState, buffer: BufferChunk, ) { - self.token_map.insert(self.token, *addr); let mut channel = TcpChannel::new( stream, self.token, + address, buffer, state, self.addr, &self.network_config, ); - debug!(self.log, "Saying Hello to {}", addr); - // Whatever error is thrown here will be re-triggered and handled later. channel.initialise(&self.addr); if let Err(e) = self.poll.registry().register( channel.stream_mut(), self.token, Interest::READABLE | Interest::WRITABLE, ) { - error!(self.log, "Failed to register polling for {}\n{:?}", addr, e); + error!( + self.log, + "Failed to register polling for {}\n{:?}", address, e + ); } - self.channel_map.insert(*addr, channel); + let rc = Rc::new(RefCell::new(channel)); + self.address_map.insert(address, rc.clone()); + self.token_map.insert(self.token, rc); self.next_token(); } - fn receive_dispatch(&mut self) { - while let Ok(event) = self.input_queue.try_recv() { - match event { - DispatchEvent::SendTcp(addr, data) => { - self.sent_msgs += 1; - // Get the token corresponding to the connection - if let Some(channel) = self.channel_map.get_mut(&addr) { - // The stream is already set-up, buffer the package and wait for writable event - if channel.connected() { - match data { - DispatchData::Serialised(frame) => { - channel.enqueue_serialised(frame); - } - _ => { - if let Err(e) = self - .encode_buffer - .get_buffer_encoder() - .and_then(|mut buf| { - channel.enqueue_serialised( - data.into_serialised(&mut buf)?, - ); - Ok(()) - }) - { - warn!(self.log, "Error serialising message: {}", e); - } - } - } - } else { - debug!(self.log, "Dispatch trying to route to non connected channel {:?}, rejecting the message", channel); - self.dispatcher_ref.tell(DispatchEnvelope::Event( - EventEnvelope::Network(NetworkEvent::RejectedData(addr, data)), - )); - break; - } - } else { - // The stream isn't set-up, request connection, set-it up and try to send the message - debug!(self.log, "Dispatch trying to route to unrecognized address {}, rejecting the message", addr); - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::RejectedData(addr, data), - ))); - break; - } - if let IoReturn::LostConnection = self.try_write(&addr) { - self.lost_connection(addr); - } - } - DispatchEvent::SendUdp(addr, data) => { - self.sent_msgs += 1; - // Get the token corresponding to the connection - if let Some(ref mut udp_state) = self.udp_state { - match data { - DispatchData::Serialised(frame) => { - udp_state.enqueue_serialised(addr, frame); - } - _ => { - if let Err(e) = - self.encode_buffer.get_buffer_encoder().and_then(|mut buf| { - udp_state.enqueue_serialised( - addr, - data.into_serialised(&mut buf)?, - ); - Ok(()) - }) - { - warn!(self.log, "Error serialising message: {}", e); - } - } - } - match udp_state.try_write() { - Ok(n) => { - self.sent_bytes += n as u64; - } - Err(e) => { - warn!(self.log, "Error during UDP sending: {}", e); - debug!(self.log, "UDP erro debug info: {:?}", e); - } - } - } else { - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::RejectedData(addr, data), - ))); - warn!( - self.log, - "Rejecting UDP message to {} as socket is already shut down.", addr - ); - } - } - DispatchEvent::Stop => { - self.stop(); - } - DispatchEvent::Kill => { - self.kill(); - } - DispatchEvent::Connect(addr) => { - debug!(self.log, "Got DispatchEvent::Connect({})", addr); - self.request_stream(addr); - } - DispatchEvent::ClosedAck(addr) => { - debug!(self.log, "Got DispatchEvent::ClosedAck({})", addr); - self.handle_closed_ack(addr); - } - DispatchEvent::Close(addr) => { - self.close_connection(addr); - } - } - } - } - - /// Handles all logic necessary to shutdown a channel for which the connection has been lost. - fn lost_connection(&mut self, addr: SocketAddr) -> () { - // We will only drop the Channel once we get the CloseAck from the NetworkDispatcher - if let Some(channel) = self.channel_map.get_mut(&addr) { - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::Connection(addr, ConnectionState::Lost), - ))); - for rejected_frame in channel.take_outbound() { - self.dispatcher_ref - .tell(DispatchEnvelope::Event(EventEnvelope::Network( - NetworkEvent::RejectedData(addr, DispatchData::Serialised(rejected_frame)), - ))); - } - channel.shutdown(); - } - } - /// Initiates a graceful closing sequence fn close_connection(&mut self, addr: SocketAddr) -> () { - if let Some(channel) = self.channel_map.get_mut(&addr) { - // The channel may fail to perform its graceful shutdown if the Network - // is unable to send the closing message now, in that case the channel will remain - // open until it has been sent - let _ = channel.initiate_graceful_shutdown(); + if let Some(channel) = self.get_channel_by_address(&addr) { + let _ = channel.borrow_mut().initiate_graceful_shutdown(); } } - fn handle_closed_ack(&mut self, addr: SocketAddr) -> () { - if let Some(mut channel) = self.channel_map.remove(&addr) { - match channel.state { - ChannelState::Connected(_, _) => { - error!(self.log, "ClosedAck for connected Channel: {:#?}", channel); - self.channel_map.insert(addr, channel); - } - _ => { - channel.shutdown(); - let buffer = channel.destroy(); - self.buffer_pool.return_buffer(buffer); - } - } + /// Handles all logic necessary to shutdown a channel for which the connection has been lost. + fn lost_connection(&mut self, mut channel: RefMut) -> () { + trace!(self.log, "Lost connection to address {}", channel.address()); + self.notify_connection_state(channel.address(), ConnectionState::Lost); + channel.shutdown(); + for rejected_frame in channel.take_outbound() { + self.reject_dispatch_data(channel.address(), DispatchData::Serialised(rejected_frame)); } } fn stop(&mut self) -> () { - let tokens = self.token_map.clone(); - for (_, addr) in tokens { - self.try_read(&addr); - } - for (_, mut channel) in self.channel_map.drain() { + for (_, channel_rc) in self.address_map.drain() { + let mut channel = channel_rc.borrow_mut(); debug!( self.log, "Stopping channel with message count {}", channel.messages ); let _ = channel.initiate_graceful_shutdown(); } - if let Some(mut listener) = self.tcp_listener.take() { - self.poll.registry().deregister(&mut listener).ok(); - drop(listener); - debug!(self.log, "Dropped its TCP server"); - } + self.poll + .registry() + .deregister(&mut self.tcp_listener) + .expect("Deregistering listener while stopping network should work"); if let Some(mut udp_state) = self.udp_state.take() { self.poll.registry().deregister(&mut udp_state.socket).ok(); let count = udp_state.pending_messages(); @@ -1000,21 +757,41 @@ impl NetworkThread { ); } self.stopped = true; - debug!(self.log, "Stopped"); } fn kill(&mut self) -> () { - debug!(self.log, "Killing channels"); - for (_, channel) in self.channel_map.drain() { - channel.kill(); + trace!(self.log, "Killing the NetworkThread"); + for (_, channel_rc) in self.address_map.drain() { + channel_rc.borrow_mut().kill(); } self.stop(); } + fn notify_connection_state(&self, address: SocketAddr, state: ConnectionState) { + self.dispatcher_ref + .tell(DispatchEnvelope::Event(EventEnvelope::Network( + NetworkEvent::Connection(address, state), + ))); + } + + fn reject_dispatch_data(&self, address: SocketAddr, data: DispatchData) { + self.dispatcher_ref + .tell(DispatchEnvelope::Event(EventEnvelope::Network( + NetworkEvent::RejectedData(address, data), + ))); + } + fn next_token(&mut self) -> () { let next = self.token.0 + 1; self.token = Token(next); } + + fn serialise_dispatch_data(&mut self, data: DispatchData) -> Result { + match data { + DispatchData::Serialised(frame) => Ok(frame), + _ => data.into_serialised(&mut self.encode_buffer.get_buffer_encoder()?), + } + } } fn bind_with_retries( @@ -1043,6 +820,41 @@ fn bind_with_retries( } } +struct EventWithRetries { + token: Token, + readable: bool, + writeable: bool, + retries: u8, +} +impl EventWithRetries { + fn from(event: &Event) -> EventWithRetries { + EventWithRetries { + token: event.token(), + readable: event.is_readable(), + writeable: event.is_writable(), + retries: 0, + } + } + + fn writeable_with_token(token: &Token) -> EventWithRetries { + EventWithRetries { + token: *token, + readable: false, + writeable: true, + retries: 0, + } + } + + fn get_retry_event(&self) -> EventWithRetries { + EventWithRetries { + token: self.token, + readable: self.readable, + writeable: self.writeable, + retries: self.retries + 1, + } + } +} + #[cfg(test)] #[allow(unused_must_use)] mod tests { @@ -1056,7 +868,7 @@ mod tests { .poll .poll(&mut events, Some(Duration::from_millis(100))); for event in events.iter() { - thread.handle_event(event.token(), event.is_readable(), event.is_writable(), 0); + thread.handle_event(EventWithRetries::from(event)); } } @@ -1082,7 +894,7 @@ mod tests { let dispatcher_ref = system.dispatcher_ref(); // Set up the two network threads - let (network_thread1, _) = NetworkThread::new( + let network_thread1 = NetworkThreadBuilder::new( logger.clone(), "127.0.0.1:0".parse().expect("Address should work"), lookup.clone(), @@ -1090,9 +902,11 @@ mod tests { dispatch_shutdown_sender1, dispatcher_ref.clone(), NetworkConfig::default(), - ); + ) + .expect("Should work") + .build(); - let (network_thread2, _) = NetworkThread::new( + let network_thread2 = NetworkThreadBuilder::new( logger, "127.0.0.1:0".parse().expect("Address should work"), lookup, @@ -1100,7 +914,9 @@ mod tests { dispatch_shutdown_sender2, dispatcher_ref, NetworkConfig::default(), - ); + ) + .expect("Should work") + .build(); ( network_thread1, input_queue_1_sender, @@ -1128,8 +944,8 @@ mod tests { thread::sleep(Duration::from_millis(100)); // Accept requested streams - thread1.accept_stream(); - thread2.accept_stream(); + thread1.receive_stream(); + thread2.receive_stream(); // Wait for Hello to reach destination: thread::sleep(Duration::from_millis(100)); @@ -1152,26 +968,28 @@ mod tests { poll_and_handle(&mut thread2); thread::sleep(Duration::from_millis(100)); // Now we can inspect the Network channels, both only have one channel: - assert_eq!(thread1.channel_map.len(), 1); - assert_eq!(thread2.channel_map.len(), 1); + assert_eq!(thread1.address_map.len(), 1); + assert_eq!(thread2.address_map.len(), 1); // Now assert that they've kept the same channel: assert_eq!( thread1 - .channel_map + .address_map .drain() .next() .unwrap() .1 + .borrow_mut() .stream() .local_addr() .unwrap(), thread2 - .channel_map + .address_map .drain() .next() .unwrap() .1 + .borrow_mut() .stream() .peer_addr() .unwrap() @@ -1192,7 +1010,7 @@ mod tests { thread::sleep(Duration::from_millis(100)); // 1 accepts the connection and sends hello back - thread1.accept_stream(); + thread1.receive_stream(); thread::sleep(Duration::from_millis(100)); // 2 receives the Hello poll_and_handle(&mut thread2); @@ -1207,7 +1025,7 @@ mod tests { thread::sleep(Duration::from_millis(100)); // 2 accepts the connection and replies with hello - thread2.accept_stream(); + thread2.receive_stream(); thread::sleep(Duration::from_millis(100)); // 2 receives the Hello on the new channel and merges @@ -1222,27 +1040,32 @@ mod tests { poll_and_handle(&mut thread2); thread::sleep(Duration::from_millis(100)); + poll_and_handle(&mut thread1); + thread::sleep(Duration::from_millis(100)); + // Now we can inspect the Network channels, both only have one channel: - assert_eq!(thread1.channel_map.len(), 1); - assert_eq!(thread2.channel_map.len(), 1); + assert_eq!(thread1.address_map.len(), 1); + assert_eq!(thread2.address_map.len(), 1); // Now assert that they've kept the same channel: assert_eq!( thread1 - .channel_map + .address_map .drain() .next() .unwrap() .1 + .borrow_mut() .stream() .local_addr() .unwrap(), thread2 - .channel_map + .address_map .drain() .next() .unwrap() .1 + .borrow_mut() .stream() .peer_addr() .unwrap() @@ -1271,8 +1094,8 @@ mod tests { let logger = system.logger().clone(); let dispatcher_ref = system.dispatcher_ref(); - // Set up the two network threads - let (mut network_thread, _) = NetworkThread::new( + // Set up the network threads + let mut network_thread = NetworkThreadBuilder::new( logger, addr, lookup, @@ -1280,34 +1103,21 @@ mod tests { dispatch_shutdown_sender1, dispatcher_ref, network_config, - ); + ) + .expect("Should work") + .build(); // Assert that the buffer_pool is created correctly - let (pool_size, _) = network_thread.buffer_pool.get_pool_sizes(); + let (pool_size, _) = network_thread.buffer_pool.borrow_mut().get_pool_sizes(); assert_eq!(pool_size, 13); // initial_pool_size - assert_eq!(network_thread.buffer_pool.get_buffer().unwrap().len(), 128); + assert_eq!( + network_thread + .buffer_pool + .borrow_mut() + .get_buffer() + .unwrap() + .len(), + 128 + ); network_thread.stop(); } - - /* - #[test] - fn graceful_network_shutdown() -> () { - // Sets up two NetworkThreads and connects them to eachother, then shuts it down - - let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8878); - let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8880); - - let (mut network_thread1, input_queue_1_sender, mut network_thread2, input_queue_2_sender) = setup_two_threads(addr1.clone(), addr2.clone()); - - // 2 Requests connection to 1 and sends Hello - input_queue_2_sender.send(DispatchEvent::Connect(addr1.clone())); - network_thread2.receive_dispatch(); - thread::sleep(Duration::from_millis(100)); - network_thread1.accept_stream(); - // Hello is sent to network_thread2, let it read: - network_thread2.poll.poll(&mut events, None); - for event in events.iter() { - network_thread2.handle_event(event); - } - events.clear(); - }*/ } diff --git a/core/src/net/udp_state.rs b/core/src/net/udp_state.rs index e318f27e..2e4d9eeb 100644 --- a/core/src/net/udp_state.rs +++ b/core/src/net/udp_state.rs @@ -1,11 +1,11 @@ use super::*; use crate::{ messaging::{NetMessage, SerialisedFrame}, - net::buffers::{BufferChunk, DecodeBuffer}, + net::buffers::{BufferChunk, BufferPool, DecodeBuffer}, }; use mio::net::UdpSocket; use network_thread::*; -use std::{cmp::min, collections::VecDeque, io, net::SocketAddr}; +use std::{cell::RefCell, cmp::min, collections::VecDeque, io, io::Error, net::SocketAddr}; // Note that this is a theoretical IPv4 limit. // This may be violated with IPv6 jumbograms. @@ -79,31 +79,40 @@ impl UdpState { Ok(sent_bytes) } - pub(super) fn try_read(&mut self) -> io::Result<(usize, IoReturn)> { - let mut received_bytes: usize = 0; + fn swap_buffer(&mut self, buffer_pool: &RefCell) -> () { + let mut pool = buffer_pool.borrow_mut(); + if let Some(mut buffer) = pool.get_buffer() { + debug!( + self.logger, + "Swapping UDP buffer as only {} bytes remain.", + self.input_buffer.writeable_len() + ); + self.input_buffer.swap_buffer(&mut buffer); + pool.return_buffer(buffer); + } + } + + pub(super) fn try_read(&mut self, buffer_pool: &RefCell) -> io::Result<()> { let mut interrupts = 0; loop { - if let Some(mut buf) = self.input_buffer.get_writeable() { - if buf.len() < self.max_packet_size { - debug!( - self.logger, - "Swapping UDP buffer as only {} bytes remain.", - buf.len() - ); - return Ok((received_bytes, IoReturn::SwapBuffer)); + if self.input_buffer.writeable_len() < self.max_packet_size { + self.swap_buffer(buffer_pool); + if self.input_buffer.writeable_len() < self.max_packet_size { + return Err(Error::new(io::ErrorKind::InvalidInput, "Out of Buffers")); } + } + if let Some(mut buf) = self.input_buffer.get_writeable() { match self.socket.recv_from(&mut buf) { Ok((0, addr)) => { debug!(self.logger, "Got empty UDP datagram from {}", addr); - return Ok((received_bytes, IoReturn::None)); + return Ok(()); } Ok((n, addr)) => { - received_bytes += n; self.input_buffer.advance_writeable(n); self.decode_message(addr); } Err(err) if would_block(&err) => { - return Ok((received_bytes, IoReturn::None)); + return Ok(()); } Err(err) if interrupted(&err) => { // We should continue trying until no interruption @@ -117,7 +126,7 @@ impl UdpState { } } } else { - return Ok((received_bytes, IoReturn::SwapBuffer)); + return Err(Error::new(io::ErrorKind::InvalidInput, "Out of Buffers")); } } } @@ -155,8 +164,4 @@ impl UdpState { pub(super) fn enqueue_serialised(&mut self, addr: SocketAddr, frame: SerialisedFrame) -> () { self.outbound_queue.push_back((addr, frame)); } - - pub(super) fn swap_buffer(&mut self, new_buffer: &mut BufferChunk) -> () { - self.input_buffer.swap_buffer(new_buffer); - } } diff --git a/core/tests/dispatch_integration_tests.rs b/core/tests/dispatch_integration_tests.rs index 8504f7b1..d7de02af 100644 --- a/core/tests/dispatch_integration_tests.rs +++ b/core/tests/dispatch_integration_tests.rs @@ -1116,12 +1116,13 @@ fn network_status_port_established_lost_dropped_connection() { local_system.create_and_register(move || PingerAct::new_lazy(pinger_path)); local_system.start(&failing_pinger2); - thread::sleep(Duration::from_millis(12000)); // let failure and drop happen - // Assert connection lost and dropped + // Wait for connection to be dropped + thread::sleep(Duration::from_millis(12000)); + status_counter.on_definition(|sc| { - assert_eq!(sc.connection_established, 1); - assert_eq!(sc.connection_lost, 1); - assert_eq!(sc.connection_dropped, 1); + assert_eq!(sc.connection_established, 1, "Connection established count"); + assert_eq!(sc.connection_lost, 1, "Connection lost count"); + assert_eq!(sc.connection_dropped, 1, "Connection dropped count"); }); } diff --git a/docs/src/distributed/networkbuffers.md b/docs/src/distributed/networkbuffers.md index 1c8fc224..99cd0226 100644 --- a/docs/src/distributed/networkbuffers.md +++ b/docs/src/distributed/networkbuffers.md @@ -9,20 +9,21 @@ Before we begin describing the Network Buffers we remind the reader that there a With lazy serialisation the Actor moves the data to the heap, and transfers it unserialised to the `NetworkDispatcher`, which later serialises the message into its (the `NetworkDispatcher`'s) own buffers. Eager serialisation serialises the data immediately into the Actor's buffers, and then transfers ownership of the serialised the data to the `NetworkDispatcher`. +Lazy serialisation may fail due to two reasons: A serialisation error, or there are no available buffers. Both will be +unnoticeable to the actor initiating the message sending, and both will lead to the message being lost. + ## How the Buffer Pools work ### Buffer Pool locations -In a Kompact system where many actors use eager serialisation there will be many `BufferPool` instances. Ìf the actors in the system only use lazy serialisation there will be two pools, one used by the `NetworkDispatcher` for serialising outbound data, and one used by the `NetworkThread` for receiving incoming data. +In a Kompact system where many actors use eager serialisation there will be many `BufferPool` instances. If the actors in the system only use lazy serialisation there will be a single pool, owned by the `NetworkThread` for serialising and receiving data. ### BufferPool, BufferChunk, and ChunkLease Each `BufferPool` (pool) consists of more than one `BufferChunk` (chunks). A chunk is the concrete memory area used for serialising data into. There may be many messages serialised into a single chunk, and discrete slices of the chunks (i.e. individual messages) can be extracted and sent to other threads/actors through the smart-pointer `ChunkLease` (lease). When a chunk runs out of space it will be locked and returned to the pool. If and only if all outstanding leases created from a chunk have been dropped may the chunk be unlocked and reused, or deallocated. When a pool is created it will pre-allocate a configurable amount of chunks, and will attempt to reuse those as long as possible, and only when it needs to will it allocate more chunks, up to a configurable maximum number of chunks. -**Note: In the current version, behaviour is unstable when a pool runs out of chunks and is unable to allocate more and will often cause a panic, similar to out-of-memory exception.** - -### The BufferPool interface -Actors access their pool through the `EncodeBuffer` wrapper which maintains a single active chunk at a time, and automatically swaps the active buffer with the local `BufferPool` when necessary. +### BufferPool interface +Actors access their pool through the `EncodeBuffer` wrapper which maintains a single active chunk at a time, and automatically swaps the active buffer with the local `BufferPool` when necessary. The method `tell_serialised(msg, &self)` automatically uses the `EncodeBuffer` interface such that users of Kompact do not need to use the interfaces of the pool (which is why the method requires a `self` reference).