Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpListener::from_listener() doesn't seem to work with listeners created via std::net::TcpListener::from_raw_fd() #241

Open
liftoff opened this issue Aug 29, 2017 · 23 comments

Comments

@liftoff
Copy link

liftoff commented Aug 29, 2017

Sorry for the lengthy code but here's my example:

https://gist.github.com/liftoff/d15fbb2d472642240af4b0acf2664bae

The pertinent lines are:

fd_listener = std::net::TcpListener::from_raw_fd(fd);
let xcb_fd_listener = TcpListener::from_listener(fd_listener, &addr, &h1).unwrap();

Then later I start up the reactor like so:

h1.spawn(xcb_handler);
h1.spawn(webserver);
loop {
    lp.turn(None);
}

The webserver works fine. The xcb_handler does not. It's like the reactor isn't detecting that the underlying fd has data to be read.

If I use mio directly with poll.register(&fd_listener, XCBEVENT, mio::Ready::readable(), mio::PollOpt::edge()).unwrap(); it seems to work fine (fd_listener was made from mio:tcp:TcpListener::from_raw_fd()`).

I asked in #rust-beginners but no one was able to figure it out. As I understand it the value of addr passed to tokio_core::net::TcpListener::from_listener() should be ignored if the underlying socket is from an fd but I'm not certain of this. I'm also not certain that I'm going about this the best way. I mean, it seems logical but I can't find any examples of using Tokio to listen for events on an fd.

Any assistance with this is greatly appreciated.

@alexcrichton
Copy link
Contributor

Hm interesting! If you can slim this down that'd be great, and maybe try poking around with strace to see what's going on? There may be some stray syscall that's causing problems.

The address being passed in is only actually used on Windows, it's just required everywhere for a consistent interface.

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

I've made some progress here. Something inside of tokio is doing something that results in "Error: Invalid argument (os error 22)" which causes the incoming() method to throw an error instead of getting to the point where it can execute my for_each() stuff.

Here's the strace of that happening:

fcntl(7, F_GETFL)                       = 0x802 (flags O_RDWR|O_NONBLOCK)
fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4, u64=4}}) = 0
socketpair(AF_LOCAL, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0, [8, 9]) = 0
getrandom("", 0, GRND_NONBLOCK)         = 0
getrandom("\210^\5\236\375\356l\355", 8, GRND_NONBLOCK) = 8
getrandom("-\227\222\303Q\27\35\202", 8, GRND_NONBLOCK) = 8
rt_sigaction(SIGINT, {0x5612416b7a80, [], SA_RESTORER|SA_RESTART|SA_SIGINFO|SA_NOCLDSTOP, 0x7f9327493670}, {SIG_DFL, [], 0}, 8) = 0
epoll_ctl(3, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=6, u64=6}}) = 0
epoll_wait(3, [{EPOLLOUT, {u32=4, u64=4}}, {EPOLLOUT, {u32=6, u64=6}}], 1024, 0) = 2
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=4, u64=4}}], 1024, -1) = 1
write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x7ffcd605f668, 128)            = -1 EAGAIN (Resource temporarily unavailable)
accept4(7, 0x7ffcd605def0, 0x7ffcd605dfbc, SOCK_CLOEXEC) = -1 EINVAL (Invalid argument)
write(1, "Error: Invalid argument (os erro"..., 38Error: Invalid argument (os error 22)
) = 38
close(7)

I suspect this is happening because something is trying to use the address of the TcpListener which of course won't work.

@alexcrichton
Copy link
Contributor

Is the file descriptor coming out of xcb actually a TCP listener? Or is it a unix socket? (in which case tokio-uds may work better here)

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

The file descriptor coming out of xcb is a Unix Socket. I looked at the tokio-uds package on Github but there was no documentation and no examples so I wasn't sure how to even begin using it. Is there an example somewhere that would match my use case?

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

Actually, I'm not sure what the file descriptor points to. It's whatever I get back from xcb::ffi::base::xcb_get_file_descriptor(conn.get_raw_conn()); I assume it's a socket because XCB can work over a network.

@alexcrichton
Copy link
Contributor

Oh the tokio-uds crate is intended to basically be the same as the TCP types in tokio-core (hence the lack of current docs), so you should be able to use mostly the same APIs?

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

I was able to finagle the code into using tokio-uds but I get the same exact error, "Error: Invalid argument (os error 22)". Wish it was more descriptive!

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

I'm totally lost at this point as to how to get tokio to work like it does with mio. I thought it might be helpful to post the mio equivalent code (which works fine):

https://gist.github.com/liftoff/d6cead19e434871985047769ee3df7a3

The relevant portion is probably this:

loop {
    poll.poll(&mut events, None).unwrap();

    for event in events.iter() {
        match event.token() {
            SERVER => {
                // Accept and drop the socket immediately, this will close
                // the socket and notify the client of the EOF.
                println!("Incoming connection!");
                let _ = server.accept();
                println!("Closing connection");
            }
            XCBEVENT => {
                let start = Instant::now();
                if event.kind().is_readable() {
                    let _ = handle_xcb_event(
                        &conn,
                        filetype,
                        counter.next().unwrap(),
                        damage_event,
                        &seg,
                    );
                }
                let elapsed = start.elapsed();
                println!(
                    "Elapsed: {} ms",
                    (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64
                );
            }
            _ => unreachable!(),
        }
    }
}

@alexcrichton
Copy link
Contributor

It looks like in the mio code you're ignoring the return value of accept, what happens if you do the same with futures and ignore the error case of the stream?

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

I'm not sure what you mean? I'm only calling accept on the actual socket. Not on the XCBEVENT token part.

@alexcrichton
Copy link
Contributor

Right what Tokio is doing is also calling accept (the same method) but the error is coming out and going into the stream, which is then later processed for sleeping in your example. In the mio code, you're ignoring any error happening in accept

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

How do I do the equivalent in Tokio though?

@alexcrichton
Copy link
Contributor

alexcrichton commented Aug 30, 2017

Probably something like -- listener.incoming().and_then(|x| Ok(()))

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

Replacing:

let xcb_handler = xcb_fd_listener.incoming()
        .for_each(move |(_socket, _welcome)| {

...with:

let xcb_handler = xcb_fd_listener.incoming()
        .and_then(move |(_socket, _welcome)| {

Results in compilation errors:

error[E0277]: the trait bound `futures::stream::MapErr<futures::stream::AndThen<std::boxed::Box<futures::Stream<Item=(tokio_uds::UnixStream, std::os::ext::net::SocketAddr), Error=std
::io::Error> + std::marker::Send>, [closure@src/main.rs:297:23: 390:10 cloned_conn:_, damage_event:_, root:_, seg:_, filetype:_, width:_, height:_], std::result::Result<(), std::io::
Error>>, [closure@src/main.rs:390:20: 393:10]>: futures::Future` is not satisfied
--> src/main.rs:451:12
    |
451 |         h1.spawn(xcb_handler);
    |            ^^^^^ the trait `futures::Future` is not implemented for `futures::stream::MapErr<futures::stream::AndThen<std::boxed::Box<futures::Stream<Item=(tokio_uds::UnixStrea
m, std::os::ext::net::SocketAddr), Error=std::io::Error> + std::marker::Send>, [closure@src/main.rs:297:23: 390:10 cloned_conn:_, damage_event:_, root:_, seg:_, filetype:_, width:_,
height:_], std::result::Result<(), std::io::Error>>, [closure@src/main.rs:390:20: 393:10]>`

error: aborting due to previous error

The same thing happens if I replace:

.map_err(|_e| {
        println!("Error: {}", _e);
        ()
    });

...with your and_then() equivalent.

@liftoff
Copy link
Author

liftoff commented Aug 30, 2017

I have an idea... Maybe the problem is that something is calling accept() on that TcpListener (or UnixListener) when it shouldn't (that will always result in an error). Is there a way I can prevent tokio from calling accept() when I use .incoming()?

@alexcrichton
Copy link
Contributor

Hm so in any case it sounds like this maybe isn't the best usage of raw file descriptors. When creating a Rust type it's consumign ownership of the file descriptor but it looks like the C side of things still has the file descriptor in this case? It may be best to basically use PollEvented manually instead of trying to shoehorn this into the standard abstractions

@0xd34d10cc
Copy link

As far as I know, XCB is a client library for X window server. Maybe you need to create TcpStream\UdsStream instead of TcpListener\UdsListener?

@liftoff
Copy link
Author

liftoff commented Aug 31, 2017

@alexcrichton I have no idea what I'm doing (haha)... Can you point me to a minimal example of a PollEvented implementation I can use as a base? I figure I should just have it call the equivalent poll() function from xcb instead of the usual socket stuff.

@0xd34d10cc How would I use UdsListener instead of TcpStream? I only have examples that use listener.incoming().for_each(). As I understand it the incoming() method just returns a stream? So I could presumably skip the listener part altogether and just... what? I just need an example.

@0xd34d10cc
Copy link

0xd34d10cc commented Aug 31, 2017

@liftoff
TcpStream instead of TcpListener in case when XCB opens tcp connection.
UdsStream instead of UdsListener in case when XCB opens uds connection.
That 'listener part' is about 'accepting' connections. As far as I understand, In case of XCB you need not 'accept' connections, but read events from already opened (via xcb::Connection::connect) connection. TcpStream\UdsStream does just that.

@0xd34d10cc
Copy link

Actually, I think tokio cannot be used with XCB that way. XCB handles io by itself so the only thing you want is to be notified when socket is ready to read. Tokio has much more higher-level api which cannot provide such information.

@liftoff
Copy link
Author

liftoff commented Aug 31, 2017

@0xd34d10cc Any recommendations as to how I should proceed then? I was wondering if there was an easy way to just have Tokio repeatedly call poll_for_event() in the background until the program ends.

@carllerche
Copy link
Member

@0xd34d10cc Tokio should work fine, it has far more than request / response (despite what the docs might make seem).

@0xd34d10cc
Copy link

@carllerche ah, I've missed that TcpStream provides poll_read method which, if wrapped with Future, can do what @liftoff needs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants