Skip to content

Commit 1b82c58

Browse files
committed
first pass at Tokio 0.2
1 parent a537c03 commit 1b82c58

11 files changed

+247
-382
lines changed

Diff for: Cargo.lock

+60-48
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: cli/Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,9 @@ source-map-mappings = "0.5.0"
5252
sys-info = "0.5.8"
5353
tempfile = "3.1.0"
5454
termcolor = "1.0.5"
55-
tokio = "0.1.22"
55+
tokio = { version = "0.2.1", features = ["full"] }
5656
tokio-executor = "0.1.8"
57-
tokio-process = "0.2.4"
58-
tokio-rustls = "0.10.2"
57+
tokio-rustls = "0.12.0"
5958
url = "1.7.2"
6059
utime = "0.2.1"
6160
webpki = "0.21.0"

Diff for: cli/global_timer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures::channel::oneshot;
1313
use futures::future::FutureExt;
1414
use std::future::Future;
1515
use std::time::Instant;
16-
use tokio::timer::Delay;
16+
use tokio;
1717

1818
#[derive(Default)]
1919
pub struct GlobalTimer {
@@ -43,8 +43,8 @@ impl GlobalTimer {
4343
let (tx, rx) = oneshot::channel();
4444
self.tx = Some(tx);
4545

46-
let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
47-
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
46+
let tokio_deadline = tokio::time::Instant::from_std(deadline);
47+
let delay = tokio::time::delay_until(tokio_deadline);
4848
let rx = rx
4949
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
5050

Diff for: cli/ops/files.rs

+23-51
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@ use futures::future::FutureExt;
1212
use futures::future::TryFutureExt;
1313
use std;
1414
use std::convert::From;
15-
use std::future::Future;
1615
use std::io::SeekFrom;
17-
use std::pin::Pin;
18-
use std::task::Context;
19-
use std::task::Poll;
2016
use tokio;
2117

2218
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@@ -92,21 +88,19 @@ fn op_open(
9288
}
9389

9490
let is_sync = args.promise_id.is_none();
95-
let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
96-
open_options.open(filename),
97-
ErrBox::from,
98-
))
99-
.and_then(move |fs_file| {
91+
92+
let fut = async move {
93+
let fs_file = open_options.open(filename).map_err(ErrBox::from).await?;
10094
let mut table = state_.lock_resource_table();
10195
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
102-
futures::future::ok(json!(rid))
103-
});
96+
Ok(json!(rid))
97+
};
10498

10599
if is_sync {
106-
let buf = futures::executor::block_on(op)?;
100+
let buf = futures::executor::block_on(fut)?;
107101
Ok(JsonOp::Sync(buf))
108102
} else {
109-
Ok(JsonOp::Async(op.boxed()))
103+
Ok(JsonOp::Async(fut.boxed()))
110104
}
111105
}
112106

@@ -127,37 +121,6 @@ fn op_close(
127121
Ok(JsonOp::Sync(json!({})))
128122
}
129123

130-
pub struct SeekFuture {
131-
seek_from: SeekFrom,
132-
rid: ResourceId,
133-
state: ThreadSafeState,
134-
}
135-
136-
impl Future for SeekFuture {
137-
type Output = Result<u64, ErrBox>;
138-
139-
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
140-
let inner = self.get_mut();
141-
let mut table = inner.state.lock_resource_table();
142-
let resource = table
143-
.get_mut::<StreamResource>(inner.rid)
144-
.ok_or_else(bad_resource)?;
145-
146-
let tokio_file = match resource {
147-
StreamResource::FsFile(ref mut file) => file,
148-
_ => return Poll::Ready(Err(bad_resource())),
149-
};
150-
151-
use tokio::prelude::Async::*;
152-
153-
match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
154-
Ok(Ready(v)) => Poll::Ready(Ok(v)),
155-
Err(err) => Poll::Ready(Err(err)),
156-
Ok(NotReady) => Poll::Pending,
157-
}
158-
}
159-
}
160-
161124
#[derive(Deserialize)]
162125
#[serde(rename_all = "camelCase")]
163126
struct SeekArgs {
@@ -189,17 +152,26 @@ fn op_seek(
189152
}
190153
};
191154

192-
let fut = SeekFuture {
193-
state: state.clone(),
194-
seek_from,
195-
rid,
155+
let mut table = state.lock_resource_table();
156+
let resource = table
157+
.get_mut::<StreamResource>(rid)
158+
.ok_or_else(bad_resource)?;
159+
160+
let tokio_file = match resource {
161+
StreamResource::FsFile(ref mut file) => file,
162+
_ => return Err(bad_resource()),
163+
};
164+
let mut file = futures::executor::block_on(tokio_file.try_clone())?;
165+
166+
let fut = async move {
167+
file.seek(seek_from).await?;
168+
Ok(json!({}))
196169
};
197170

198-
let op = fut.and_then(move |_| futures::future::ok(json!({})));
199171
if args.promise_id.is_none() {
200-
let buf = futures::executor::block_on(op)?;
172+
let buf = futures::executor::block_on(fut)?;
201173
Ok(JsonOp::Sync(buf))
202174
} else {
203-
Ok(JsonOp::Async(op.boxed()))
175+
Ok(JsonOp::Async(fut.boxed()))
204176
}
205177
}

Diff for: cli/ops/io.rs

+20-38
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,15 @@ use deno::ErrBox;
88
use deno::Resource;
99
use deno::*;
1010
use futures;
11-
use futures::compat::AsyncRead01CompatExt;
12-
use futures::compat::AsyncWrite01CompatExt;
1311
use futures::future::FutureExt;
14-
use futures::io::{AsyncRead, AsyncWrite};
1512
use std;
1613
use std::future::Future;
1714
use std::pin::Pin;
1815
use std::task::Context;
1916
use std::task::Poll;
2017
use tokio;
18+
use tokio::io::{AsyncRead, AsyncWrite};
2119
use tokio::net::TcpStream;
22-
use tokio_process;
2320
use tokio_rustls::client::TlsStream as ClientTlsStream;
2421
use tokio_rustls::server::TlsStream as ServerTlsStream;
2522

@@ -86,9 +83,9 @@ pub enum StreamResource {
8683
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
8784
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
8885
HttpBody(Box<HttpBody>),
89-
ChildStdin(tokio_process::ChildStdin),
90-
ChildStdout(tokio_process::ChildStdout),
91-
ChildStderr(tokio_process::ChildStderr),
86+
ChildStdin(tokio::process::ChildStdin),
87+
ChildStdout(tokio::process::ChildStdout),
88+
ChildStderr(tokio::process::ChildStderr),
9289
}
9390

9491
impl Resource for StreamResource {}
@@ -111,22 +108,15 @@ impl DenoAsyncRead for StreamResource {
111108
) -> Poll<Result<usize, ErrBox>> {
112109
let inner = self.get_mut();
113110
let mut f: Box<dyn AsyncRead + Unpin> = match inner {
114-
StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
115-
StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
116-
StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
117-
StreamResource::ClientTlsStream(f) => {
118-
Box::new(AsyncRead01CompatExt::compat(f))
119-
}
120-
StreamResource::ServerTlsStream(f) => {
121-
Box::new(AsyncRead01CompatExt::compat(f))
122-
}
123-
StreamResource::HttpBody(f) => Box::new(f),
124-
StreamResource::ChildStdout(f) => {
125-
Box::new(AsyncRead01CompatExt::compat(f))
126-
}
127-
StreamResource::ChildStderr(f) => {
128-
Box::new(AsyncRead01CompatExt::compat(f))
129-
}
111+
StreamResource::FsFile(f) => Box::new(f),
112+
StreamResource::Stdin(f) => Box::new(f),
113+
StreamResource::TcpStream(f) => Box::new(f),
114+
StreamResource::ClientTlsStream(f) => Box::new(f),
115+
StreamResource::ServerTlsStream(f) => Box::new(f),
116+
StreamResource::ChildStdout(f) => Box::new(f),
117+
StreamResource::ChildStderr(f) => Box::new(f),
118+
// TODO: temporarily disabled, pending release of Reqwest
119+
// StreamResource::HttpBody(f) => Box::new(f),
130120
_ => {
131121
return Poll::Ready(Err(bad_resource()));
132122
}
@@ -247,21 +237,13 @@ impl DenoAsyncWrite for StreamResource {
247237
) -> Poll<Result<usize, ErrBox>> {
248238
let inner = self.get_mut();
249239
let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
250-
StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
251-
StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
252-
StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
253-
StreamResource::TcpStream(f) => {
254-
Box::new(AsyncWrite01CompatExt::compat(f))
255-
}
256-
StreamResource::ClientTlsStream(f) => {
257-
Box::new(AsyncWrite01CompatExt::compat(f))
258-
}
259-
StreamResource::ServerTlsStream(f) => {
260-
Box::new(AsyncWrite01CompatExt::compat(f))
261-
}
262-
StreamResource::ChildStdin(f) => {
263-
Box::new(AsyncWrite01CompatExt::compat(f))
264-
}
240+
StreamResource::FsFile(f) => Box::new(f),
241+
StreamResource::Stdout(f) => Box::new(f),
242+
StreamResource::Stderr(f) => Box::new(f),
243+
StreamResource::TcpStream(f) => Box::new(f),
244+
StreamResource::ClientTlsStream(f) => Box::new(f),
245+
StreamResource::ServerTlsStream(f) => Box::new(f),
246+
StreamResource::ChildStdin(f) => Box::new(f),
265247
_ => {
266248
return Poll::Ready(Err(bad_resource()));
267249
}

Diff for: cli/ops/net.rs

+24-45
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ use deno::Resource;
99
use deno::*;
1010
use futures::future::FutureExt;
1111
use futures::future::TryFutureExt;
12-
use futures::stream::StreamExt;
13-
use futures::stream::TryStreamExt;
1412
use std;
1513
use std::convert::From;
1614
use std::future::Future;
@@ -20,7 +18,6 @@ use std::pin::Pin;
2018
use std::task::Context;
2119
use std::task::Poll;
2220
use tokio;
23-
use tokio::net::tcp::Incoming;
2421
use tokio::net::TcpListener;
2522
use tokio::net::TcpStream;
2623

@@ -74,50 +71,44 @@ impl Future for Accept {
7471
ErrBox::from(e)
7572
})?;
7673

77-
let mut listener =
78-
futures::compat::Compat01As03::new(&mut listener_resource.listener)
79-
.map_err(ErrBox::from);
74+
let listener = &mut listener_resource.listener;
8075

8176
if inner.accept_state == AcceptState::Eager {
8277
// Similar to try_ready!, but also track/untrack accept task
8378
// in TcpListener resource.
8479
// In this way, when the listener is closed, the task can be
8580
// notified to error out (instead of stuck forever).
86-
match listener.poll_next_unpin(cx) {
87-
Poll::Ready(Some(Ok(stream))) => {
81+
match listener.poll_accept(cx).map_err(ErrBox::from) {
82+
Poll::Ready(Ok((stream, addr))) => {
8883
inner.accept_state = AcceptState::Done;
89-
let addr = stream.peer_addr().unwrap();
9084
return Poll::Ready(Ok((stream, addr)));
9185
}
9286
Poll::Pending => {
9387
inner.accept_state = AcceptState::Pending;
9488
return Poll::Pending;
9589
}
96-
Poll::Ready(Some(Err(e))) => {
90+
Poll::Ready(Err(e)) => {
9791
inner.accept_state = AcceptState::Done;
9892
return Poll::Ready(Err(e));
9993
}
100-
_ => unreachable!(),
10194
}
10295
}
10396

104-
match listener.poll_next_unpin(cx) {
105-
Poll::Ready(Some(Ok(stream))) => {
97+
match listener.poll_accept(cx).map_err(ErrBox::from) {
98+
Poll::Ready(Ok((stream, addr))) => {
10699
listener_resource.untrack_task();
107100
inner.accept_state = AcceptState::Done;
108-
let addr = stream.peer_addr().unwrap();
109101
Poll::Ready(Ok((stream, addr)))
110102
}
111103
Poll::Pending => {
112104
listener_resource.track_task(cx)?;
113105
Poll::Pending
114106
}
115-
Poll::Ready(Some(Err(e))) => {
107+
Poll::Ready(Err(e)) => {
116108
listener_resource.untrack_task();
117109
inner.accept_state = AcceptState::Done;
118110
Poll::Ready(Err(e))
119111
}
120-
_ => unreachable!(),
121112
}
122113
}
123114
}
@@ -184,32 +175,20 @@ fn op_dial(
184175
let state_ = state.clone();
185176
state.check_net(&args.hostname, args.port)?;
186177

187-
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
188-
futures::compat::Compat01As03::new(TcpStream::connect(&addr))
189-
.map_err(ErrBox::from)
190-
.and_then(move |tcp_stream| {
191-
let local_addr = match tcp_stream.local_addr() {
192-
Ok(v) => v,
193-
Err(e) => return futures::future::err(ErrBox::from(e)),
194-
};
195-
let remote_addr = match tcp_stream.peer_addr() {
196-
Ok(v) => v,
197-
Err(e) => return futures::future::err(ErrBox::from(e)),
198-
};
199-
let mut table = state_.lock_resource_table();
200-
let rid = table
201-
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
202-
futures::future::ok((rid, local_addr, remote_addr))
203-
})
204-
.map_err(ErrBox::from)
205-
.and_then(move |(rid, local_addr, remote_addr)| {
206-
futures::future::ok(json!({
207-
"rid": rid,
208-
"localAddr": local_addr.to_string(),
209-
"remoteAddr": remote_addr.to_string(),
210-
}))
211-
})
212-
});
178+
let op = async move {
179+
let addr = resolve_addr(&args.hostname, args.port).await?;
180+
let tcp_stream = TcpStream::connect(&addr).await?;
181+
let local_addr = tcp_stream.local_addr()?;
182+
let remote_addr = tcp_stream.peer_addr()?;
183+
let mut table = state_.lock_resource_table();
184+
let rid =
185+
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
186+
Ok(json!({
187+
"rid": rid,
188+
"localAddr": local_addr.to_string(),
189+
"remoteAddr": remote_addr.to_string(),
190+
}))
191+
};
213192

214193
Ok(JsonOp::Async(op.boxed()))
215194
}
@@ -259,7 +238,7 @@ struct ListenArgs {
259238

260239
#[allow(dead_code)]
261240
struct TcpListenerResource {
262-
listener: Incoming,
241+
listener: TcpListener,
263242
waker: Option<futures::task::AtomicWaker>,
264243
local_addr: SocketAddr,
265244
}
@@ -324,11 +303,11 @@ fn op_listen(
324303

325304
let addr =
326305
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
327-
let listener = TcpListener::bind(&addr)?;
306+
let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
328307
let local_addr = listener.local_addr()?;
329308
let local_addr_str = local_addr.to_string();
330309
let listener_resource = TcpListenerResource {
331-
listener: listener.incoming(),
310+
listener,
332311
waker: None,
333312
local_addr,
334313
};

0 commit comments

Comments
 (0)