diff --git a/p2p/src/network/yamux/p2p_network_yamux_actions.rs b/p2p/src/network/yamux/p2p_network_yamux_actions.rs index 1e86296d73..f34166d421 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_actions.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_actions.rs @@ -20,7 +20,6 @@ pub enum P2pNetworkYamuxAction { #[action_event(level = trace)] IncomingFrame { addr: ConnectionAddr, - frame: YamuxFrame, }, #[action_event(level = trace)] OutgoingFrame { diff --git a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs index 4b89341c75..1465ae34cb 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs @@ -89,7 +89,7 @@ impl P2pNetworkYamuxState { } } 1 => { - let difference = i32::from_be_bytes(b); + let difference = u32::from_be_bytes(b); let frame = YamuxFrame { flags, stream_id, @@ -100,7 +100,7 @@ impl P2pNetworkYamuxState { continue; } 2 => { - let opaque = i32::from_be_bytes(b); + let opaque = u32::from_be_bytes(b); let frame = YamuxFrame { flags, stream_id, @@ -143,11 +143,12 @@ impl P2pNetworkYamuxState { yamux_state.buffer = yamux_state.buffer[offset..].to_vec(); - let incoming_data = yamux_state.incoming.clone(); + let frame_count = yamux_state.incoming.len(); let dispatcher = state_context.into_dispatcher(); - incoming_data.into_iter().for_each(|frame| { - dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr, frame }) - }); + + for _ in 0..frame_count { + dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr }) + } Ok(()) } @@ -181,69 +182,74 @@ impl P2pNetworkYamuxState { Ok(()) } - P2pNetworkYamuxAction::IncomingFrame { addr, frame } => { + P2pNetworkYamuxAction::IncomingFrame { addr } => { let mut pending_outgoing = VecDeque::default(); - if let Some(frame) = yamux_state.incoming.pop_front() { - if frame.flags.contains(YamuxFlags::SYN) { - yamux_state - .streams - .insert(frame.stream_id, YamuxStreamState::incoming()); + let Some(frame) = yamux_state.incoming.pop_front() else { + bug_condition!( + "Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`" + ); + return Ok(()); + }; + + if frame.flags.contains(YamuxFlags::SYN) { + yamux_state + .streams + .insert(frame.stream_id, YamuxStreamState::incoming()); - if frame.stream_id != 0 { - connection_state.streams.insert( - frame.stream_id, - P2pNetworkStreamState::new_incoming(meta.time()), - ); + if frame.stream_id != 0 { + connection_state.streams.insert( + frame.stream_id, + P2pNetworkStreamState::new_incoming(meta.time()), + ); + } + } + if frame.flags.contains(YamuxFlags::ACK) { + yamux_state + .streams + .entry(frame.stream_id) + .or_default() + .established = true; + } + + match &frame.inner { + YamuxFrameInner::Data(data) => { + if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) { + // must not underflow + // TODO: check it and disconnect peer that violates flow rules + stream.window_ours = stream.window_ours.wrapping_sub(data.len() as u32); } } - if frame.flags.contains(YamuxFlags::ACK) { - yamux_state + YamuxFrameInner::WindowUpdate { difference } => { + let stream = yamux_state .streams .entry(frame.stream_id) - .or_default() - .established = true; - } - - match frame.inner { - YamuxFrameInner::Data(data) => { - if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) { - // must not underflow - // TODO: check it and disconnect peer that violates flow rules - stream.window_ours = - stream.window_ours.wrapping_sub(data.len() as u32); - } - } - YamuxFrameInner::WindowUpdate { difference } => { - let stream = yamux_state - .streams - .entry(frame.stream_id) - .or_insert_with(YamuxStreamState::incoming); - stream.update_window(false, difference); - if difference > 0 { - // have some fresh space in the window - // try send as many frames as can - let mut window = stream.window_theirs; - while let Some(mut frame) = stream.pending.pop_front() { - let len = frame.len() as u32; - if let Some(new_window) = window.checked_sub(len) { - pending_outgoing.push_back(frame); - window = new_window; - } else { - if let Some(remaining) = - frame.split_at((len - window) as usize) - { - stream.pending.push_front(remaining); - } - pending_outgoing.push_back(frame); - - break; + .or_insert_with(YamuxStreamState::incoming); + + stream.window_theirs = stream.window_theirs.saturating_add(*difference); + + if *difference > 0 { + // have some fresh space in the window + // try send as many frames as can + let mut window = stream.window_theirs; + while let Some(mut frame) = stream.pending.pop_front() { + let len = frame.len() as u32; + if let Some(new_window) = window.checked_sub(len) { + pending_outgoing.push_back(frame); + window = new_window; + } else { + if let Some(remaining) = frame.split_at((len - window) as usize) + { + stream.pending.push_front(remaining); } + pending_outgoing.push_back(frame); + + break; } } } - YamuxFrameInner::Ping { .. } => {} - YamuxFrameInner::GoAway(res) => yamux_state.set_res(res), } + YamuxFrameInner::Ping { .. } => {} + YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res), } let (dispatcher, state) = state_context.into_dispatcher_and_state(); @@ -344,16 +350,9 @@ impl P2pNetworkYamuxState { }); } } - YamuxFrameInner::WindowUpdate { difference } => { - if *difference < 0 { - let error = - P2pNetworkConnectionError::YamuxBadWindowUpdate(frame.stream_id); - dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); - } else { - while let Some(frame) = pending_outgoing.pop_front() { - dispatcher - .push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame }); - } + YamuxFrameInner::WindowUpdate { .. } => { + while let Some(frame) = pending_outgoing.pop_front() { + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame }); } } _ => {} @@ -408,7 +407,7 @@ impl P2pNetworkYamuxState { // } } YamuxFrameInner::WindowUpdate { difference } => { - stream.update_window(true, *difference); + stream.window_ours = stream.window_ours.saturating_add(*difference); } _ => {} } @@ -475,24 +474,3 @@ impl P2pNetworkYamuxState { } } } - -impl YamuxStreamState { - pub fn update_window(&mut self, ours: bool, difference: i32) { - let window = if ours { - &mut self.window_ours - } else { - &mut self.window_theirs - }; - if difference < 0 { - let decreasing = (-difference) as u32; - if *window < decreasing { - *window = 0; - } else { - *window = (*window).wrapping_sub(decreasing); - } - } else { - let increasing = difference as u32; - *window = (*window).wrapping_add(increasing); - } - } -} diff --git a/p2p/src/network/yamux/p2p_network_yamux_state.rs b/p2p/src/network/yamux/p2p_network_yamux_state.rs index 766465a786..23c004ca9e 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_state.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_state.rs @@ -95,7 +95,7 @@ bitflags::bitflags! { #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub struct YamuxPing { pub stream_id: StreamId, - pub opaque: i32, + pub opaque: u32, pub response: bool, } @@ -227,12 +227,12 @@ impl YamuxFrame { #[derive(Serialize, Deserialize, Debug, Clone)] pub enum YamuxFrameInner { Data(Data), - WindowUpdate { difference: i32 }, - Ping { opaque: i32 }, + WindowUpdate { difference: u32 }, + Ping { opaque: u32 }, GoAway(Result<(), YamuxSessionError>), } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum YamuxSessionError { Protocol, Internal,