Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
cycle::CycleRecoveryStrategy,
ingredient::{fmt_index, Ingredient, Jar, MaybeChangedAfter},
plumbing::JarAux,
table::Table,
zalsa::IngredientIndex,
zalsa_local::QueryOrigin,
Database, DatabaseKeyIndex, Id, Revision,
Expand Down Expand Up @@ -137,7 +138,7 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
false
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, _: &mut Table) {
panic!("unexpected reset on accumulator")
}

Expand Down
22 changes: 13 additions & 9 deletions src/function.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{any::Any, fmt, sync::Arc};
use std::{any::Any, fmt, mem::ManuallyDrop, sync::Arc};

use crate::{
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
Expand All @@ -7,6 +7,7 @@ use crate::{
key::DatabaseKeyIndex,
plumbing::JarAux,
salsa_struct::SalsaStructInDb,
table::Table,
zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa},
zalsa_local::QueryOrigin,
Cycle, Database, Id, Revision,
Expand Down Expand Up @@ -167,15 +168,16 @@ where
memo: memo::Memo<C::Output<'db>>,
) -> &'db memo::Memo<C::Output<'db>> {
let memo = Arc::new(memo);
let db_memo = unsafe {
// Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this
// value is returned) and anything removed from map is added to deleted entries (ensured elsewhere).
self.extend_memo_lifetime(&memo)
};
if let Some(old_value) = self.insert_memo_into_table_for(zalsa, id, memo) {
// Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this
// value is returned) and anything removed from map is added to deleted entries (ensured elsewhere).
let db_memo = unsafe { self.extend_memo_lifetime(&memo) };
// Safety: We delay the drop of `old_value` until a new revision starts which ensures no
// references will exist for the memo contents.
if let Some(old_value) = unsafe { self.insert_memo_into_table_for(zalsa, id, memo) } {
// In case there is a reference to the old memo out there, we have to store it
// in the deleted entries. This will get cleared when a new revision starts.
self.deleted_entries.push(old_value);
self.deleted_entries
.push(ManuallyDrop::into_inner(old_value));
}
db_memo
}
Expand Down Expand Up @@ -231,7 +233,9 @@ where
true
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, table: &mut Table) {
self.lru
.for_each_evicted(|evict| self.evict_value_from_memo_for(table.memos_mut(evict)));
std::mem::take(&mut self.deleted_entries);
}

Expand Down
6 changes: 2 additions & 4 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ where
C: Configuration,
{
pub fn fetch<'db>(&'db self, db: &'db C::DbView, id: Id) -> &'db C::Output<'db> {
let (zalsa, zalsa_local) = db.zalsas();
let zalsa_local = db.zalsa_local();
zalsa_local.unwind_if_revision_cancelled(db.as_dyn_database());

let memo = self.refresh_memo(db, id);
Expand All @@ -19,9 +19,7 @@ where
changed_at,
} = memo.revisions.stamped_value(memo.value.as_ref().unwrap());

if let Some(evicted) = self.lru.record_use(id) {
self.evict_value_from_memo_for(zalsa, evicted);
}
self.lru.record_use(id);

zalsa_local.report_tracked_read(
self.database_key_index(id).into(),
Expand Down
26 changes: 15 additions & 11 deletions src/function/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,35 @@ pub(super) struct Lru {
}

impl Lru {
pub(super) fn record_use(&self, index: Id) -> Option<Id> {
pub(super) fn record_use(&self, index: Id) {
// Relaxed should be fine, we don't need to synchronize on this.
let capacity = self.capacity.load(Ordering::Relaxed);

if capacity == 0 {
// LRU is disabled
return None;
return;
}

let mut set = self.set.lock();
set.insert(index);
if set.len() > capacity {
return set.pop_front();
}

None
}

pub(super) fn set_capacity(&self, capacity: usize) {
// Relaxed should be fine, we don't need to synchronize on this.
self.capacity.store(capacity, Ordering::Relaxed);
}

if capacity == 0 {
let mut set = self.set.lock();
*set = FxLinkedHashSet::default();
pub(super) fn for_each_evicted(&self, mut cb: impl FnMut(Id)) {
let mut set = self.set.lock();
// Relaxed should be fine, we don't need to synchronize on this.
let cap = self.capacity.load(Ordering::Relaxed);
if set.len() <= cap || cap == 0 {
return;
}
while let Some(id) = set.pop_front() {
cb(id);
if set.len() <= cap {
break;
}
}
}
}
103 changes: 57 additions & 46 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::mem::ManuallyDrop;
use std::sync::Arc;

use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::revision::AtomicRevision;
use crate::table::memo::MemoTable;
use crate::zalsa_local::QueryOrigin;
use crate::{
key::DatabaseKeyIndex, zalsa::Zalsa, zalsa_local::QueryRevisions, Event, EventKind, Id,
Expand All @@ -31,18 +33,26 @@ impl<C: Configuration> IngredientImpl<C> {
unsafe { std::mem::transmute(memo) }
}

/// Inserts the memo for the given key; (atomically) overwrites any previously existing memo.
pub(super) fn insert_memo_into_table_for<'db>(
/// Inserts the memo for the given key; (atomically) overwrites and returns any previously existing memo
///
/// # Safety
///
/// The caller needs to make sure to not drop the returned value until no more references into
/// the database exist as there may be outstanding borrows into the `Arc` contents.
pub(super) unsafe fn insert_memo_into_table_for<'db>(
&'db self,
zalsa: &'db Zalsa,
id: Id,
memo: ArcMemo<'db, C>,
) -> Option<ArcMemo<'db, C>> {
) -> Option<ManuallyDrop<ArcMemo<'db, C>>> {
let static_memo = unsafe { self.to_static(memo) };
let old_static_memo = zalsa
.memo_table_for(id)
.insert(self.memo_ingredient_index, static_memo)?;
unsafe { Some(self.to_self(old_static_memo)) }
let old_static_memo = unsafe {
zalsa
.memo_table_for(id)
.insert(self.memo_ingredient_index, static_memo)
}?;
let old_static_memo = ManuallyDrop::into_inner(old_static_memo);
Some(ManuallyDrop::new(unsafe { self.to_self(old_static_memo) }))
}

/// Loads the current memo for `key_index`. This does not hold any sort of
Expand All @@ -60,51 +70,52 @@ impl<C: Configuration> IngredientImpl<C> {
/// Evicts the existing memo for the given key, replacing it
/// with an equivalent memo that has no value. If the memo is untracked, BaseInput,
/// or has values assigned as output of another query, this has no effect.
pub(super) fn evict_value_from_memo_for<'db>(&'db self, zalsa: &'db Zalsa, id: Id) {
let old = zalsa.memo_table_for(id).map_memo::<Memo<C::Output<'_>>>(
self.memo_ingredient_index,
|memo| {
match memo.revisions.origin {
QueryOrigin::Assigned(_)
| QueryOrigin::DerivedUntracked(_)
| QueryOrigin::BaseInput => {
// Careful: Cannot evict memos whose values were
// assigned as output of another query
// or those with untracked inputs
// as their values cannot be reconstructed.
memo
}
QueryOrigin::Derived(_) => {
// QueryRevisions: !Clone to discourage cloning, we need it here though
let &QueryRevisions {
pub(super) fn evict_value_from_memo_for(&self, table: &mut MemoTable) {
let map = |memo: ArcMemo<'static, C>| -> ArcMemo<'static, C> {
match &memo.revisions.origin {
QueryOrigin::Assigned(_)
| QueryOrigin::DerivedUntracked(_)
| QueryOrigin::BaseInput => {
// Careful: Cannot evict memos whose values were
// assigned as output of another query
// or those with untracked inputs
// as their values cannot be reconstructed.
memo
}
QueryOrigin::Derived(_) => {
// Note that we cannot use `Arc::get_mut` here as the use of `ArcSwap` makes it
// impossible to get unique access to the interior Arc
// QueryRevisions: !Clone to discourage cloning, we need it here though
let &QueryRevisions {
changed_at,
durability,
ref origin,
ref tracked_struct_ids,
ref accumulated,
ref accumulated_inputs,
} = &memo.revisions;
// Re-assemble the memo but with the value set to `None`
Arc::new(Memo::new(
None,
memo.verified_at.load(),
QueryRevisions {
changed_at,
durability,
ref origin,
ref tracked_struct_ids,
ref accumulated,
ref accumulated_inputs,
} = &memo.revisions;
// Re-assemble the memo but with the value set to `None`
Arc::new(Memo::new(
None,
memo.verified_at.load(),
QueryRevisions {
changed_at,
durability,
origin: origin.clone(),
tracked_struct_ids: tracked_struct_ids.clone(),
accumulated: accumulated.clone(),
accumulated_inputs: accumulated_inputs.clone(),
},
))
}
origin: origin.clone(),
tracked_struct_ids: tracked_struct_ids.clone(),
accumulated: accumulated.clone(),
accumulated_inputs: accumulated_inputs.clone(),
},
))
}
},
);
}
};
// SAFETY: We queue the old value for deletion, delaying its drop until the next revision bump.
let old = unsafe { table.map_memo(self.memo_ingredient_index, map) };
if let Some(old) = old {
// In case there is a reference to the old memo out there, we have to store it
// in the deleted entries. This will get cleared when a new revision starts.
self.deleted_entries.push(old);
self.deleted_entries.push(ManuallyDrop::into_inner(old));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ingredient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use crate::{
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
cycle::CycleRecoveryStrategy,
table::Table,
zalsa::{IngredientIndex, MemoIngredientIndex},
zalsa_local::QueryOrigin,
Database, DatabaseKeyIndex, Id,
Expand Down Expand Up @@ -122,7 +123,7 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync {
///
/// **Important:** to actually receive resets, the ingredient must set
/// [`IngredientRequiresReset::RESET_ON_NEW_REVISION`] to true.
fn reset_for_new_revision(&mut self);
fn reset_for_new_revision(&mut self, table: &mut Table);

fn fmt_index(&self, index: Option<crate::Id>, fmt: &mut fmt::Formatter<'_>) -> fmt::Result;
}
Expand Down
6 changes: 5 additions & 1 deletion src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl<C: Configuration> Ingredient for IngredientImpl<C> {
false
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, _: &mut Table) {
panic!("unexpected call to `reset_for_new_revision`")
}

Expand Down Expand Up @@ -328,6 +328,10 @@ where
&self.memos
}

fn memos_mut(&mut self) -> &mut crate::table::memo::MemoTable {
&mut self.memos
}

unsafe fn syncs(&self, _current_revision: Revision) -> &SyncTable {
&self.syncs
}
Expand Down
3 changes: 2 additions & 1 deletion src/input/input_field.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::cycle::CycleRecoveryStrategy;
use crate::ingredient::{fmt_index, Ingredient, MaybeChangedAfter};
use crate::input::Configuration;
use crate::table::Table;
use crate::zalsa::IngredientIndex;
use crate::zalsa_local::QueryOrigin;
use crate::{Database, DatabaseKeyIndex, Id, Revision};
Expand Down Expand Up @@ -85,7 +86,7 @@ where
false
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, _: &mut Table) {
panic!("unexpected call: input fields don't register for resets");
}

Expand Down
8 changes: 6 additions & 2 deletions src/interned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::key::InputDependencyIndex;
use crate::plumbing::{Jar, JarAux};
use crate::table::memo::MemoTable;
use crate::table::sync::SyncTable;
use crate::table::Slot;
use crate::table::{Slot, Table};
use crate::zalsa::IngredientIndex;
use crate::zalsa_local::QueryOrigin;
use crate::{Database, DatabaseKeyIndex, Id};
Expand Down Expand Up @@ -327,7 +327,7 @@ where
false
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, _: &mut Table) {
// Interned ingredients do not, normally, get deleted except when they are "reset" en masse.
// There ARE methods (e.g., `clear_deleted_entries` and `remove`) for deleting individual
// items, but those are only used for tracked struct ingredients.
Expand Down Expand Up @@ -362,6 +362,10 @@ where
&self.memos
}

fn memos_mut(&mut self) -> &mut MemoTable {
&mut self.memos
}

unsafe fn syncs(&self, _current_revision: Revision) -> &crate::table::sync::SyncTable {
&self.syncs
}
Expand Down
4 changes: 4 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl Runtime {
&self.table
}

pub(crate) fn table_mut(&mut self) -> &mut Table {
&mut self.table
}

/// Increments the "current revision" counter and clears
/// the cancellation flag.
///
Expand Down
Loading