From ec0d26f76b0c12e5e222e1952be737db5c4329fe Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Tue, 28 Oct 2025 11:46:51 +0100 Subject: [PATCH] Always increment iteration count --- src/function/execute.rs | 39 ++++++++++++++++++++++------- src/function/fetch.rs | 18 ++++++++++++- src/function/maybe_changed_after.rs | 23 ----------------- src/function/memo.rs | 20 ++++++--------- src/zalsa_local.rs | 27 ++++++++++++++------ 5 files changed, 73 insertions(+), 54 deletions(-) diff --git a/src/function/execute.rs b/src/function/execute.rs index d299b0966..9d6758730 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -172,11 +172,7 @@ where let mut iteration_count = IterationCount::initial(); if let Some(old_memo) = opt_old_memo { - if old_memo.verified_at.load() == zalsa.current_revision() - && old_memo.cycle_heads().contains(&database_key_index) - { - let memo_iteration_count = old_memo.revisions.iteration(); - + if old_memo.verified_at.load() == zalsa.current_revision() { // The `DependencyGraph` locking propagates panics when another thread is blocked on a panicking query. // However, the locking doesn't handle the case where a thread fetches the result of a panicking // cycle head query **after** all locks were released. That's what we do here. @@ -189,8 +185,14 @@ where tracing::warn!("Propagating panic for cycle head that panicked in an earlier execution in that revision"); Cancelled::PropagatedPanic.throw(); } - last_provisional_memo = Some(old_memo); - iteration_count = memo_iteration_count; + + // Only use the last provisional memo if it was a cycle head in the last iteration. This is to + // force at least two executions. + if old_memo.cycle_heads().contains(&database_key_index) { + last_provisional_memo = Some(old_memo); + } + + iteration_count = old_memo.revisions.iteration(); } } @@ -216,6 +218,14 @@ where // If there are no cycle heads, break out of the loop (`cycle_heads_mut` returns `None` if the cycle head list is empty) let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() else { + iteration_count = iteration_count.increment().unwrap_or_else(|| { + tracing::warn!("{database_key_index:?}: execute: too many cycle iterations"); + panic!("{database_key_index:?}: execute: too many cycle iterations") + }); + completed_query + .revisions + .update_iteration_count_mut(database_key_index, iteration_count); + claim_guard.set_release_mode(ReleaseMode::SelfOnly); break (new_value, completed_query); }; @@ -289,6 +299,15 @@ where } completed_query.revisions.set_cycle_heads(cycle_heads); + + iteration_count = iteration_count.increment().unwrap_or_else(|| { + tracing::warn!("{database_key_index:?}: execute: too many cycle iterations"); + panic!("{database_key_index:?}: execute: too many cycle iterations") + }); + completed_query + .revisions + .update_iteration_count_mut(database_key_index, iteration_count); + break (new_value, completed_query); } @@ -555,8 +574,10 @@ impl<'a, C: Configuration> PoisonProvisionalIfPanicking<'a, C> { impl Drop for PoisonProvisionalIfPanicking<'_, C> { fn drop(&mut self) { if thread::panicking() { - let revisions = - QueryRevisions::fixpoint_initial(self.ingredient.database_key_index(self.id)); + let revisions = QueryRevisions::fixpoint_initial( + self.ingredient.database_key_index(self.id), + IterationCount::initial(), + ); let memo = Memo::new(None, self.zalsa.current_revision(), revisions); self.ingredient diff --git a/src/function/fetch.rs b/src/function/fetch.rs index f1c58eda1..588b08bb1 100644 --- a/src/function/fetch.rs +++ b/src/function/fetch.rs @@ -195,6 +195,9 @@ where // existing provisional memo if it exists let memo_guard = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index); if let Some(memo) = memo_guard { + // Ideally, we'd use the last provisional memo even if it wasn't a cycle head in the last iteration + // but that would require inserting itself as a cycle head, which either requires clone + // on the value OR a concurrent `Vec` for cycle heads. if memo.verified_at.load() == zalsa.current_revision() && memo.value.is_some() && memo.revisions.cycle_heads().contains(&database_key_index) @@ -233,7 +236,20 @@ where "hit cycle at {database_key_index:#?}, \ inserting and returning fixpoint initial value" ); - let revisions = QueryRevisions::fixpoint_initial(database_key_index); + + let iteration = memo_guard + .and_then(|old_memo| { + if old_memo.verified_at.load() == zalsa.current_revision() + && old_memo.value.is_some() + { + Some(old_memo.revisions.iteration()) + } else { + None + } + }) + .unwrap_or(IterationCount::initial()); + let revisions = QueryRevisions::fixpoint_initial(database_key_index, iteration); + let initial_value = C::cycle_initial(db, id, C::id_to_input(zalsa, id)); self.insert_memo( zalsa, diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index 4198631b9..20440883e 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -509,8 +509,6 @@ where head_iteration_count, memo_iteration_count: current_iteration_count, verified_at: head_verified_at, - cycle_heads, - database_key_index: head_database_key, } => { if head_verified_at != memo_verified_at { return false; @@ -519,27 +517,6 @@ where if head_iteration_count != current_iteration_count { return false; } - - // Check if the memo is still a cycle head and hasn't changed - // to a normal cycle participant. This is to force re-execution in - // a scenario like this: - // - // * There's a nested cycle with the outermost query A - // * B participates in the cycle and is a cycle head in the first few iterations - // * B becomes a non-cycle head in a later iteration - // * There's a query `C` that has `B` as its cycle head - // - // The crucial point is that `B` switches from being a cycle head to being a regular cycle participant. - // The issue with that is that `A` doesn't update `B`'s `iteration_count `when the iteration completes - // because it only does that for cycle heads (and collecting all queries participating in a query would be sort of expensive?). - // - // When we now pull `C` in a later iteration, `validate_same_iteration` iterates over all its cycle heads (`B`), - // and check if the iteration count still matches. Which is the case because `A` didn't update `B`'s iteration count. - // - // That's why we also check if `B` is still a cycle head in the current iteration. - if !cycle_heads.contains(&head_database_key) { - return false; - } } _ => { return false; diff --git a/src/function/memo.rs b/src/function/memo.rs index fd830ced3..d8faf3e0b 100644 --- a/src/function/memo.rs +++ b/src/function/memo.rs @@ -409,11 +409,9 @@ mod persistence { pub(super) enum TryClaimHeadsResult<'me> { /// Claiming the cycle head results in a cycle. Cycle { - database_key_index: DatabaseKeyIndex, head_iteration_count: IterationCount, memo_iteration_count: IterationCount, verified_at: Revision, - cycle_heads: &'me CycleHeads, }, /// The cycle head is not finalized, but it can be claimed. @@ -460,28 +458,24 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> { let provisional_status = ingredient .provisional_status(self.zalsa, head_key_index) .expect("cycle head memo to exist"); - let (current_iteration_count, verified_at, cycle_heads) = match provisional_status { + let (current_iteration_count, verified_at) = match provisional_status { ProvisionalStatus::Provisional { iteration, verified_at, - cycle_heads, - } => (iteration, verified_at, cycle_heads), + cycle_heads: _, + } => (iteration, verified_at), ProvisionalStatus::Final { iteration, verified_at, - } => (iteration, verified_at, empty_cycle_heads()), - ProvisionalStatus::FallbackImmediate => ( - IterationCount::initial(), - self.zalsa.current_revision(), - empty_cycle_heads(), - ), + } => (iteration, verified_at), + ProvisionalStatus::FallbackImmediate => { + (IterationCount::initial(), self.zalsa.current_revision()) + } }; Some(TryClaimHeadsResult::Cycle { - database_key_index: head_database_key, memo_iteration_count: current_iteration_count, head_iteration_count: head.iteration_count.load(), - cycle_heads, verified_at, }) } diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index 7b0399178..f43eb78eb 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -639,7 +639,7 @@ const _: [(); std::mem::size_of::()] = [(); std::mem::size_of::<[usize; if cfg!(feature = "accumulator") { 7 } else { 3 }]>()]; impl QueryRevisions { - pub(crate) fn fixpoint_initial(query: DatabaseKeyIndex) -> Self { + pub(crate) fn fixpoint_initial(query: DatabaseKeyIndex, iteration: IterationCount) -> Self { Self { changed_at: Revision::start(), durability: Durability::MAX, @@ -651,8 +651,8 @@ impl QueryRevisions { #[cfg(feature = "accumulator")] AccumulatedMap::default(), ThinVec::default(), - CycleHeads::initial(query, IterationCount::initial()), - IterationCount::initial(), + CycleHeads::initial(query, iteration), + iteration, ), } } @@ -743,12 +743,23 @@ impl QueryRevisions { cycle_head_index: DatabaseKeyIndex, iteration_count: IterationCount, ) { - if let Some(extra) = &mut self.extra.0 { - extra.iteration.store_mut(iteration_count); + match &mut self.extra.0 { + None => { + self.extra = QueryRevisionsExtra::new( + #[cfg(feature = "accumulator")] + AccumulatedMap::default(), + ThinVec::default(), + empty_cycle_heads().clone(), + iteration_count, + ); + } + Some(extra) => { + extra.iteration.store_mut(iteration_count); - extra - .cycle_heads - .update_iteration_count_mut(cycle_head_index, iteration_count); + extra + .cycle_heads + .update_iteration_count_mut(cycle_head_index, iteration_count); + } } }