Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ features = [
"multipart",
]

[patch.crates-io]
hyper-util = { git = "https://github.com/grafbase/hyper-util", rev = "c7acf8968d96a4408e952a097d93602d2e8ed01a" }

[features]
default = ["default-tls"]

Expand Down
13 changes: 12 additions & 1 deletion src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Body {
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + Send + 'static,
S: futures_core::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
Expand Down Expand Up @@ -264,6 +264,17 @@ impl HttpBody for Body {
),
}
}

fn size_hint(&self) -> http_body::SizeHint {
match self.inner {
Inner::Reusable(ref bytes) => {
let mut hint = http_body::SizeHint::default();
hint.set_exact(bytes.len() as u64);
hint
}
Inner::Streaming(ref body) => body.size_hint(),
}
}
}

// ===== impl TotalTimeoutBody =====
Expand Down
2 changes: 2 additions & 0 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ impl ClientBuilder {
builder.http2_keep_alive_while_idle(true);
}

#[cfg(not(target_arch = "wasm32"))]
builder.timer(hyper_util::rt::TokioTimer::new());
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Expand Down
17 changes: 15 additions & 2 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ pub(super) struct Accepts {
pub(super) deflate: bool,
}

impl Accepts {
pub fn none() -> Self {
Self {
#[cfg(feature = "gzip")]
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "deflate")]
deflate: false,
}
}
}

/// A response decompressor over a non-blocking stream of chunks.
///
/// The inner decoder may be constructed asynchronously.
Expand Down Expand Up @@ -126,7 +139,7 @@ impl Decoder {
///
/// This decoder will buffer and decompress chunks that are brotlied.
#[cfg(feature = "brotli")]
fn brotli(body: Body) -> Decoder {
fn brotli(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
Expand All @@ -141,7 +154,7 @@ impl Decoder {
///
/// This decoder will buffer and decompress chunks that are deflated.
#[cfg(feature = "deflate")]
fn deflate(body: Body) -> Decoder {
fn deflate(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
Expand Down
13 changes: 8 additions & 5 deletions src/async_impl/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
use http_body_util::BodyExt;
use hyper::{HeaderMap, StatusCode, Version};
use hyper_util::client::legacy::connect::HttpInfo;
use mime::Mime;
Expand All @@ -16,6 +17,7 @@ use url::Url;

use super::body::Body;
use super::decoder::{Accepts, Decoder};
use crate::async_impl::body::ResponseBody;
#[cfg(feature = "cookies")]
use crate::cookie;

Expand Down Expand Up @@ -418,16 +420,19 @@ impl From<Response> for Body {
}
}

/*
// I'm not sure this conversion is that useful... People should be encouraged
// to use `http::Resposne`, not `reqwest::Response`.
impl<T: Into<Body>> From<http::Response<T>> for Response {
fn from(r: http::Response<T>) -> Response {
use crate::response::ResponseUrl;

let (mut parts, body) = r.into_parts();
let body = body.into();
let decoder = Decoder::detect(&mut parts.headers, body, Accepts::none());
let body: crate::async_impl::body::Body = body.into();
let decoder = Decoder::detect(
&mut parts.headers,
ResponseBody::new(body.map_err(Into::into)),
Accepts::none(),
);
let url = parts
.extensions
.remove::<ResponseUrl>()
Expand All @@ -441,7 +446,6 @@ impl<T: Into<Body>> From<http::Response<T>> for Response {
}
}


#[cfg(test)]
mod tests {
use super::Response;
Expand All @@ -463,4 +467,3 @@ mod tests {
assert_eq!(*response.url(), url);
}
}
*/
4 changes: 1 addition & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,13 @@ impl Error {
matches!(self.inner.kind, Kind::Request)
}

/*
#[cfg(not(target_arch = "wasm32"))]
/// Returns true if the error is related to connect
pub fn is_connect(&self) -> bool {
let mut source = self.source();

while let Some(err) = source {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if let Some(hyper_err) = err.downcast_ref::<hyper_util::client::legacy::Error>() {
if hyper_err.is_connect() {
return true;
}
Expand All @@ -139,7 +138,6 @@ impl Error {

false
}
*/

/// Returns true if the error is related to the request or response body
pub fn is_body(&self) -> bool {
Expand Down
26 changes: 15 additions & 11 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async fn response_json() {

#[tokio::test]
async fn body_pipe_response() {
use http_body_util::BodyExt;
let _ = env_logger::try_init();

let server = server::http(move |mut req| async move {
Expand All @@ -141,10 +142,13 @@ async fn body_pipe_response() {
assert_eq!(req.uri(), "/pipe");
assert_eq!(req.headers()["transfer-encoding"], "chunked");

let mut full: Vec<u8> = Vec::new();
while let Some(item) = req.body_mut().next().await {
full.extend(&*item.unwrap());
}
let full: Vec<u8> = req
.into_body()
.collect()
.await
.expect("must succeed")
.to_bytes()
.to_vec();

assert_eq!(full, b"pipe me");

Expand Down Expand Up @@ -325,7 +329,6 @@ fn use_preconfigured_rustls_default() {

let root_cert_store = rustls::RootCertStore::empty();
let tls = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store)
.with_no_client_auth();

Expand Down Expand Up @@ -454,9 +457,11 @@ async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_stre
let server = server::http_with_config(
move |req| async move {
assert_eq!(req.version(), http::Version::HTTP_2);
http::Response::default()
http::Response::<String>::default()
},
|builder| {
builder.http2().max_concurrent_streams(1);
},
|builder| builder.http2_only(true).http2_max_concurrent_streams(1),
);

let url = format!("http://{}", server.addr());
Expand All @@ -482,11 +487,10 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent
let server = delay_server::Server::new(
move |req| async move {
assert_eq!(req.version(), http::Version::HTTP_2);
http::Response::default()
http::Response::<String>::default()
},
|mut http| {
http.http2_only(true).http2_max_concurrent_streams(1);
http
|http| {
http.http2().max_concurrent_streams(1);
},
std::time::Duration::from_secs(2),
)
Expand Down
3 changes: 2 additions & 1 deletion tests/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(not(target_arch = "wasm32"))]
mod support;
use http_body_util::Empty;
use support::server;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand All @@ -25,7 +26,7 @@ async fn http_upgrade() {
.status(http::StatusCode::SWITCHING_PROTOCOLS)
.header(http::header::CONNECTION, "upgrade")
.header(http::header::UPGRADE, "foobar")
.body(hyper::Body::empty())
.body(Empty::<Vec<u8>>::new())
.unwrap()
}
});
Expand Down