-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Rewrite ConnectionReuse #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
twittner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/connection_reuse.rs
Outdated
| fn reset(&self, addr: &Multiaddr) { | ||
| let mut conns = self.connections.lock(); | ||
| self.reset_conn(conns.remove(&addr)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset is never used.
core/src/connection_reuse.rs
Outdated
| { | ||
| /// Informs the Manager about a new inbound connection that has been | ||
| /// established, so it | ||
| fn new_inbound(&self, addr: Multiaddr, listener_id: usize, muxer: C::Output) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is addr not taken by reference?
core/src/connection_reuse.rs
Outdated
| /// the connection isn't establed and ready yet. | ||
| fn poll_outbound( | ||
| &self, | ||
| addr: Multiaddr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is addr not taken by reference?
core/src/connection_reuse.rs
Outdated
| // Struct shared between most of the `ConnectionReuse` infrastructure. | ||
| shared: Arc<Mutex<Shared<C::Output>>>, | ||
| /// Tasks to notify when one or more new elements were added to `connections`. | ||
| notify_on_new_connec: Mutex<Vec<(usize, task::Task)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a Vec O(n) why not a HashMap O(1)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were wondering about the performance of this, too so I made a bunch of test. They showed: The overhead of hashing is only cut of once the vec is actually quite large (>200) - which is super unlikely to happen for the notifications (as barely ever more than a handfull of tasks would want to be notified, and we drain it every time we do).
What makes HashMap even worse in this scenario, is that in practice we intend-to-insert much more often than we actually insert, and every time, we have to do the expensive hashing in the hashmap while we'd exit early in looping through the vector. In practice the vector has one or two entries, making it faster to loop than to hash (even with Fnv).
I was told these supposedly tiny performance differences matter, so I made the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think these benchmarks do the same thing. For instance bench_custom_hasher_hashmap_same does COUNT inserts, whereas bench_with_vec_same returns early, i.e. after MODULO inserts. Changing bench_with_vec_same to do (roughly) what the HashMap based benchmark does
#[bench]
fn bench_with_vec_same(b: &mut Bencher) {
b.iter(|| {
let n = test::black_box(COUNT);
let mut v: Vec<(Key, &'static str)> = vec![];
'outer: for idx in 0..n {
let id = idx % MODULO;
for i in 0..v.len() {
if v.get(i).unwrap().0 == id {
v[i] = (id, "value");
continue 'outer;
}
}
v.push((id, "value"));
};
});
}yields
running 2 tests
test bench_custom_hasher_hashmap ... bench: 8,800 ns/iter (+/- 376)
test bench_with_vec_same ... bench: 32,970 ns/iter (+/- 1,188)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twittner oh, yes, what a stupid mistake - I had an iterator in there first and never replaced that return. I have to run these tests again.
why are you doing a v[i] = (id, "value"); if you found it though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to mimick HashMap::insert which also updates existing entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that, that was a stupid bug. However, I inserted the Modulo only to emulate a scenario closer to our "many attempts little inserts" scenario. I only tried that because the worst-case-scenario (never matches, inserts every time) yielded good results for smaller sizes, which was unaffected by the bug. I fixed the code and running it now on small sets still yields significant performance improvemnets. On my system - take it with a grain of salt, flexible cpu-rate and all - the HashMap only shows benefit after 30+ items, which we'll probably not reach in practice:
# 500 Modulo 30
test bench_custom_hasher_hashmap_same ... bench: 5,626 ns/iter (+/- 608)
test bench_fnv_hashmap_same ... bench: 8,043 ns/iter (+/- 3,692)
test bench_fnv_hashset_same ... bench: 6,652 ns/iter (+/- 394)
test bench_fnv_hashset_uniques ... bench: 12,490 ns/iter (+/- 2,910)
test bench_with_sorted_vec_same ... bench: 11,130 ns/iter (+/- 457)
test bench_with_vec_same ... bench: 6,454 ns/iter (+/- 154)
test result: ok. 0 passed; 0 failed; 0 ignored; 6 measured; 0 filtered out
(the custom hasher shows unreliable results for me :( )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can not reproduce those results. bench_custom_hasher_hashmap_same is always faster or on par with bench_with_vec_same on my machine, so I would prefer to go with Big-O and use a HashMap.
# 100 Modulo 20
running 2 tests
test bench_custom_hasher_hashmap_same ... bench: 909 ns/iter (+/- 62)
test bench_with_vec_same ... bench: 1,026 ns/iter (+/- 59)
# 200 Modulo 20
running 2 tests
test bench_custom_hasher_hashmap_same ... bench: 1,782 ns/iter (+/- 74)
test bench_with_vec_same ... bench: 1,903 ns/iter (+/- 93)
# 2000 Modulo 20
running 2 tests
test bench_custom_hasher_hashmap_same ... bench: 17,063 ns/iter (+/- 366)
test bench_with_vec_same ... bench: 17,590 ns/iter (+/- 933)
# 1000 Modulo 100
running 2 tests
test bench_custom_hasher_hashmap_same ... bench: 9,126 ns/iter (+/- 448)
test bench_with_vec_same ... bench: 38,695 ns/iter (+/- 1,522)
# 500 Modulo 30
running 2 tests
test bench_custom_hasher_hashmap_same ... bench: 4,605 ns/iter (+/- 237)
test bench_with_vec_same ... bench: 7,246 ns/iter (+/- 471)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twittner and then use the custom hasher? Fine with me. I am not sure my results aren't influenced because of the automatic CPU-frequency-govener that I can't disable :( . Especially for the first tests this probably skewes the result as it will only ramp up CPU freq during the test... That the custom hasher (which does nothing) is supposedly magnitutes slower than the Fnv supports that thesis - because Fnv doesn't do anything else than we do...
I'll do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SmallVec should be faster because of cache locality.
core/src/connection_reuse.rs
Outdated
| /// Future that produces the muxer. | ||
| future: Box<Future<Item = (M, Multiaddr), Error = IoError>>, | ||
| /// All the tasks to notify when `future` resolves. | ||
| notify: Vec<(usize, task::Task)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a Vec O(n) why not a HashMap O(1)?
core/src/connection_reuse.rs
Outdated
| /// substreams | ||
| fn reset_conn(&self, state: Option<PeerState<C::Output>>) { | ||
| if let Some(PeerState::Active { closed, .. }) = state { | ||
| closed.store(false, Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think Relaxed is correct. When loading the value elsewhere there are no ordering guarantees, therefore this store may not become visible to subsequent loads.
core/src/connection_reuse.rs
Outdated
|
|
||
| // Nothing is ready. | ||
| Ok(Async::NotReady) | ||
| self.inner.read(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing whitespace.
| } | ||
| continue; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would have found a match easier to read.
core/src/connection_reuse.rs
Outdated
| } | ||
| Err(err) => Err(err), | ||
| Ok(Async::NotReady) => Ok(Async::NotReady) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This match could be simpified with try_ready!.
core/src/connection_reuse.rs
Outdated
| use tokio_io::{AsyncRead, AsyncWrite}; | ||
| use transport::{MuxedTransport, Transport, UpgradedNode}; | ||
| use upgrade::ConnectionUpgrade; | ||
| use std::clone::Clone; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary import.
|
@twittner yes, indeed, it intended to. However, it has/had a bug and a major drawback:
|
Removing a connection from the cache does not necessarily close or reset the connection. Depending on the |
Agreed. However, with |
abee6a9 to
af1760a
Compare
|
@twittner addressed your concerns. |
twittner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Would be nice to get a review by @tomaka as well.
tomaka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this code is still far too complicated.
I feel like the only difference is that before I was the only one to understand how it works, while now you are the only one to understand how it works.
That being said if you are confident that it works, then let's merge this, but we must simplify it in the future.
| use std::hash::{BuildHasherDefault, Hasher}; | ||
|
|
||
| #[derive(Default)] | ||
| struct UsizeHasher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use FnvHashMap, which afaik does this better, instead of defining our own hasher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance. See the conversation I had with @twittner about this. I am not sure what you mean with FnvHashMap doing "this better", FnvHasher does the same we do here: it defines its own hasher, using the Fnv-Algo. However as we already have pretty unique values, that Algo is a bit more expensive (~10-15% in this case) than us just converting the usize-Key into the required u64 type. We are doing the exact same as Fnv but with a less expensive "algorithm" (if you can call it that).
This algorithm however is very much constraint to the use case at hand and should probably not be re-used at many other places. Which is why I can understand the argument to go for a more generic solution - if we are willing to pay the performance price, I am fine with doing that, too.
core/src/connection_reuse.rs
Outdated
| /// Knows about all connections and their current state, allows one to poll for | ||
| /// incoming and outbound connections, while managing all state-transitions that | ||
| /// might occur automatically. | ||
| // #[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this comment. This struct doesnt' need to be cloned.
| /// substreams. Consume the PeerState while doing so. | ||
| fn reset_conn(self) { | ||
| if let PeerState::Active { closed, .. } = self { | ||
| closed.store(false, Ordering::Release); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of having Ordering::Release without any context, as the ordering to use strongly depends on the context.
Personally I'd just copy paste these three lines of code at the two places where we use them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally agree with you regarding copying three lines around. The reason I went for this being a constraint within a function rather than copied is that this isn't yet fully resetting the connection yet. So, I can see us quickly expanding what should happen here (by first adding some debugging statements, later doing some actual signaling/cleaning up of the related connections) and if you then had to do that at many code location, you easily miss one when doing changes and most reviewer(s) won't notice. If you insist, though, I don't have a strong opinion on this and can do a copy instead.
Regarding the Ordering:Release, I am not sure what else we would want to do here/at the specific locations. @twittner pointed out correctly, that Ordering:Relaxed might simply never gets propagated. I don't think that we need to expect a strong guarantee of ordering in this case, as it doesn't actually hurt if you read the remaining bytes of an already closed connection. The only important things here, to me is, that it will eventually fail and not dangle around for too long.
| active_connections: FnvHashMap<Multiaddr, M>, | ||
| /// Notify all `Tasks` in `self.notify_on_new_connec` that a new connection was established | ||
| /// consume the tasks in that process | ||
| fn new_con_notify(&self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented about this in private, but I'm really not a fan of locking a mutex in a function that is intended to be called from another function which can potentially lock a mutex as well.
This greatly increases the difficulty of the reviewer if you want to make sure that there is no deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented about this in private, but I'm really not a fan of locking a mutex in a function that is intended to be called from another function which can potentially lock a mutex as well.
While I agree on the general merit of this statement, I don't think this applies here: There are two distinct Mutexs, each one is clearly bound to corresponding function calls. I don't really see how it gives us any benefit if instead, we'd Mutex the entire ConnectionManager (what we had before) and therefore prevent anyone using the new_notify_registration (the only function actually used from the outside) until we are done with the respective poll_s. Even in that scenario, if task.1.notify would cause an immediate poll (which is the only side-effect function and we know it doesn't as this wake-up is only queued) we'd deadlock.
This greatly increases the difficulty of the reviewer if you want to make sure that there is no deadlock.
I totally agree with you that Mutexs make the review harder, as you need to make sure there isn't any side-effect code within the locked code that might cause a deadlock. But that isn't really caused by the number of Mutexs but rather how they are used and what code is executed while they are locked. Which is exactly why I try to keep the code between locks as constraint, short and side-effect-free as possible.
I am fine with moving this lock+drain on the respective position in the code base (as it is only used once) but I see little to no merit in putting the entire object into a mutex just for these two clearly independent locks... If you insist however, I can also do that.
core/src/connection_reuse.rs
Outdated
| fn clear_error(&self, addr: Multiaddr) -> bool { | ||
| let mut conns = self.connections.lock(); | ||
|
|
||
| if let Entry::Occupied(e) = conns.entry(addr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be get() instead of entry() so that clear_error can take addr by reference.
core/src/connection_reuse.rs
Outdated
| } | ||
| } | ||
|
|
||
| fn poll_for_substream<M>(mut outbound: M::OutboundSubstream, addr: &Multiaddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this function do?
I don't care if a function doesn't have documentation if it is trivial, but this function clearly is not.
|
@tomaka addressed your remarks. Simplified code, fixed or commented on. I am not fond of this (or anything) to be only understood by one person (no matter how many tests we have). I totally agree this is still super complicated (code-wise) and I am happy to discuss how we can simplify it further. Do you have any ideas? |
…dy; more traces for debugging; `iter().any()` is a more effecient approach rather than `iter().all()`
1758f48 to
ac9eacc
Compare
Extract code into submodules. |
|
Decided to try a totally different approach. Pausing. |
|
Could you fix the conflicts? |
|
I updated with the changes in master. |
tomaka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any code that closes the muxer when all the substreams have been closed. This has been a problem in the past, because the connection will eventually time out and we have no way to know.
I'm not really confident in merging this considering that I will likely have to debug and fix issues arising in substrate myself.
| } | ||
| let muxer = Arc::new(muxer); | ||
| match poll_for_substream(muxing::outbound_from_ref_and_wrap(muxer.clone()), &addr) { | ||
| Ok(None) => return Ok(Async::NotReady), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we successfully dial a remote but it's not ready to open a substream yet, this means we will poll the dial again the next time and probably panic.
|
|
||
| /// Polls the given outbound of addr for a new substream. Returns `Ok(Some(stream))` if available, | ||
| /// `Ok(None)` if the stream is not ready yet add an `Err` if the stream had failed. | ||
| fn poll_for_substream<O, S>(mut outbound: O, addr: &Multiaddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't that a Poll, so that we return Async::Ready and NotReady instead of respectively Some and None?
| <UpgradedNode<T, C> as Transport>::MultiaddrFuture: 'static, | ||
| { | ||
| /// Informs the Manager about a new inbound connection that has been | ||
| /// established, so it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it?
| task.notify() | ||
| } | ||
| } else { | ||
| old_state.map(|s| s.reset_conn()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very bad style in my opinion. You're mixing a "pulling" API (map) with a "pushing" action (reset_conn).
| "Using existing connection to {} to open outbound substream", | ||
| addr | ||
| ); | ||
| match poll_for_substream(muxing::outbound_from_ref_and_wrap(muxer.clone()), &addr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're creating a new outbound future, but we immediately throw it away if it's not ready? There's no way this can work for something less trivial than mplex.
| .. | ||
| } => { | ||
| if closed.load(Ordering::Acquire) { | ||
| (PeerState::Errored(IoError::new(IoErrorKind::BrokenPipe, "Connection closed")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we open a connection, then later close it, then later attempt to open again, we will forever trigger this condition, no?
If I'm not mistaken, we should attempt to dial again instead.
| }; | ||
| self.new_con_notify(); | ||
| // Althought we've just started in pending state, there are transports, which are | ||
| // immediately ready - namely 'memory' - so we need to poll our future right away |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You always need to poll immediately, otherwise the current task will not be waken up later when it needs to.
| } | ||
|
|
||
| for to_remove in to_remove { | ||
| conn.remove(&to_remove).map(|s| s.reset_conn()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really really really prefer if let Some over map
|
closing in favor of #446. |
based upon #355 but has a centralised
ConnectionManager, which encapsulates and manages all state transitions in a few central places and ensures everyone is notified as needed if those occur. Which means:PeerState-Enum whatsoeverArcnot theMutex, the Managers manages all lockingconnectionsandcountersconnectionswith less code and more explicit transitions, making it easier to reason about the codeCountersare increased properly (rather than relying on external parties to do that)Poisoned-State possible anymore (and removed alltogether)connection_id-system and consequently all referencing ofConnectionManagerin Streams, replace with sharedclosed-state to stop streams earlyreset_connectiononConnectionReuseSubstreamto allow a substream to state that a connection is to be considered closed (primarily for thePing-protocol) and doesn't continue producingPeerStateFurther more:
ConnectionManageralltogether, massivley reduxoingFnvHashMap<usize, task::Task>, with a fasterVec<(usize, task::Task)>