Skip to content

Commit

Permalink
[stable2412] Backport #6729 (#6830)
Browse files Browse the repository at this point in the history
Backport #6729 into `stable2412` from alexggh.

See the
[documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md)
on how to use this bot.

<!--
  # To be used by other automation, do not modify:
  original-pr-number: #${pull_number}
-->

Co-authored-by: Alexandru Gheorghe <[email protected]>
  • Loading branch information
1 parent d74cd0b commit 2f20a64
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 3 deletions.
12 changes: 9 additions & 3 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,8 +1582,9 @@ async fn handle_actions<
session_info_provider,
)
.await?;

approval_voting_sender.send_messages(messages.into_iter()).await;
for message in messages.into_iter() {
approval_voting_sender.send_unbounded_message(message);
}
let next_actions: Vec<Action> =
next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();

Expand Down Expand Up @@ -1668,6 +1669,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi

let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
let mut approvals = Vec::new();
let mut actions = Vec::new();

messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
Expand Down Expand Up @@ -1839,7 +1841,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
approvals.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: approval_sig.signed_candidates_indices,
Expand All @@ -1864,6 +1866,10 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
}

messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
// Approvals are appended at the end, to make sure all assignments are sent
// before the approvals, otherwise if they arrive ahead in approval-distribution
// they will be ignored.
messages.extend(approvals.into_iter());
Ok((messages, actions))
}

Expand Down
314 changes: 314 additions & 0 deletions polkadot/node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4459,6 +4459,114 @@ async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered(
assert!(our_assignment.triggered());
}

// Builds a chain with a fork where both relay blocks include the same candidate.
async fn build_chain_with_block_with_two_candidates(
block_hash1: Hash,
slot: Slot,
sync_oracle_handle: TestSyncOracleHandle,
candidate_receipt: Vec<CandidateReceipt>,
) -> (ChainBuilder, SessionInfo) {
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
];
let session_info = SessionInfo {
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(vec![
vec![ValidatorIndex(0), ValidatorIndex(1)],
vec![ValidatorIndex(2)],
vec![ValidatorIndex(3), ValidatorIndex(4)],
]),
..session_info(&validators)
};

let candidates = Some(
candidate_receipt
.iter()
.enumerate()
.map(|(i, receipt)| (receipt.clone(), CoreIndex(i as u32), GroupIndex(i as u32)))
.collect(),
);
let mut chain_builder = ChainBuilder::new();

chain_builder
.major_syncing(sync_oracle_handle.is_major_syncing.clone())
.add_block(
block_hash1,
ChainBuilder::GENESIS_HASH,
1,
BlockConfig {
slot,
candidates: candidates.clone(),
session_info: Some(session_info.clone()),
end_syncing: true,
},
);
(chain_builder, session_info)
}

async fn setup_overseer_with_blocks_with_two_assignments_triggered(
virtual_overseer: &mut VirtualOverseer,
store: TestStore,
clock: &Arc<MockClock>,
sync_oracle_handle: TestSyncOracleHandle,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();
let candidate_hash2 = candidate_receipt2.hash();

let slot = Slot::from(1);
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt, candidate_receipt2],
)
.await;
chain_builder.build(virtual_overseer).await;

assert!(!clock.inner.lock().current_wakeup_is(1));
clock.inner.lock().wakeup_all(1);

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot)));
clock.inner.lock().wakeup_all(slot_to_tick(slot));

futures_timer::Delay::new(Duration::from_millis(200)).await;

clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;

let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());

let candidate_entry = store.load_candidate_entry(&candidate_hash2).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());
}

// Tests that for candidates that we did not approve yet, for which we triggered the assignment and
// the approval work we restart the work to approve it.
#[test]
Expand Down Expand Up @@ -4920,6 +5028,212 @@ fn subsystem_sends_pending_approvals_on_approval_restart() {
});
}

// Test that after restart approvals are sent after all assignments have been distributed.
#[test]
fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
let assignment_criteria = Box::new(MockAssignmentCriteria(
|| {
let mut assignments = HashMap::new();

let _ = assignments.insert(
CoreIndex(0),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact {
core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);

let _ = assignments.insert(
CoreIndex(1),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
core_index: CoreIndex(1),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);
assignments
},
|_| Ok(0),
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
let store_clone = config.backend();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

setup_overseer_with_blocks_with_two_assignments_triggered(
&mut virtual_overseer,
store,
&clock,
sync_oracle_handle,
)
.await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

// Configure a big coalesce number, so that the signature is cached instead of being sent to
// approval-distribution.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});

let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();
// On restart we should first distribute all assignments covering a coalesced approval.
test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();

let slot = Slot::from(1);

clock.inner.lock().set_tick(slot_to_tick(slot + 2));
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt.into(), candidate_receipt2.into()],
)
.await;
chain_builder.build(&mut virtual_overseer).await;

futures_timer::Delay::new(Duration::from_millis(2000)).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(approval)) => {
assert_eq!(approval.candidate_indices.count_ones(), 2);
}
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});
}

// Test we correctly update the timer when we mark the beginning of gathering assignments.
#[test]
fn test_gathering_assignments_statements() {
Expand Down
15 changes: 15 additions & 0 deletions prdoc/pr_6729.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Fix order of resending messages after restart

doc:
- audience: Node Dev
description: |
At restart when dealing with a coalesced approval we might end up in a situation where we sent to
approval-distribution the approval before all assignments covering it, in that case, the approval
is ignored and never distribute, which will lead to no-shows.

crates:
- name: polkadot-node-core-approval-voting
bump: minor

0 comments on commit 2f20a64

Please sign in to comment.