Skip to content

Commit

Permalink
Auto merge of #84806 - Mark-Simulacrum:try-start-entry, r=cjgillot
Browse files Browse the repository at this point in the history
Streamline try_start code

This shifts some branches around and avoids interleaving parallel and
non-parallel versions of the function too much.
  • Loading branch information
bors committed May 6, 2021
2 parents 676ee14 + 981135a commit 777bb2f
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 101 deletions.
4 changes: 2 additions & 2 deletions compiler/rustc_query_system/src/query/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub trait CacheSelector<K, V> {
type Cache;
}

pub trait QueryStorage: Default {
pub trait QueryStorage {
type Value: Debug;
type Stored: Clone;

Expand All @@ -23,7 +23,7 @@ pub trait QueryStorage: Default {
fn store_nocache(&self, value: Self::Value) -> Self::Stored;
}

pub trait QueryCache: QueryStorage {
pub trait QueryCache: QueryStorage + Sized {
type Key: Hash + Eq + Clone + Debug;
type Sharded: Default;

Expand Down
4 changes: 0 additions & 4 deletions compiler/rustc_query_system/src/query/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ impl<CTX: QueryContext, K, V> QueryVtable<CTX, K, V> {
(self.hash_result)(hcx, value)
}

pub(crate) fn handle_cycle_error(&self, tcx: CTX, diag: DiagnosticBuilder<'_>) -> V {
(self.handle_cycle_error)(tcx, diag)
}

pub(crate) fn cache_on_disk(&self, tcx: CTX, key: &K, value: Option<&V>) -> bool {
(self.cache_on_disk)(tcx, key, value)
}
Expand Down
21 changes: 3 additions & 18 deletions compiler/rustc_query_system/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use rustc_span::Span;

use std::convert::TryFrom;
use std::hash::Hash;
use std::marker::PhantomData;
use std::num::NonZeroU32;

#[cfg(parallel_compiler)]
Expand Down Expand Up @@ -100,8 +99,6 @@ pub struct QueryJob<D> {
/// The latch that is used to wait on this job.
#[cfg(parallel_compiler)]
latch: Option<QueryLatch<D>>,

dummy: PhantomData<QueryLatch<D>>,
}

impl<D> QueryJob<D>
Expand All @@ -116,23 +113,17 @@ where
parent,
#[cfg(parallel_compiler)]
latch: None,
dummy: PhantomData,
}
}

#[cfg(parallel_compiler)]
pub(super) fn latch(&mut self, _id: QueryJobId<D>) -> QueryLatch<D> {
pub(super) fn latch(&mut self) -> QueryLatch<D> {
if self.latch.is_none() {
self.latch = Some(QueryLatch::new());
}
self.latch.as_ref().unwrap().clone()
}

#[cfg(not(parallel_compiler))]
pub(super) fn latch(&mut self, id: QueryJobId<D>) -> QueryLatch<D> {
QueryLatch { id }
}

/// Signals to waiters that the query is complete.
///
/// This does nothing for single threaded rustc,
Expand All @@ -148,13 +139,7 @@ where
}

#[cfg(not(parallel_compiler))]
#[derive(Clone)]
pub(super) struct QueryLatch<D> {
id: QueryJobId<D>,
}

#[cfg(not(parallel_compiler))]
impl<D> QueryLatch<D>
impl<D> QueryJobId<D>
where
D: Copy + Clone + Eq + Hash,
{
Expand All @@ -172,7 +157,7 @@ where
let info = query_map.get(&job).unwrap();
cycle.push(info.info.clone());

if job == self.id {
if job == *self {
cycle.reverse();

// This is the end of the cycle
Expand Down
183 changes: 106 additions & 77 deletions compiler/rustc_query_system/src/query/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::query::job::{
};
use crate::query::{QueryContext, QueryMap, QueryStackFrame};

#[cfg(not(parallel_compiler))]
use rustc_data_structures::cold_path;
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::{FxHashMap, FxHasher};
use rustc_data_structures::sharded::{get_shard_index_by_hash, Sharded};
use rustc_data_structures::sync::{Lock, LockGuard};
use rustc_data_structures::thin_vec::ThinVec;
#[cfg(not(parallel_compiler))]
use rustc_errors::DiagnosticBuilder;
use rustc_errors::{Diagnostic, FatalError};
use rustc_span::Span;
use std::collections::hash_map::Entry;
Expand All @@ -36,7 +36,7 @@ pub struct QueryCacheStore<C: QueryCache> {
pub cache_hits: AtomicUsize,
}

impl<C: QueryCache> Default for QueryCacheStore<C> {
impl<C: QueryCache + Default> Default for QueryCacheStore<C> {
fn default() -> Self {
Self {
cache: C::default(),
Expand Down Expand Up @@ -158,6 +158,31 @@ where
id: QueryJobId<D>,
}

#[cold]
#[inline(never)]
#[cfg(not(parallel_compiler))]
fn mk_cycle<CTX, V, R>(
tcx: CTX,
root: QueryJobId<CTX::DepKind>,
span: Span,
handle_cycle_error: fn(CTX, DiagnosticBuilder<'_>) -> V,
cache: &dyn crate::query::QueryStorage<Value = V, Stored = R>,
) -> R
where
CTX: QueryContext,
V: std::fmt::Debug,
R: Clone,
{
let error: CycleError = root.find_cycle_in_stack(
tcx.try_collect_active_jobs().unwrap(),
&tcx.current_query_job(),
span,
);
let error = report_cycle(tcx.dep_context().sess(), error);
let value = handle_cycle_error(tcx, error);
cache.store_nocache(value)
}

impl<'tcx, D, C> JobOwner<'tcx, D, C>
where
D: Copy + Clone + Eq + Hash,
Expand All @@ -177,7 +202,7 @@ where
state: &'b QueryState<CTX::DepKind, C::Key>,
cache: &'b QueryCacheStore<C>,
span: Span,
key: &C::Key,
key: C::Key,
lookup: QueryLookup,
query: &QueryVtable<CTX, C::Key, C::Value>,
) -> TryGetJob<'b, CTX::DepKind, C>
Expand All @@ -188,94 +213,86 @@ where
let mut state_lock = state.shards.get_shard_by_index(shard).lock();
let lock = &mut *state_lock;

let (latch, mut _query_blocked_prof_timer) = match lock.active.entry((*key).clone()) {
Entry::Occupied(mut entry) => {
match entry.get_mut() {
QueryResult::Started(job) => {
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let _query_blocked_prof_timer = if cfg!(parallel_compiler) {
Some(tcx.dep_context().profiler().query_blocked())
} else {
None
};

// Create the id of the job we're waiting for
let id = QueryJobId::new(job.id, shard, query.dep_kind);

(job.latch(id), _query_blocked_prof_timer)
}
QueryResult::Poisoned => FatalError.raise(),
}
}
match lock.active.entry(key) {
Entry::Vacant(entry) => {
// No job entry for this query. Return a new one to be started later.

// Generate an id unique within this shard.
let id = lock.jobs.checked_add(1).unwrap();
lock.jobs = id;
let id = QueryShardJobId(NonZeroU32::new(id).unwrap());

let global_id = QueryJobId::new(id, shard, query.dep_kind);

let job = tcx.current_query_job();
let job = QueryJob::new(id, span, job);

let key = entry.key().clone();
entry.insert(QueryResult::Started(job));

let owner = JobOwner { state, cache, id: global_id, key: (*key).clone() };
let global_id = QueryJobId::new(id, shard, query.dep_kind);
let owner = JobOwner { state, cache, id: global_id, key };
return TryGetJob::NotYetStarted(owner);
}
};
mem::drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
#[cfg(not(parallel_compiler))]
return TryGetJob::Cycle(cold_path(|| {
let error: CycleError = latch.find_cycle_in_stack(
tcx.try_collect_active_jobs().unwrap(),
&tcx.current_query_job(),
span,
);
let error = report_cycle(tcx.dep_context().sess(), error);
let value = query.handle_cycle_error(tcx, error);
cache.cache.store_nocache(value)
}));

// With parallel queries we might just have to wait on some other
// thread.
#[cfg(parallel_compiler)]
{
let result = latch.wait_on(tcx.current_query_job(), span);

if let Err(cycle) = result {
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
let value = query.handle_cycle_error(tcx, cycle);
let value = cache.cache.store_nocache(value);
return TryGetJob::Cycle(value);
}
Entry::Occupied(mut entry) => {
match entry.get_mut() {
#[cfg(not(parallel_compiler))]
QueryResult::Started(job) => {
let id = QueryJobId::new(job.id, shard, query.dep_kind);

let cached = cache
.cache
.lookup(cache, &key, |value, index| {
if unlikely!(tcx.dep_context().profiler().enabled()) {
tcx.dep_context().profiler().query_cache_hit(index.into());
drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
return TryGetJob::Cycle(mk_cycle(
tcx,
id,
span,
query.handle_cycle_error,
&cache.cache,
));
}
#[cfg(debug_assertions)]
{
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
#[cfg(parallel_compiler)]
QueryResult::Started(job) => {
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let query_blocked_prof_timer = tcx.dep_context().profiler().query_blocked();

// Get the latch out
let latch = job.latch();
let key = entry.key().clone();

drop(state_lock);

// With parallel queries we might just have to wait on some other
// thread.
let result = latch.wait_on(tcx.current_query_job(), span);

if let Err(cycle) = result {
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
let value = (query.handle_cycle_error)(tcx, cycle);
let value = cache.cache.store_nocache(value);
return TryGetJob::Cycle(value);
}

let cached = cache
.cache
.lookup(cache, &key, |value, index| {
if unlikely!(tcx.dep_context().profiler().enabled()) {
tcx.dep_context().profiler().query_cache_hit(index.into());
}
#[cfg(debug_assertions)]
{
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
}
(value.clone(), index)
})
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));

query_blocked_prof_timer.finish_with_query_invocation_id(cached.1.into());

return TryGetJob::JobCompleted(cached);
}
(value.clone(), index)
})
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));

if let Some(prof_timer) = _query_blocked_prof_timer.take() {
prof_timer.finish_with_query_invocation_id(cached.1.into());
QueryResult::Poisoned => FatalError.raise(),
}
}

return TryGetJob::JobCompleted(cached);
}
}

Expand Down Expand Up @@ -418,7 +435,13 @@ where
CTX: QueryContext,
{
let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
tcx, state, cache, span, &key, lookup, query,
tcx,
state,
cache,
span,
key.clone(),
lookup,
query,
) {
TryGetJob::NotYetStarted(job) => job,
TryGetJob::Cycle(result) => return result,
Expand Down Expand Up @@ -741,7 +764,13 @@ fn force_query_impl<CTX, C>(
};

let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
tcx, state, cache, span, &key, lookup, query,
tcx,
state,
cache,
span,
key.clone(),
lookup,
query,
) {
TryGetJob::NotYetStarted(job) => job,
TryGetJob::Cycle(_) => return,
Expand Down

0 comments on commit 777bb2f

Please sign in to comment.