From c3b71034129b17fefc7ff48dd42c88031fe739f6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 23 Jul 2018 14:50:06 +0200 Subject: [PATCH 1/9] Fixes to the mplex implementation --- mplex/Cargo.toml | 1 + mplex/src/lib.rs | 65 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/mplex/Cargo.toml b/mplex/Cargo.toml index 97911a016d5..20bcfdf0549 100644 --- a/mplex/Cargo.toml +++ b/mplex/Cargo.toml @@ -9,6 +9,7 @@ bytes = "0.4.5" fnv = "1.0" futures = "0.1" libp2p-core = { path = "../core" } +log = "0.4" parking_lot = "0.4.8" tokio-codec = "0.1" tokio-io = "0.1" diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index c8acaf49af0..405aa8736f8 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -23,6 +23,8 @@ extern crate fnv; #[macro_use] extern crate futures; extern crate libp2p_core as core; +#[macro_use] +extern crate log; extern crate parking_lot; extern crate tokio_codec; extern crate tokio_io; @@ -112,7 +114,7 @@ struct MultiplexInner { // Buffer of elements pulled from the stream but not processed yet. buffer: Vec, // List of Ids of opened substreams. Used to filter out messages that don't belong to any - // substream. + // substream. Note that this is handled exclusively by `next_match`. opened_substreams: FnvHashSet, // Id of the next outgoing substream. Should always increase by two. next_outbound_stream_id: u32, @@ -120,10 +122,10 @@ struct MultiplexInner { to_notify: Vec, } -// Processes elements in `inner` until one matching `filter` is found. -// -// If `NotReady` is returned, the current task is scheduled for later, just like with any `Poll`. -// `Ready(Some())` is almost always returned. `Ready(None)` is returned if the stream is EOF. +/// Processes elements in `inner` until one matching `filter` is found. +/// +/// If `NotReady` is returned, the current task is scheduled for later, just like with any `Poll`. +/// `Ready(Some())` is almost always returned. `Ready(None)` is returned if the stream is EOF. fn next_match(inner: &mut MultiplexInner, mut filter: F) -> Poll, IoError> where C: AsyncRead + AsyncWrite, F: FnMut(&codec::Elem) -> Option, @@ -146,11 +148,25 @@ where C: AsyncRead + AsyncWrite, }; if let Some(elem) = elem { + trace!("Received message: {:?}", elem); + + // Handle substreams opening/closing. + match elem { + codec::Elem::Open { substream_id } => { // TODO: check even/uneven? + inner.opened_substreams.insert(substream_id); + }, + codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { + inner.opened_substreams.remove(&substream_id); + }, + _ => () + } + if let Some(out) = filter(&elem) { return Ok(Async::Ready(Some(out))); } else { if inner.buffer.len() >= MAX_BUFFER_LEN { - return Err(IoError::new(IoErrorKind::InvalidData, "reached maximum buffer length")); + debug!("Reached mplex maximum buffer length"); + return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); } if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { @@ -158,6 +174,8 @@ where C: AsyncRead + AsyncWrite, for task in inner.to_notify.drain(..) { task.notify(); } + } else { + debug!("Ignored message because the substream wasn't open"); } } } else { @@ -166,13 +184,6 @@ where C: AsyncRead + AsyncWrite, } } -// Closes a substream in `inner`. -fn clean_out_substream(inner: &mut MultiplexInner, num: u32) { - let was_in = inner.opened_substreams.remove(&num); - debug_assert!(was_in, "Dropped substream which wasn't open ; programmer error"); - inner.buffer.retain(|elem| elem.substream_id() != num); -} - // Small convenience function that tries to write `elem` to the stream. fn poll_send(inner: &mut MultiplexInner, elem: codec::Elem) -> Poll<(), IoError> where C: AsyncRead + AsyncWrite @@ -214,7 +225,10 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ // We use an RAII guard, so that we close the substream in case of an error. struct OpenedSubstreamGuard(Arc>>, u32); impl Drop for OpenedSubstreamGuard { - fn drop(&mut self) { clean_out_substream(&mut self.0.lock(), self.1); } + fn drop(&mut self) { + debug!("Failed to open outbound substream {}", self.1); + self.0.lock().buffer.retain(|elem| elem.substream_id() != self.1); + } } inner.opened_substreams.insert(substream_id); let guard = OpenedSubstreamGuard(self.inner.clone(), substream_id); @@ -233,6 +247,7 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ future::poll_fn(move || inner.lock().inner.poll_complete()) } }).map({ + debug!("Successfully opened outbound substream {}", substream_id); let inner = self.inner.clone(); move |()| { mem::forget(guard); @@ -265,19 +280,20 @@ where C: AsyncRead + AsyncWrite let mut inner = self.inner.lock(); if inner.opened_substreams.len() >= MAX_SUBSTREAMS { + debug!("Refused substream ; reached maximum number of substreams {}", MAX_SUBSTREAMS); return Err(IoError::new(IoErrorKind::ConnectionRefused, "exceeded maximum number of open substreams")); } let num = try_ready!(next_match(&mut inner, |elem| { match elem { - codec::Elem::Open { substream_id } => Some(*substream_id), // TODO: check even/uneven? + codec::Elem::Open { substream_id } => Some(*substream_id), _ => None, } })); if let Some(num) = num { - inner.opened_substreams.insert(num); + debug!("Successfully opened inbound substream {}", num); Ok(Async::Ready(Some(Substream { inner: self.inner.clone(), current_data: Bytes::new(), @@ -306,13 +322,14 @@ where C: AsyncRead + AsyncWrite { fn read(&mut self, buf: &mut [u8]) -> Result { loop { - // First transfer from `current_data`. + // First, transfer from `current_data`. if self.current_data.len() != 0 { let len = cmp::min(self.current_data.len(), buf.len()); buf[..len].copy_from_slice(&self.current_data.split_to(len)); return Ok(len); } + // Try to find a packet of data in the buffer. let mut inner = self.inner.lock(); let next_data_poll = next_match(&mut inner, |elem| { match elem { @@ -328,7 +345,15 @@ where C: AsyncRead + AsyncWrite match next_data_poll { Ok(Async::Ready(Some(data))) => self.current_data = data.freeze(), Ok(Async::Ready(None)) => return Ok(0), - Ok(Async::NotReady) => return Err(IoErrorKind::WouldBlock.into()), + Ok(Async::NotReady) => { + // There was no data packet in the buffer about this substream ; maybe it's + // because it has been closed. + if inner.opened_substreams.contains(&self.num) { + return Err(IoErrorKind::WouldBlock.into()); + } else { + return Ok(0); + } + }, Err(err) => return Err(err), } } @@ -386,7 +411,7 @@ impl Drop for Substream where C: AsyncRead + AsyncWrite { fn drop(&mut self) { - let _ = self.shutdown(); - clean_out_substream(&mut self.inner.lock(), self.num); + let _ = self.shutdown(); // TODO: this doesn't necessarily send the close message + self.inner.lock().buffer.retain(|elem| elem.substream_id() != self.num); } } From 7cc95d4b7ecaceb69acb0a65c8b364c46c26e30e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 28 Jul 2018 10:52:10 +0200 Subject: [PATCH 2/9] Fix mem leak and wrong logging message --- mplex/src/lib.rs | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 405aa8736f8..34105a18f70 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -34,7 +34,6 @@ mod codec; use std::{cmp, iter}; use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; -use std::mem; use std::sync::Arc; use bytes::Bytes; use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; @@ -223,15 +222,17 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ }; // We use an RAII guard, so that we close the substream in case of an error. - struct OpenedSubstreamGuard(Arc>>, u32); + struct OpenedSubstreamGuard(Option>>>, u32); impl Drop for OpenedSubstreamGuard { fn drop(&mut self) { - debug!("Failed to open outbound substream {}", self.1); - self.0.lock().buffer.retain(|elem| elem.substream_id() != self.1); + if let Some(inner) = self.0.take() { + debug!("Failed to open outbound substream {}", self.1); + inner.lock().buffer.retain(|elem| elem.substream_id() != self.1); + } } } inner.opened_substreams.insert(substream_id); - let guard = OpenedSubstreamGuard(self.inner.clone(), substream_id); + let mut guard = OpenedSubstreamGuard(Some(self.inner.clone()), substream_id); // We send `Open { substream_id }`, then flush, then only produce the substream. let future = { @@ -246,18 +247,14 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ move |()| { future::poll_fn(move || inner.lock().inner.poll_complete()) } - }).map({ + }).map(move |()| { debug!("Successfully opened outbound substream {}", substream_id); - let inner = self.inner.clone(); - move |()| { - mem::forget(guard); - Some(Substream { - inner: inner.clone(), - num: substream_id, - current_data: Bytes::new(), - endpoint: Endpoint::Dialer, - }) - } + Some(Substream { + inner: guard.0.take().unwrap(), + num: substream_id, + current_data: Bytes::new(), + endpoint: Endpoint::Dialer, + }) }) }; From 236e4ea33fa01e2a7e1d0c1c18a98c9625b72738 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 28 Jul 2018 14:36:57 +0200 Subject: [PATCH 3/9] Fix potential leak --- mplex/src/lib.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 34105a18f70..0b052e3ddea 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -34,11 +34,11 @@ mod codec; use std::{cmp, iter}; use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::Arc; +use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use bytes::Bytes; use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; use parking_lot::Mutex; -use fnv::FnvHashSet; +use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; use futures::{future, stream::Fuse, task}; use tokio_codec::Framed; @@ -79,7 +79,7 @@ where buffer: Vec::with_capacity(32), opened_substreams: Default::default(), next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, - to_notify: Vec::new(), + to_notify: Default::default(), })) }; @@ -118,7 +118,7 @@ struct MultiplexInner { // Id of the next outgoing substream. Should always increase by two. next_outbound_stream_id: u32, // List of tasks to notify when a new element is inserted in `buffer`. - to_notify: Vec, + to_notify: FnvHashMap, } /// Processes elements in `inner` until one matching `filter` is found. @@ -138,7 +138,11 @@ where C: AsyncRead + AsyncWrite, let elem = match inner.inner.poll() { Ok(Async::Ready(item)) => item, Ok(Async::NotReady) => { - inner.to_notify.push(task::current()); + static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); + task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) + } + inner.to_notify.insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady); }, Err(err) => { @@ -170,8 +174,8 @@ where C: AsyncRead + AsyncWrite, if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { inner.buffer.push(elem); - for task in inner.to_notify.drain(..) { - task.notify(); + for task in inner.to_notify.drain() { + task.1.notify(); } } else { debug!("Ignored message because the substream wasn't open"); From e61a5b91caa67fe8a59f3e33e18ad97d151bac51 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 28 Jul 2018 18:53:16 +0200 Subject: [PATCH 4/9] Immediately return an error if an error happened --- mplex/src/lib.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 0b052e3ddea..4b9313305f4 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -47,7 +47,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; // Maximum number of simultaneously-open substreams. const MAX_SUBSTREAMS: usize = 1024; // Maximum number of elements in the internal buffer. -const MAX_BUFFER_LEN: usize = 256; +const MAX_BUFFER_LEN: usize = 1024; /// Configuration for the multiplexer. #[derive(Debug, Clone, Default)] @@ -75,6 +75,7 @@ where fn upgrade(self, i: C, _: (), endpoint: Endpoint, remote_addr: Maf) -> Self::Future { let out = Multiplex { inner: Arc::new(Mutex::new(MultiplexInner { + error: Ok(()), inner: Framed::new(i, codec::Codec::new()).fuse(), buffer: Vec::with_capacity(32), opened_substreams: Default::default(), @@ -108,6 +109,8 @@ impl Clone for Multiplex { // Struct shared throughout the implementation. struct MultiplexInner { + // Errored that happend earlier. Should poison any attempt to use this `MultiplexError`. + error: Result<(), IoError>, // Underlying stream. inner: Fuse>, // Buffer of elements pulled from the stream but not processed yet. @@ -117,7 +120,7 @@ struct MultiplexInner { opened_substreams: FnvHashSet, // Id of the next outgoing substream. Should always increase by two. next_outbound_stream_id: u32, - // List of tasks to notify when a new element is inserted in `buffer`. + // List of tasks to notify when a new element is inserted in `buffer` or an error happens. to_notify: FnvHashMap, } @@ -129,6 +132,12 @@ fn next_match(inner: &mut MultiplexInner, mut filter: F) -> Poll Option, { + // If an error happened earlier, immediately return it. + match inner.error { + Ok(()) => (), + Err(ref err) => return Err(IoError::new(err.kind(), err.to_string())), + }; + if let Some((offset, out)) = inner.buffer.iter().enumerate().filter_map(|(n, v)| filter(v).map(|v| (n, v))).next() { inner.buffer.remove(offset); return Ok(Async::Ready(Some(out))); @@ -146,7 +155,12 @@ where C: AsyncRead + AsyncWrite, return Ok(Async::NotReady); }, Err(err) => { - return Err(err); + let err2 = IoError::new(err.kind(), err.to_string()); + inner.error = Err(err); + for task in inner.to_notify.drain() { + task.1.notify(); + } + return Err(err2); }, }; @@ -169,6 +183,10 @@ where C: AsyncRead + AsyncWrite, } else { if inner.buffer.len() >= MAX_BUFFER_LEN { debug!("Reached mplex maximum buffer length"); + inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); + for task in inner.to_notify.drain() { + task.1.notify(); + } return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); } From d4c39513bfdb1fbfe29ec29eb35ab47e50ce1624 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 31 Jul 2018 13:38:19 +0200 Subject: [PATCH 5/9] Correctly handle Close and Reset --- mplex/src/codec.rs | 11 +++++++++++ mplex/src/lib.rs | 6 +++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/mplex/src/codec.rs b/mplex/src/codec.rs index c9adf3e4dbc..c2caffab953 100644 --- a/mplex/src/codec.rs +++ b/mplex/src/codec.rs @@ -50,6 +50,17 @@ impl Elem { } } + /// Returns true if this message is `Close` or `Reset`. + #[inline] + pub fn is_close_or_reset_msg(&self) -> bool { + match self { + Elem::Close { .. } | Elem::Reset { .. } => true, + _ => false, + } + } + + /// Returns true if this message is `Open`. + #[inline] pub fn is_open_msg(&self) -> bool { if let Elem::Open { .. } = self { true diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 4b9313305f4..4f297e0ea0b 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -195,8 +195,8 @@ where C: AsyncRead + AsyncWrite, for task in inner.to_notify.drain() { task.1.notify(); } - } else { - debug!("Ignored message because the substream wasn't open"); + } else if !elem.is_close_or_reset_msg() { + debug!("Ignored message {:?} because the substream wasn't open", elem); } } } else { @@ -416,7 +416,7 @@ impl AsyncWrite for Substream where C: AsyncRead + AsyncWrite { fn shutdown(&mut self) -> Poll<(), IoError> { - let elem = codec::Elem::Close { + let elem = codec::Elem::Reset { substream_id: self.num, endpoint: self.endpoint, }; From 68529ed6d9ebf01f00c78fd4a0efde2ff6868083 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 31 Jul 2018 13:42:58 +0200 Subject: [PATCH 6/9] Check the even-ness of the substream id --- mplex/src/lib.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 4f297e0ea0b..48192939bc8 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -169,8 +169,18 @@ where C: AsyncRead + AsyncWrite, // Handle substreams opening/closing. match elem { - codec::Elem::Open { substream_id } => { // TODO: check even/uneven? - inner.opened_substreams.insert(substream_id); + codec::Elem::Open { substream_id } => { + if (substream_id % 2) == (inner.next_outbound_stream_id % 2) { + inner.error = Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); + for task in inner.to_notify.drain() { + task.1.notify(); + } + return Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); + } + + if !inner.opened_substreams.insert(substream_id) { + debug!("Received open message for substream {} which was already open", substream_id) + } }, codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { inner.opened_substreams.remove(&substream_id); From aae820e3f98399d85747342afcfd7957cf3d1ed0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 2 Aug 2018 13:39:11 +0200 Subject: [PATCH 7/9] Fix concerns --- mplex/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 48192939bc8..a899cfc9965 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -133,10 +133,9 @@ where C: AsyncRead + AsyncWrite, F: FnMut(&codec::Elem) -> Option, { // If an error happened earlier, immediately return it. - match inner.error { - Ok(()) => (), - Err(ref err) => return Err(IoError::new(err.kind(), err.to_string())), - }; + if let Err(ref err) = inner.error { + return Err(IoError::new(err.kind(), err.to_string())); + } if let Some((offset, out)) = inner.buffer.iter().enumerate().filter_map(|(n, v)| filter(v).map(|v| (n, v))).next() { inner.buffer.remove(offset); From 06ebefda27a6f69060fd997ee44953087a236829 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 Aug 2018 15:40:05 +0200 Subject: [PATCH 8/9] Fix tasks notification system --- mplex/src/lib.rs | 121 ++++++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index a899cfc9965..bb4781b8aaf 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -32,7 +32,7 @@ extern crate varint; mod codec; -use std::{cmp, iter}; +use std::{cmp, iter, mem}; use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use bytes::Bytes; @@ -40,7 +40,7 @@ use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; use parking_lot::Mutex; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; -use futures::{future, stream::Fuse, task}; +use futures::{future, stream::Fuse, executor, task}; use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; @@ -76,11 +76,13 @@ where let out = Multiplex { inner: Arc::new(Mutex::new(MultiplexInner { error: Ok(()), - inner: Framed::new(i, codec::Codec::new()).fuse(), + inner: executor::spawn(Framed::new(i, codec::Codec::new()).fuse()), buffer: Vec::with_capacity(32), opened_substreams: Default::default(), next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, - to_notify: Default::default(), + notifier: Arc::new(Notifier { + to_notify: Mutex::new(Default::default()), + }), })) }; @@ -112,7 +114,7 @@ struct MultiplexInner { // Errored that happend earlier. Should poison any attempt to use this `MultiplexError`. error: Result<(), IoError>, // Underlying stream. - inner: Fuse>, + inner: executor::Spawn>>, // Buffer of elements pulled from the stream but not processed yet. buffer: Vec, // List of Ids of opened substreams. Used to filter out messages that don't belong to any @@ -121,7 +123,21 @@ struct MultiplexInner { // Id of the next outgoing substream. Should always increase by two. next_outbound_stream_id: u32, // List of tasks to notify when a new element is inserted in `buffer` or an error happens. - to_notify: FnvHashMap, + notifier: Arc, +} + +struct Notifier { + // List of tasks to notify when a new element is inserted in `buffer` or an error happens. + to_notify: Mutex>, +} + +impl executor::Notify for Notifier { + fn notify(&self, _: usize) { + let tasks = mem::replace(&mut *self.to_notify.lock(), Default::default()); + for (_, task) in tasks { + task.notify(); + } + } } /// Processes elements in `inner` until one matching `filter` is found. @@ -143,73 +159,63 @@ where C: AsyncRead + AsyncWrite, } loop { - let elem = match inner.inner.poll() { + let elem = match inner.inner.poll_stream_notify(&inner.notifier, 0) { Ok(Async::Ready(item)) => item, Ok(Async::NotReady) => { static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); task_local!{ static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) } - inner.to_notify.insert(TASK_ID.with(|&t| t), task::current()); + inner.notifier.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady); }, Err(err) => { let err2 = IoError::new(err.kind(), err.to_string()); inner.error = Err(err); - for task in inner.to_notify.drain() { - task.1.notify(); - } return Err(err2); }, }; - if let Some(elem) = elem { - trace!("Received message: {:?}", elem); - - // Handle substreams opening/closing. - match elem { - codec::Elem::Open { substream_id } => { - if (substream_id % 2) == (inner.next_outbound_stream_id % 2) { - inner.error = Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); - for task in inner.to_notify.drain() { - task.1.notify(); - } - return Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); - } + let elem = match elem { + Some(e) => e, + // EOF + None => return Ok(Async::Ready(None)), + }; - if !inner.opened_substreams.insert(substream_id) { - debug!("Received open message for substream {} which was already open", substream_id) - } - }, - codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { - inner.opened_substreams.remove(&substream_id); - }, - _ => () - } + trace!("Received message: {:?}", elem); - if let Some(out) = filter(&elem) { - return Ok(Async::Ready(Some(out))); - } else { - if inner.buffer.len() >= MAX_BUFFER_LEN { - debug!("Reached mplex maximum buffer length"); - inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); - for task in inner.to_notify.drain() { - task.1.notify(); - } - return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); + // Handle substreams opening/closing. + match elem { + codec::Elem::Open { substream_id } => { + if (substream_id % 2) == (inner.next_outbound_stream_id % 2) { + inner.error = Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); + return Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); } - if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { - inner.buffer.push(elem); - for task in inner.to_notify.drain() { - task.1.notify(); - } - } else if !elem.is_close_or_reset_msg() { - debug!("Ignored message {:?} because the substream wasn't open", elem); + if !inner.opened_substreams.insert(substream_id) { + debug!("Received open message for substream {} which was already open", substream_id) } - } + }, + codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { + inner.opened_substreams.remove(&substream_id); + }, + _ => () + } + + if let Some(out) = filter(&elem) { + return Ok(Async::Ready(Some(out))); } else { - return Ok(Async::Ready(None)); + if inner.buffer.len() >= MAX_BUFFER_LEN { + debug!("Reached mplex maximum buffer length"); + inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); + return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); + } + + if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { + inner.buffer.push(elem); + } else if !elem.is_close_or_reset_msg() { + debug!("Ignored message {:?} because the substream wasn't open", elem); + } } } } @@ -218,7 +224,7 @@ where C: AsyncRead + AsyncWrite, fn poll_send(inner: &mut MultiplexInner, elem: codec::Elem) -> Poll<(), IoError> where C: AsyncRead + AsyncWrite { - match inner.inner.start_send(elem) { + match inner.inner.start_send_notify(elem, &inner.notifier, 0) { Ok(AsyncSink::Ready) => { Ok(Async::Ready(())) }, @@ -276,7 +282,11 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ }).and_then({ let inner = self.inner.clone(); move |()| { - future::poll_fn(move || inner.lock().inner.poll_complete()) + future::poll_fn(move || { + let mut inner = inner.lock(); + let inner = &mut *inner; + inner.inner.poll_flush_notify(&inner.notifier, 0) + }) } }).map(move |()| { debug!("Successfully opened outbound substream {}", substream_id); @@ -413,7 +423,8 @@ where C: AsyncRead + AsyncWrite fn flush(&mut self) -> Result<(), IoError> { let mut inner = self.inner.lock(); - match inner.inner.poll_complete() { + let inner = &mut *inner; + match inner.inner.poll_flush_notify(&inner.notifier, 0) { Ok(Async::Ready(())) => Ok(()), Ok(Async::NotReady) => Err(IoErrorKind::WouldBlock.into()), Err(err) => Err(err), From e82aa7fde18e88cbea0ae709beff4dd39cb3b426 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 13 Aug 2018 11:07:49 +0200 Subject: [PATCH 9/9] Revert "Fix tasks notification system" This reverts commit 06ebefda27a6f69060fd997ee44953087a236829. --- mplex/src/lib.rs | 121 +++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 66 deletions(-) diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index bb4781b8aaf..a899cfc9965 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -32,7 +32,7 @@ extern crate varint; mod codec; -use std::{cmp, iter, mem}; +use std::{cmp, iter}; use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use bytes::Bytes; @@ -40,7 +40,7 @@ use core::{ConnectionUpgrade, Endpoint, StreamMuxer}; use parking_lot::Mutex; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; -use futures::{future, stream::Fuse, executor, task}; +use futures::{future, stream::Fuse, task}; use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; @@ -76,13 +76,11 @@ where let out = Multiplex { inner: Arc::new(Mutex::new(MultiplexInner { error: Ok(()), - inner: executor::spawn(Framed::new(i, codec::Codec::new()).fuse()), + inner: Framed::new(i, codec::Codec::new()).fuse(), buffer: Vec::with_capacity(32), opened_substreams: Default::default(), next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, - notifier: Arc::new(Notifier { - to_notify: Mutex::new(Default::default()), - }), + to_notify: Default::default(), })) }; @@ -114,7 +112,7 @@ struct MultiplexInner { // Errored that happend earlier. Should poison any attempt to use this `MultiplexError`. error: Result<(), IoError>, // Underlying stream. - inner: executor::Spawn>>, + inner: Fuse>, // Buffer of elements pulled from the stream but not processed yet. buffer: Vec, // List of Ids of opened substreams. Used to filter out messages that don't belong to any @@ -123,21 +121,7 @@ struct MultiplexInner { // Id of the next outgoing substream. Should always increase by two. next_outbound_stream_id: u32, // List of tasks to notify when a new element is inserted in `buffer` or an error happens. - notifier: Arc, -} - -struct Notifier { - // List of tasks to notify when a new element is inserted in `buffer` or an error happens. - to_notify: Mutex>, -} - -impl executor::Notify for Notifier { - fn notify(&self, _: usize) { - let tasks = mem::replace(&mut *self.to_notify.lock(), Default::default()); - for (_, task) in tasks { - task.notify(); - } - } + to_notify: FnvHashMap, } /// Processes elements in `inner` until one matching `filter` is found. @@ -159,63 +143,73 @@ where C: AsyncRead + AsyncWrite, } loop { - let elem = match inner.inner.poll_stream_notify(&inner.notifier, 0) { + let elem = match inner.inner.poll() { Ok(Async::Ready(item)) => item, Ok(Async::NotReady) => { static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); task_local!{ static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) } - inner.notifier.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); + inner.to_notify.insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady); }, Err(err) => { let err2 = IoError::new(err.kind(), err.to_string()); inner.error = Err(err); + for task in inner.to_notify.drain() { + task.1.notify(); + } return Err(err2); }, }; - let elem = match elem { - Some(e) => e, - // EOF - None => return Ok(Async::Ready(None)), - }; + if let Some(elem) = elem { + trace!("Received message: {:?}", elem); - trace!("Received message: {:?}", elem); + // Handle substreams opening/closing. + match elem { + codec::Elem::Open { substream_id } => { + if (substream_id % 2) == (inner.next_outbound_stream_id % 2) { + inner.error = Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); + for task in inner.to_notify.drain() { + task.1.notify(); + } + return Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); + } - // Handle substreams opening/closing. - match elem { - codec::Elem::Open { substream_id } => { - if (substream_id % 2) == (inner.next_outbound_stream_id % 2) { - inner.error = Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); - return Err(IoError::new(IoErrorKind::Other, "invalid substream id opened")); - } + if !inner.opened_substreams.insert(substream_id) { + debug!("Received open message for substream {} which was already open", substream_id) + } + }, + codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { + inner.opened_substreams.remove(&substream_id); + }, + _ => () + } - if !inner.opened_substreams.insert(substream_id) { - debug!("Received open message for substream {} which was already open", substream_id) + if let Some(out) = filter(&elem) { + return Ok(Async::Ready(Some(out))); + } else { + if inner.buffer.len() >= MAX_BUFFER_LEN { + debug!("Reached mplex maximum buffer length"); + inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); + for task in inner.to_notify.drain() { + task.1.notify(); + } + return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); } - }, - codec::Elem::Close { substream_id, .. } | codec::Elem::Reset { substream_id, .. } => { - inner.opened_substreams.remove(&substream_id); - }, - _ => () - } - if let Some(out) = filter(&elem) { - return Ok(Async::Ready(Some(out))); - } else { - if inner.buffer.len() >= MAX_BUFFER_LEN { - debug!("Reached mplex maximum buffer length"); - inner.error = Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); - return Err(IoError::new(IoErrorKind::Other, "reached maximum buffer length")); - } - - if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { - inner.buffer.push(elem); - } else if !elem.is_close_or_reset_msg() { - debug!("Ignored message {:?} because the substream wasn't open", elem); + if inner.opened_substreams.contains(&elem.substream_id()) || elem.is_open_msg() { + inner.buffer.push(elem); + for task in inner.to_notify.drain() { + task.1.notify(); + } + } else if !elem.is_close_or_reset_msg() { + debug!("Ignored message {:?} because the substream wasn't open", elem); + } } + } else { + return Ok(Async::Ready(None)); } } } @@ -224,7 +218,7 @@ where C: AsyncRead + AsyncWrite, fn poll_send(inner: &mut MultiplexInner, elem: codec::Elem) -> Poll<(), IoError> where C: AsyncRead + AsyncWrite { - match inner.inner.start_send_notify(elem, &inner.notifier, 0) { + match inner.inner.start_send(elem) { Ok(AsyncSink::Ready) => { Ok(Async::Ready(())) }, @@ -282,11 +276,7 @@ where C: AsyncRead + AsyncWrite + 'static // TODO: 'static :-/ }).and_then({ let inner = self.inner.clone(); move |()| { - future::poll_fn(move || { - let mut inner = inner.lock(); - let inner = &mut *inner; - inner.inner.poll_flush_notify(&inner.notifier, 0) - }) + future::poll_fn(move || inner.lock().inner.poll_complete()) } }).map(move |()| { debug!("Successfully opened outbound substream {}", substream_id); @@ -423,8 +413,7 @@ where C: AsyncRead + AsyncWrite fn flush(&mut self) -> Result<(), IoError> { let mut inner = self.inner.lock(); - let inner = &mut *inner; - match inner.inner.poll_flush_notify(&inner.notifier, 0) { + match inner.inner.poll_complete() { Ok(Async::Ready(())) => Ok(()), Ok(Async::NotReady) => Err(IoErrorKind::WouldBlock.into()), Err(err) => Err(err),