Skip to content

Commit 9203906

Browse files
committed
feat(lib): change custom executors to be Fn
Instead of custom executors needing to be `tokio::Executor` or `tokio::TypedExecutor`, they are now just `Fn`s. BREAKING CHANGE: Configuring an `executor` for a client or server should now pass a closure or function to spawn the future. Closes #1944
1 parent 71d088d commit 9203906

File tree

9 files changed

+33
-117
lines changed

9 files changed

+33
-117
lines changed

src/client/conn.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,7 @@ impl Builder {
458458
/// Provide an executor to execute background HTTP2 tasks.
459459
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
460460
where
461-
for<'a> &'a E: tokio_executor::Executor,
462-
E: Send + Sync + 'static,
461+
E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static,
463462
{
464463
self.exec = Exec::Executor(Arc::new(exec));
465464
self

src/client/mod.rs

+5-19
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,7 @@ where C: Connect + Clone + Send + Sync + 'static,
359359
drop(delayed_tx);
360360
});
361361

362-
if let Err(err) = executor.execute(on_idle) {
363-
// This task isn't critical, so just log and ignore.
364-
warn!("error spawning task to insert idle connection: {}", err);
365-
}
362+
executor.execute(on_idle);
366363
} else {
367364
// There's no body to delay, but the connection isn't
368365
// ready yet. Only re-insert when it's ready
@@ -371,10 +368,7 @@ where C: Connect + Clone + Send + Sync + 'static,
371368
})
372369
.map(|_| ());
373370

374-
if let Err(err) = executor.execute(on_idle) {
375-
// This task isn't critical, so just log and ignore.
376-
warn!("error spawning task to insert idle connection: {}", err);
377-
}
371+
executor.execute(on_idle);
378372
}
379373
res
380374
})))
@@ -513,20 +507,13 @@ where C: Connect + Clone + Send + Sync + 'static,
513507
.handshake(io)
514508
.and_then(move |(tx, conn)| {
515509
trace!("handshake complete, spawning background dispatcher task");
516-
let bg = executor.execute(conn.map_err(|e| {
510+
executor.execute(conn.map_err(|e| {
517511
debug!("client connection error: {}", e)
518512
}).map(|_| ()));
519513

520-
// This task is critical, so an execute error
521-
// should be returned.
522-
if let Err(err) = bg {
523-
warn!("error spawning critical client task: {}", err);
524-
return Either::Left(future::err(err));
525-
}
526-
527514
// Wait for 'conn' to ready up before we
528515
// declare this tx as usable
529-
Either::Right(tx.when_ready())
516+
tx.when_ready()
530517
})
531518
.map_ok(move |tx| {
532519
pool.pooled(connecting, PoolClient {
@@ -1013,8 +1000,7 @@ impl Builder {
10131000
/// Provide an executor to execute background `Connection` tasks.
10141001
pub fn executor<E>(&mut self, exec: E) -> &mut Self
10151002
where
1016-
for<'a> &'a E: tokio_executor::Executor,
1017-
E: Send + Sync + 'static,
1003+
E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static,
10181004
{
10191005
self.conn_builder.executor(exec);
10201006
self

src/client/pool.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -416,16 +416,11 @@ impl<T: Poolable> PoolInner<T> {
416416

417417
let start = Instant::now() + dur;
418418

419-
let interval = IdleTask {
419+
self.exec.execute(IdleTask {
420420
interval: Interval::new(start, dur),
421421
pool: WeakOpt::downgrade(pool_ref),
422422
pool_drop_notifier: rx,
423-
};
424-
425-
if let Err(err) = self.exec.execute(interval) {
426-
// This task isn't critical, so simply log and ignore.
427-
warn!("error spawning connection pool idle interval: {}", err);
428-
}
423+
});
429424
}
430425
}
431426

src/client/service.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ where
6363
if let Err(e) = conn.await {
6464
debug!("connection error: {:?}", e);
6565
}
66-
})?;
66+
});
6767
Ok(sr)
6868
},
6969
Err(e) => Err(e)

src/common/exec.rs

+19-74
Original file line numberDiff line numberDiff line change
@@ -3,83 +3,41 @@ use std::future::Future;
33
use std::pin::Pin;
44
use std::sync::Arc;
55

6-
use tokio_executor::{SpawnError, TypedExecutor};
7-
86
use crate::body::{Payload, Body};
97
use crate::proto::h2::server::H2Stream;
108
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
119
use crate::service::HttpService;
1210

1311
pub trait H2Exec<F, B: Payload>: Clone {
14-
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()>;
12+
fn execute_h2stream(&self, fut: H2Stream<F, B>);
1513
}
1614

1715
pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
18-
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()>;
19-
}
20-
21-
type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;
22-
23-
pub trait SharedExecutor {
24-
fn shared_spawn(&self, future: BoxFuture) -> Result<(), SpawnError>;
16+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>);
2517
}
2618

27-
impl<E> SharedExecutor for E
28-
where
29-
for<'a> &'a E: tokio_executor::Executor,
30-
{
31-
fn shared_spawn(mut self: &Self, future: BoxFuture) -> Result<(), SpawnError> {
32-
tokio_executor::Executor::spawn(&mut self, future)
33-
}
34-
}
19+
pub type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;
3520

3621
// Either the user provides an executor for background tasks, or we use
3722
// `tokio::spawn`.
3823
#[derive(Clone)]
3924
pub enum Exec {
4025
Default,
41-
Executor(Arc<dyn SharedExecutor + Send + Sync>),
26+
Executor(Arc<dyn Fn(BoxFuture) + Send + Sync>),
4227
}
4328

4429
// ===== impl Exec =====
4530

4631
impl Exec {
47-
pub(crate) fn execute<F>(&self, fut: F) -> crate::Result<()>
32+
pub(crate) fn execute<F>(&self, fut: F)
4833
where
4934
F: Future<Output=()> + Send + 'static,
5035
{
5136
match *self {
5237
Exec::Default => {
5338
#[cfg(feature = "tcp")]
5439
{
55-
use std::error::Error as StdError;
56-
57-
struct TokioSpawnError;
58-
59-
impl fmt::Debug for TokioSpawnError {
60-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61-
fmt::Debug::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f)
62-
}
63-
}
64-
65-
impl fmt::Display for TokioSpawnError {
66-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67-
fmt::Display::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f)
68-
}
69-
}
70-
71-
impl StdError for TokioSpawnError {
72-
fn description(&self) -> &str {
73-
"tokio::spawn failed"
74-
}
75-
}
76-
77-
::tokio_executor::DefaultExecutor::current()
78-
.spawn(Box::pin(fut))
79-
.map_err(|err| {
80-
warn!("executor error: {:?}", err);
81-
crate::Error::new_execute(TokioSpawnError)
82-
})
40+
tokio::spawn(fut);
8341
}
8442
#[cfg(not(feature = "tcp"))]
8543
{
@@ -88,20 +46,18 @@ impl Exec {
8846
}
8947
},
9048
Exec::Executor(ref e) => {
91-
e.shared_spawn(Box::pin(fut))
92-
.map_err(|err| {
93-
warn!("executor error: {:?}", err);
94-
crate::Error::new_execute("custom executor failed")
95-
})
49+
e(Box::pin(fut));
9650
},
9751
}
9852
}
9953
}
10054

10155
impl fmt::Debug for Exec {
10256
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103-
f.debug_struct("Exec")
104-
.finish()
57+
match self {
58+
Exec::Default => f.write_str("Exec::Default"),
59+
Exec::Executor(..) => f.write_str("Exec::Custom"),
60+
}
10561
}
10662
}
10763

@@ -111,7 +67,7 @@ where
11167
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
11268
B: Payload,
11369
{
114-
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()> {
70+
fn execute_h2stream(&self, fut: H2Stream<F, B>) {
11571
self.execute(fut)
11672
}
11773
}
@@ -122,7 +78,7 @@ where
12278
S: HttpService<Body>,
12379
W: Watcher<I, S, E>,
12480
{
125-
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
81+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) {
12682
self.execute(fut)
12783
}
12884
}
@@ -131,34 +87,23 @@ where
13187

13288
impl<E, F, B> H2Exec<F, B> for E
13389
where
134-
E: TypedExecutor<H2Stream<F, B>> + Clone,
90+
E: Fn(H2Stream<F, B>) + Clone,
13591
H2Stream<F, B>: Future<Output=()>,
13692
B: Payload,
13793
{
138-
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()> {
139-
self.spawn(fut)
140-
.map_err(|err| {
141-
warn!("executor error: {:?}", err);
142-
crate::Error::new_execute("custom executor failed")
143-
})
94+
fn execute_h2stream(&self, fut: H2Stream<F, B>) {
95+
self(fut);
14496
}
14597
}
14698

14799
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
148100
where
149-
E: TypedExecutor<NewSvcTask<I, N, S, E, W>> + Clone,
101+
E: Fn(NewSvcTask<I, N, S, E, W>) + Clone,
150102
NewSvcTask<I, N, S, E, W>: Future<Output=()>,
151103
S: HttpService<Body>,
152104
W: Watcher<I, S, E>,
153105
{
154-
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
155-
self.spawn(fut)
156-
.map_err(|err| {
157-
warn!("executor error: {:?}", err);
158-
crate::Error::new_execute("custom executor failed")
159-
})
106+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) {
107+
self(fut);
160108
}
161109
}
162-
163-
// ===== StdError impls =====
164-

src/error.rs

-9
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,8 @@ pub(crate) enum User {
8686

8787
/// User tried polling for an upgrade that doesn't exist.
8888
NoUpgrade,
89-
9089
/// User polled for an upgrade, but low-level API is not using upgrades.
9190
ManualUpgrade,
92-
93-
/// Error trying to call `Executor::execute`.
94-
Execute,
9591
}
9692

9793
impl Error {
@@ -277,10 +273,6 @@ impl Error {
277273
Error::new(Kind::Shutdown).with(cause)
278274
}
279275

280-
pub(crate) fn new_execute<E: Into<Cause>>(cause: E) -> Error {
281-
Error::new_user(User::Execute).with(cause)
282-
}
283-
284276
pub(crate) fn new_h2(cause: ::h2::Error) -> Error {
285277
if cause.is_io() {
286278
Error::new_io(cause.into_io().expect("h2::Error::is_io"))
@@ -346,7 +338,6 @@ impl StdError for Error {
346338
Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs",
347339
Kind::User(User::NoUpgrade) => "no upgrade available",
348340
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
349-
Kind::User(User::Execute) => "executor failed to spawn task",
350341
}
351342
}
352343

src/proto/h2/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where
7171
}
7272
};
7373

74-
exec.execute(conn_task)?;
74+
exec.execute(conn_task);
7575

7676
Ok(ClientTask {
7777
conn_drop_ref,
@@ -155,7 +155,7 @@ where
155155
drop(conn_drop_ref);
156156
x
157157
});
158-
self.executor.execute(pipe)?;
158+
self.executor.execute(pipe);
159159
}
160160
}
161161
}
@@ -175,7 +175,7 @@ where
175175
}
176176
}
177177
});
178-
self.executor.execute(cb.send_when(fut))?;
178+
self.executor.execute(cb.send_when(fut));
179179
continue;
180180
},
181181

src/proto/h2/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ where
175175
crate::Body::h2(stream, content_length)
176176
});
177177
let fut = H2Stream::new(service.call(req), respond);
178-
exec.execute_h2stream(fut)?;
178+
exec.execute_h2stream(fut);
179179
},
180180
Some(Err(e)) => {
181181
return Poll::Ready(Err(crate::Error::new_h2(e)));

src/server/conn.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ where
843843
loop {
844844
if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
845845
let fut = NewSvcTask::new(connecting, watcher.clone());
846-
me.serve.as_mut().project().protocol.exec.execute_new_svc(fut)?;
846+
me.serve.as_mut().project().protocol.exec.execute_new_svc(fut);
847847
} else {
848848
return Poll::Ready(Ok(()));
849849
}

0 commit comments

Comments
 (0)