Skip to content

Commit

Permalink
upgrade: Tokio 0.2 (#3418)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju authored Dec 30, 2019
1 parent df1665a commit 46d76a7
Show file tree
Hide file tree
Showing 18 changed files with 849 additions and 942 deletions.
993 changes: 505 additions & 488 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ deno_typescript = { path = "../deno_typescript", version = "0.27.0" }
ansi_term = "0.12.1"
atty = "0.2.13"
base64 = "0.11.0"
bytes = "0.5"
byteorder = "1.3.2"
clap = "2.33.0"
dirs = "2.0.2"
dlopen = "0.1.8"
futures = { version = "0.3", features = [ "compat", "io-compat" ] }
http = "0.1.19"
http = "0.2"
hyper = "0.12.35"
hyper-rustls = "0.17.1"
indexmap = "1.3.0"
Expand All @@ -43,7 +44,7 @@ log = "0.4.8"
rand = "0.7.2"
regex = "1.3.1"
remove_dir_all = "0.5.2"
reqwest = { version = "0.9.22", default-features = false, features = ["rustls-tls"] }
reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "0ab5df3", features = ["rustls-tls", "stream"] }
ring = "0.16.9"
rustyline = "5.0.4"
serde = { version = "1.0.102", features = ["derive"] }
Expand All @@ -53,11 +54,10 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
termcolor = "1.0.5"
tokio = "0.1.22"
tokio = { version = "0.2.6", features = ["full"] }
tokio-executor = "0.1.8"
tokio-process = "0.2.4"
tokio-rustls = "0.10.2"
url = "1.7.2"
tokio-rustls = "0.12.0"
url = "2.1"
utime = "0.2.1"
webpki = "0.21.0"
webpki-roots = "0.17.0"
Expand Down
5 changes: 3 additions & 2 deletions cli/deno_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl GetErrorKind for url::ParseError {
}
RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase,
SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl,
_ => ErrorKind::Other,
}
}
}
Expand All @@ -231,7 +232,7 @@ impl GetErrorKind for reqwest::Error {
fn kind(&self) -> ErrorKind {
use self::GetErrorKind as Get;

match self.get_ref() {
match self.source() {
Some(err_ref) => None
.or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind))
.or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind))
Expand All @@ -242,7 +243,7 @@ impl GetErrorKind for reqwest::Error {
.map(Get::kind)
})
.unwrap_or_else(|| ErrorKind::HttpOther),
_ => ErrorKind::HttpOther,
None => ErrorKind::HttpOther,
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions cli/global_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::channel::oneshot;
use futures::future::FutureExt;
use std::future::Future;
use std::time::Instant;
use tokio::timer::Delay;
use tokio;

#[derive(Default)]
pub struct GlobalTimer {
Expand Down Expand Up @@ -43,8 +43,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);

let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
let delay = tokio::time::delay_until(deadline.into());
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));

Expand Down
27 changes: 15 additions & 12 deletions cli/http_body.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.

use futures::io::AsyncRead;
use futures::stream::StreamExt;
use reqwest::r#async::Chunk;
use reqwest::r#async::Decoder;
use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;
use reqwest;
use std::cmp::min;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;

// TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs`
type ReqwestStream = Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>;

/// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated
/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated
/// into resources more easily.
pub struct HttpBody {
decoder: futures::compat::Compat01As03<Decoder>,
chunk: Option<Chunk>,
stream: ReqwestStream,
chunk: Option<Bytes>,
pos: usize,
}

impl HttpBody {
pub fn from(body: Decoder) -> Self {
pub fn from(body: ReqwestStream) -> Self {
Self {
decoder: futures::compat::Compat01As03::new(body),
stream: body,
chunk: None,
pos: 0,
}
Expand Down Expand Up @@ -65,10 +68,10 @@ impl AsyncRead for HttpBody {
assert_eq!(inner.pos, 0);
}

let p = inner.decoder.poll_next_unpin(cx);
let p = inner.stream.poll_next_unpin(cx);
match p {
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(
// TODO Need to map hyper::Error into std::io::Error.
// TODO(bartlomieju): rewrite it to use ErrBox
io::Error::new(io::ErrorKind::Other, e),
)),
Poll::Ready(Some(Ok(chunk))) => {
Expand Down
115 changes: 40 additions & 75 deletions cli/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ use crate::deno_error;
use crate::deno_error::DenoError;
use crate::version;
use deno::ErrBox;
use futures::future;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use reqwest;
use reqwest::header::HeaderMap;
use reqwest::header::CONTENT_TYPE;
use reqwest::header::LOCATION;
use reqwest::header::USER_AGENT;
use reqwest::r#async::Client;
use reqwest::RedirectPolicy;
use reqwest::redirect::Policy;
use reqwest::Client;
use std::future::Future;
use std::pin::Pin;
use url::Url;

/// Create new instance of async reqwest::Client. This client supports
Expand All @@ -26,9 +23,9 @@ pub fn get_client() -> Client {
format!("Deno/{}", version::DENO).parse().unwrap(),
);
Client::builder()
.redirect(RedirectPolicy::none())
.redirect(Policy::none())
.default_headers(headers)
.use_sys_proxy()
.use_rustls_tls()
.build()
.unwrap()
}
Expand Down Expand Up @@ -75,77 +72,45 @@ pub enum FetchOnceResult {
pub fn fetch_string_once(
url: &Url,
) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> {
type FetchAttempt = (Option<String>, Option<String>, Option<FetchOnceResult>);

let url = url.clone();
let client = get_client();

futures::compat::Compat01As03::new(client.get(url.clone()).send())
.map_err(ErrBox::from)
.and_then(
move |mut response| -> Pin<
Box<dyn Future<Output = Result<FetchAttempt, ErrBox>> + Send>,
> {
if response.status().is_redirection() {
let location_string = response
.headers()
.get(LOCATION)
.expect("url redirection should provide 'location' header")
.to_str()
.unwrap();

debug!("Redirecting to {:?}...", &location_string);
let new_url = resolve_url_from_location(&url, location_string);
// Boxed trait object turns out to be the savior for 2+ types yielding same results.
return futures::future::try_join3(
future::ok(None),
future::ok(None),
future::ok(Some(FetchOnceResult::Redirect(new_url))),
)
.boxed();
}

if response.status().is_client_error()
|| response.status().is_server_error()
{
return future::err(
DenoError::new(
deno_error::ErrorKind::Other,
format!("Import '{}' failed: {}", &url, response.status()),
)
.into(),
)
.boxed();
}

let content_type = response
.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.to_str().unwrap().to_owned());

let body = futures::compat::Compat01As03::new(response.text())
.map_ok(Some)
.map_err(ErrBox::from);

futures::future::try_join3(
body,
future::ok(content_type),
future::ok(None),
)
.boxed()
},
)
.and_then(move |(maybe_code, maybe_content_type, maybe_redirect)| {
if let Some(redirect) = maybe_redirect {
future::ok(redirect)
} else {
// maybe_code should always contain code here!
future::ok(FetchOnceResult::Code(
maybe_code.unwrap(),
maybe_content_type,
))
}
})
let fut = async move {
let response = client.get(url.clone()).send().await?;

if response.status().is_redirection() {
let location_string = response
.headers()
.get(LOCATION)
.expect("url redirection should provide 'location' header")
.to_str()
.unwrap();

debug!("Redirecting to {:?}...", &location_string);
let new_url = resolve_url_from_location(&url, location_string);
return Ok(FetchOnceResult::Redirect(new_url));
}

if response.status().is_client_error()
|| response.status().is_server_error()
{
let err = DenoError::new(
deno_error::ErrorKind::Other,
format!("Import '{}' failed: {}", &url, response.status()),
);
return Err(err.into());
}

let content_type = response
.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.to_str().unwrap().to_owned());

let body = response.text().await?;
return Ok(FetchOnceResult::Code(body, content_type));
};

fut.boxed()
}

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion cli/js/tls_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ testPerm({ read: true, net: true }, async function dialAndListenTLS(): Promise<
assert(conn.remoteAddr != null);
assert(conn.localAddr != null);
await conn.write(response);
conn.close();
// TODO(bartlomieju): this might be a bug
setTimeout(() => {
conn.close();
}, 0);
}
);

Expand Down
46 changes: 23 additions & 23 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::StreamExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
Expand Down Expand Up @@ -56,32 +56,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = futures::compat::Compat01As03::new(request.send())
.map_err(ErrBox::from)
.and_then(move |res| {
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
);
let future = async move {
let res = request.send().await?;
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});
let body = HttpBody::from(res.bytes_stream().boxed());
let mut table = state_.lock_resource_table();
let rid = table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
);

futures::future::ok(json_res)
let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});

Ok(json_res)
};

Ok(JsonOp::Async(future.boxed()))
}
Loading

0 comments on commit 46d76a7

Please sign in to comment.