Skip to content

Commit

Permalink
Merge commit from fork
Browse files Browse the repository at this point in the history
* Fix a race condition in the Wasm type registry

Under certain concurrent event orderings, the type registry was susceptible to
double-unregistration bugs due to a race condition.

Consider the following example where we have threads `A` and `B`, an existing
rec group entry `E`, and another rec group entry `E'` that is a duplicate of
`E`:

| Thread A                          | Thread B                       |
|-----------------------------------|--------------------------------|
| `acquire(type registry lock)`     |                                |
|                                   | `decref(E) --> 0`              |
|                                   | `block_on(type registry lock)` |
| `register(E') == incref(E) --> 1` |                                |
| `release(type registry lock)`     |                                |
| `decref(E) --> 0`                 |                                |
| `acquire(type registry lock)`     |                                |
| `unregister(E)`                   |                                |
| `release(type registry lock)`     |                                |
|                                   | `acquire(type registry lock)`  |
|                                   | `unregister(E)`         !!!!!! |

As you can see, in the above event ordering, we would unregister `E` twice,
leading to panics and potential type registry corruption.

This commit adds an `unregistered` flag to each entry which is set when we
commit to unregistering the entry and while we hold the type registry lock. When
we are considering unregistering an entry at the beginning of
`TypeRegistryInner::unregister_entry`, because we observed a zero-registrations
count for that entry and then waited on the type registry's lock, we now check
if that flag is already set (by some concurrent unregistration that happened
between observing the zero-registrations count and acquiring the lock) before we
actually perform the unregistration.

Furthermore, in the process of writing a smoke test for concurrent module (and
therefore type) loading and unloading, I discovered an index out-of-bounds panic
during Wasm-to-CLIF module translation. This commit also includes the one-line
fix for that bug and a `.wast` regression test for it as well.

* Update test to trigger case requiring `unregistered` flag

* Add another test further stressing concurrent interactions

Try to get a module's function type to get typed the wrong way and/or
get wasm to call with the wrong signature.

---------

Co-authored-by: Alex Crichton <[email protected]>
  • Loading branch information
fitzgen and alexcrichton authored Oct 9, 2024
1 parent bc6b0fe commit 0ebe54d
Show file tree
Hide file tree
Showing 2 changed files with 417 additions and 27 deletions.
208 changes: 181 additions & 27 deletions crates/wasmtime/src/runtime/type_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use core::{
hash::{Hash, Hasher},
ops::Range,
sync::atomic::{
AtomicUsize,
Ordering::{AcqRel, Acquire},
AtomicBool, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
};
use wasmtime_environ::{
iter_entity_range, packed_option::PackedOption, EngineOrModuleTypeIndex, GcLayout,
ModuleInternedTypeIndex, ModuleTypes, PrimaryMap, SecondaryMap, TypeTrace, VMSharedTypeIndex,
WasmRecGroup, WasmSubType,
iter_entity_range,
packed_option::{PackedOption, ReservedValue},
EngineOrModuleTypeIndex, GcLayout, ModuleInternedTypeIndex, ModuleTypes, PrimaryMap,
SecondaryMap, TypeTrace, VMSharedTypeIndex, WasmRecGroup, WasmSubType,
};
use wasmtime_slab::{Id as SlabId, Slab};

Expand Down Expand Up @@ -180,12 +181,15 @@ impl Drop for TypeCollection {

#[inline]
fn shared_type_index_to_slab_id(index: VMSharedTypeIndex) -> SlabId {
assert!(!index.is_reserved_value());
SlabId::from_raw(index.bits())
}

#[inline]
fn slab_id_to_shared_type_index(id: SlabId) -> VMSharedTypeIndex {
VMSharedTypeIndex::new(id.into_raw())
let index = VMSharedTypeIndex::new(id.into_raw());
assert!(!index.is_reserved_value());
index
}

/// A Wasm type that has been registered in the engine's `TypeRegistry`.
Expand Down Expand Up @@ -417,8 +421,25 @@ impl Debug for RecGroupEntry {
struct RecGroupEntryInner {
/// The Wasm rec group, canonicalized for hash consing.
hash_consing_key: WasmRecGroup,

/// The shared type indices for each type in this rec group.
shared_type_indices: Box<[VMSharedTypeIndex]>,

/// The number of times that this entry has been registered in the
/// `TypeRegistryInner`.
///
/// This is an atomic counter so that cloning a `RegisteredType`, and
/// temporarily keeping a type registered, doesn't require locking the full
/// registry.
registrations: AtomicUsize,

/// Whether this entry has already been unregistered from the
/// `TypeRegistryInner`.
///
/// This flag exists to detect and avoid double-unregistration bugs that
/// could otherwise occur in rare cases. See the comments in
/// `TypeRegistryInner::unregister_type` for details.
unregistered: AtomicBool,
}

impl PartialEq for RecGroupEntry {
Expand Down Expand Up @@ -611,6 +632,7 @@ impl TypeRegistryInner {

// If we've already registered this rec group before, reuse it.
if let Some(entry) = self.hash_consing_map.get(&hash_consing_key) {
assert_eq!(entry.0.unregistered.load(Acquire), false);
entry.incref(
"hash consed to already-registered type in `TypeRegistryInner::register_rec_group`",
);
Expand All @@ -622,8 +644,9 @@ impl TypeRegistryInner {
// while this rec group is still alive.
hash_consing_key
.trace_engine_indices::<_, ()>(&mut |index| {
let entry = &self.type_to_rec_group[index].as_ref().unwrap();
entry.incref(
let other_entry = &self.type_to_rec_group[index].as_ref().unwrap();
assert_eq!(other_entry.0.unregistered.load(Acquire), false);
other_entry.incref(
"new cross-group type reference to existing type in `register_rec_group`",
);
Ok(())
Expand All @@ -645,17 +668,32 @@ impl TypeRegistryInner {
map[idx]
} else {
let rec_group_offset = idx.as_u32() - module_rec_group_start.as_u32();
VMSharedTypeIndex::from_u32(engine_rec_group_start + rec_group_offset)
let index =
VMSharedTypeIndex::from_u32(engine_rec_group_start + rec_group_offset);
assert!(!index.is_reserved_value());
index
}
});
self.insert_one_type_from_rec_group(gc_runtime, module_index, ty)
})
.collect();

debug_assert_eq!(
shared_type_indices.len(),
shared_type_indices
.iter()
.copied()
.inspect(|ty| assert!(!ty.is_reserved_value()))
.collect::<crate::hash_set::HashSet<_>>()
.len(),
"should not have any duplicate type indices",
);

let entry = RecGroupEntry(Arc::new(RecGroupEntryInner {
hash_consing_key,
shared_type_indices,
registrations: AtomicUsize::new(1),
unregistered: AtomicBool::new(false),
}));
log::trace!("create new entry {entry:?} (registrations -> 1)");

Expand Down Expand Up @@ -845,29 +883,133 @@ impl TypeRegistryInner {
/// zero remaining registrations.
fn unregister_entry(&mut self, entry: RecGroupEntry) {
debug_assert!(self.drop_stack.is_empty());

// There are two races to guard against before we can unregister the
// entry, even though it was on the drop stack:
//
// 1. Although an entry has to reach zero registrations before it is
// enqueued in the drop stack, we need to double check whether the
// entry is *still* at zero registrations. This is because someone
// else can resurrect the entry in between when the
// zero-registrations count was first observed and when we actually
// acquire the lock to unregister it. In this example, we have
// threads A and B, an existing rec group entry E, and a rec group
// entry E' that is a duplicate of E:
//
// Thread A | Thread B
// --------------------------------+-----------------------------
// acquire(type registry lock) |
// |
// | decref(E) --> 0
// |
// | block_on(type registry lock)
// |
// register(E') == incref(E) --> 1 |
// |
// release(type registry lock) |
// |
// | acquire(type registry lock)
// |
// | unregister(E) !!!!!!
//
// If we aren't careful, we can unregister a type while it is still
// in use!
//
// The fix in this case is that we skip unregistering the entry if
// its reference count is non-zero, since that means it was
// concurrently resurrected and is now in use again.
//
// 2. In a slightly more convoluted version of (1), where an entry is
// resurrected but then dropped *again*, someone might attempt to
// unregister an entry a second time:
//
// Thread A | Thread B
// --------------------------------|-----------------------------
// acquire(type registry lock) |
// |
// | decref(E) --> 0
// |
// | block_on(type registry lock)
// |
// register(E') == incref(E) --> 1 |
// |
// release(type registry lock) |
// |
// decref(E) --> 0 |
// |
// acquire(type registry lock) |
// |
// unregister(E) |
// |
// release(type registry lock) |
// |
// | acquire(type registry lock)
// |
// | unregister(E) !!!!!!
//
// If we aren't careful, we can unregister a type twice, which leads
// to panics and registry corruption!
//
// To detect this scenario and avoid the double-unregistration bug,
// we maintain an `unregistered` flag on entries. We set this flag
// once an entry is unregistered and therefore, even if it is
// enqueued in the drop stack multiple times, we only actually
// unregister the entry the first time.
//
// A final note: we don't need to worry about any concurrent
// modifications during the middle of this function's execution, only
// between (a) when we first observed a zero-registrations count and
// decided to unregister the type, and (b) when we acquired the type
// registry's lock so that we could perform that unregistration. This is
// because this method has exclusive access to `&mut self` -- that is,
// we have a write lock on the whole type registry -- and therefore no
// one else can create new references to this zero-registration entry
// and bring it back to life (which would require finding it in
// `self.hash_consing_map`, which no one else has access to, because we
// now have an exclusive lock on `self`).

// Handle scenario (1) from above.
let registrations = entry.0.registrations.load(Acquire);
if registrations != 0 {
log::trace!(
"{entry:?} was concurrently resurrected and no longer has \
zero registrations (registrations -> {registrations})",
);
assert_eq!(entry.0.unregistered.load(Acquire), false);
return;
}

// Handle scenario (2) from above.
if entry.0.unregistered.load(Acquire) {
log::trace!(
"{entry:?} was concurrently resurrected, dropped again, \
and already unregistered"
);
return;
}

// Okay, we are really going to unregister this entry. Enqueue it on the
// drop stack.
self.drop_stack.push(entry);

// Keep unregistering entries until the drop stack is empty. This is
// logically a recursive process where if we unregister a type that was
// the only thing keeping another type alive, we then recursively
// unregister that other type as well. However, we use this explicit
// drop stack to avoid recursion and the potential stack overflows that
// recursion implies.
while let Some(entry) = self.drop_stack.pop() {
log::trace!("Start unregistering {entry:?}");

// We need to double check whether the entry is still at zero
// registrations: Between the time that we observed a zero and
// acquired the lock to call this function, another thread could
// have registered the type and found the 0-registrations entry in
// `self.map` and incremented its count.
//
// We don't need to worry about any concurrent increments during
// this function's invocation after we check for zero because we
// have exclusive access to `&mut self` and therefore no one can
// create a new reference to this entry and bring it back to life.
let registrations = entry.0.registrations.load(Acquire);
if registrations != 0 {
log::trace!(
"{entry:?} was concurrently resurrected and no longer has \
zero registrations (registrations -> {registrations})",
);
continue;
}
// All entries on the drop stack should *really* be ready for
// unregistration, since no one can resurrect entries once we've
// locked the registry.
assert_eq!(entry.0.registrations.load(Acquire), 0);
assert_eq!(entry.0.unregistered.load(Acquire), false);

// We are taking responsibility for unregistering this entry, so
// prevent anyone else from attempting to do it again.
entry.0.unregistered.store(true, Release);

// Decrement any other types that this type was shallowly
// (i.e. non-transitively) referencing and keeping alive. If this
Expand Down Expand Up @@ -901,6 +1043,18 @@ impl TypeRegistryInner {
// map. Additionally, stop holding a strong reference from each
// function type in the rec group to that function type's trampoline
// type.
debug_assert_eq!(
entry.0.shared_type_indices.len(),
entry
.0
.shared_type_indices
.iter()
.copied()
.inspect(|ty| assert!(!ty.is_reserved_value()))
.collect::<crate::hash_set::HashSet<_>>()
.len(),
"should not have any duplicate type indices",
);
for ty in entry.0.shared_type_indices.iter().copied() {
log::trace!("removing {ty:?} from registry");

Expand Down
Loading

0 comments on commit 0ebe54d

Please sign in to comment.