Skip to content

Commit

Permalink
Do not always require an authority
Browse files Browse the repository at this point in the history
This fixes connections where a local UNIX domain socket path is
provided, where the authority contains the full path to the *.sock file.

Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Oct 30, 2020
1 parent 3bb01aa commit 3a7d79b
Showing 1 changed file with 90 additions and 33 deletions.
123 changes: 90 additions & 33 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html

use crate::codec::{Codec, RecvError, UserError};
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
use crate::frame::{
self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId,
};
use crate::proto::{self, Config, Prioritized};
use crate::{FlowControl, PingPong, RecvStream, SendStream};

Expand Down Expand Up @@ -396,15 +398,18 @@ where
/// Accept the next incoming request on this connection.
pub async fn accept(
&mut self,
) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>
{
futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
}

#[doc(hidden)]
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
) -> Poll<
Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>,
> {
// Always try to advance the internal state. Getting Pending also is
// needed to allow this function to return Pending.
if let Poll::Ready(_) = self.poll_closed(cx)? {
Expand All @@ -416,7 +421,8 @@ where
if let Some(inner) = self.connection.next_incoming() {
tracing::trace!("received incoming");
let (head, _) = inner.take_request().into_parts();
let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
let body =
RecvStream::new(FlowControl::new(inner.clone_to_opaque()));

let request = Request::from_parts(head, body);
let respond = SendResponse { inner };
Expand Down Expand Up @@ -462,7 +468,10 @@ where
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
pub fn set_initial_window_size(
&mut self,
size: u32,
) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.connection.set_initial_window_size(size)?;
Ok(())
Expand All @@ -481,13 +490,19 @@ where
/// [`poll_accept`]: struct.Connection.html#method.poll_accept
/// [`RecvStream`]: ../struct.RecvStream.html
/// [`SendStream`]: ../struct.SendStream.html
pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
pub fn poll_closed(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), crate::Error>> {
self.connection.poll(cx).map_err(Into::into)
}

#[doc(hidden)]
#[deprecated(note = "renamed to poll_closed")]
pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
pub fn poll_close(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), crate::Error>> {
self.poll_closed(cx)
}

Expand Down Expand Up @@ -539,7 +554,10 @@ where
{
type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_accept(cx)
}
}
Expand Down Expand Up @@ -586,7 +604,9 @@ impl Builder {
/// ```
pub fn new() -> Builder {
Builder {
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_duration: Duration::from_secs(
proto::DEFAULT_RESET_STREAM_SECS,
),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
Expand Down Expand Up @@ -1023,7 +1043,10 @@ impl<B: Buf> SendResponse<B> {
///
/// Calling this method after having called `send_response` will return
/// a user error.
pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
pub fn poll_reset(
&mut self,
cx: &mut Context,
) -> Poll<Result<Reason, crate::Error>> {
self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
}

Expand Down Expand Up @@ -1095,7 +1118,10 @@ impl<B: Buf> SendPushedResponse<B> {
///
/// Calling this method after having called `send_response` will return
/// a user error.
pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
pub fn poll_reset(
&mut self,
cx: &mut Context,
) -> Poll<Result<Reason, crate::Error>> {
self.inner.poll_reset(cx)
}

Expand Down Expand Up @@ -1124,9 +1150,13 @@ where
{
type Output = Result<Codec<T, B>, crate::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
// Flush the codec
ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
ready!(self.codec.as_mut().unwrap().flush(cx))
.map_err(crate::Error::from_io)?;

// Return the codec
Poll::Ready(Ok(self.codec.take().unwrap()))
Expand All @@ -1153,18 +1183,23 @@ where
{
type Output = Result<Codec<T, B>, crate::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut buf = [0; 24];
let mut rem = PREFACE.len() - self.pos;

while rem > 0 {
let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))
.map_err(crate::Error::from_io)?;
if n == 0 {
return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"connection closed before reading preface",
))));
return Poll::Ready(Err(crate::Error::from_io(
io::Error::new(
io::ErrorKind::UnexpectedEof,
"connection closed before reading preface",
),
)));
}

if PREFACE[self.pos..self.pos + n] != buf[..n] {
Expand All @@ -1190,7 +1225,10 @@ where
{
type Output = Result<Connection<T, B>, crate::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let span = self.span.clone(); // XXX(eliza): T_T
let _e = span.enter();
tracing::trace!(state = ?self.state);
Expand Down Expand Up @@ -1245,7 +1283,8 @@ where

tracing::trace!("connection established!");
let mut c = Connection { connection };
if let Some(sz) = self.builder.initial_target_connection_window_size {
if let Some(sz) = self.builder.initial_target_connection_window_size
{
c.set_target_window_size(sz);
}
Ok(c)
Expand Down Expand Up @@ -1383,7 +1422,9 @@ impl proto::Peer for Peer {

// Specifying :status for a request is a protocol error
if pseudo.status.is_some() {
tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
tracing::trace!(
"malformed headers: :status field on request; PROTOCOL_ERROR"
);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}

Expand All @@ -1393,14 +1434,21 @@ impl proto::Peer for Peer {
// A request translated from HTTP/1 must not include the :authority
// header
if let Some(authority) = pseudo.authority {
let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
parts.authority = Some(maybe_authority.or_else(|why| {
malformed!(
"malformed headers: malformed authority ({:?}): {}",
authority,
why,
)
})?);
// When connecting to a UNIX Domain Socket (UDS), then we might get a path for the
// authority field. If it's a local path and exists, then we do not error in that case
// and assume an UDS.
if !authority.as_str().ends_with(".sock") {
let maybe_authority = uri::Authority::from_maybe_shared(
authority.clone().into_inner(),
);
parts.authority = Some(maybe_authority.or_else(|why| {
malformed!(
"malformed headers: malformed authority ({:?}): {}",
authority,
why,
)
})?);
}
}

// A :scheme is required, except CONNECT.
Expand Down Expand Up @@ -1437,9 +1485,14 @@ impl proto::Peer for Peer {
malformed!("malformed headers: missing path");
}

let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
let maybe_path =
uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
parts.path_and_query = Some(maybe_path.or_else(|why| {
malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
malformed!(
"malformed headers: malformed path ({:?}): {}",
path,
why,
)
})?);
}

Expand Down Expand Up @@ -1474,7 +1527,9 @@ where
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
Handshaking::ReadingPreface(_) => {
write!(f, "Handshaking::ReadingPreface(_)")
}
Handshaking::Empty => write!(f, "Handshaking::Empty"),
}
}
Expand All @@ -1498,7 +1553,9 @@ where
{
#[inline]
fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface")))
Handshaking::ReadingPreface(
read.instrument(tracing::trace_span!("read_preface")),
)
}
}

Expand Down

0 comments on commit 3a7d79b

Please sign in to comment.