Skip to content

upgrade: Tokio 0.2 #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5244a70
first pass at Tokio 0.2
bartlomieju Nov 28, 2019
f02028f
bump Tokio to 0.2.2
bartlomieju Nov 30, 2019
0fb7953
Merge branch 'master' into upgrade-tokio_02
bartlomieju Dec 16, 2019
2093f06
upgrade reqwest 1/2
bartlomieju Dec 16, 2019
b10fd68
temp disable reqwest body
bartlomieju Dec 16, 2019
93a690d
update third_party
bartlomieju Dec 16, 2019
2029a58
fix reqwest errors
bartlomieju Dec 16, 2019
bb94b69
get fetch to work
bartlomieju Dec 23, 2019
54cc4cc
Merge branch 'master' into upgrade-tokio_02
bartlomieju Dec 23, 2019
c9b8644
disable failing tests
bartlomieju Dec 23, 2019
0563f54
format & lint
bartlomieju Dec 23, 2019
ae8b1f0
add kill_on_drop
bartlomieju Dec 23, 2019
ae8af4e
rustls
bartlomieju Dec 23, 2019
3073ad2
try fix
bartlomieju Dec 23, 2019
c7a0119
reset CI
bartlomieju Dec 26, 2019
07cac64
bump tokio
bartlomieju Dec 26, 2019
ecf2169
debug
bartlomieju Dec 26, 2019
6c3473e
add debug log
bartlomieju Dec 26, 2019
1a086ee
run all tests
bartlomieju Dec 26, 2019
ec18ca4
try
bartlomieju Dec 26, 2019
b6c136f
try flush
bartlomieju Dec 26, 2019
52a5746
clippy
bartlomieju Dec 26, 2019
8053e35
try file
bartlomieju Dec 26, 2019
a452f25
try strace
bartlomieju Dec 26, 2019
477830a
review
bartlomieju Dec 27, 2019
eaef464
remove strace
bartlomieju Dec 27, 2019
1c5664b
try sync
bartlomieju Dec 27, 2019
d268319
try to add flush method
bartlomieju Dec 27, 2019
2b07050
implicit flush
bartlomieju Dec 27, 2019
ceedceb
fix stdin/stderr
bartlomieju Dec 27, 2019
7f73d33
try again
bartlomieju Dec 27, 2019
46744c3
try fix prettier
bartlomieju Dec 27, 2019
5077a9d
disable test
bartlomieju Dec 27, 2019
97409dc
Merge branch 'master' into upgrade-tokio_02
bartlomieju Dec 27, 2019
6de58de
fix prettier
bartlomieju Dec 27, 2019
9a08f33
reset CI
bartlomieju Dec 28, 2019
d9c9073
fix
bartlomieju Dec 28, 2019
1f48fa3
add more descriptive todo
bartlomieju Dec 30, 2019
ae92b5a
link issue
bartlomieju Dec 30, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
993 changes: 505 additions & 488 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ deno_typescript = { path = "../deno_typescript", version = "0.27.0" }
ansi_term = "0.12.1"
atty = "0.2.13"
base64 = "0.11.0"
bytes = "0.5"
byteorder = "1.3.2"
clap = "2.33.0"
dirs = "2.0.2"
dlopen = "0.1.8"
futures = { version = "0.3", features = [ "compat", "io-compat" ] }
http = "0.1.19"
http = "0.2"
hyper = "0.12.35"
hyper-rustls = "0.17.1"
indexmap = "1.3.0"
Expand All @@ -43,7 +44,7 @@ log = "0.4.8"
rand = "0.7.2"
regex = "1.3.1"
remove_dir_all = "0.5.2"
reqwest = { version = "0.9.22", default-features = false, features = ["rustls-tls"] }
reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "0ab5df3", features = ["rustls-tls", "stream"] }
ring = "0.16.9"
rustyline = "5.0.4"
serde = { version = "1.0.102", features = ["derive"] }
Expand All @@ -53,11 +54,10 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
termcolor = "1.0.5"
tokio = "0.1.22"
tokio = { version = "0.2.6", features = ["full"] }
tokio-executor = "0.1.8"
tokio-process = "0.2.4"
tokio-rustls = "0.10.2"
url = "1.7.2"
tokio-rustls = "0.12.0"
url = "2.1"
utime = "0.2.1"
webpki = "0.21.0"
webpki-roots = "0.17.0"
Expand Down
5 changes: 3 additions & 2 deletions cli/deno_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl GetErrorKind for url::ParseError {
}
RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase,
SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl,
_ => ErrorKind::Other,
}
}
}
Expand All @@ -231,7 +232,7 @@ impl GetErrorKind for reqwest::Error {
fn kind(&self) -> ErrorKind {
use self::GetErrorKind as Get;

match self.get_ref() {
match self.source() {
Some(err_ref) => None
.or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind))
.or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind))
Expand All @@ -242,7 +243,7 @@ impl GetErrorKind for reqwest::Error {
.map(Get::kind)
})
.unwrap_or_else(|| ErrorKind::HttpOther),
_ => ErrorKind::HttpOther,
None => ErrorKind::HttpOther,
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions cli/global_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::channel::oneshot;
use futures::future::FutureExt;
use std::future::Future;
use std::time::Instant;
use tokio::timer::Delay;
use tokio;

#[derive(Default)]
pub struct GlobalTimer {
Expand Down Expand Up @@ -43,8 +43,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);

let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
let delay = tokio::time::delay_until(deadline.into());
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));

Expand Down
27 changes: 15 additions & 12 deletions cli/http_body.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.

use futures::io::AsyncRead;
use futures::stream::StreamExt;
use reqwest::r#async::Chunk;
use reqwest::r#async::Decoder;
use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;
use reqwest;
use std::cmp::min;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;

// TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs`
type ReqwestStream = Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>;

/// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated
/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated
/// into resources more easily.
pub struct HttpBody {
decoder: futures::compat::Compat01As03<Decoder>,
chunk: Option<Chunk>,
stream: ReqwestStream,
chunk: Option<Bytes>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a redundant Read impl below this which can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do that in next pass, this PR is already complex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pos: usize,
}

impl HttpBody {
pub fn from(body: Decoder) -> Self {
pub fn from(body: ReqwestStream) -> Self {
Self {
decoder: futures::compat::Compat01As03::new(body),
stream: body,
chunk: None,
pos: 0,
}
Expand Down Expand Up @@ -65,10 +68,10 @@ impl AsyncRead for HttpBody {
assert_eq!(inner.pos, 0);
}

let p = inner.decoder.poll_next_unpin(cx);
let p = inner.stream.poll_next_unpin(cx);
match p {
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(
// TODO Need to map hyper::Error into std::io::Error.
// TODO(bartlomieju): rewrite it to use ErrBox
io::Error::new(io::ErrorKind::Other, e),
)),
Poll::Ready(Some(Ok(chunk))) => {
Expand Down
115 changes: 40 additions & 75 deletions cli/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ use crate::deno_error;
use crate::deno_error::DenoError;
use crate::version;
use deno::ErrBox;
use futures::future;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use reqwest;
use reqwest::header::HeaderMap;
use reqwest::header::CONTENT_TYPE;
use reqwest::header::LOCATION;
use reqwest::header::USER_AGENT;
use reqwest::r#async::Client;
use reqwest::RedirectPolicy;
use reqwest::redirect::Policy;
use reqwest::Client;
use std::future::Future;
use std::pin::Pin;
use url::Url;

/// Create new instance of async reqwest::Client. This client supports
Expand All @@ -26,9 +23,9 @@ pub fn get_client() -> Client {
format!("Deno/{}", version::DENO).parse().unwrap(),
);
Client::builder()
.redirect(RedirectPolicy::none())
.redirect(Policy::none())
.default_headers(headers)
.use_sys_proxy()
.use_rustls_tls()
.build()
.unwrap()
}
Expand Down Expand Up @@ -75,77 +72,45 @@ pub enum FetchOnceResult {
pub fn fetch_string_once(
url: &Url,
) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> {
type FetchAttempt = (Option<String>, Option<String>, Option<FetchOnceResult>);

let url = url.clone();
let client = get_client();

futures::compat::Compat01As03::new(client.get(url.clone()).send())
.map_err(ErrBox::from)
.and_then(
move |mut response| -> Pin<
Box<dyn Future<Output = Result<FetchAttempt, ErrBox>> + Send>,
> {
if response.status().is_redirection() {
let location_string = response
.headers()
.get(LOCATION)
.expect("url redirection should provide 'location' header")
.to_str()
.unwrap();

debug!("Redirecting to {:?}...", &location_string);
let new_url = resolve_url_from_location(&url, location_string);
// Boxed trait object turns out to be the savior for 2+ types yielding same results.
return futures::future::try_join3(
future::ok(None),
future::ok(None),
future::ok(Some(FetchOnceResult::Redirect(new_url))),
)
.boxed();
}

if response.status().is_client_error()
|| response.status().is_server_error()
{
return future::err(
DenoError::new(
deno_error::ErrorKind::Other,
format!("Import '{}' failed: {}", &url, response.status()),
)
.into(),
)
.boxed();
}

let content_type = response
.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.to_str().unwrap().to_owned());

let body = futures::compat::Compat01As03::new(response.text())
.map_ok(Some)
.map_err(ErrBox::from);

futures::future::try_join3(
body,
future::ok(content_type),
future::ok(None),
)
.boxed()
},
)
.and_then(move |(maybe_code, maybe_content_type, maybe_redirect)| {
if let Some(redirect) = maybe_redirect {
future::ok(redirect)
} else {
// maybe_code should always contain code here!
future::ok(FetchOnceResult::Code(
maybe_code.unwrap(),
maybe_content_type,
))
}
})
let fut = async move {
let response = client.get(url.clone()).send().await?;

if response.status().is_redirection() {
let location_string = response
.headers()
.get(LOCATION)
.expect("url redirection should provide 'location' header")
.to_str()
.unwrap();

debug!("Redirecting to {:?}...", &location_string);
let new_url = resolve_url_from_location(&url, location_string);
return Ok(FetchOnceResult::Redirect(new_url));
}

if response.status().is_client_error()
|| response.status().is_server_error()
{
let err = DenoError::new(
deno_error::ErrorKind::Other,
format!("Import '{}' failed: {}", &url, response.status()),
);
return Err(err.into());
}

let content_type = response
.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.to_str().unwrap().to_owned());

let body = response.text().await?;
return Ok(FetchOnceResult::Code(body, content_type));
};

fut.boxed()
}

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion cli/js/tls_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ testPerm({ read: true, net: true }, async function dialAndListenTLS(): Promise<
assert(conn.remoteAddr != null);
assert(conn.localAddr != null);
await conn.write(response);
conn.close();
// TODO(bartlomieju): this might be a bug
setTimeout(() => {
conn.close();
}, 0);
}
);

Expand Down
46 changes: 23 additions & 23 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::StreamExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
Expand Down Expand Up @@ -56,32 +56,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = futures::compat::Compat01As03::new(request.send())
.map_err(ErrBox::from)
.and_then(move |res| {
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
);
let future = async move {
let res = request.send().await?;
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}

let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});
let body = HttpBody::from(res.bytes_stream().boxed());
let mut table = state_.lock_resource_table();
let rid = table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
);

futures::future::ok(json_res)
let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});

Ok(json_res)
};

Ok(JsonOp::Async(future.boxed()))
}
Loading