Skip to content

Commit

Permalink
Update to latest versions of hyper and http-body (#1882)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Scofield <[email protected]>
Co-authored-by: Jonas Platte <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2023
1 parent 2f47209 commit 43b14a5
Show file tree
Hide file tree
Showing 93 changed files with 1,530 additions and 1,515 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI

env:
CARGO_TERM_COLOR: always
MSRV: '1.65'
MSRV: '1.66'

on:
push:
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ default-members = ["axum", "axum-*"]
# Example has been deleted, but README.md remains
exclude = ["examples/async-graphql"]
resolver = "2"

[patch.crates-io]
# for http 1.0. PR to update is merged but not published
headers = { git = "https://github.com/hyperium/headers", rev = "4400aa90c47a7" }
6 changes: 6 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[files]
extend-exclude = ["Cargo.toml"]

[default.extend-identifiers]
DefaultOnFailedUpdgrade = "DefaultOnFailedUpdgrade"
OnFailedUpdgrade = "OnFailedUpdgrade"
11 changes: 6 additions & 5 deletions axum-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ __private_docs = ["dep:tower-http"]
async-trait = "0.1.67"
bytes = "1.0"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2.7"
http-body = "0.4.5"
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
mime = "0.3.16"
pin-project-lite = "0.2.7"
sync_wrapper = "0.1.1"
tower-layer = "0.3"
tower-service = "0.3"

# optional dependencies
tower-http = { version = "0.4", optional = true, features = ["limit"] }
tower-http = { version = "0.5.0", optional = true, features = ["limit"] }
tracing = { version = "0.1.37", default-features = false, optional = true }

[build-dependencies]
Expand All @@ -40,9 +41,9 @@ rustversion = "1.0.9"
axum = { path = "../axum", version = "0.6.0" }
axum-extra = { path = "../axum-extra", features = ["typed-header"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14.24"
hyper = "1.0.0"
tokio = { version = "1.25.0", features = ["macros"] }
tower-http = { version = "0.4", features = ["limit"] }
tower-http = { version = "0.5.0", features = ["limit"] }

[package.metadata.cargo-public-api-crates]
allowed = [
Expand Down
146 changes: 63 additions & 83 deletions axum-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

use crate::{BoxError, Error};
use bytes::Bytes;
use bytes::{Buf, BufMut};
use futures_util::stream::Stream;
use futures_util::TryStream;
use http::HeaderMap;
use http_body::Body as _;
use http_body::{Body as _, Frame};
use http_body_util::BodyExt;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use sync_wrapper::SyncWrapper;

type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;

fn boxed<B>(body: B) -> BoxBody
where
Expand All @@ -35,58 +34,6 @@ where
}
}

// copied from hyper under the following license:
// Copyright (c) 2014-2021 Sean McArthur

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
pub(crate) async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error>
where
T: http_body::Body,
{
futures_util::pin_mut!(body);

// If there's only 1 chunk, we can just return Buf::to_bytes()
let mut first = if let Some(buf) = body.data().await {
buf?
} else {
return Ok(Bytes::new());
};

let second = if let Some(buf) = body.data().await {
buf?
} else {
return Ok(first.copy_to_bytes(first.remaining()));
};

// With more than 1 buf, we gotta flatten into a Vec first.
let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize;
let mut vec = Vec::with_capacity(cap);
vec.put(first);
vec.put(second);

while let Some(buf) = body.data().await {
vec.put(buf?);
}

Ok(vec.into())
}

/// The body type used in axum requests and responses.
#[derive(Debug)]
pub struct Body(BoxBody);
Expand All @@ -103,7 +50,7 @@ impl Body {

/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body::Empty::new())
Self::new(http_body_util::Empty::new())
}

/// Create a new `Body` from a [`Stream`].
Expand All @@ -119,6 +66,16 @@ impl Body {
stream: SyncWrapper::new(stream),
})
}

/// Convert the body into a [`Stream`] of data frames.
///
/// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
/// you need a [`Stream`] of all frame types.
///
/// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
pub fn into_data_stream(self) -> BodyDataStream {
BodyDataStream { inner: self }
}
}

impl Default for Body {
Expand All @@ -131,7 +88,7 @@ macro_rules! body_from_impl {
($ty:ty) => {
impl From<$ty> for Body {
fn from(buf: $ty) -> Self {
Self::new(http_body::Full::from(buf))
Self::new(http_body_util::Full::from(buf))
}
}
};
Expand All @@ -152,19 +109,11 @@ impl http_body::Body for Body {
type Error = Error;

#[inline]
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(&mut self.0).poll_data(cx)
}

#[inline]
fn poll_trailers(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Result<Option<HeaderMap>, Self::Error>> {
Pin::new(&mut self.0).poll_trailers(cx)
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}

#[inline]
Expand All @@ -178,12 +127,51 @@ impl http_body::Body for Body {
}
}

impl Stream for Body {
/// A stream of data frames.
///
/// Created with [`Body::into_data_stream`].
#[derive(Debug)]
pub struct BodyDataStream {
inner: Body,
}

impl Stream for BodyDataStream {
type Item = Result<Bytes, Error>;

#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_data(cx)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
},
None => return Poll::Ready(None),
}
}
}
}

impl http_body::Body for BodyDataStream {
type Data = Bytes;
type Error = Error;

#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.inner).poll_frame(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}

Expand All @@ -203,25 +191,17 @@ where
type Data = Bytes;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match futures_util::ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

#[test]
Expand Down
23 changes: 10 additions & 13 deletions axum-core/src/ext_traits/request.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::body::Body;
use crate::extract::{DefaultBodyLimitKind, FromRequest, FromRequestParts, Request};
use futures_util::future::BoxFuture;
use http_body::Limited;

mod sealed {
pub trait Sealed {}
Expand Down Expand Up @@ -258,13 +257,13 @@ pub trait RequestExt: sealed::Sealed + Sized {

/// Apply the [default body limit](crate::extract::DefaultBodyLimit).
///
/// If it is disabled, return the request as-is in `Err`.
fn with_limited_body(self) -> Result<Request<Limited<Body>>, Request>;
/// If it is disabled, the request is returned as-is.
fn with_limited_body(self) -> Request;

/// Consumes the request, returning the body wrapped in [`Limited`] if a
/// Consumes the request, returning the body wrapped in [`http_body_util::Limited`] if a
/// [default limit](crate::extract::DefaultBodyLimit) is in place, or not wrapped if the
/// default limit is disabled.
fn into_limited_body(self) -> Result<Limited<Body>, Body>;
fn into_limited_body(self) -> Body;
}

impl RequestExt for Request {
Expand Down Expand Up @@ -320,24 +319,22 @@ impl RequestExt for Request {
})
}

fn with_limited_body(self) -> Result<Request<Limited<Body>>, Request> {
fn with_limited_body(self) -> Request {
// update docs in `axum-core/src/extract/default_body_limit.rs` and
// `axum/src/docs/extract.md` if this changes
const DEFAULT_LIMIT: usize = 2_097_152; // 2 mb

match self.extensions().get::<DefaultBodyLimitKind>().copied() {
Some(DefaultBodyLimitKind::Disable) => Err(self),
Some(DefaultBodyLimitKind::Disable) => self,
Some(DefaultBodyLimitKind::Limit(limit)) => {
Ok(self.map(|b| http_body::Limited::new(b, limit)))
self.map(|b| Body::new(http_body_util::Limited::new(b, limit)))
}
None => Ok(self.map(|b| http_body::Limited::new(b, DEFAULT_LIMIT))),
None => self.map(|b| Body::new(http_body_util::Limited::new(b, DEFAULT_LIMIT))),
}
}

fn into_limited_body(self) -> Result<Limited<Body>, Body> {
self.with_limited_body()
.map(Request::into_body)
.map_err(Request::into_body)
fn into_limited_body(self) -> Body {
self.with_limited_body().into_body()
}
}

Expand Down
Loading

0 comments on commit 43b14a5

Please sign in to comment.