Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ futures-timer = "0.4.0"
hyper = "0.12.35"
hyper-tls = "0.3.2"
log = "0.4.8"
threadpool = "1.7"
num_cpus = "1.10"
offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" }
codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] }
parking_lot = "0.9.0"
Expand Down
29 changes: 16 additions & 13 deletions core/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use std::{
sync::Arc,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sync::Arc,
sync::{Arc, Mutex},

};

use parking_lot::Mutex;
use threadpool::ThreadPool;
use client::runtime_api::ApiExt;
use futures::future::Future;
use log::{debug, warn};
Expand All @@ -58,6 +60,7 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
client: Arc<Client>,
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Mutex isn't needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool isn't Send/Sync

Copy link
Contributor

@tomaka tomaka Oct 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you said surprised me a lot, so I checked.

https://github.com/rust-threadpool/rust-threadpool/blob/21a70c7e8b19fb73a9f3f44a55ca1b7333d804f9/src/lib.rs#L1212-L1228

I don't see a test for Sync for ThreadPool, but I would be surprised if it wasn't Sync as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust-threadpool/rust-threadpool#96

There is an issue for this.

}

impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
Expand All @@ -67,6 +70,7 @@ impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Blo
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
}
}
}
Expand Down Expand Up @@ -116,7 +120,7 @@ impl<Client, Storage, Block> OffchainWorkers<
debug!("Spawning offchain workers at {:?}", at);
let number = *number;
let client = self.client.clone();
spawn_worker(move || {
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
Expand All @@ -134,19 +138,18 @@ impl<Client, Storage, Block> OffchainWorkers<
futures::future::Either::Right(futures::future::ready(()))
}
}
}

/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
/// TODO [ToDr] (#1458) we can consider using a thread pool instead.
fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) {
std::thread::spawn(f);
/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
self.thread_pool.lock().execute(f);
}
}

#[cfg(test)]
Expand Down