Skip to content

Commit f7e6662

Browse files
authored
Turbopack: add verify_determinism feature to check if tasks are deterministic (#85559)
### What? Enabling the feature will double execute tasks and print to stdout when tasks are updating cells while recomputing The change will also change the behavior of cell updates during recomputation slightly. Instead of updating the cell and not notifying dependents, it will not update the cell at all ignoring the (unexpected) change. This means when a task is recomputed without being invalidated, it won't update any cell data, except for the missing cell values.
1 parent b8a14cd commit f7e6662

File tree

10 files changed

+200
-66
lines changed

10 files changed

+200
-66
lines changed

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ no_fast_stale = []
1919
verify_serialization = []
2020
verify_aggregation_graph = []
2121
verify_immutable = []
22+
verify_determinism = []
2223
trace_aggregation_update = []
2324
trace_find_and_schedule = []
2425
trace_task_completion = []

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use turbo_tasks::{
3131
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
3232
backend::{
3333
Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34-
TransientTaskType, TurboTasksExecutionError, TypedCellContent,
34+
TransientTaskType, TurboTasksExecutionError, TypedCellContent, VerificationMode,
3535
},
3636
event::{Event, EventListener},
3737
message_queue::TimingEvent,
@@ -423,6 +423,8 @@ struct TaskExecutionCompletePrepareResult {
423423
pub new_children: FxHashSet<TaskId>,
424424
pub removed_data: Vec<CachedDataItem>,
425425
pub is_now_immutable: bool,
426+
#[cfg(feature = "verify_determinism")]
427+
pub no_output_set: bool,
426428
pub new_output: Option<OutputValue>,
427429
pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
428430
}
@@ -1765,6 +1767,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17651767
new_children,
17661768
mut removed_data,
17671769
is_now_immutable,
1770+
#[cfg(feature = "verify_determinism")]
1771+
no_output_set,
17681772
new_output,
17691773
output_dependent_tasks,
17701774
}) = self.task_execution_completed_prepare(
@@ -1809,6 +1813,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18091813
if self.task_execution_completed_finish(
18101814
&mut ctx,
18111815
task_id,
1816+
#[cfg(feature = "verify_determinism")]
1817+
no_output_set,
18121818
new_output,
18131819
&mut removed_data,
18141820
is_now_immutable,
@@ -1843,6 +1849,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18431849
new_children: Default::default(),
18441850
removed_data: Default::default(),
18451851
is_now_immutable: false,
1852+
#[cfg(feature = "verify_determinism")]
1853+
no_output_set: false,
18461854
new_output: None,
18471855
output_dependent_tasks: Default::default(),
18481856
});
@@ -2021,6 +2029,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20212029

20222030
// Check if output need to be updated
20232031
let current_output = get!(task, Output);
2032+
#[cfg(feature = "verify_determinism")]
2033+
let no_output_set = current_output.is_none();
20242034
let new_output = match result {
20252035
Ok(RawVc::TaskOutput(output_task_id)) => {
20262036
if let Some(OutputValue::Output(current_task_id)) = current_output
@@ -2092,6 +2102,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20922102
new_children,
20932103
removed_data,
20942104
is_now_immutable,
2105+
#[cfg(feature = "verify_determinism")]
2106+
no_output_set,
20952107
new_output,
20962108
output_dependent_tasks,
20972109
})
@@ -2232,6 +2244,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
22322244
&self,
22332245
ctx: &mut impl ExecuteContext<'_>,
22342246
task_id: TaskId,
2247+
#[cfg(feature = "verify_determinism")] no_output_set: bool,
22352248
new_output: Option<OutputValue>,
22362249
removed_data: &mut Vec<CachedDataItem>,
22372250
is_now_immutable: bool,
@@ -2306,7 +2319,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
23062319
None
23072320
};
23082321

2309-
let data_update = if old_dirty_state != new_dirty_state {
2322+
let dirty_changed = old_dirty_state != new_dirty_state;
2323+
let data_update = if dirty_changed {
23102324
if let Some(new_dirty_state) = new_dirty_state {
23112325
task.insert(CachedDataItem::Dirty {
23122326
value: new_dirty_state,
@@ -2353,17 +2367,32 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
23532367
None
23542368
};
23552369

2356-
drop(task);
2357-
drop(old_content);
2370+
#[cfg(feature = "verify_determinism")]
2371+
let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient();
2372+
#[cfg(not(feature = "verify_determinism"))]
2373+
let reschedule = false;
2374+
if reschedule {
2375+
task.add_new(CachedDataItem::InProgress {
2376+
value: InProgressState::Scheduled {
2377+
done_event,
2378+
reason: TaskExecutionReason::Stale,
2379+
},
2380+
});
2381+
drop(task);
2382+
} else {
2383+
drop(task);
2384+
2385+
// Notify dependent tasks that are waiting for this task to finish
2386+
done_event.notify(usize::MAX);
2387+
}
23582388

2359-
// Notify dependent tasks that are waiting for this task to finish
2360-
done_event.notify(usize::MAX);
2389+
drop(old_content);
23612390

23622391
if let Some(data_update) = data_update {
23632392
AggregationUpdateQueue::run(data_update, ctx);
23642393
}
23652394

2366-
false
2395+
reschedule
23672396
}
23682397

23692398
fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
@@ -2652,12 +2681,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26522681
task_id: TaskId,
26532682
cell: CellId,
26542683
content: CellContent,
2684+
verification_mode: VerificationMode,
26552685
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
26562686
) {
26572687
operation::UpdateCellOperation::run(
26582688
task_id,
26592689
cell,
26602690
content,
2691+
verification_mode,
26612692
self.execute_context(turbo_tasks),
26622693
);
26632694
}
@@ -3251,9 +3282,11 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
32513282
task_id: TaskId,
32523283
cell: CellId,
32533284
content: CellContent,
3285+
verification_mode: VerificationMode,
32543286
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
32553287
) {
3256-
self.0.update_task_cell(task_id, cell, content, turbo_tasks);
3288+
self.0
3289+
.update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
32573290
}
32583291

32593292
fn mark_own_task_as_finished(

turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::mem::take;
22

33
use serde::{Deserialize, Serialize};
44
use smallvec::SmallVec;
5+
#[cfg(not(feature = "verify_determinism"))]
6+
use turbo_tasks::backend::VerificationMode;
57
use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent};
68

79
#[cfg(feature = "trace_task_dirty")]
@@ -13,7 +15,7 @@ use crate::{
1315
AggregationUpdateQueue, ExecuteContext, Operation, TaskGuard,
1416
invalidate::make_task_dirty_internal,
1517
},
16-
storage::{get_many, remove},
18+
storage::{get, get_many, remove},
1719
},
1820
data::{CachedDataItem, CachedDataItemKey, CellRef},
1921
};
@@ -40,18 +42,61 @@ pub enum UpdateCellOperation {
4042
}
4143

4244
impl UpdateCellOperation {
43-
pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) {
45+
pub fn run(
46+
task_id: TaskId,
47+
cell: CellId,
48+
content: CellContent,
49+
#[cfg(feature = "verify_determinism")] verification_mode: VerificationMode,
50+
#[cfg(not(feature = "verify_determinism"))] _verification_mode: VerificationMode,
51+
mut ctx: impl ExecuteContext,
52+
) {
53+
let content = if let CellContent(Some(new_content)) = content {
54+
Some(new_content.into_typed(cell.type_id))
55+
} else {
56+
None
57+
};
58+
4459
let mut task = ctx.task(task_id, TaskDataCategory::All);
4560

61+
let is_stateful = task.has_key(&CachedDataItemKey::Stateful {});
4662
// We need to detect recomputation, because here the content has not actually changed (even
4763
// if it's not equal to the old content, as not all values implement Eq). We have to
4864
// assume that tasks are deterministic and pure.
49-
let should_invalidate = ctx.should_track_dependencies()
50-
&& (task.has_key(&CachedDataItemKey::Dirty {}) ||
65+
let assume_unchanged = !ctx.should_track_dependencies()
66+
|| (!task.has_key(&CachedDataItemKey::Dirty {})
5167
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
52-
task.has_key(&CachedDataItemKey::Stateful {}));
68+
&& !is_stateful);
69+
70+
let old_content = get!(task, CellData { cell });
71+
72+
if assume_unchanged {
73+
if old_content.is_some() {
74+
// Never update cells when recomputing if they already have a value.
75+
// It's not expected that content changes during recomputation.
76+
77+
// Check if this assumption holds.
78+
#[cfg(feature = "verify_determinism")]
79+
if !is_stateful
80+
&& matches!(verification_mode, VerificationMode::EqualityCheck)
81+
&& content.as_ref() != old_content
82+
{
83+
let task_description = ctx.get_task_description(task_id);
84+
let cell_type = turbo_tasks::registry::get_value_type(cell.type_id).global_name;
85+
eprintln!(
86+
"Task {} updated cell #{} (type: {}) while recomputing",
87+
task_description, cell.index, cell_type
88+
);
89+
}
90+
return;
91+
} else {
92+
// Initial computation, or computation after a cell has been cleared.
93+
// We can just set the content, but we don't want to notify dependent tasks,
94+
// as we assume that content hasn't changed (deterministic tasks).
95+
}
96+
} else {
97+
// When not recomputing, we need to notify dependent tasks if the content actually
98+
// changes.
5399

54-
if should_invalidate {
55100
let dependent_tasks: SmallVec<[TaskId; 4]> = get_many!(
56101
task,
57102
CellDependent { cell: dependent_cell, task }
@@ -78,12 +123,6 @@ impl UpdateCellOperation {
78123
drop(task);
79124
drop(old_content);
80125

81-
let content = if let CellContent(Some(new_content)) = content {
82-
Some(new_content.into_typed(cell.type_id))
83-
} else {
84-
None
85-
};
86-
87126
UpdateCellOperation::InvalidateWhenCellDependency {
88127
cell_ref: CellRef {
89128
task: task_id,
@@ -101,8 +140,7 @@ impl UpdateCellOperation {
101140
// Fast path: We don't need to invalidate anything.
102141
// So we can just update the cell content.
103142

104-
let old_content = if let CellContent(Some(new_content)) = content {
105-
let new_content = new_content.into_typed(cell.type_id);
143+
let old_content = if let Some(new_content) = content {
106144
task.insert(CachedDataItem::CellData {
107145
cell,
108146
value: new_content,

turbopack/crates/turbo-tasks-backend/tests/detached.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ static REGISTRATION: Registration = register!();
1818
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1919
async fn test_spawns_detached() -> anyhow::Result<()> {
2020
run_once(&REGISTRATION, || async {
21+
println!("test_spawns_detached");
2122
// HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs`, just
2223
// disable GC for the test so this can't cause any problems.
2324
prevent_gc();
@@ -73,7 +74,9 @@ async fn spawns_detached(
7374
sender: TransientInstance<WatchSenderTaskInput<Option<Vc<u32>>>>,
7475
) -> Vc<()> {
7576
tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move {
77+
println!("spawns_detached: waiting for notify");
7678
notify.0.notified().await;
79+
println!("spawns_detached: notified, sending value");
7780
// creating cells after the normal lifetime of the task should be okay, as the parent task
7881
// is waiting on us before exiting!
7982
sender.0.send(Some(Vc::cell(42))).unwrap();

turbopack/crates/turbo-tasks-testing/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tokio::sync::mpsc::Receiver;
1818
use turbo_tasks::{
1919
CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
2020
ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
21-
backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
21+
backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
2222
event::{Event, EventListener},
2323
message_queue::CompilationEvent,
2424
test_helpers::with_turbo_tasks_for_testing,
@@ -261,7 +261,13 @@ impl TurboTasksApi for VcStorage {
261261
.into_typed(index.type_id))
262262
}
263263

264-
fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
264+
fn update_own_task_cell(
265+
&self,
266+
task: TaskId,
267+
index: CellId,
268+
content: CellContent,
269+
_verification_mode: VerificationMode,
270+
) {
265271
let mut map = self.cells.lock().unwrap();
266272
let cell = map.entry((task, index)).or_default();
267273
*cell = content;

turbopack/crates/turbo-tasks-testing/src/run.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Debug, future::Future, sync::Arc};
1+
use std::{env, fmt::Debug, future::Future, sync::Arc};
22

33
use anyhow::Result;
44
use turbo_tasks::{TurboTasksApi, trace::TraceRawVcs};
@@ -98,22 +98,28 @@ where
9898
F: Future<Output = Result<T>> + Send + 'static,
9999
T: Debug + PartialEq + Eq + TraceRawVcs + Send + 'static,
100100
{
101+
let single_run = env::var("SINGLE_RUN").is_ok();
101102
let name = closure_to_name(&fut);
102103
let tt = registration.create_turbo_tasks(&name, true);
103104
println!("Run #1 (without cache)");
104105
let start = std::time::Instant::now();
105106
let first = fut(tt.clone()).await?;
106107
println!("Run #1 took {:?}", start.elapsed());
107-
for i in 2..10 {
108-
println!("Run #{i} (with memory cache, same TurboTasks instance)");
109-
let start = std::time::Instant::now();
110-
let second = fut(tt.clone()).await?;
111-
println!("Run #{i} took {:?}", start.elapsed());
112-
assert_eq!(first, second);
108+
if !single_run {
109+
for i in 2..10 {
110+
println!("Run #{i} (with memory cache, same TurboTasks instance)");
111+
let start = std::time::Instant::now();
112+
let second = fut(tt.clone()).await?;
113+
println!("Run #{i} took {:?}", start.elapsed());
114+
assert_eq!(first, second);
115+
}
113116
}
114117
let start = std::time::Instant::now();
115118
tt.stop_and_wait().await;
116119
println!("Stopping TurboTasks took {:?}", start.elapsed());
120+
if single_run {
121+
return Ok(());
122+
}
117123
for i in 10..20 {
118124
let tt = registration.create_turbo_tasks(&name, false);
119125
println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)");

turbopack/crates/turbo-tasks/src/backend.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ impl From<anyhow::Error> for TurboTasksExecutionError {
497497
}
498498
}
499499

500+
pub enum VerificationMode {
501+
EqualityCheck,
502+
Skip,
503+
}
504+
500505
pub trait Backend: Sync + Send {
501506
#[allow(unused_variables)]
502507
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
@@ -621,6 +626,7 @@ pub trait Backend: Sync + Send {
621626
task: TaskId,
622627
index: CellId,
623628
content: CellContent,
629+
verification_mode: VerificationMode,
624630
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
625631
);
626632

0 commit comments

Comments
 (0)