Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 10 additions & 148 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,51 +142,18 @@ impl AccountsHashVerifier {
Some((accounts_package, 1, 0))
}
_ => {
let num_eah_packages = accounts_packages
.iter()
.filter(|account_package| {
account_package.package_kind == AccountsPackageKind::EpochAccountsHash
})
.count();
assert!(
num_eah_packages <= 1,
"Only a single EAH accounts package is allowed at a time! count: \
{num_eah_packages}"
);

// Get the two highest priority requests, `y` and `z`.
// By asking for the second-to-last element to be in its final sorted position, we
// also ensure that the last element is also sorted.
let (_, y, z) = accounts_packages.select_nth_unstable_by(
// Note, we no longer need the second-to-last element; this code can be refactored.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let (_, _y, z) = accounts_packages.select_nth_unstable_by(
accounts_packages_len - 2,
snapshot_package::cmp_accounts_packages_by_priority,
);
assert_eq!(z.len(), 1);
let z = z.first().unwrap();
let y: &_ = y; // reborrow to remove `mut`

// If the highest priority request (`z`) is EpochAccountsHash, we need to check if
// there's a FullSnapshot request with a lower slot in `y` that is about to be
// dropped. We do not want to drop a FullSnapshot request in this case because it
// will cause subsequent IncrementalSnapshot requests to fail.
//
// So, if `z` is an EpochAccountsHash request, check `y`. We know there can only
// be at most one EpochAccountsHash request, so `y` is the only other request we
// need to check. If `y` is a FullSnapshot request *with a lower slot* than `z`,
// then handle `y` first.
let accounts_package = if z.package_kind == AccountsPackageKind::EpochAccountsHash
&& y.package_kind == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
&& y.slot < z.slot
{
// SAFETY: We know the len is > 1, so both `pop`s will return `Some`
let z = accounts_packages.pop().unwrap();
let y = accounts_packages.pop().unwrap();
accounts_packages.push(z);
y
} else {
// SAFETY: We know the len is > 1, so `pop` will return `Some`
accounts_packages.pop().unwrap()
};

// SAFETY: We know the len is > 1, so `pop` will return `Some`
let accounts_package = accounts_packages.pop().unwrap();

let handled_accounts_package_slot = accounts_package.slot;
// re-enqueue any remaining accounts packages for slots GREATER-THAN the accounts package
Expand Down Expand Up @@ -258,7 +225,6 @@ impl AccountsHashVerifier {
}

let accounts_hash_calculation_kind = match accounts_package.package_kind {
AccountsPackageKind::EpochAccountsHash => unreachable!("EAH is removed"),
AccountsPackageKind::Snapshot(snapshot_kind) => match snapshot_kind {
SnapshotKind::FullSnapshot => CalcAccountsHashKind::Full,
SnapshotKind::IncrementalSnapshot(_) => CalcAccountsHashKind::Incremental,
Expand Down Expand Up @@ -506,9 +472,6 @@ mod tests {
..AccountsPackage::default_for_tests()
}
}
fn new_eah(slot: Slot) -> AccountsPackage {
new(AccountsPackageKind::EpochAccountsHash, slot)
}
fn new_fss(slot: Slot) -> AccountsPackage {
new(
AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot),
Expand All @@ -535,13 +498,12 @@ mod tests {
let mut accounts_packages = [
new_fss(100), // skipped, since there's another full snapshot with a higher slot
new_iss(110, 100),
new_eah(200), // <-- handle 1st
new_iss(210, 100),
new_fss(300),
new_fss(300), // skipped, since there's another full snapshot with a higher slot
new_iss(310, 300),
new_fss(400), // <-- handle 2nd
new_fss(400), // <-- handle 1st
new_iss(410, 400),
new_iss(420, 400), // <-- handle 3rd
new_iss(420, 400), // <-- handle 2nd
];
// Shuffle the accounts packages to simulate receiving new accounts packages from ABS
// simultaneously as AHV is processing them.
Expand All @@ -550,25 +512,8 @@ mod tests {
.into_iter()
.for_each(|accounts_package| accounts_package_sender.send(accounts_package).unwrap());

// The EAH is handled 1st
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_kind,
AccountsPackageKind::EpochAccountsHash
);
assert_eq!(account_package.slot, 200);
assert_eq!(num_re_enqueued_accounts_packages, 6);

// The Full Snapshot from slot 400 is handled 2nd
// (the older full snapshot from slot 300 is skipped and dropped)
// The Full Snapshot from slot 400 is handled 1st
// (the older full snapshots are skipped and dropped)
let (
account_package,
_num_outstanding_accounts_packages,
Expand Down Expand Up @@ -610,87 +555,4 @@ mod tests {
)
.is_none());
}

/// Ensure that unhandled accounts packages are properly re-enqueued or dropped
///
/// This test differs from the one above by having an older full snapshot request that must be
/// handled before the new epoch accounts hash request.
#[test]
fn test_get_next_accounts_package2() {
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();

// Populate the channel so that re-enqueueing and dropping will be tested
let mut accounts_packages = [
new_fss(100), // <-- handle 1st
new_iss(110, 100),
new_eah(200), // <-- handle 2nd
new_iss(210, 100),
new_iss(220, 100), // <-- handle 3rd
];
// Shuffle the accounts packages to simulate receiving new accounts packages from ABS
// simultaneously as AHV is processing them.
accounts_packages.shuffle(&mut rand::thread_rng());
accounts_packages
.into_iter()
.for_each(|accounts_package| accounts_package_sender.send(accounts_package).unwrap());

// The Full Snapshot is handled 1st
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_kind,
AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
);
assert_eq!(account_package.slot, 100);
assert_eq!(num_re_enqueued_accounts_packages, 4);

// The EAH is handled 2nd
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_kind,
AccountsPackageKind::EpochAccountsHash
);
assert_eq!(account_package.slot, 200);
assert_eq!(num_re_enqueued_accounts_packages, 2);

// The Incremental Snapshot from slot 220 is handled 3rd
// (the older incremental snapshot from slot 210 is skipped and dropped)
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_kind,
AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(100))
);
assert_eq!(account_package.slot, 220);
assert_eq!(num_re_enqueued_accounts_packages, 0);

// And now the accounts package channel is empty!
assert!(AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver
)
.is_none());
}
}
Loading
Loading