Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 25 additions & 17 deletions core/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod queue_manager;

// Default values for network config.
const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
const BOOT_TIMEOUT: u64 = 5000;
const MAX_RETRY_ATTEMPTS: u8 = 10;

type NetHashMap<K, V> = FxHashMap<K, V>;
Expand Down Expand Up @@ -74,6 +75,7 @@ pub struct NetworkConfig {
tcp_nodelay: bool,
max_connection_retry_attempts: u8,
connection_retry_interval: u64,
boot_timeout: u64,
}

impl NetworkConfig {
Expand All @@ -88,22 +90,17 @@ impl NetworkConfig {
tcp_nodelay: true,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
boot_timeout: BOOT_TIMEOUT,
}
}

/// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
/// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::Tcp,
buffer_config,
custom_allocator: None,
tcp_nodelay: true,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
}
let mut cfg = NetworkConfig::new(addr);
cfg.set_buffer_config(buffer_config);
cfg
}

/// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
Expand All @@ -122,6 +119,7 @@ impl NetworkConfig {
tcp_nodelay: true,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
boot_timeout: BOOT_TIMEOUT,
}
}

Expand Down Expand Up @@ -192,6 +190,18 @@ impl NetworkConfig {
pub fn get_connection_retry_interval(&self) -> u64 {
self.connection_retry_interval
}

/// Configures how long the system will wait (in ms) for the network layer to set-up
///
/// Default value is 5000 ms.
pub fn set_boot_timeout(&mut self, milliseconds: u64) {
self.boot_timeout = milliseconds;
}

/// How long (in ms) the system will wait (in ms) for the network layer to set-up
pub fn get_boot_timeout(&self) -> u64 {
self.boot_timeout
}
}

/// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::Tcp)
Expand All @@ -205,6 +215,7 @@ impl Default for NetworkConfig {
tcp_nodelay: true,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
boot_timeout: BOOT_TIMEOUT,
}
}
}
Expand Down Expand Up @@ -653,15 +664,12 @@ impl NetworkDispatcher {
}
}
ConnectionState::Initializing => {
//debug!(self.ctx.log(), "Connection is initializing; queuing frame");
self.queue_manager.enqueue_data(data, addr);
None
}
ConnectionState::Closed => {
// Enqueue the Frame and request a connection to the destination.
self.queue_manager.enqueue_data(data, addr);
if let Some(bridge) = &self.net_bridge {
// Request a new connection
bridge.connect(Tcp, addr)?;
}
Some(ConnectionState::Initializing)
Expand Down Expand Up @@ -829,9 +837,10 @@ impl NetworkDispatcher {
if let Some(state) = self.connections.get_mut(&addr) {
match state {
ConnectionState::Connected => {
debug!(
trace!(
self.ctx.log(),
"Closing channel to connected system {}", addr
"Closing channel to connected system {}",
addr
);
if let Some(bridge) = &self.net_bridge {
while self.queue_manager.has_data(&addr) {
Expand Down Expand Up @@ -953,10 +962,9 @@ impl ComponentLifecycle for NetworkDispatcher {

impl Provide<NetworkStatusPort> for NetworkDispatcher {
fn handle(&mut self, event: <NetworkStatusPort as Port>::Request) -> Handled {
trace!(
debug!(
self.ctx.log(),
"Received NetworkStatusPort Request {:?}",
event
"Received NetworkStatusPort Request {:?}", event
);
match event {
NetworkStatusRequest::DisconnectSystem(system_path) => {
Expand Down
82 changes: 46 additions & 36 deletions core/src/net/buffers/decode_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,29 @@ impl DecodeBuffer {
// If the readable portion of the buffer would have less than `encode_buf_min_free_space`
// Or if we would return less than 8 readable bytes we don't allow further writing into
// the current buffer, caller must swap.
// TODO: Define what is a sensible amount of minimum bytes to be read at any given moment.
if self.writeable_len() < 8 {
return None;
if self.is_writeable() {
unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) }
} else {
None
}
unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) }
}

/// True if there is sufficient amount of writeable bytes
pub(crate) fn is_writeable(&mut self) -> bool {
self.writeable_len() > 8
}

/// Returns true if there is data to be decoded, else false
pub(crate) fn has_frame(&mut self) -> io::Result<bool> {
if self.decode_frame_head().is_err() {
return Err(io::Error::new(io::ErrorKind::InvalidData, "framing error"));
}
if let Some(head) = &self.next_frame_head {
if self.readable_len() >= head.content_length() {
return Ok(true);
}
}
Ok(false)
}

/// Swaps the underlying buffer in place with other
Expand All @@ -86,7 +104,10 @@ impl DecodeBuffer {
if let Some(mut overflow_chunk) = overflow {
// TODO: change the config parameter to a separate value?
let overflow_len = overflow_chunk.remaining();
if self.writeable_len() - overflow_len > self.buffer_config.encode_buf_min_free_space {
if overflow_len < self.writeable_len()
&& self.writeable_len() - overflow_len
> self.buffer_config.encode_buf_min_free_space
{
// Just copy the overflow_chunk bytes, no need to chain.
// the overflow must not exceed the new buffers capacity
unsafe {
Expand Down Expand Up @@ -145,47 +166,41 @@ impl DecodeBuffer {

/// Tries to decode one frame from the readable part of the buffer
pub fn get_frame(&mut self) -> Result<Frame, FramingError> {
self.decode_frame_head()?;
if let Some(head) = &self.next_frame_head {
if self.readable_len() >= head.content_length() {
let head = self.next_frame_head.take().unwrap();
let chunk_lease = self.read_chunk_lease(head.content_length());
match head.frame_type() {
return match head.frame_type() {
// Frames with empty bodies should be handled in frame-head decoding below.
FrameType::Data => {
Data::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
Data::decode_from(self.read_chunk_lease(head.content_length()))
.map_err(|_| FramingError::InvalidFrame)
}
FrameType::StreamRequest => StreamRequest::decode_from(chunk_lease)
.map_err(|_| FramingError::InvalidFrame),
FrameType::Hello => {
Hello::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
Hello::decode_from(self.read_chunk_lease(head.content_length()))
.map_err(|_| FramingError::InvalidFrame)
}
FrameType::Start => {
Start::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
}
FrameType::Ack => {
Ack::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
Start::decode_from(self.read_chunk_lease(head.content_length()))
.map_err(|_| FramingError::InvalidFrame)
}
// Frames without content match here for expediency, Decoder doesn't allow 0 length.
FrameType::Bye => Ok(Frame::Bye()),
FrameType::Ack => Ok(Frame::Ack()),
_ => Err(FramingError::UnsupportedFrameType),
}
} else {
Err(FramingError::NoData)
};
}
} else if self.readable_len() >= FRAME_HEAD_LEN as usize {
}
Err(FramingError::NoData)
}

fn decode_frame_head(&mut self) -> Result<(), FramingError> {
if self.next_frame_head.is_none() && self.readable_len() >= FRAME_HEAD_LEN as usize {
let mut chunk_lease = self.read_chunk_lease(FRAME_HEAD_LEN as usize);
let head = FrameHead::decode_from(&mut chunk_lease)?;
if head.content_length() == 0 {
match head.frame_type() {
// Frames without content match here for expediency, Decoder doesn't allow 0 length.
FrameType::Bye => Ok(Frame::Bye()),
_ => Err(FramingError::NoData),
}
} else {
self.next_frame_head = Some(head);
self.get_frame()
}
} else {
Err(FramingError::NoData)
self.next_frame_head = Some(head);
}
Ok(())
}

/// Extracts the readable portion (if any) from the active buffer as a ChunkLease
Expand All @@ -198,11 +213,6 @@ impl DecodeBuffer {
}
None
}

/// Destroys the DecodeBuffer and returns the BufferChunk
pub(crate) fn destroy(self) -> BufferChunk {
self.buffer
}
}

#[cfg(test)]
Expand Down
Loading