@@ -143,11 +143,12 @@ impl P2pNetworkYamuxState {
143143
144144 yamux_state. buffer = yamux_state. buffer [ offset..] . to_vec ( ) ;
145145
146- let incoming_data = yamux_state. incoming . clone ( ) ;
146+ let frame_count = yamux_state. incoming . len ( ) ;
147147 let dispatcher = state_context. into_dispatcher ( ) ;
148- incoming_data. into_iter ( ) . for_each ( |frame| {
149- dispatcher. push ( P2pNetworkYamuxAction :: IncomingFrame { addr, frame } )
150- } ) ;
148+
149+ for _ in 0 ..frame_count {
150+ dispatcher. push ( P2pNetworkYamuxAction :: IncomingFrame { addr } )
151+ }
151152
152153 Ok ( ( ) )
153154 }
@@ -181,69 +182,72 @@ impl P2pNetworkYamuxState {
181182
182183 Ok ( ( ) )
183184 }
184- P2pNetworkYamuxAction :: IncomingFrame { addr, frame } => {
185+ P2pNetworkYamuxAction :: IncomingFrame { addr } => {
185186 let mut pending_outgoing = VecDeque :: default ( ) ;
186- if let Some ( frame) = yamux_state. incoming . pop_front ( ) {
187- if frame. flags . contains ( YamuxFlags :: SYN ) {
188- yamux_state
189- . streams
190- . insert ( frame. stream_id , YamuxStreamState :: incoming ( ) ) ;
187+ let Some ( frame) = yamux_state. incoming . pop_front ( ) else {
188+ bug_condition ! (
189+ "Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
190+ ) ;
191+ return Ok ( ( ) ) ;
192+ } ;
193+
194+ if frame. flags . contains ( YamuxFlags :: SYN ) {
195+ yamux_state
196+ . streams
197+ . insert ( frame. stream_id , YamuxStreamState :: incoming ( ) ) ;
198+
199+ if frame. stream_id != 0 {
200+ connection_state. streams . insert (
201+ frame. stream_id ,
202+ P2pNetworkStreamState :: new_incoming ( meta. time ( ) ) ,
203+ ) ;
204+ }
205+ }
206+ if frame. flags . contains ( YamuxFlags :: ACK ) {
207+ yamux_state
208+ . streams
209+ . entry ( frame. stream_id )
210+ . or_default ( )
211+ . established = true ;
212+ }
191213
192- if frame. stream_id != 0 {
193- connection_state. streams . insert (
194- frame. stream_id ,
195- P2pNetworkStreamState :: new_incoming ( meta. time ( ) ) ,
196- ) ;
214+ match & frame. inner {
215+ YamuxFrameInner :: Data ( data) => {
216+ if let Some ( stream) = yamux_state. streams . get_mut ( & frame. stream_id ) {
217+ // must not underflow
218+ // TODO: check it and disconnect peer that violates flow rules
219+ stream. window_ours = stream. window_ours . wrapping_sub ( data. len ( ) as u32 ) ;
197220 }
198221 }
199- if frame . flags . contains ( YamuxFlags :: ACK ) {
200- yamux_state
222+ YamuxFrameInner :: WindowUpdate { difference } => {
223+ let stream = yamux_state
201224 . streams
202225 . entry ( frame. stream_id )
203- . or_default ( )
204- . established = true ;
205- }
206-
207- match frame. inner {
208- YamuxFrameInner :: Data ( data) => {
209- if let Some ( stream) = yamux_state. streams . get_mut ( & frame. stream_id ) {
210- // must not underflow
211- // TODO: check it and disconnect peer that violates flow rules
212- stream. window_ours =
213- stream. window_ours . wrapping_sub ( data. len ( ) as u32 ) ;
214- }
215- }
216- YamuxFrameInner :: WindowUpdate { difference } => {
217- let stream = yamux_state
218- . streams
219- . entry ( frame. stream_id )
220- . or_insert_with ( YamuxStreamState :: incoming) ;
221- stream. update_window ( false , difference) ;
222- if difference > 0 {
223- // have some fresh space in the window
224- // try send as many frames as can
225- let mut window = stream. window_theirs ;
226- while let Some ( mut frame) = stream. pending . pop_front ( ) {
227- let len = frame. len ( ) as u32 ;
228- if let Some ( new_window) = window. checked_sub ( len) {
229- pending_outgoing. push_back ( frame) ;
230- window = new_window;
231- } else {
232- if let Some ( remaining) =
233- frame. split_at ( ( len - window) as usize )
234- {
235- stream. pending . push_front ( remaining) ;
236- }
237- pending_outgoing. push_back ( frame) ;
238-
239- break ;
226+ . or_insert_with ( YamuxStreamState :: incoming) ;
227+ stream. update_window ( false , * difference) ;
228+ if * difference > 0 {
229+ // have some fresh space in the window
230+ // try send as many frames as can
231+ let mut window = stream. window_theirs ;
232+ while let Some ( mut frame) = stream. pending . pop_front ( ) {
233+ let len = frame. len ( ) as u32 ;
234+ if let Some ( new_window) = window. checked_sub ( len) {
235+ pending_outgoing. push_back ( frame) ;
236+ window = new_window;
237+ } else {
238+ if let Some ( remaining) = frame. split_at ( ( len - window) as usize )
239+ {
240+ stream. pending . push_front ( remaining) ;
240241 }
242+ pending_outgoing. push_back ( frame) ;
243+
244+ break ;
241245 }
242246 }
243247 }
244- YamuxFrameInner :: Ping { .. } => { }
245- YamuxFrameInner :: GoAway ( res) => yamux_state. set_res ( res) ,
246248 }
249+ YamuxFrameInner :: Ping { .. } => { }
250+ YamuxFrameInner :: GoAway ( res) => yamux_state. set_res ( * res) ,
247251 }
248252
249253 let ( dispatcher, state) = state_context. into_dispatcher_and_state ( ) ;
0 commit comments