Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inner accessors to Send, SplitSink, and SplitSend combinators #682

Closed
wants to merge 1 commit into from
Closed

Add inner accessors to Send, SplitSink, and SplitSend combinators #682

wants to merge 1 commit into from

Conversation

YetAnotherMinion
Copy link
Contributor

These inner accessors are required to be able to implement shutdown on send failure on a split TcpStream over at the websockets crate (https://github.com/cyderize/rust-websocket/issues/155).

This is the code I would like to write, where I create a custom future that handles shutdown if the socket send fails. I cannot do this as of today because I cannot access inner sink of the SplitSink.

    pub struct ShutdownOnSendError {
        pub in_shutdown: bool,
        pub inner: Send<SplitSink<Framed<TcpStream, MessageCodec<OwnedMessage>>>>,
    }

    impl Future for ShutdownOnSendError {
        type Item=SplitSink<Framed<TcpStream, MessageCodec<OwnedMessage>>>;
        type Error=WebSocketError;

        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

           if !self.in_shutdown {
                match self.inner.poll() {
                    Ok(Async::Ready(stream)) => return Ok(Async::Ready(stream)),
                    Ok(Async::NotReady) => return Ok(Async::NotReady),
                    // NOTE I just log the original error for demonstration purposes, but you probably want to handle the different send errors.
                    Err(e) => {
                        println!("send error: {:?}", e);
                        self.in_shutdown = true;
                    }
                }
            }

            if self.in_shutdown {
                println!("SHUTDOWN");
                match self.inner.get_mut().inner().poll_lock() {
                    Async::Ready(mut framed) => {
                        let tcp_stream: &mut TcpStream = framed.get_mut();
                        //println!("calling shutdown on socket");
                        match tcp_stream.shutdown(::std::net::Shutdown::Both) {
                            Ok(()) => (),
                            Err(error) => return Err(WebSocketError::IoError(error))
                        }
                    },
                    Async::NotReady => return Ok(Async::NotReady),
                }
                println!("Shutdown successfully");
                return Ok(Async::Ready(self.inner.into_inner()));
            }
            Ok(Async::NotReady)
        }
    }

src/sink/send.rs Outdated
///
/// Note that this will discard the item being sent if this Future has not completed, so care
/// should be taken to avoid losing resources when this is called.
pub fn into_inner(&mut self) -> S {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little wary about this because like the previous PR this probably wants to return an option, but here it's particularly tricky due to ownership. Could this be left out for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just included it for completeness, I don't actually need it because in the shutdown code I write.

@@ -16,6 +16,21 @@ impl<S> SplitStream<S> {
pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
other.reunite(self)
}

/// Get a shared reference to the inner locked stream.
pub fn get_ref(&self) -> &BiLock<S> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I'm not sure if this is what we want to expose here (the implementation detail of a BiLock), perhaps we could just have a get_ref for S itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see it being usable if you were fine with having get_ref(&self) -> Async<BiLockGuard<S>>, or even Option<Async<BiLockGuard<S>>> if you want to keep the optional. The code that needs a ref to the locked stream is most likely going to be a future itself, so it will be fine with polling get_ref to actually obtain the ref. However now it looks like the distinction between mut and shared reference are gone. I could maybe create a wrapper struct for each case that only implements Deref and DerefMut respectively, but that would look even stranger.

In my example use case above where I get extract the stream, it does not really matter where poll_lock gets called, if my code calls it or it is called inside the SplitSink impl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm but my point here is that we don't actually want to expose the implementation details of BiLock, which I think may force us to not be able to implement these functions for now?

Copy link
Contributor Author

@YetAnotherMinion YetAnotherMinion Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be willing to give me a more detailed reasoning behind not exposing BitLock?

I can understand that you want to use different synchronization primitives in the future, and that exposing BiLock would force a major version bump whenever the synchronization system is changed. I understand the business cost of forcing every library user to change their code.

However, I generally assume changes in a library are improvements. Taking advantage of future improvements is the reason why I submit this patch in the first place. My code that generates business value already works by just including only the patched modules into a dirty-hacks-futures crate that is private to my company only. However I gamble that by giving the fixes to upstream, then if someone comes along later and improves the design, then I can pick up those improvements when upgrading libraries, which will lead to even more profits. The cost of moving from BiLock to some other -- more efficient -- synchronization primitive (or even lock free!) would be paid for by the improvement.

Given that BiLock is part of futures, and appears to be written specifically for SplitSink, SplitStream, it really looks like they are already interlinked.

/// A BiLock is typically used for "split" operations where data which serves
/// two purposes wants to be split into two to be worked with separately. For
/// example a TCP stream could be both a reader and a writer or a framing layer
/// could be both a stream and a sink for messages. A BiLock enables splitting
/// these two and then using each independently in a futures-powered fashion.

The only way to create a SplitSink, SplitStream is from the Stream trait split function in the first place. I am not sure that the existing public API will allow a lock free change to the implementation of SplitSink, SplitStream. The really generic signature of the public split method does not give you a lot of wriggle room to introduce lock free behavior. You would have to change Sink and Stream definitions, which is major version change itself.

fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
        where Self: super::sink::Sink + Sized

If you are planning to use a different, perhaps optimized synchronization primitive for the split, then perhaps a type alias could be used to satisfy your desire to change what kind of locking is used under the covers. The actual lock implementation can then be varied to your hearts content.

// src/stream/split.rs
pub struct ReadWriteLock<S>(BiLock<S>);
/// ....
pub fn get_ref(&self) -> Option<&ReadWriteLock<S>> {

In summary, I do not think you can change the behavior of the splitting a Sink+Stream without wide reaching changes to already public interfaces. If that assumption is true, then exposing BiLock to the public API will not add any additional constraints to versioning. By exposing BiLock, you gain business value by saving all other users from having to implement their own internal versions of dirty-hack-futures. Therefore I recommend exposing BiLock in the signature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm with @alexcrichton on this. BiLock is an implementation detail here. If you need access to it, I would recommend vendoring split into your own codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, thanks for the reply.

@YetAnotherMinion
Copy link
Contributor Author

Looks like rustup failed to download the nightly binary, which caused the build failure.
https://travis-ci.org/alexcrichton/futures-rs/jobs/318934035

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants