Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/libs/threading/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ The `for_each` function uses a `Pool` object internally, which you can also use
let set = std::sync::RwLock::<std::collections::HashMap<u32, usize>>::default();
let pool = windows_threading::Pool::new();
pool.set_thread_limits(2, 10);
pool.scope(|pool| {
for _ in 0..10 {
pool.submit(|| {
windows_threading::sleep(10);
let mut writer = set.write().unwrap();
*writer.entry(windows_threading::thread_id()).or_default() += 1;
})
}
});

for _ in 0..10 {
pool.submit(|| {
windows_threading::sleep(10);
let mut writer = set.write().unwrap();
*writer.entry(windows_threading::thread_id()).or_default() += 1;
})
}

pool.join();
println!("{:#?}", set.read().unwrap());
```

Expand Down
10 changes: 5 additions & 5 deletions crates/libs/threading/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ where
F: Fn(T) + Sync,
T: Send,
{
let pool = Pool::new();

for item in i {
pool.submit(|| f(item));
}
Pool::with_scope(|pool| {
for item in i {
pool.submit(|| f(item));
}
});
}

/// The thread identifier of the calling thread.
Expand Down
71 changes: 67 additions & 4 deletions crates/libs/threading/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use core::{marker::PhantomData, ops::Deref};

/// A `Pool` object represents a private thread pool with its own thread limits.
///
Expand All @@ -25,6 +26,15 @@ impl Pool {
Self(Box::new(e))
}

/// Convenience function for creating a new pool and calling [`scope`][Self::scope].
pub fn with_scope<'env, F>(f: F)
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
{
let pool = Pool::new();
pool.scope(f);
}

/// Sets the thread limits for the `Pool` object.
pub fn set_thread_limits(&self, min: u32, max: u32) {
unsafe {
Expand All @@ -33,16 +43,42 @@ impl Pool {
}
}

/// Submits the closure to run on the `Pool`.
/// Submit the closure to the thread pool.
///
/// The closure cannot outlive the `Pool` on which it runs.
pub fn submit<'a, F: FnOnce() + Send + 'a>(&'a self, f: F) {
// This is safe because the lifetime of the closure is bounded by the `Pool`.
/// * The closure must have `'static` lifetime as the thread may outlive the lifetime in which `submit` is called.
/// * The closure must be `Send` as it will be sent to another thread for execution.
pub fn submit<F: FnOnce() + Send + 'static>(&self, f: F) {
// This is safe because the closure has a `'static` lifetime.
unsafe {
try_submit(&*self.0, f);
}
}

/// Create a scope for submitting closures.
///
/// Within this scope local variables can be sent to the pool thread for execution.
/// This is possible because `scope` will wait for all submitted closures to finish before returning,
/// Note however that it will also wait for closures that were submitted from other threads.
pub fn scope<'env, F>(&self, f: F)
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
{
struct DropGuard<'a>(&'a Pool);
impl Drop for DropGuard<'_> {
fn drop(&mut self) {
self.0.join();
}
}
// Ensure that we always join the pool before returning.
let _guard = DropGuard(self);
let scope = Scope {
pool: self,
env: PhantomData,
scope: PhantomData,
};
f(&scope);
}

/// Waits for all submissions to finish.
///
/// Dropping the `Pool` will also wait for all submissions to finish.
Expand Down Expand Up @@ -74,3 +110,30 @@ impl Drop for Pool {
}
}
}

/// A scope to submit closures in.
///
/// See [`scope`][Pool::scope] for details.
pub struct Scope<'scope, 'env: 'scope> {
pool: &'scope Pool,
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}

impl<'scope, 'env> Scope<'scope, 'env> {
/// Submits the closure to run on the `Pool`.
///
/// The closure cannot outlive the `Scope` it's run in.
pub fn submit<F: FnOnce() + Send + 'scope>(&'scope self, f: F) {
unsafe {
try_submit(&*self.pool.0, f);
}
}
}

impl Deref for Scope<'_, '_> {
type Target = Pool;
fn deref(&self) -> &Self::Target {
self.pool
}
}
22 changes: 16 additions & 6 deletions crates/samples/services/thread/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,54 @@
use windows_services::*;
use windows_threading::*;

use std::sync::{Arc, RwLock};

fn main() {
let pool = Pool::new();
let pool: Pool = Pool::new();
pool.set_thread_limits(1, 1);

Service::new()
let service_original = Arc::new(RwLock::new(Service::new()));
let service = Arc::clone(&service_original);
service_original
Comment on lines +10 to +12
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems overly complicated. The original Service::run terminated the process upon completion so that the run method was -> ! and the lifetime was much simpler. This was changed in #3662 to make it possible to test Service but perhaps that was a mistake.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or the Service::run needs a similar scope pattern?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be better (and simpler) to separate Service into two: one that constructs the service and another that accesses it afterward. The latter could then be freely moved, cloned, etc because it's just accessing global state (via appropriate locks).

Using the scoped pattern may be another option. I'm still not sure if that'd be more or less ergonomic though. It does need to be able to call the closure multiple times which complicates things. It might help if run consumed self rather than taking &mut self. I'd want to play about with this a bit more to see.

.write()
.unwrap()
.can_pause()
.can_stop()
.can_fallback(|_| {
println!("Press Enter to stop service.");
use std::io::Read;
_ = std::io::stdin().read(&mut [0]);
})
.run(|service, command| {
.run(move |_, command| {
log(&format!("Command: {command:?}\n"));

match command {
Command::Start | Command::Resume => pool.submit(|| service_thread(service)),
Command::Start | Command::Resume => {
let service = Arc::clone(&service);
pool.submit(move || service_thread(service))
}
Command::Pause | Command::Stop => pool.join(),
_ => {}
}
})
.unwrap();
}

fn service_thread(service: &Service) {
fn service_thread(service: Arc<RwLock<Service<'_>>>) {
for i in 0..10 {
log(&format!("Thread:{}... iteration:{i}\n", thread_id()));

// Replace with whatever work the service needs to do.
sleep(1000);

// Services can use the `state` function to query the current service state.
let service = service.read().unwrap();
if matches!(service.state(), State::StopPending | State::PausePending) {
return;
}
}

// Services can use the `set_state` function to update the service state.
let service = service.read().unwrap();
service.set_state(State::Stopped);
}

Expand Down
71 changes: 35 additions & 36 deletions crates/tests/libs/threading/tests/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ fn join() {
let pool = windows_threading::Pool::new();
let counter = std::sync::RwLock::<usize>::new(0);

for _ in 0..10 {
pool.submit(|| {
let mut writer = counter.write().unwrap();
*writer += 1;
});
}

pool.join();
assert_eq!(*counter.read().unwrap(), 10);

for _ in 0..10 {
pool.submit(|| {
let mut writer = counter.write().unwrap();
*writer += 1;
});
}

pool.scope(|pool| {
for _ in 0..10 {
pool.submit(|| {
let mut writer = counter.write().unwrap();
*writer += 1;
});
}

pool.join();
assert_eq!(*counter.read().unwrap(), 10);

for _ in 0..10 {
pool.submit(|| {
let mut writer = counter.write().unwrap();
*writer += 1;
});
}
});
drop(pool);
assert_eq!(*counter.read().unwrap(), 20);
}
Expand All @@ -42,16 +43,15 @@ fn multi() {

let pool = windows_threading::Pool::new();
pool.set_thread_limits(2, 10);

for _ in 0..10 {
pool.submit(|| {
windows_threading::sleep(10);
let mut writer = set.write().unwrap();
writer.insert(windows_threading::thread_id());
})
}

pool.join();
pool.scope(|pool| {
for _ in 0..10 {
pool.submit(|| {
windows_threading::sleep(10);
let mut writer = set.write().unwrap();
writer.insert(windows_threading::thread_id());
})
}
});
assert!(set.read().unwrap().len() > 1);
}

Expand All @@ -61,15 +61,14 @@ fn single() {

let pool = windows_threading::Pool::new();
pool.set_thread_limits(1, 1);

for _ in 0..10 {
pool.submit(|| {
let mut writer = set.write().unwrap();
writer.insert(windows_threading::thread_id());
})
}

pool.join();
pool.scope(|pool| {
for _ in 0..10 {
pool.submit(|| {
let mut writer = set.write().unwrap();
writer.insert(windows_threading::thread_id());
})
}
});
assert_eq!(set.read().unwrap().len(), 1);
}

Expand Down
Loading