Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ pub struct Overseer<SupportsParachains> {
ApprovalVotingMessage,
ApprovalDistributionMessage,
ApprovalVotingParallelMessage,
])]
], can_receive_priority_messages)]
approval_voting_parallel: ApprovalVotingParallel,
#[subsystem(GossipSupportMessage, sends: [
NetworkBridgeTxMessage,
Expand All @@ -643,7 +643,7 @@ pub struct Overseer<SupportsParachains> {
AvailabilityRecoveryMessage,
ChainSelectionMessage,
ApprovalVotingParallelMessage,
])]
], can_receive_priority_messages)]
dispute_coordinator: DisputeCoordinator,

#[subsystem(DisputeDistributionMessage, sends: [
Expand Down
278 changes: 276 additions & 2 deletions polkadot/node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use async_trait::async_trait;
use futures::{executor, pending, pin_mut, poll, select, stream, FutureExt};
use std::{collections::HashMap, sync::atomic, task::Poll};

use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange};
use polkadot_node_network_protocol::{
peer_set::ValidationVersion, ObservedRole, PeerId, UnifiedReputationChange,
};
use polkadot_node_primitives::{
BlockData, CollationGenerationConfig, CollationResult, DisputeMessage, InvalidDisputeVote, PoV,
UncheckedDisputeMessage, ValidDisputeVote,
Expand Down Expand Up @@ -853,10 +855,14 @@ fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
fn test_statement_distribution_with_priority_msg() -> StatementDistributionMessage {
StatementDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
StatementDistributionMessage::Backed(Default::default())
}

fn test_availability_recovery_msg() -> AvailabilityRecoveryMessage {
let (sender, _) = oneshot::channel();
AvailabilityRecoveryMessage::RecoverAvailableData(
Expand All @@ -872,6 +878,15 @@ fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdate(test_network_bridge_event())
}

fn test_bitfield_distribution_with_priority_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
PeerId::random(),
ObservedRole::Authority,
ValidationVersion::V3.into(),
None,
))
}

fn test_provisioner_msg() -> ProvisionerMessage {
let (sender, _) = oneshot::channel();
ProvisionerMessage::RequestInherentData(Default::default(), sender)
Expand Down Expand Up @@ -912,11 +927,25 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage {
ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender)
}

fn test_approval_voting_parallel_with_priority_msg() -> ApprovalVotingParallelMessage {
let (sender, _) = oneshot::channel();
ApprovalVotingParallelMessage::ApprovedAncestor(Default::default(), 0, sender)
}

fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage {
let (sender, _) = oneshot::channel();
DisputeCoordinatorMessage::RecentDisputes(sender)
}

fn test_dispute_coordinator_msg_with_priority() -> DisputeCoordinatorMessage {
let (sender, _) = oneshot::channel();
DisputeCoordinatorMessage::DetermineUndisputedChain {
base: Default::default(),
block_descriptions: Default::default(),
tx: sender,
}
}

fn test_dispute_distribution_msg() -> DisputeDistributionMessage {
let dummy_dispute_message = UncheckedDisputeMessage {
candidate_receipt: dummy_candidate_receipt_v2(dummy_hash()),
Expand Down Expand Up @@ -1238,3 +1267,248 @@ fn context_holds_onto_message_until_enough_signals_received() {

futures::executor::block_on(test_fut);
}

// A subsystem that simulates a slow subsystem, processing messages at a rate of one per second.
// We will use this to test the prioritization of messages in the subsystems generated by orchestra.
#[derive(Clone)]
struct SlowSubsystem {
num_normal_msgs_received: Arc<atomic::AtomicUsize>,
num_prio_msgs_received: Arc<atomic::AtomicUsize>,
}

impl SlowSubsystem {
fn new(
msgs_received: Arc<atomic::AtomicUsize>,
prio_msgs_received: Arc<atomic::AtomicUsize>,
) -> Self {
Self { num_normal_msgs_received: msgs_received, num_prio_msgs_received: prio_msgs_received }
}
}

// Trait to determine if a message is a priority message or not, it is by the SlowSubsystem
// to determine if it should count the message as a priority message or not.
trait IsPrioMessage {
// Tells if the message is a priority message.
fn is_prio(&self) -> bool {
// By default, messages are not priority messages.
false
}
}

// Implement the IsPrioMessage trait for all message types.
impl IsPrioMessage for CandidateValidationMessage {}
impl IsPrioMessage for CandidateBackingMessage {}
impl IsPrioMessage for ChainApiMessage {}
impl IsPrioMessage for CollationGenerationMessage {}
impl IsPrioMessage for CollatorProtocolMessage {}
impl IsPrioMessage for StatementDistributionMessage {
fn is_prio(&self) -> bool {
matches!(self, StatementDistributionMessage::NetworkBridgeUpdate(_))
}
}
impl IsPrioMessage for ApprovalDistributionMessage {}
impl IsPrioMessage for ApprovalVotingMessage {}
impl IsPrioMessage for ApprovalVotingParallelMessage {
fn is_prio(&self) -> bool {
matches!(self, ApprovalVotingParallelMessage::ApprovedAncestor(_, _, _))
}
}
impl IsPrioMessage for AvailabilityDistributionMessage {}
impl IsPrioMessage for AvailabilityRecoveryMessage {}
impl IsPrioMessage for AvailabilityStoreMessage {}
impl IsPrioMessage for BitfieldDistributionMessage {
fn is_prio(&self) -> bool {
matches!(
self,
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
_,
_,
_,
_
),)
)
}
}
impl IsPrioMessage for ChainSelectionMessage {}
impl IsPrioMessage for DisputeCoordinatorMessage {
fn is_prio(&self) -> bool {
matches!(self, DisputeCoordinatorMessage::DetermineUndisputedChain { .. })
}
}
impl IsPrioMessage for DisputeDistributionMessage {}
impl IsPrioMessage for GossipSupportMessage {}
impl IsPrioMessage for NetworkBridgeRxMessage {}
impl IsPrioMessage for NetworkBridgeTxMessage {}
impl IsPrioMessage for ProspectiveParachainsMessage {}
impl IsPrioMessage for ProvisionerMessage {}
impl IsPrioMessage for RuntimeApiMessage {}
impl IsPrioMessage for BitfieldSigningMessage {}
impl IsPrioMessage for PvfCheckerMessage {}

impl<C, M> Subsystem<C, SubsystemError> for SlowSubsystem
where
C: overseer::SubsystemContext<Message = M, Signal = OverseerSignal>,
M: Send + IsPrioMessage,
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "counter-subsystem",
future: Box::pin(async move {
loop {
// Simulate a slow processing subsystem to give time for both priority and
// normal messages to accumulate.
Delay::new(Duration::from_secs(1)).await;
match ctx.try_recv().await {
Ok(Some(FromOrchestra::Signal(OverseerSignal::Conclude))) => break,
Ok(Some(FromOrchestra::Signal(_))) => continue,
Ok(Some(FromOrchestra::Communication { msg })) => {
if msg.is_prio() {
self.num_prio_msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
} else {
self.num_normal_msgs_received
.fetch_add(1, atomic::Ordering::SeqCst);
}
continue
},
Err(_) => (),
_ => (),
}
pending!();
}

Ok(())
}),
}
}
}

#[test]
fn overseer_all_subsystems_can_receive_their_priority_messages() {
const NUM_NORMAL_MESSAGES: usize = 10;
const NUM_PRIORITY_MESSAGES: usize = 4;
overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| AllMessages::DisputeCoordinator(test_dispute_coordinator_msg_with_priority()))
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::ApprovalVotingParallel(test_approval_distribution_msg().into()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::ApprovalVotingParallel(
test_approval_voting_parallel_with_priority_msg(),
)
})
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::StatementDistribution(test_statement_distribution_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::StatementDistribution(test_statement_distribution_with_priority_msg())
})
.collect(),
);

overseer_check_subsystem_can_receive_their_priority_messages(
(0..NUM_NORMAL_MESSAGES)
.map(|_| AllMessages::BitfieldDistribution(test_bitfield_distribution_msg()))
.collect(),
(0..NUM_PRIORITY_MESSAGES)
.map(|_| {
AllMessages::BitfieldDistribution(test_bitfield_distribution_with_priority_msg())
})
.collect(),
);
}

// Test that when subsystem processes messages slow, the priority messages are processed before
// the normal messages. This is important to ensure that the subsytem can handle priority messages.
fn overseer_check_subsystem_can_receive_their_priority_messages(
normal_msgs: Vec<AllMessages>,
prio_msgs: Vec<AllMessages>,
) {
let num_normal_messages = normal_msgs.len();
let num_prio_messages: usize = prio_msgs.len();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));
let prio_msgs_received = Arc::new(atomic::AtomicUsize::new(0));

let subsystem = SlowSubsystem::new(msgs_received.clone(), prio_msgs_received.clone());

let (overseer, handle) =
one_for_all_overseer_builder(spawner, MockSupportsParachains, subsystem, None)
.unwrap()
.build()
.unwrap();

let mut handle = Handle::new(handle);
let overseer_fut = overseer.run_inner().fuse();

pin_mut!(overseer_fut);

// send a signal to each subsystem
let unpin_handle = dummy_unpin_handle(dummy_hash());
handle
.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
unpin_handle: unpin_handle.clone(),
})
.await;

// Send normal messages first, they are processed 1 per second by the SlowSubsystem, so they
// should accumulated in the queue.
for msg in normal_msgs {
handle.send_msg_anon(msg).await;
}

// Send priority messages.
for msg in prio_msgs {
handle.send_msg_with_priority(msg, "test", PriorityLevel::High).await;
}

loop {
match (&mut overseer_fut).timeout(Duration::from_millis(100)).await {
None => {
let normal_msgs: usize = msgs_received.load(atomic::Ordering::SeqCst);
let prio_msgs: usize = prio_msgs_received.load(atomic::Ordering::SeqCst);

assert!(
prio_msgs == num_prio_messages || normal_msgs < num_normal_messages,
"we should not receive all normal messages before the prio message"
);

assert!(
normal_msgs <= num_normal_messages && prio_msgs <= num_prio_messages,
"too many messages received"
);

if normal_msgs < num_normal_messages || prio_msgs < num_prio_messages {
Delay::new(Duration::from_millis(100)).await;
} else {
break;
}
},
Some(_) => panic!("exited too early"),
}
}

// send a stop signal to each subsystems
handle.stop().await;

let res = overseer_fut.await;
assert!(res.is_ok());
});
}
2 changes: 2 additions & 0 deletions polkadot/node/service/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ async fn test_skeleton(
) => {
tx.send(undisputed_chain.unwrap_or((target_block_number, target_block_hash))).unwrap();
});
// Check that ApprovedAncestor and DetermineUndisputedChain are sent with high priority.
assert_eq!(virtual_overseer.message_counter.with_high_priority(), 2);
}

/// Straight forward test case, where the test is not
Expand Down
Loading