diff --git a/src/runtime.rs b/src/runtime.rs index 25b3ce6b2..e14dc935c 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,5 +1,4 @@ use std::{ - panic::AssertUnwindSafe, sync::atomic::{AtomicBool, Ordering}, thread::ThreadId, }; @@ -7,8 +6,8 @@ use std::{ use parking_lot::Mutex; use crate::{ - durability::Durability, key::DatabaseKeyIndex, table::Table, zalsa_local::ZalsaLocal, - Cancelled, Database, Event, EventKind, Revision, + durability::Durability, key::DatabaseKeyIndex, table::Table, Cancelled, Database, Event, + EventKind, Revision, }; use self::dependency_graph::DependencyGraph; @@ -169,8 +168,7 @@ impl Runtime { /// cancelled, so this function will panic with a `Cancelled` value. pub(crate) fn block_on( &self, - db: &dyn Database, - local_state: &ZalsaLocal, + db: &(impl Database + ?Sized), database_key: DatabaseKeyIndex, other_id: ThreadId, query_mutex_guard: QueryMutexGuard, @@ -189,21 +187,8 @@ impl Runtime { }) }); - // `DependencyGraph::block_on` does not panic, so we cannot enter an inconsistent state. - let dg = AssertUnwindSafe(dg); - // `DependencyGraph::block_on` does not panic, nor does it read from query_mutex_guard, so - // we cannot enter an inconsistent state for this parameter. - let query_mutex_guard = AssertUnwindSafe(query_mutex_guard); - let result = local_state.with_query_stack(|stack| { - DependencyGraph::block_on( - { dg }.0, - thread_id, - database_key, - other_id, - stack, - { query_mutex_guard }.0, - ) - }); + let result = + DependencyGraph::block_on(dg, thread_id, database_key, other_id, query_mutex_guard); match result { WaitResult::Completed => BlockResult::Completed, diff --git a/src/runtime/dependency_graph.rs b/src/runtime/dependency_graph.rs index 1e07bb3fb..e691f3e79 100644 --- a/src/runtime/dependency_graph.rs +++ b/src/runtime/dependency_graph.rs @@ -1,6 +1,5 @@ use std::thread::ThreadId; -use crate::active_query::ActiveQuery; use crate::key::DatabaseKeyIndex; use crate::runtime::WaitResult; use parking_lot::MutexGuard; @@ -59,12 +58,9 @@ impl DependencyGraph { from_id: ThreadId, database_key: DatabaseKeyIndex, to_id: ThreadId, - from_stack: &mut [ActiveQuery], query_mutex_guard: QueryMutexGuard, ) -> WaitResult { - // SAFETY: We are blocking until the result is removed from `DependencyGraph::wait_results` - // and as such we are keeping `from_stack` alive. - let condvar = unsafe { me.add_edge(from_id, database_key, to_id, from_stack) }; + let edge = me.add_edge(from_id, database_key, to_id); // Release the mutex that prevents `database_key` // from completing, now that the edge has been added. @@ -75,30 +71,23 @@ impl DependencyGraph { debug_assert!(!me.edges.contains_key(&from_id)); return result; } - condvar.wait(&mut me); + edge.wait(&mut me); } } /// Helper for `block_on`: performs actual graph modification /// to add a dependency edge from `from_id` to `to_id`, which is /// computing `database_key`. - /// - /// # Safety - /// - /// The caller needs to keep `from_stack`/`'aq`` alive until `from_id` has been removed from the `wait_results`. - // This safety invariant is consumed by the `Edge` struct - unsafe fn add_edge<'aq>( + fn add_edge( &mut self, from_id: ThreadId, database_key: DatabaseKeyIndex, to_id: ThreadId, - from_stack: &'aq mut [ActiveQuery], - ) -> edge::EdgeGuard<'aq> { + ) -> edge::EdgeGuard { assert_ne!(from_id, to_id); debug_assert!(!self.edges.contains_key(&from_id)); debug_assert!(!self.depends_on(to_id, from_id)); - // SAFETY: The caller is responsible for ensuring that the `EdgeGuard` outlives the `Edge`. - let (edge, guard) = unsafe { edge::Edge::new(to_id, from_stack) }; + let (edge, guard) = edge::Edge::new(to_id); self.edges.insert(from_id, edge); self.query_dependents .entry(database_key) @@ -138,11 +127,11 @@ impl DependencyGraph { } mod edge { - use std::{marker::PhantomData, sync::Arc, thread::ThreadId}; + use std::{sync::Arc, thread::ThreadId}; use parking_lot::MutexGuard; - use crate::{active_query::ActiveQuery, runtime::dependency_graph::DependencyGraph}; + use crate::runtime::dependency_graph::DependencyGraph; #[derive(Debug)] pub(super) struct Edge { @@ -153,35 +142,24 @@ mod edge { condvar: Arc, } - pub struct EdgeGuard<'aq> { + pub struct EdgeGuard { condvar: Arc, - // Inform the borrow checker that the edge stack is borrowed until the guard is released. - // This is necessary to ensure that the stack is not modified by the caller of - // `DependencyGraph::add_edge` after the call returns. - _pd: PhantomData<&'aq mut [ActiveQuery]>, } - impl EdgeGuard<'_> { + impl EdgeGuard { pub fn wait(&self, mutex_guard: &mut MutexGuard<'_, DependencyGraph>) { self.condvar.wait(mutex_guard) } } impl Edge { - pub(super) unsafe fn new( - blocked_on_id: ThreadId, - stack: &mut [ActiveQuery], - ) -> (Self, EdgeGuard<'_>) { - _ = stack; + pub(super) fn new(blocked_on_id: ThreadId) -> (Self, EdgeGuard) { let condvar = Arc::new(parking_lot::Condvar::new()); let edge = Self { blocked_on_id, condvar: condvar.clone(), }; - let edge_guard = EdgeGuard { - condvar, - _pd: PhantomData, - }; + let edge_guard = EdgeGuard { condvar }; (edge, edge_guard) } diff --git a/src/table/sync.rs b/src/table/sync.rs index 97f175467..2baf3247a 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -71,13 +71,10 @@ impl SyncTable { // boolean is to decide *whether* to acquire the lock, // not to gate future atomic reads. *anyone_waiting = true; - match zalsa.runtime().block_on( - db.as_dyn_database(), - db.zalsa_local(), - database_key_index, - *other_id, - syncs, - ) { + match zalsa + .runtime() + .block_on(db, database_key_index, *other_id, syncs) + { BlockResult::Completed => ClaimResult::Retry, BlockResult::Cycle => ClaimResult::Cycle, }