Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 5 additions & 20 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{
panic::AssertUnwindSafe,
sync::atomic::{AtomicBool, Ordering},
thread::ThreadId,
};

use parking_lot::Mutex;

use crate::{
durability::Durability, key::DatabaseKeyIndex, table::Table, zalsa_local::ZalsaLocal,
Cancelled, Database, Event, EventKind, Revision,
durability::Durability, key::DatabaseKeyIndex, table::Table, Cancelled, Database, Event,
EventKind, Revision,
};

use self::dependency_graph::DependencyGraph;
Expand Down Expand Up @@ -169,8 +168,7 @@ impl Runtime {
/// cancelled, so this function will panic with a `Cancelled` value.
pub(crate) fn block_on<QueryMutexGuard>(
&self,
db: &dyn Database,
local_state: &ZalsaLocal,
db: &(impl Database + ?Sized),
database_key: DatabaseKeyIndex,
other_id: ThreadId,
query_mutex_guard: QueryMutexGuard,
Expand All @@ -189,21 +187,8 @@ impl Runtime {
})
});

// `DependencyGraph::block_on` does not panic, so we cannot enter an inconsistent state.
let dg = AssertUnwindSafe(dg);
// `DependencyGraph::block_on` does not panic, nor does it read from query_mutex_guard, so
// we cannot enter an inconsistent state for this parameter.
let query_mutex_guard = AssertUnwindSafe(query_mutex_guard);
let result = local_state.with_query_stack(|stack| {
DependencyGraph::block_on(
{ dg }.0,
thread_id,
database_key,
other_id,
stack,
{ query_mutex_guard }.0,
)
});
let result =
DependencyGraph::block_on(dg, thread_id, database_key, other_id, query_mutex_guard);

match result {
WaitResult::Completed => BlockResult::Completed,
Expand Down
44 changes: 11 additions & 33 deletions src/runtime/dependency_graph.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::thread::ThreadId;

use crate::active_query::ActiveQuery;
use crate::key::DatabaseKeyIndex;
use crate::runtime::WaitResult;
use parking_lot::MutexGuard;
Expand Down Expand Up @@ -59,12 +58,9 @@ impl DependencyGraph {
from_id: ThreadId,
database_key: DatabaseKeyIndex,
to_id: ThreadId,
from_stack: &mut [ActiveQuery],
query_mutex_guard: QueryMutexGuard,
) -> WaitResult {
// SAFETY: We are blocking until the result is removed from `DependencyGraph::wait_results`
// and as such we are keeping `from_stack` alive.
let condvar = unsafe { me.add_edge(from_id, database_key, to_id, from_stack) };
let edge = me.add_edge(from_id, database_key, to_id);

// Release the mutex that prevents `database_key`
// from completing, now that the edge has been added.
Expand All @@ -75,30 +71,23 @@ impl DependencyGraph {
debug_assert!(!me.edges.contains_key(&from_id));
return result;
}
condvar.wait(&mut me);
edge.wait(&mut me);
}
}

/// Helper for `block_on`: performs actual graph modification
/// to add a dependency edge from `from_id` to `to_id`, which is
/// computing `database_key`.
///
/// # Safety
///
/// The caller needs to keep `from_stack`/`'aq`` alive until `from_id` has been removed from the `wait_results`.
// This safety invariant is consumed by the `Edge` struct
unsafe fn add_edge<'aq>(
fn add_edge(
&mut self,
from_id: ThreadId,
database_key: DatabaseKeyIndex,
to_id: ThreadId,
from_stack: &'aq mut [ActiveQuery],
) -> edge::EdgeGuard<'aq> {
) -> edge::EdgeGuard {
assert_ne!(from_id, to_id);
debug_assert!(!self.edges.contains_key(&from_id));
debug_assert!(!self.depends_on(to_id, from_id));
// SAFETY: The caller is responsible for ensuring that the `EdgeGuard` outlives the `Edge`.
let (edge, guard) = unsafe { edge::Edge::new(to_id, from_stack) };
let (edge, guard) = edge::Edge::new(to_id);
self.edges.insert(from_id, edge);
self.query_dependents
.entry(database_key)
Expand Down Expand Up @@ -138,11 +127,11 @@ impl DependencyGraph {
}

mod edge {
use std::{marker::PhantomData, sync::Arc, thread::ThreadId};
use std::{sync::Arc, thread::ThreadId};

use parking_lot::MutexGuard;

use crate::{active_query::ActiveQuery, runtime::dependency_graph::DependencyGraph};
use crate::runtime::dependency_graph::DependencyGraph;

#[derive(Debug)]
pub(super) struct Edge {
Expand All @@ -153,35 +142,24 @@ mod edge {
condvar: Arc<parking_lot::Condvar>,
}

pub struct EdgeGuard<'aq> {
pub struct EdgeGuard {
condvar: Arc<parking_lot::Condvar>,
// Inform the borrow checker that the edge stack is borrowed until the guard is released.
// This is necessary to ensure that the stack is not modified by the caller of
// `DependencyGraph::add_edge` after the call returns.
_pd: PhantomData<&'aq mut [ActiveQuery]>,
}

impl EdgeGuard<'_> {
impl EdgeGuard {
pub fn wait(&self, mutex_guard: &mut MutexGuard<'_, DependencyGraph>) {
self.condvar.wait(mutex_guard)
}
}

impl Edge {
pub(super) unsafe fn new(
blocked_on_id: ThreadId,
stack: &mut [ActiveQuery],
) -> (Self, EdgeGuard<'_>) {
_ = stack;
pub(super) fn new(blocked_on_id: ThreadId) -> (Self, EdgeGuard) {
let condvar = Arc::new(parking_lot::Condvar::new());
let edge = Self {
blocked_on_id,
condvar: condvar.clone(),
};
let edge_guard = EdgeGuard {
condvar,
_pd: PhantomData,
};
let edge_guard = EdgeGuard { condvar };
(edge, edge_guard)
}

Expand Down
11 changes: 4 additions & 7 deletions src/table/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,10 @@ impl SyncTable {
// boolean is to decide *whether* to acquire the lock,
// not to gate future atomic reads.
*anyone_waiting = true;
match zalsa.runtime().block_on(
db.as_dyn_database(),
db.zalsa_local(),
database_key_index,
*other_id,
syncs,
) {
match zalsa
.runtime()
.block_on(db, database_key_index, *other_id, syncs)
{
BlockResult::Completed => ClaimResult::Retry,
BlockResult::Cycle => ClaimResult::Cycle,
}
Expand Down