Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 51 additions & 60 deletions p2p/src/network/yamux/p2p_network_yamux_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ use std::collections::VecDeque;

use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};

use crate::P2pLimits;

use self::p2p_network_yamux_state::{YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxStreamState};

use super::{super::*, *};
use crate::{
yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner},
Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError,
P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction,
P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
};

use super::{
p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE},
P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing,
};

impl P2pNetworkYamuxState {
/// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing
Expand Down Expand Up @@ -130,12 +136,12 @@ impl P2pNetworkYamuxState {
}

match &frame.inner {
YamuxFrameInner::Data(data) => {
YamuxFrameInner::Data(_) => {
if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
// must not underflow
// TODO: check it and disconnect peer that violates flow rules
stream.window_ours =
stream.window_ours.saturating_sub(data.len() as u32);
stream.window_ours.saturating_sub(frame.len_as_u32());
}
}
YamuxFrameInner::WindowUpdate { difference } => {
Expand All @@ -150,18 +156,12 @@ impl P2pNetworkYamuxState {
// have some fresh space in the window
// try send as many frames as can
let mut window = stream.window_theirs;
while let Some(mut frame) = stream.pending.pop_front() {
let len = frame.len() as u32;
while let Some(frame) = stream.pending.pop_front() {
let len = frame.len_as_u32();
pending_outgoing.push_back(frame);
if let Some(new_window) = window.checked_sub(len) {
pending_outgoing.push_back(frame);
window = new_window;
} else {
if let Some(remaining) = frame.split_at((len - window) as usize)
{
stream.pending.push_front(remaining);
}
pending_outgoing.push_back(frame);

break;
}
}
Expand Down Expand Up @@ -231,11 +231,11 @@ impl P2pNetworkYamuxState {
}
match &frame.inner {
YamuxFrameInner::Data(data) => {
// here we are very permissive
// always when our window is smaller 64 kb, just increase it by 256 kb
// if we need fine grained back pressure, it should be implemented here
if stream.window_ours < 64 * 1024 {
let difference = 256 * 1024;
// when our window size is less than half of the max window size send window update
if stream.window_ours < stream.max_window_size / 2 {
let difference =
stream.max_window_size.saturating_mul(2).min(1024 * 1024);

dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
addr,
frame: YamuxFrame {
Expand Down Expand Up @@ -285,48 +285,39 @@ impl P2pNetworkYamuxState {
return Ok(());
};
match &mut frame.inner {
YamuxFrameInner::Data(data) => {
stream.window_theirs = stream
.window_theirs
.checked_sub(data.len() as u32)
.unwrap_or_default();

// TODO: code bellow include splitting the frame so that data which doesn't fit inside the window doesn't get sent, this breaks the bootstrap to rust

// if let Some(new_window) =
// stream.window_theirs.checked_sub(data.len() as u32)
// {
// // their window is big enough, decrease the size
// // and send the whole frame
// stream.window_theirs = new_window;
// } else if stream.window_theirs != 0 && stream.pending.is_empty() {
// // their window is not big enough, but has some space,
// // and the queue is empty,
// // do not send the whole frame,
// // split it and put remaining in the queue,
// if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
// stream.pending.push_back(remaining);
// }
// // the window will be zero after sending
// stream.window_theirs = 0;
// } else {
// // either the window cannot accept any byte,
// // or the queue is already not empty
// // in both cases the whole frame goes in the queue and nothing to send
// stream.pending.push_back(frame);
// if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
// > yamux_state.pending_outgoing_limit
// {
// let dispatcher = state_context.into_dispatcher();
// let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
// dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
// }

// return Ok(());
// }
YamuxFrameInner::Data(_) => {
if let Some(new_window) =
stream.window_theirs.checked_sub(frame.len_as_u32())
{
// their window is big enough, decrease the size
// and send the whole frame
stream.window_theirs = new_window;
} else {
// their window is not big enough
// split the frame to send as much as you can and put the rest in the queue
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
stream.pending.push_front(remaining);
}

// the window will be zero after sending
stream.window_theirs = 0;

// if size of pending that is above the limit, ignore the peer
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
> yamux_state.pending_outgoing_limit
{
let dispatcher = state_context.into_dispatcher();
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
return Ok(());
}
}
}
YamuxFrameInner::WindowUpdate { difference } => {
stream.window_ours = stream.window_ours.saturating_add(*difference);
if stream.window_ours > stream.max_window_size {
stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE);
}
}
_ => {}
}
Expand Down
17 changes: 15 additions & 2 deletions p2p/src/network/yamux/p2p_network_yamux_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
use super::super::*;

pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb
pub const INITIAL_WINDOW_SIZE: u32 = INITIAL_RECV_BUFFER_CAPACITY as u32;
pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; // 16mb

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct P2pNetworkYamuxState {
Expand Down Expand Up @@ -199,6 +201,7 @@ pub struct YamuxStreamState {
pub writable: bool,
pub window_theirs: u32,
pub window_ours: u32,
pub max_window_size: u32,
pub pending: VecDeque<YamuxFrame>,
}

Expand All @@ -210,8 +213,9 @@ impl Default for YamuxStreamState {
established: false,
readable: false,
writable: false,
window_theirs: 256 * 1024,
window_ours: 256 * 1024,
window_theirs: INITIAL_WINDOW_SIZE,
window_ours: INITIAL_WINDOW_SIZE,
max_window_size: INITIAL_WINDOW_SIZE,
pending: VecDeque::default(),
}
}
Expand Down Expand Up @@ -339,6 +343,15 @@ impl YamuxFrame {
}
}

// When we parse the frame we parse length as u32 and so `data.len()` should always be representable as u32
pub fn len_as_u32(&self) -> u32 {
if let YamuxFrameInner::Data(data) = &self.inner {
u32::try_from(data.len()).unwrap_or(u32::MAX)
} else {
0
}
}

/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
/// otherwise return none
pub fn split_at(&mut self, pos: usize) -> Option<Self> {
Expand Down
Loading