Skip to content

Commit

Permalink
feat(ext/http): Upgrade to hyper1.0-rc4 (#19987)
Browse files Browse the repository at this point in the history
Includes a lightly-modified version of hyper-util's `TokioIo` utility. 

Hyper changes:

v1.0.0-rc.4 (2023-07-10)
Bug Fixes

    http1:
http1 server graceful shutdown fix (#3261)
([f4b51300](hyperium/hyper@f4b5130))
send error on Incoming body when connection errors (#3256)
([52f19259](hyperium/hyper@52f1925),
closes hyperium/hyper#3253)
properly end chunked bodies when it was known to be empty (#3254)
([fec64cf0](hyperium/hyper@fec64cf),
closes hyperium/hyper#3252)

Features

client: Make clients able to use non-Send executor (#3184)
([d977f209](hyperium/hyper@d977f20),
closes hyperium/hyper#3017)
    rt:
replace IO traits with hyper::rt ones (#3230)
([f9f65b7a](hyperium/hyper@f9f65b7),
closes hyperium/hyper#3110)
add downcast on Sleep trait (#3125)
([d92d3917](hyperium/hyper@d92d391),
closes hyperium/hyper#3027)
service: change Service::call to take &self (#3223)
([d894439e](hyperium/hyper@d894439),
closes hyperium/hyper#3040)

Breaking Changes

Any IO transport type provided must not implement hyper::rt::{Read,
Write} instead of tokio::io traits. You can grab a helper type from
hyper-util to wrap Tokio types, or implement the traits yourself, if
it's a custom type.
([f9f65b7a](hyperium/hyper@f9f65b7))
client::conn::http2 types now use another generic for an Executor. Code
that names Connection needs to include the additional generic parameter.
([d977f209](hyperium/hyper@d977f20))
The Service::call function no longer takes a mutable reference to self.
The FnMut trait bound on the service::util::service_fn function and the
trait bound on the impl for the ServiceFn struct were changed from FnMut
to Fn.
  • Loading branch information
mmastrac authored Jul 31, 2023
1 parent 43877f1 commit 990ecc9
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 10 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ext/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fly-accept-encoding = "0.2.0"
http.workspace = true
httparse.workspace = true
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.3" }
hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.4" }
memmem.workspace = true
mime = "0.3.16"
once_cell.workspace = true
Expand Down
8 changes: 5 additions & 3 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::compressible::is_content_compressible;
use crate::extract_network_stream;
use crate::hyper_util_tokioio::TokioIo;
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
use crate::request_body::HttpRequestBody;
use crate::request_properties::HttpConnectionProperties;
Expand Down Expand Up @@ -139,7 +140,7 @@ pub fn op_http_upgrade_raw(
let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
let mut upgraded = upgrade.await?;
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
break upgraded;
}
Expand Down Expand Up @@ -709,7 +710,7 @@ fn serve_http11_unconditional(
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
.serve_connection(io, svc);
.serve_connection(TokioIo::new(io), svc);

conn.with_upgrades().map_err(AnyError::from)
}
Expand All @@ -718,7 +719,8 @@ fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
conn.map_err(AnyError::from)
}

Expand Down
206 changes: 206 additions & 0 deletions ext/http/hyper_util_tokioio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

// Copyright 2023 Sean McArthur <[email protected]>
// MIT licensed copy of unreleased hyper-util code from
// https://raw.githubusercontent.com/hyperium/hyper-util/master/src/rt/tokio_io.rs

#![allow(dead_code)]
//! Tokio IO integration for hyper
use hyper1 as hyper;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use pin_project::pin_project;

/// A wrapping implementing hyper IO traits for a type that
/// implements Tokio's IO traits.
#[pin_project]
#[derive(Debug)]
pub struct TokioIo<T> {
#[pin]
inner: T,
}

impl<T> TokioIo<T> {
/// Wrap a type implementing Tokio's IO traits.
pub fn new(inner: T) -> Self {
Self { inner }
}

/// Borrow the inner type.
pub fn inner(&self) -> &T {
&self.inner
}

/// Consume this wrapper and get the inner type.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T> hyper::rt::Read for TokioIo<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
// SAFETY: Imported code from hyper-util
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf)
{
Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};

// SAFETY: Imported code from hyper-util
unsafe {
buf.advance(n);
}
Poll::Ready(Ok(()))
}
}

impl<T> hyper::rt::Write for TokioIo<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}

fn is_write_vectored(&self) -> bool {
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
}
}

impl<T> tokio::io::AsyncRead for TokioIo<T>
where
T: hyper::rt::Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
tbuf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
//let init = tbuf.initialized().len();
let filled = tbuf.filled().len();
// SAFETY: Imported code from hyper-util
let sub_filled = unsafe {
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());

match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled())
{
Poll::Ready(Ok(())) => buf.filled().len(),
other => return other,
}
};

let n_filled = filled + sub_filled;
// At least sub_filled bytes had to have been initialized.
let n_init = sub_filled;
// SAFETY: Imported code from hyper-util
unsafe {
tbuf.assume_init(n_init);
tbuf.set_filled(n_filled);
}

Poll::Ready(Ok(()))
}
}

impl<T> tokio::io::AsyncWrite for TokioIo<T>
where
T: hyper::rt::Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_shutdown(self.project().inner, cx)
}

fn is_write_vectored(&self) -> bool {
hyper::rt::Write::is_write_vectored(&self.inner)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
}
}

/// A wrapping implementing Tokio IO traits for a type that
/// implements Hyper's IO traits.
#[pin_project]
#[derive(Debug)]
pub struct TokioIoForHyper<T> {
#[pin]
inner: T,
}

impl<T> TokioIoForHyper<T> {
/// Wrap a type implementing Tokio's IO traits.
pub fn new(inner: T) -> Self {
Self { inner }
}

/// Borrow the inner type.
pub fn inner(&self) -> &T {
&self.inner
}

/// Consume this wrapper and get the inner type.
pub fn into_inner(self) -> T {
self.inner
}
}
7 changes: 5 additions & 2 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use hyper::Body;
use hyper::HeaderMap;
use hyper::Request;
use hyper::Response;
use hyper_util_tokioio::TokioIo;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
Expand All @@ -76,6 +77,7 @@ use crate::reader_stream::ShutdownHandle;

pub mod compressible;
mod http_next;
mod hyper_util_tokioio;
mod network_buffered_stream;
mod reader_stream;
mod request_body;
Expand Down Expand Up @@ -1061,8 +1063,9 @@ impl CanDowncastUpgrade for hyper1::upgrade::Upgraded {
fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
self,
) -> Result<(T, Bytes), Self> {
let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
Ok((io, read_buf))
let hyper1::upgrade::Parts { io, read_buf, .. } =
self.downcast::<TokioIo<T>>()?;
Ok((io.into_inner(), read_buf))
}
}

Expand Down

0 comments on commit 990ecc9

Please sign in to comment.