Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ memory-db = { version = "0.33.0", default-features = false }
merkleized-metadata = { version = "0.5.0" }
merlin = { version = "3.0", default-features = false }
messages-relay = { path = "bridges/relays/messages" }
metered = { version = "0.6.1", default-features = false, package = "prioritized-metered-channel" }
metered = { git = "https://github.com/paritytech/orchestra", branch = "alexggh/router_priority_messages", default-features = false, package = "prioritized-metered-channel" }
milagro-bls = { version = "1.5.4", default-features = false, package = "snowbridge-milagro-bls" }
minimal-template-node = { path = "templates/minimal/node" }
minimal-template-runtime = { path = "templates/minimal/runtime" }
Expand Down Expand Up @@ -926,7 +926,7 @@ num-rational = { version = "0.4.1" }
num-traits = { version = "0.2.17", default-features = false }
num_cpus = { version = "1.13.1" }
once_cell = { version = "1.21.3" }
orchestra = { version = "0.4.0", default-features = false }
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "alexggh/router_priority_messages", default-features = false }
pallet-alliance = { path = "substrate/frame/alliance", default-features = false }
pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false }
pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false }
Expand Down
34 changes: 27 additions & 7 deletions cumulus/client/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use cumulus_client_consensus_common::ParachainConsensus;
use polkadot_node_primitives::{CollationGenerationConfig, CollationResult, MaybeCompressedPoV};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_overseer::{Handle as OverseerHandle, PriorityLevel};
use polkadot_primitives::{CollatorPair, Id as ParaId};

use codec::Decode;
Expand Down Expand Up @@ -152,7 +152,7 @@ pub mod relay_chain_driven {
};
use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_overseer::{Handle as OverseerHandle, PriorityLevel};
use polkadot_primitives::{CollatorPair, Id as ParaId};

use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
Expand Down Expand Up @@ -218,11 +218,19 @@ pub mod relay_chain_driven {
};

overseer_handle
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
.send_msg(
CollationGenerationMessage::Initialize(config),
"StartCollator",
PriorityLevel::Normal,
)
.await;

overseer_handle
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
.send_msg(
CollatorProtocolMessage::CollateOn(para_id),
"StartCollator",
PriorityLevel::Normal,
)
.await;

stream_rx
Expand All @@ -243,16 +251,28 @@ pub async fn initialize_collator_subsystems(

if reinitialize {
overseer_handle
.send_msg(CollationGenerationMessage::Reinitialize(config), "StartCollator")
.send_msg(
CollationGenerationMessage::Reinitialize(config),
"StartCollator",
PriorityLevel::Normal,
)
.await;
} else {
overseer_handle
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
.send_msg(
CollationGenerationMessage::Initialize(config),
"StartCollator",
PriorityLevel::Normal,
)
.await;
}

overseer_handle
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
.send_msg(
CollatorProtocolMessage::CollateOn(para_id),
"StartCollator",
PriorityLevel::Normal,
)
.await;
}

Expand Down
2 changes: 2 additions & 0 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_overseer::PriorityLevel;

use polkadot_node_primitives::SubmitCollationParams;
use polkadot_node_subsystem::messages::CollationGenerationMessage;
Expand Down Expand Up @@ -444,6 +445,7 @@ where
},
),
"SubmitCollation",
PriorityLevel::Normal,
)
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_overseer::{Handle as OverseerHandle, PriorityLevel};
use polkadot_primitives::{CollatorPair, Id as ParaId};

use cumulus_primitives_core::relay_chain::BlockId;
Expand Down Expand Up @@ -183,6 +183,7 @@ async fn handle_collation_message<Block: BlockT, RClient: RelayChainInterface +
result_sender: None,
}),
"SubmitCollation",
PriorityLevel::Normal,
)
.await;
}
4 changes: 2 additions & 2 deletions cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_overseer::{Handle as OverseerHandle, PriorityLevel};
use polkadot_primitives::{
vstaging::{
CandidateReceiptV2 as CandidateReceipt,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl RecoveryHandle for OverseerHandle {
message: AvailabilityRecoveryMessage,
origin: &'static str,
) {
self.send_msg(message, origin).await;
self.send_msg(message, origin, PriorityLevel::Normal).await;
}
}

Expand Down
3 changes: 2 additions & 1 deletion cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use cumulus_client_consensus_aura::{
ImportQueueParams,
};
use cumulus_client_consensus_proposer::Proposer;
use polkadot_overseer::PriorityLevel;
use prometheus::Registry;
use runtime::AccountId;
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
Expand Down Expand Up @@ -175,7 +176,7 @@ impl RecoveryHandle for FailingRecoveryHandle {
.send(Err(RecoveryError::Unavailable))
.expect("Return channel should work here.");
} else {
self.overseer_handle.send_msg(message, origin).await;
self.overseer_handle.send_msg(message, origin, PriorityLevel::Normal).await;
}
self.counter += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/approval-voting-parallel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ async fn run_main_loop<Context>(
// The message the approval voting subsystem would've handled.
ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) |
ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => {
to_approval_voting_worker.send_message(
to_approval_voting_worker.send_message_with_priority::<overseer::HighPriority>(
msg.try_into().expect(
"Message is one of ApprovedAncestor, GetApprovalSignaturesForCandidate
and that can be safely converted to ApprovalVotingMessage; qed"
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/parachains-inherent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl<C: sp_blockchain::HeaderBackend<Block>> ParachainsInherentDataProvider<C> {
.send_msg(
ProvisionerMessage::RequestInherentData(parent, sender),
std::any::type_name::<Self>(),
polkadot_overseer::PriorityLevel::Normal,
)
.await;

Expand Down
27 changes: 21 additions & 6 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,20 @@ impl Handle {
}

/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
pub async fn send_msg(
&mut self,
msg: impl Into<AllMessages>,
origin: &'static str,
priority: PriorityLevel,
) {
self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
.await
}

/// Send a message not providing an origin.
#[inline(always)]
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
self.send_msg(msg, "").await
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>, priority: PriorityLevel) {
self.send_msg(msg, "", priority).await
}

/// Inform the `Overseer` that some block was finalized.
Expand Down Expand Up @@ -296,6 +302,8 @@ pub enum Event {
msg: AllMessages,
/// The originating subsystem name.
origin: &'static str,
/// The priority of the message.
priority: PriorityLevel,
},
/// A request from the outer world.
ExternalRequest(ExternalRequest),
Expand Down Expand Up @@ -764,8 +772,15 @@ where
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
Event::MsgToSubsystem { msg, origin, priority } => {
match priority {
PriorityLevel::Normal => {
self.route_message::<NormalPriority>(msg.into(), origin).await?;
},
PriorityLevel::High => {
self.route_message::<HighPriority>(msg.into(), origin).await?;
},
}
self.metrics.on_message_relayed();
}
Event::Stop => {
Expand Down
30 changes: 25 additions & 5 deletions polkadot/node/service/src/relay_chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use polkadot_node_subsystem::messages::{
ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::{AllMessages, Handle};
use polkadot_overseer::{AllMessages, Handle, PriorityLevel};
use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader};
use sp_consensus::{Error as ConsensusError, SelectChain};
use std::sync::Arc;
Expand Down Expand Up @@ -322,13 +322,23 @@ enum Error {
/// Required for testing purposes.
#[async_trait::async_trait]
pub trait OverseerHandleT: Clone + Send + Sync {
async fn send_msg<M: Send + Into<AllMessages>>(&mut self, msg: M, origin: &'static str);
async fn send_msg<M: Send + Into<AllMessages>>(
&mut self,
msg: M,
origin: &'static str,
priority: PriorityLevel,
);
}

#[async_trait::async_trait]
impl OverseerHandleT for Handle {
async fn send_msg<M: Send + Into<AllMessages>>(&mut self, msg: M, origin: &'static str) {
Handle::send_msg(self, msg, origin).await
async fn send_msg<M: Send + Into<AllMessages>>(
&mut self,
msg: M,
origin: &'static str,
priority: PriorityLevel,
) {
Handle::send_msg(self, msg, origin, priority).await
}
}

Expand All @@ -344,7 +354,11 @@ where

self.overseer
.clone()
.send_msg(ChainSelectionMessage::Leaves(tx), std::any::type_name::<Self>())
.send_msg(
ChainSelectionMessage::Leaves(tx),
std::any::type_name::<Self>(),
PriorityLevel::Normal,
)
.await;

let leaves = rx
Expand Down Expand Up @@ -395,6 +409,7 @@ where
.send_msg(
ChainSelectionMessage::BestLeafContaining(target_hash, tx),
std::any::type_name::<Self>(),
PriorityLevel::Normal,
)
.await;

Expand Down Expand Up @@ -468,13 +483,15 @@ where
tx,
),
std::any::type_name::<Self>(),
PriorityLevel::High,
)
.await;
} else {
overseer
.send_msg(
ApprovalVotingMessage::ApprovedAncestor(subchain_head, target_number, tx),
std::any::type_name::<Self>(),
PriorityLevel::High,
)
.await;
}
Expand Down Expand Up @@ -506,13 +523,15 @@ where
.send_msg(
ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag),
std::any::type_name::<Self>(),
PriorityLevel::Normal,
)
.await;
} else {
overseer_handle
.send_msg(
ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
std::any::type_name::<Self>(),
PriorityLevel::Normal,
)
.await;
}
Expand Down Expand Up @@ -549,6 +568,7 @@ where
tx,
},
std::any::type_name::<Self>(),
PriorityLevel::High,
)
.await;

Expand Down
14 changes: 11 additions & 3 deletions polkadot/node/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use chain_spec::*;
use futures::{future::Future, stream::StreamExt};
use polkadot_node_primitives::{CollationGenerationConfig, CollatorFn};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::Handle;
use polkadot_overseer::{Handle, PriorityLevel};
use polkadot_primitives::{Balance, CollatorPair, HeadData, Id as ParaId, ValidationCode};
use polkadot_runtime_common::BlockHashCount;
use polkadot_runtime_parachains::paras::{ParaGenesisArgs, ParaKind};
Expand Down Expand Up @@ -428,11 +428,19 @@ impl PolkadotTestNode {
CollationGenerationConfig { key: collator_key, collator: Some(collator), para_id };

self.overseer_handle
.send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.send_msg(
CollationGenerationMessage::Initialize(config),
"Collator",
PriorityLevel::Normal,
)
.await;

self.overseer_handle
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.send_msg(
CollatorProtocolMessage::CollateOn(para_id),
"Collator",
PriorityLevel::Normal,
)
.await;
}
}
Expand Down
Loading
Loading