diff --git a/src/server/mod.rs b/src/server/mod.rs index 970f0a2e96..fefda0ee65 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -25,7 +25,7 @@ use http; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::{Core, Handle, Timeout}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; use proto; @@ -564,14 +564,14 @@ impl AddrIncoming { } impl Stream for AddrIncoming { - type Item = TcpStream; + type Item = self::addr_stream::AddrStream; type Error = ::std::io::Error; fn poll(&mut self) -> Poll, Self::Error> { loop { match self.listener.accept() { Ok((socket, _addr)) => { - return Ok(Async::Ready(Some(socket))); + return Ok(Async::Ready(Some(self::addr_stream::new(socket)))); }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), Err(e) => debug!("internal error: {:?}", e), @@ -580,6 +580,68 @@ impl Stream for AddrIncoming { } } +mod addr_stream { + use std::io::{self, Read, Write}; + use bytes::{Buf, BufMut}; + use futures::Poll; + use tokio::net::TcpStream; + use tokio_io::{AsyncRead, AsyncWrite}; + + pub fn new(tcp: TcpStream) -> AddrStream { + AddrStream { + inner: tcp, + } + } + + #[derive(Debug)] + pub struct AddrStream { + inner: TcpStream, + } + + impl Read for AddrStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } + } + + impl Write for AddrStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self ) -> io::Result<()> { + self.inner.flush() + } + } + + impl AsyncRead for AddrStream { + #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + + #[inline] + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.inner.read_buf(buf) + } + } + + impl AsyncWrite for AddrStream { + #[inline] + fn shutdown(&mut self) -> Poll<(), io::Error> { + AsyncWrite::shutdown(&mut self.inner) + } + + #[inline] + fn write_buf(&mut self, buf: &mut B) -> Poll { + self.inner.write_buf(buf) + } + } +} + struct NotifyService { inner: S, info: Weak>,