From bd7b61ca8bf2ec472c74d221adfc4f8b22d2d090 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 11 Jun 2022 11:56:25 -0700 Subject: [PATCH] Add more internal enforcement of static/scope lifetimes --- rayon-core/src/broadcast/mod.rs | 2 +- rayon-core/src/job.rs | 16 +++++ rayon-core/src/scope/mod.rs | 104 ++++++++++++++++++++++---------- rayon-core/src/spawn/mod.rs | 2 +- 4 files changed, 90 insertions(+), 34 deletions(-) diff --git a/rayon-core/src/broadcast/mod.rs b/rayon-core/src/broadcast/mod.rs index bbf6f3e0d..452aa71b6 100644 --- a/rayon-core/src/broadcast/mod.rs +++ b/rayon-core/src/broadcast/mod.rs @@ -141,7 +141,7 @@ where // on each thread. This ref is decremented at the (*) above. registry.increment_terminate_count(); - ArcJob::as_job_ref(&job) + ArcJob::as_static_job_ref(&job) }); registry.inject_broadcast(job_refs); diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index b099d1735..b7a3dae18 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -144,6 +144,14 @@ where pub(super) unsafe fn into_job_ref(self: Box) -> JobRef { JobRef::new(Box::into_raw(self)) } + + /// Creates a static `JobRef` from this job. + pub(super) fn into_static_job_ref(self: Box) -> JobRef + where + BODY: 'static, + { + unsafe { self.into_job_ref() } + } } impl Job for HeapJob @@ -179,6 +187,14 @@ where pub(super) unsafe fn as_job_ref(this: &Arc) -> JobRef { JobRef::new(Arc::into_raw(Arc::clone(this))) } + + /// Creates a static `JobRef` from this job. + pub(super) fn as_static_job_ref(this: &Arc) -> JobRef + where + BODY: 'static, + { + unsafe { Self::as_job_ref(this) } + } } impl Job for ArcJob diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 7eadf1f19..25cda832e 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -6,7 +6,7 @@ //! [`join()`]: ../join/join.fn.html use crate::broadcast::BroadcastContext; -use crate::job::{ArcJob, HeapJob, JobFifo}; +use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; use crate::latch::{CountLatch, CountLockLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::unwind; @@ -539,16 +539,18 @@ impl<'scope> Scope<'scope> { where BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { - self.base.increment(); - unsafe { - let job_ref = - HeapJob::new(move || self.base.execute_job(move || body(self))).into_job_ref(); + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + scope.base.execute_job(move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); - // Since `Scope` implements `Sync`, we can't be sure that we're still in a - // thread of this pool, so we can't just push to the local worker thread. - // Also, this might be an in-place scope. - self.base.registry.inject_or_push(job_ref); - } + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an in-place scope. + self.base.registry.inject_or_push(job_ref); } /// Spawns a job into every thread of the fork-join scope `self`. This job will @@ -559,12 +561,15 @@ impl<'scope> Scope<'scope> { where BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { + let scope_ptr = ScopePtr(self); let job = ArcJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; let body = &body; - self.base - .execute_job(move || BroadcastContext::with(move |ctx| body(self, ctx))) + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + scope.base.execute_job(func); }); - unsafe { self.base.inject_broadcast(job) } + self.base.inject_broadcast(job) } } @@ -594,20 +599,23 @@ impl<'scope> ScopeFifo<'scope> { where BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, { - self.base.increment(); - unsafe { - let job_ref = - HeapJob::new(move || self.base.execute_job(move || body(self))).into_job_ref(); - - // If we're in the pool, use our scope's private fifo for this thread to execute - // in a locally-FIFO order. Otherwise, just use the pool's global injector. - match self.base.registry.current_thread() { - Some(worker) => { - let fifo = &self.fifos[worker.index()]; - worker.push(fifo.push(job_ref)); - } - None => self.base.registry.inject(&[job_ref]), + let scope_ptr = ScopePtr(self); + let job = HeapJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; + scope.base.execute_job(move || body(scope)) + }); + let job_ref = self.base.heap_job_ref(job); + + // If we're in the pool, use our scope's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match self.base.registry.current_thread() { + Some(worker) => { + let fifo = &self.fifos[worker.index()]; + // SAFETY: this job will execute before the scope ends. + unsafe { worker.push(fifo.push(job_ref)) }; } + None => self.base.registry.inject(&[job_ref]), } } @@ -619,12 +627,15 @@ impl<'scope> ScopeFifo<'scope> { where BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { + let scope_ptr = ScopePtr(self); let job = ArcJob::new(move || { + // SAFETY: this job will execute before the scope ends. + let scope = unsafe { scope_ptr.as_ref() }; let body = &body; - self.base - .execute_job(move || BroadcastContext::with(move |ctx| body(self, ctx))) + let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); + scope.base.execute_job(func); }); - unsafe { self.base.inject_broadcast(job) } + self.base.inject_broadcast(job) } } @@ -648,12 +659,22 @@ impl<'scope> ScopeBase<'scope> { self.job_completed_latch.increment(); } - unsafe fn inject_broadcast(&self, job: Arc>) + fn heap_job_ref(&self, job: Box>) -> JobRef where - FUNC: Fn() + Send + Sync, + FUNC: FnOnce() + Send + 'scope, + { + unsafe { + self.increment(); + job.into_job_ref() + } + } + + fn inject_broadcast(&self, job: Arc>) + where + FUNC: Fn() + Send + Sync + 'scope, { let n_threads = self.registry.num_threads(); - let job_refs = (0..n_threads).map(|_| { + let job_refs = (0..n_threads).map(|_| unsafe { self.increment(); ArcJob::as_job_ref(&job) }); @@ -817,3 +838,22 @@ impl fmt::Debug for ScopeLatch { } } } + +/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. +/// +/// Unsafe code is still required to dereference the pointer, but that's fine in +/// scope jobs that are guaranteed to execute before the scope ends. +struct ScopePtr(*const T); + +// SAFETY: !Send for raw pointers is not for safety, just as a lint +unsafe impl Send for ScopePtr {} + +// SAFETY: !Sync for raw pointers is not for safety, just as a lint +unsafe impl Sync for ScopePtr {} + +impl ScopePtr { + // Helper to avoid disjoint captures of `scope_ptr.0` + unsafe fn as_ref(&self) -> &T { + &*self.0 + } +} diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index 827e36e61..ae1f211ef 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -98,7 +98,7 @@ where registry.terminate(); // (*) permit registry to terminate now } }) - .into_job_ref() + .into_static_job_ref() } /// Fires off a task into the Rayon threadpool in the "static" or