Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharing socket between threads / non-blocking read #113

Closed
wbrickner opened this issue Mar 23, 2020 · 20 comments
Closed

Sharing socket between threads / non-blocking read #113

wbrickner opened this issue Mar 23, 2020 · 20 comments
Labels

Comments

@wbrickner
Copy link

Hello, I'm building a service which demands minimal latency.

I need to be able to send a message as fast as possible (latency wise), but still engage in potential ping/pong interactions.

My problem is that read_message seems to block until a message arrives, which won't do.

I was thinking I could access the underlying stream, split it, then have two threads, one which blocks while waiting for new messages and then handles them, and the other which writes whenever it needs to according to its own independent logic.

Is this possible? I've heard about using mio to make some of the components async, I saw a set_nonblocking method mentioned in another issue regarding the blocking nature of read_message. I'm overall a bit confused, and can't find an example of how I would achieve an async read (or something equivalent) using mio.

Thanks so much!

@agalakhov
Copy link
Member

agalakhov commented Mar 24, 2020

Hi,

first, what set_nonblocking() does. It prevents read_message() from blocking. Instead it will return Err immediately if there are no messages available. If your software has some CPU-intensive computations all the time without pauses and with 100% CPU load, then calling non-blocking read_message somewhere in the computation would do the trick. But this approach always causes 100% CPU load and thus should not be used directly. Instead, you generally combine non-blocking read_message with your own blocking waiting. This differs from blocking read_message in that you have full control over blocking waiting and are able to interrupt it for i.e. sending.

That's what mio does. Using mio is nothing else than the standard "poll/select" approach known from UNIX textbooks. mio is a wrapper around poll/select. It waits for one or multiple sockets and tells when one or more of them has data for reading. By using it you would be able to send and receive within the same thread. That's how we work with Tungstenite in our company.

Probably the best way nowadays is tokio with tokio-tungstenite. It wraps mio and tungstenite together using Future trait. Then you can use async/await just like in JavaScript or start background tasks (not threads!) that do some work in parallel. If unsure, try tokio. But if for some reason you need fine-grained control over low-level sockets and threads, then use mio directly. This is generally harder to do and I don't recommend this for general use.

@wbrickner
Copy link
Author

Thank you very much for your help!

I'm still a little unsure, I've never played with native networking like this... maybe you could give me a design recommendation?

I have two WebSocket connections with the same remote host, one I use for reading in thread A and one I use for writing in thread B. This feels like poor form but it's what I'm doing now.

I setup a couple channels to communicate between threads.

I spawn a thread with an infinite loop that tries to read new messages from the socket and if one is found it parses it into a struct and sends it over channel A to an "agent" thread, which will do something else with the information. Ping handling here is easy, as I'm already reading in an infinite loop.

In thread B I check channel B for messages from the agent thread, which I serialize and send over the TX socket.

Of course this doesn't work because read_message blocks, and I rarely get messages on the TX socket, so I can't use it to send a pong response when I get a ping. I tried setting nonblocking behavior like was suggested in the other read_message thread, but I got weird errors after a few minutes that I didn't understand.

This might be the wrong approach, I don't know. I thought it would minimize the time from the packets arriving at my machine to the information being handled in the agent thread.

Two things:

  • If I get a ping, I have 10 minutes to send a pong response, so I can miss the message for an eternity before it's an issue, but I do need to get it eventually to send a pong.
  • 100% CPU on a couple cores (the socket threads) is fine with me as long as the latency is as low as possible and messages aren't stuck sitting around.

Hopefully you can give some advice, you seem to know what you're talking about here 👍

@agalakhov
Copy link
Member

Hi,
what you're doing now is probably the least efficient way ever. Using multiple network connections does neither simplify things nor improve latency since the communication between threads is not easier than the WebSocket itself. It just has exactly the same problem.
In your case I'd rather use no threads at all. Use tokio and tokio-tungstenite. Create the WebSocket and call split() on it. You'll get separate sender and receiver parts, both asynchronous, and then you'll be able to spawn two separate tasks. A good example of doing so is available here:
https://github.com/snapview/tokio-tungstenite/blob/master/examples/interval-server.rs

@wbrickner
Copy link
Author

Haha, I felt it was naive but I'm glad to hear it's the least efficient way ever 😄.
I'll look into your recommendations, thank you!

@daniel-abramov
Copy link
Member

Judging from comments and nice and detailed comments from @agalakhov I may assume that this issue is solved / question is answered, so I'll close the topic ;)

@d4h0
Copy link

d4h0 commented Jun 28, 2020

It seems the websocket crate supports splitting Websockets into a reader and writer:

https://docs.rs/websocket/0.24.0/websocket/client/sync/struct.Client.html#method.split

Is there a reason why tungstenite can't do this?

To move from a blocking API to an async API is a big step (both, for the maintainers and the users of the resulting API).

Even if only the core that handles the Websocket is async, that also would increase the complexity of the code a lot (and add many additional dependencies).

So it would be good if tungstenite could support splitting somehow.

I'm going to use the websocket crate now, but the maintainers from that crate recommend tungstenite, so I'd prefer to use tungstenite.

@agalakhov
Copy link
Member

Tungstenite itself is neither blocking nor non-blocking. There is non-blocking support and splitting support in tokio-tungstenite.

The reason tungstenite does not allow sending from different thread is, there is implicit ping and close handshake support. Splitting can be done explicitly using mpsc.

@d4h0
Copy link

d4h0 commented Jun 29, 2020

Thanks, @agalakhov. So this fundamentally is not possible with tungstenite in a blocking way.

Splitting can be done explicitly using mpsc

Do you mean, mpsc can be used for splitting in a blocking or in an async context? I don't understand what you mean with this, to be honest.

Thanks for your help :)

@agalakhov
Copy link
Member

Most likely you're reading websocket in a loop in some thread. Switch the socket to non-blocking mode, then reading will result in WouldBlock error if the socket has no messages. Now you can send messages to the socket in the same loop. Use mpsc to get messages from another thread. Now you only have to add some delay to avoid infinitely poling the socket that has no data. I use mio for that but there are many other possibilities.

As you can see, we need two external components for that: mpsc and mio. There are many other possible choices and we don't want to force any of them. That's the reason why this is not built-in in Tungstenite. One popular choice is to use tokio for external components. This is done in tokio-tungstenite and async-tungstenite. Another choice is mpsc+mio as described above. This is not popular enough for making a dedicated library for that. There are other Tungstenute users that have their own combinations, i.e. warp.

@d4h0
Copy link

d4h0 commented Jun 29, 2020

Okay, thanks again!

Looks like I have to use tokio-tungstenite or async-tungstenite if I want to use tungstenite.

It would be great if there was an example of using mio with tungstenite. I think this would be an ideal setup for many use-cases, but I wasn't able to figure out how to do this easily (it seems complex).

I think I will create an async "core" that handles the WebSocket and communicates with the rest of the program via messages. This way only the core needs to be async. I'd like to avoid this, but it seems to be the only option if I want to use tungstenite.

@mkeedlinger
Copy link

mkeedlinger commented Jul 21, 2020

Hi @agalakhov, sorry to resurrect this thread, but I'm trying to use tungstenite with mio as well.

Right now the issue I have is with the handshake when using the client function. For some reason I keep getting HandshakeError::Interrupted(...).

Any chance you have a simple example using mio as a client? I can post a simplified version of what I'm trying to do if that helps

Much thanks!

@mkeedlinger
Copy link

mkeedlinger commented Jul 21, 2020

Ok, I think I got it. I'm sure this isn't the sexiest way to do it, but it works when tested against wscat, so I'll leave it for anyone else who went on this search.

Please don't judge me

First, here's the wscat command I used to test this:

{ while true; do date; sleep 1; done } | wscat -l 8088

And here is the main.rs:

use mio::net::TcpStream;
use mio::{Events, Interest, Poll, Token, Waker};

use std::net::SocketAddr;
use std::net::TcpStream as StdStream;
use std::sync::Arc;
use std::thread::sleep;
use std::thread::spawn;
use std::time::Duration;

use tungstenite::client::IntoClientRequest;
use tungstenite::handshake::client::ClientHandshake;

use tungstenite::protocol::Message;

fn main() {
    // Create a poll instance.
    let mut poll = Poll::new().unwrap();
    const STREAM_TOKEN: Token = Token(10);
    const WAKE_TOKEN: Token = Token(20);
    // Create storage for events.
    let mut events = Events::with_capacity(128);
    let socket: SocketAddr = "127.0.0.1:8088".parse().unwrap();
    let std_stream = StdStream::connect(socket).unwrap();
    let stream = TcpStream::from_std(std_stream);

    let hs = ClientHandshake::start(
        stream,
        String::from("ws://example.com:8080")
            .into_client_request()
            .unwrap(),
        None,
    )
    .unwrap();

    let mut ws = hs.handshake().unwrap().0;
    ws.write_message(Message::from("HI!")).unwrap();
    ws.write_pending().unwrap();
    println!("{}", ws.read_message().unwrap());

    // Register the socket with `Poll`
    poll.registry()
        .register(ws.get_mut(), STREAM_TOKEN, Interest::READABLE)
        .unwrap();

    let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN).unwrap());

    poll.poll(&mut events, Some(Duration::from_millis(100)))
        .unwrap();

    spawn(move || {
        sleep(Duration::from_secs(1));
        loop {
            poll.poll(&mut events, Some(Duration::from_millis(100)))
                .unwrap();
            for event in events.iter() {
                // We can use the token we previously provided to `register` to
                // determine for which socket the event is.
                match event.token() {
                    STREAM_TOKEN => {
                        println!("Received: {}", ws.read_message().unwrap());
                    }
                    WAKE_TOKEN => {
                        ws.write_message(Message::from("Yay!")).unwrap();
                    }
                    // We don't expect any events with tokens other than those we provided.
                    _ => unreachable!(),
                }
            }
        }
    });

    loop {
        sleep(Duration::from_secs(3));
        waker.wake().unwrap()
    }
}

From wscat I get:

client connected
< HI!
> Tue 21 Jul 2020 05:01:21 AM MDT
> Tue 21 Jul 2020 05:01:22 AM MDT
> Tue 21 Jul 2020 05:01:23 AM MDT
> Tue 21 Jul 2020 05:01:24 AM MDT
< Yay!
> Tue 21 Jul 2020 05:01:25 AM MDT
> Tue 21 Jul 2020 05:01:26 AM MDT
> Tue 21 Jul 2020 05:01:27 AM MDT
< Yay!
disconnected

And when running cargo run I get:

Received: Tue 21 Jul 2020 05:01:22 AM MDT
Received: Tue 21 Jul 2020 05:01:23 AM MDT
Received: Tue 21 Jul 2020 05:01:24 AM MDT
Received: Tue 21 Jul 2020 05:01:25 AM MDT
Received: Tue 21 Jul 2020 05:01:26 AM MDT
Received: Tue 21 Jul 2020 05:01:27 AM MDT

And as far as I can tell this doesn't take 100% CPU looping too much, so success!

@daniel-abramov
Copy link
Member

@mkeedlinger out of interest: why don't you prefer to use tokio, so that you don't need to implement it on top of mio? We receive some questions here in tungstenite from people who struggle to make tungstenite work with mio (this is possible, yet quite complicated as it required a good understanding of mio and strategies to use non-blocking scheduler in a "right way", in fact we did something like this 2.5 years ago in our solution as well, but the tokio was not quite stable back then). mio is quite a low-level thing, I think it's really a very rare case to use tungstenite with mio nowadays.

@mkeedlinger
Copy link

@application-developer-DA Good question. First, the project I'm using mio + tungstenite for is a passion project and learning how to use mio has been interesting.

Also, my project has 2 streams we need to listen to, a HTTP server, and also a websocket client that is both getting and sending messages. In examples where I've seen tokio used, it seems that there's one loop where connections are accepted or messages received, but it's not immediately clear how to have 2 loops. For sure let me know if this issue has an obvious answer! In the meantime I plan on spawning threads and message passing.

Last, my project may have parts that are CPU blocking, but tokio has an answer for that, so I guess it was as much of a problem as I thought 😅

I guess the hope is that by using mio and wrestling with the compiler I'll learn a lot about solving concurrency problems. My background is in javascript, this should be a great learning experience.

@Muqito
Copy link

Muqito commented Jul 22, 2020

@application-developer-DA Good question. First, the project I'm using mio + tungstenite for is a passion project and learning how to use mio has been interesting.

Also, my project has 2 streams we need to listen to, a HTTP server, and also a websocket client that is both getting and sending messages. In examples where I've seen tokio used, it seems that there's one loop where connections are accepted or messages received, but it's not immediately clear how to have 2 loops. For sure let me know if this issue has an obvious answer! In the meantime I plan on spawning threads and message passing.

Last, my project may have parts that are CPU blocking, but tokio has an answer for that, so I guess it was as much of a problem as I thought 😅

I guess the hope is that by using mio and wrestling with the compiler I'll learn a lot about solving concurrency problems. My background is in javascript, this should be a great learning experience.

Hello

You can achieve this by using multi-producer-single-consumer channels.

So basically when you create a WS connection.

You create these channels; pass them to the shared server object and that owns them.

Together with the WsConnection start point you spawn a new task and listen for the receiving messages and then write to the tcp stream on the WsConnection.

The ws connection should have the sender part of the server (which you can clone multiple times)

If you want to read from multiple places you can instead use a multi-producer-multiple-consumer channel.

I put together a naive ascii art for you:
ws-workflow

I have created an example create to show you how you can achieve this with full working code. Though it's not fully compatible with the rfc spec yet and can probably be improved a lot. But it's focused towards explaining the simpler concepts of multi users in a server which handles connections etc.

Hope that makes sense in some way :o Please let me know if you have any further questions

@soulmachine
Copy link

At last my solution is to call std::net:: TcpStream::set_read_timeout(Duration::from_secs(5)) so that read_message() will not block forever, instead it returns an WouldBlock error after 5 seconds.

@agalakhov
Copy link
Member

You can also set TcpStream to non-blocking mode. In this case it will return WouldBlock immediately. You can then use third-party library (i.e. mio) to check if it will return WouldBlock before you call and thus manage your connections and tasks. Or you can just call read periodically, but this is not so nice since your CPU will be always active.

@haxpor
Copy link

haxpor commented Mar 2, 2022

@mkeedlinger Thanks for your example. It didn't work on my end although it gives me the idea and closer to what I want as I'm trying to understand what is stream as the first argument to ClientHandshake::start function means. Do you connect or just bind the local machine in connecting to remote websocket URL to be 127.0.0.1:8088 in order to utilize mio ? I always get Connection refused with error code of 111 always.

Yeah, so in short

  • 1. What is stream argument for?
  • 2. stream + websocket URL combinations confuse me. Does this mean we bind local socket to 127.0.0.1:8088 in connecting with target websocket url?

Edit:

I think I got 2. answered. I miss that wscat executed with -l so it operates in listening mode and Rust code connect to it. What if I just want to write a client code to connect to target websocket URL but also create TcpStream to work with mio so I can utilize Poll in event loop. Any suggestion to accomplish that?

@mkeedlinger
Copy link

@haxpor Well it's been a few years (and I've moved on to async Rust with Tokio) but I'll do my best.

Websockets communicate over TCP. Tungstenite could create the TCP connection (ie stream) for you, but chooses not to in favor of you creating one and passing it in. Since you create the stream, you could set whatever flags you want on it (like seen here). Since the stream arg has the trait bounds Read and Write, you could also feasibly use whatever implementation of a TCP connection you wanted (or any other transport for that matter, not that that would be very useful).

@haxpor
Copy link

haxpor commented Mar 5, 2022

Thank you @mkeedlinger . And to others, my apology for resurrect this thread.

Base on your sample code, I did more research, and came to the same conclusion just like what you described. Thanks again.

For the record, I've attempted to do it in non-async way, make tungstenite with its native-tls feature to work with mio's Poll connecting to a secure websocket in order to have read/write functionality in a main message loop (mostly to solve problem of unable to share socket in multiple threads), but the hard part is to make underlying stream compatible to native-tls which has TlsStream. I cannot conveniently use tungstenite's connect because the returned underlying type which wrapped by the Websocket doesn't support mio's event::Source. Although at he upstream source code, we can see how to do this as seen from mio::net::TcpStream but IoSource is all over the place and should be implemented with any stream from start, I see it difficult for what I try to achieve for now.

So at the end I cut out mio and went with simpler approach by using connect and configure underlying stream (std::net::TcpStream - as you already mentioned) for its set_read_timeout, and set_nonblocking. This means we have a chance to either make it non-blocking or have timeout while reading things from websocket. At the end this approach is also combined with two of std::sync::mpsc::sync_channel for communication between two threads e.g. heartbeat thread to send signal for another main thread to do send & receive message within its loop. You can see summary of this approach here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

8 participants