diff --git a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs index 78cdd4d209..e3fbdb72c1 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs @@ -2,11 +2,17 @@ use std::collections::VecDeque; use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess}; -use crate::P2pLimits; - -use self::p2p_network_yamux_state::{YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxStreamState}; - -use super::{super::*, *}; +use crate::{ + yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner}, + Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError, + P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction, + P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind, +}; + +use super::{ + p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE}, + P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing, +}; impl P2pNetworkYamuxState { /// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing @@ -130,12 +136,12 @@ impl P2pNetworkYamuxState { } match &frame.inner { - YamuxFrameInner::Data(data) => { + YamuxFrameInner::Data(_) => { if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) { // must not underflow // TODO: check it and disconnect peer that violates flow rules stream.window_ours = - stream.window_ours.saturating_sub(data.len() as u32); + stream.window_ours.saturating_sub(frame.len_as_u32()); } } YamuxFrameInner::WindowUpdate { difference } => { @@ -150,18 +156,12 @@ impl P2pNetworkYamuxState { // have some fresh space in the window // try send as many frames as can let mut window = stream.window_theirs; - while let Some(mut frame) = stream.pending.pop_front() { - let len = frame.len() as u32; + while let Some(frame) = stream.pending.pop_front() { + let len = frame.len_as_u32(); + pending_outgoing.push_back(frame); if let Some(new_window) = window.checked_sub(len) { - pending_outgoing.push_back(frame); window = new_window; } else { - if let Some(remaining) = frame.split_at((len - window) as usize) - { - stream.pending.push_front(remaining); - } - pending_outgoing.push_back(frame); - break; } } @@ -231,11 +231,11 @@ impl P2pNetworkYamuxState { } match &frame.inner { YamuxFrameInner::Data(data) => { - // here we are very permissive - // always when our window is smaller 64 kb, just increase it by 256 kb - // if we need fine grained back pressure, it should be implemented here - if stream.window_ours < 64 * 1024 { - let difference = 256 * 1024; + // when our window size is less than half of the max window size send window update + if stream.window_ours < stream.max_window_size / 2 { + let difference = + stream.max_window_size.saturating_mul(2).min(1024 * 1024); + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame: YamuxFrame { @@ -285,48 +285,39 @@ impl P2pNetworkYamuxState { return Ok(()); }; match &mut frame.inner { - YamuxFrameInner::Data(data) => { - stream.window_theirs = stream - .window_theirs - .checked_sub(data.len() as u32) - .unwrap_or_default(); - - // TODO: code bellow include splitting the frame so that data which doesn't fit inside the window doesn't get sent, this breaks the bootstrap to rust - - // if let Some(new_window) = - // stream.window_theirs.checked_sub(data.len() as u32) - // { - // // their window is big enough, decrease the size - // // and send the whole frame - // stream.window_theirs = new_window; - // } else if stream.window_theirs != 0 && stream.pending.is_empty() { - // // their window is not big enough, but has some space, - // // and the queue is empty, - // // do not send the whole frame, - // // split it and put remaining in the queue, - // if let Some(remaining) = frame.split_at(stream.window_theirs as usize) { - // stream.pending.push_back(remaining); - // } - // // the window will be zero after sending - // stream.window_theirs = 0; - // } else { - // // either the window cannot accept any byte, - // // or the queue is already not empty - // // in both cases the whole frame goes in the queue and nothing to send - // stream.pending.push_back(frame); - // if stream.pending.iter().map(YamuxFrame::len).sum::() - // > yamux_state.pending_outgoing_limit - // { - // let dispatcher = state_context.into_dispatcher(); - // let error = P2pNetworkConnectionError::YamuxOverflow(stream_id); - // dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); - // } - - // return Ok(()); - // } + YamuxFrameInner::Data(_) => { + if let Some(new_window) = + stream.window_theirs.checked_sub(frame.len_as_u32()) + { + // their window is big enough, decrease the size + // and send the whole frame + stream.window_theirs = new_window; + } else { + // their window is not big enough + // split the frame to send as much as you can and put the rest in the queue + if let Some(remaining) = frame.split_at(stream.window_theirs as usize) { + stream.pending.push_front(remaining); + } + + // the window will be zero after sending + stream.window_theirs = 0; + + // if size of pending that is above the limit, ignore the peer + if stream.pending.iter().map(YamuxFrame::len).sum::() + > yamux_state.pending_outgoing_limit + { + let dispatcher = state_context.into_dispatcher(); + let error = P2pNetworkConnectionError::YamuxOverflow(stream_id); + dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); + return Ok(()); + } + } } YamuxFrameInner::WindowUpdate { difference } => { stream.window_ours = stream.window_ours.saturating_add(*difference); + if stream.window_ours > stream.max_window_size { + stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE); + } } _ => {} } diff --git a/p2p/src/network/yamux/p2p_network_yamux_state.rs b/p2p/src/network/yamux/p2p_network_yamux_state.rs index 90292155eb..938ab3d68b 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_state.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_state.rs @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize}; use super::super::*; pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb +pub const INITIAL_WINDOW_SIZE: u32 = INITIAL_RECV_BUFFER_CAPACITY as u32; +pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; // 16mb #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct P2pNetworkYamuxState { @@ -199,6 +201,7 @@ pub struct YamuxStreamState { pub writable: bool, pub window_theirs: u32, pub window_ours: u32, + pub max_window_size: u32, pub pending: VecDeque, } @@ -210,8 +213,9 @@ impl Default for YamuxStreamState { established: false, readable: false, writable: false, - window_theirs: 256 * 1024, - window_ours: 256 * 1024, + window_theirs: INITIAL_WINDOW_SIZE, + window_ours: INITIAL_WINDOW_SIZE, + max_window_size: INITIAL_WINDOW_SIZE, pending: VecDeque::default(), } } @@ -339,6 +343,15 @@ impl YamuxFrame { } } + // When we parse the frame we parse length as u32 and so `data.len()` should always be representable as u32 + pub fn len_as_u32(&self) -> u32 { + if let YamuxFrameInner::Data(data) = &self.inner { + u32::try_from(data.len()).unwrap_or(u32::MAX) + } else { + 0 + } + } + /// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining /// otherwise return none pub fn split_at(&mut self, pos: usize) -> Option {