diff --git a/Cargo.toml b/Cargo.toml index 6c12c3205..40a5055f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "parity-path", "plain_hasher", "rlp", + "runtime", "transaction-pool", "trace-time", "triehash", diff --git a/runtime/CHANGELOG.MD b/runtime/CHANGELOG.MD new file mode 100644 index 000000000..59d2c7ffe --- /dev/null +++ b/runtime/CHANGELOG.MD @@ -0,0 +1,11 @@ +# Changelog + +The format is based on [Keep a Changelog]. + +[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/ + +## [Unreleased] + +## [0.1.1] - 2019-11-25 +### Changed +- Moved to parity common repo, prepared for publishing (https://github.com/paritytech/parity-common/pull/271) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml new file mode 100644 index 000000000..ca53759a0 --- /dev/null +++ b/runtime/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "parity-runtime" +version = "0.1.1" +authors = ["Parity Technologies "] +edition = "2018" + +description = "Tokio runtime wrapper" +license = "GPL-3.0" +readme = "README.md" +homepage = "https://www.parity.io/" +keywords = ["parity", "runtime", "tokio"] +include = ["Cargo.toml", "src/**/*.rs", "README.md", "CHANGELOG.md"] + +[dependencies] +futures = "0.1" +tokio = "0.1.22" + +[features] +test-helpers = [] \ No newline at end of file diff --git a/runtime/README.MD b/runtime/README.MD new file mode 100644 index 000000000..7cda2a31f --- /dev/null +++ b/runtime/README.MD @@ -0,0 +1,6 @@ +# parity-runtime + +Wrapper over tokio runtime. Provides: +- Customizable runtime with ability to spawn it in different thread models +- Corresponding runtime executor for tasks +- Runtime handle diff --git a/runtime/examples/simple.rs b/runtime/examples/simple.rs new file mode 100644 index 000000000..6448bcbf6 --- /dev/null +++ b/runtime/examples/simple.rs @@ -0,0 +1,41 @@ +// Copyright 2015-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +//! Simple example, illustating usage of runtime wrapper. + +use futures::{Future, Stream}; +use parity_runtime::Runtime; +use std::thread::park_timeout; +use std::time::Duration; +use tokio::fs::read_dir; + +/// Read current directory in a future, which is executed in the created runtime +fn main() { + let fut = read_dir(".") + .flatten_stream() + .for_each(|dir| { + println!("{:?}", dir.path()); + Ok(()) + }) + .map_err(|err| { + eprintln!("Error: {:?}", err); + () + }); + let runtime = Runtime::with_default_thread_count(); + runtime.executor().spawn(fut); + let timeout = Duration::from_secs(3); + park_timeout(timeout); +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs new file mode 100644 index 000000000..acb1e4b6e --- /dev/null +++ b/runtime/src/lib.rs @@ -0,0 +1,198 @@ +// Copyright 2015-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +//! Tokio Runtime wrapper. + +use futures::{future, Future, IntoFuture}; +use std::sync::mpsc; +use std::{fmt, thread}; +pub use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime, TaskExecutor}; +pub use tokio::timer::Delay; + +/// Runtime for futures. +/// +/// Runs in a separate thread. +pub struct Runtime { + executor: Executor, + handle: RuntimeHandle, +} + +impl Runtime { + fn new(runtime_bldr: &mut TokioRuntimeBuilder) -> Self { + let mut runtime = runtime_bldr.build().expect( + "Building a Tokio runtime will only fail when mio components \ + cannot be initialized (catastrophic)", + ); + let (stop, stopped) = futures::oneshot(); + let (tx, rx) = mpsc::channel(); + let handle = thread::spawn(move || { + tx.send(runtime.executor()).expect("Rx is blocking upper thread."); + runtime + .block_on(futures::empty().select(stopped).map(|_| ()).map_err(|_| ())) + .expect("Tokio runtime should not have unhandled errors."); + }); + let executor = rx.recv().expect("tx is transfered to a newly spawned thread."); + + Runtime { + executor: Executor { inner: Mode::Tokio(executor) }, + handle: RuntimeHandle { close: Some(stop), handle: Some(handle) }, + } + } + + /// Spawns a new tokio runtime with a default thread count on a background + /// thread and returns a `Runtime` which can be used to spawn tasks via + /// its executor. + pub fn with_default_thread_count() -> Self { + let mut runtime_bldr = TokioRuntimeBuilder::new(); + Self::new(&mut runtime_bldr) + } + + /// Spawns a new tokio runtime with a the specified thread count on a + /// background thread and returns a `Runtime` which can be used to spawn + /// tasks via its executor. + #[cfg(any(test, feature = "test-helpers"))] + pub fn with_thread_count(thread_count: usize) -> Self { + let mut runtime_bldr = TokioRuntimeBuilder::new(); + runtime_bldr.core_threads(thread_count); + + Self::new(&mut runtime_bldr) + } + + /// Returns this runtime raw executor. + #[cfg(any(test, feature = "test-helpers"))] + pub fn raw_executor(&self) -> TaskExecutor { + if let Mode::Tokio(ref executor) = self.executor.inner { + executor.clone() + } else { + panic!("Runtime is not initialized in Tokio mode.") + } + } + + /// Returns runtime executor. + pub fn executor(&self) -> Executor { + self.executor.clone() + } +} + +#[derive(Clone)] +enum Mode { + Tokio(TaskExecutor), + // Mode used in tests + #[allow(dead_code)] + Sync, + // Mode used in tests + #[allow(dead_code)] + ThreadPerFuture, +} + +impl fmt::Debug for Mode { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use self::Mode::*; + + match *self { + Tokio(_) => write!(fmt, "tokio"), + Sync => write!(fmt, "synchronous"), + ThreadPerFuture => write!(fmt, "thread per future"), + } + } +} + +#[derive(Debug, Clone)] +pub struct Executor { + inner: Mode, +} + +impl Executor { + /// Synchronous executor, used for tests. + #[cfg(any(test, feature = "test-helpers"))] + pub fn new_sync() -> Self { + Executor { inner: Mode::Sync } + } + + /// Spawns a new thread for each future (use only for tests). + #[cfg(any(test, feature = "test-helpers"))] + pub fn new_thread_per_future() -> Self { + Executor { inner: Mode::ThreadPerFuture } + } + + /// Spawn a future on this runtime + pub fn spawn(&self, r: R) + where + R: IntoFuture + Send + 'static, + R::Future: Send + 'static, + { + match self.inner { + Mode::Tokio(ref executor) => executor.spawn(r.into_future()), + Mode::Sync => { + let _ = r.into_future().wait(); + } + Mode::ThreadPerFuture => { + thread::spawn(move || { + let _ = r.into_future().wait(); + }); + } + } + } +} + +impl + Send + 'static> future::Executor for Executor { + fn execute(&self, future: F) -> Result<(), future::ExecuteError> { + match self.inner { + Mode::Tokio(ref executor) => executor.execute(future), + Mode::Sync => { + let _ = future.wait(); + Ok(()) + } + Mode::ThreadPerFuture => { + thread::spawn(move || { + let _ = future.wait(); + }); + Ok(()) + } + } + } +} + +/// A handle to a runtime. Dropping the handle will cause runtime to shutdown. +pub struct RuntimeHandle { + close: Option>, + handle: Option>, +} + +impl From for RuntimeHandle { + fn from(el: Runtime) -> Self { + el.handle + } +} + +impl Drop for RuntimeHandle { + fn drop(&mut self) { + self.close.take().map(|v| v.send(())); + } +} + +impl RuntimeHandle { + /// Blocks current thread and waits until the runtime is finished. + pub fn wait(mut self) -> thread::Result<()> { + self.handle.take().expect("Handle is taken only in `wait`, `wait` is consuming; qed").join() + } + + /// Finishes this runtime. + pub fn close(mut self) { + let _ = + self.close.take().expect("Close is taken only in `close` and `drop`. `close` is consuming; qed").send(()); + } +}