Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ impl CandidateBackingJob {

if let Ok(report) = MisbehaviorReport::try_from(f) {
let message = ProvisionerMessage::ProvisionableData(
self.parent,
ProvisionableData::MisbehaviorReport(self.parent, report),
);

Expand Down Expand Up @@ -459,6 +460,7 @@ impl CandidateBackingJob {
table_attested_to_backed(attested, &self.table_context)
{
let message = ProvisionerMessage::ProvisionableData(
self.parent,
ProvisionableData::BackedCandidate(backed),
);
self.send_to_provisioner(message).await?;
Expand Down Expand Up @@ -1356,6 +1358,7 @@ mod tests {
virtual_overseer.recv().await,
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::BackedCandidate(BackedCandidate {
candidate,
validity_votes,
Expand Down Expand Up @@ -1510,6 +1513,7 @@ mod tests {
virtual_overseer.recv().await,
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::MisbehaviorReport(
relay_parent,
MisbehaviorReport::SelfContradiction(_, s1, s2),
Expand Down Expand Up @@ -1538,6 +1542,7 @@ mod tests {
virtual_overseer.recv().await,
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::MisbehaviorReport(
relay_parent,
MisbehaviorReport::SelfContradiction(_, s1, s2),
Expand Down
2 changes: 1 addition & 1 deletion node/core/proposer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2018"

[dependencies]
futures = "0.3.4"
futures-timer = "3.0.2"
log = "0.4.8"
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-overseer = { path = "../../overseer" }
Expand All @@ -21,4 +22,3 @@ sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "mast
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.4"
40 changes: 13 additions & 27 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,38 +136,26 @@ where
/// Get provisioner inherent data
///
/// This function has a constant timeout: `PROPOSE_TIMEOUT`.
fn get_provisioner_data(&self) -> impl Future<Output = Result<ProvisionerInherentData, Error>> {
async fn get_provisioner_data(&self) -> Result<ProvisionerInherentData, Error> {
// clone this (lightweight) data because we're going to move it into the future
let mut overseer = self.overseer.clone();
let parent_header_hash = self.parent_header_hash.clone();

let mut provisioner_inherent_data = async move {
let (sender, receiver) = futures::channel::oneshot::channel();
let (sender, receiver) = futures::channel::oneshot::channel();

overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;

let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the
// receiver; it's possible that the response there would be ready slightly before
// this call completes. IMO it's not worth the hassle or overhead of spawning a
// distinct task for that kind of miniscule efficiency improvement.
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;

receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
}
.boxed()
.fuse();

let mut timeout = wasm_timer::Delay::new(PROPOSE_TIMEOUT).fuse();
let mut timeout = futures_timer::Delay::new(PROPOSE_TIMEOUT).fuse();

async move {
select! {
pid = provisioner_inherent_data => pid,
_ = timeout => Err(Error::Timeout),
}
select! {
pid = receiver.fuse() => pid.map_err(|_| Error::ClosedChannelAwaitingInherentData),
_ = timeout => Err(Error::Timeout),
}
}
}
Expand Down Expand Up @@ -201,10 +189,8 @@ where
max_duration: time::Duration,
record_proof: RecordProof,
) -> Self::Proposal {
let provisioner_data = self.get_provisioner_data();

async move {
let provisioner_data = match provisioner_data.await {
let provisioner_data = match self.get_provisioner_data().await {
Ok(pd) => pd,
Err(err) => {
log::warn!("could not get provisioner inherent data; injecting default data: {}", err);
Expand Down
2 changes: 0 additions & 2 deletions node/core/provisioner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
[dev-dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
tempfile = "3.1.0"
128 changes: 67 additions & 61 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CoreState, Hash, OccupiedCoreAssumption,
SignedAvailabilityBitfield,
};
use std::{collections::HashMap, convert::TryFrom, pin::Pin};
use std::{collections::HashSet, convert::TryFrom, pin::Pin};
use thiserror::Error;

struct ProvisioningJob {
Expand Down Expand Up @@ -211,7 +211,7 @@ impl ProvisioningJob {
ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => {
self.provisionable_data_channels.push(sender)
}
ToJob::Provisioner(ProvisionableData(data)) => {
ToJob::Provisioner(ProvisionableData(_, data)) => {
Copy link
Copy Markdown
Contributor

@coriolinus coriolinus Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the relay parent to ProvisionableData if we just ignore it? I see the statement above that sending data to a job should always include that, but it's not obvious why you said that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that not obvious? How do you want to know to which job the message needs to be send?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, you're right; missed the change in messages.rs.

let mut bad_indices = Vec::new();
for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() {
match channel.send(data.clone()).await {
Expand Down Expand Up @@ -266,23 +266,23 @@ impl ProvisioningJob {

type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;

// The provisioner is the subsystem best suited to choosing which specific
// backed candidates and availability bitfields should be assembled into the
// block. To engage this functionality, a
// `ProvisionerMessage::RequestInherentData` is sent; the response is a set of
// non-conflicting candidates and the appropriate bitfields. Non-conflicting
// means that there are never two distinct parachain candidates included for
// the same parachain and that new parachain candidates cannot be included
// until the previous one either gets declared available or expired.
//
// The main complication here is going to be around handling
// occupied-core-assumptions. We might have candidates that are only
// includable when some bitfields are included. And we might have candidates
// that are not includable when certain bitfields are included.
//
// When we're choosing bitfields to include, the rule should be simple:
// maximize availability. So basically, include all bitfields. And then
// choose a coherent set of candidates along with that.
/// The provisioner is the subsystem best suited to choosing which specific
/// backed candidates and availability bitfields should be assembled into the
/// block. To engage this functionality, a
/// `ProvisionerMessage::RequestInherentData` is sent; the response is a set of
/// non-conflicting candidates and the appropriate bitfields. Non-conflicting
/// means that there are never two distinct parachain candidates included for
/// the same parachain and that new parachain candidates cannot be included
/// until the previous one either gets declared available or expired.
///
/// The main complication here is going to be around handling
/// occupied-core-assumptions. We might have candidates that are only
/// includable when some bitfields are included. And we might have candidates
/// that are not includable when certain bitfields are included.
///
/// When we're choosing bitfields to include, the rule should be simple:
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
Expand Down Expand Up @@ -310,48 +310,49 @@ async fn send_inherent_data(
Ok(())
}

// in general, we want to pick all the bitfields. However, we have the following constraints:
//
// - not more than one per validator
// - each must correspond to an occupied core
//
// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing availability,
// we pick the one with the greatest number of 1 bits.
//
// note: this does not enforce any sorting precondition on the output; the ordering there will be unrelated
// to the sorting of the input.
/// In general, we want to pick all the bitfields. However, we have the following constraints:
///
/// - not more than one per validator
/// - each must correspond to an occupied core
///
/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing availability,
/// we pick the one with the greatest number of 1 bits.
///
/// Note: This does not enforce any sorting precondition on the output; the ordering there will be unrelated
/// to the sorting of the input.
fn select_availability_bitfields(
cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
) -> Vec<SignedAvailabilityBitfield> {
let mut fields_by_core: HashMap<_, Vec<_>> = HashMap::new();
for bitfield in bitfields.iter() {
let core_idx = bitfield.validator_index() as usize;
if let CoreState::Occupied(_) = cores[core_idx] {
fields_by_core
.entry(core_idx)
// there cannot be a value list in field_by_core with len < 1
.or_default()
.push(bitfield.clone());
let mut bitfield_per_core: Vec<Option<SignedAvailabilityBitfield>> = vec![None; cores.len()];
let mut seen_validators = HashSet::new();

for mut bitfield in bitfields.iter().cloned() {
// If we have seen the validator already, ignore it.
if !seen_validators.insert(bitfield.validator_index()) {
continue;
}
}

let mut out = Vec::with_capacity(fields_by_core.len());
for (_, core_bitfields) in fields_by_core.iter_mut() {
core_bitfields.sort_by_key(|bitfield| bitfield.payload().0.count_ones());
out.push(
core_bitfields
.pop()
.expect("every core bitfield has at least 1 member; qed"),
);
for (idx, _) in cores.iter().enumerate().filter(|v| v.1.is_occupied()) {
if *bitfield.payload().0.get(idx).unwrap_or(&false) {
if let Some(ref mut occupied) = bitfield_per_core[idx] {
if occupied.payload().0.count_ones() < bitfield.payload().0.count_ones() {
// We found a better bitfield, lets swap them and search a new spot for the old
// best one
std::mem::swap(occupied, &mut bitfield);
}
} else {
bitfield_per_core[idx] = Some(bitfield);
break;
}
}
}
}

out
bitfield_per_core.into_iter().filter_map(|v| v).collect()
}

// determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
//
// follow the candidate selection algorithm from the guide
/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
Expand Down Expand Up @@ -416,8 +417,8 @@ async fn select_candidates(
Ok(selected_candidates)
}

// produces a block number 1 higher than that of the relay parent
// in the event of an invalid `relay_parent`, returns `Ok(0)`
/// Produces a block number 1 higher than that of the relay parent
/// in the event of an invalid `relay_parent`, returns `Ok(0)`
async fn get_block_number_under_construction(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
Expand All @@ -437,19 +438,18 @@ async fn get_block_number_under_construction(
}
}

// the availability bitfield for a given core is the transpose
// of a set of signed availability bitfields. It goes like this:
//
// - construct a transverse slice along `core_idx`
// - bitwise-or it with the availability slice
// - count the 1 bits, compare to the total length; true on 2/3+
/// The availability bitfield for a given core is the transpose
/// of a set of signed availability bitfields. It goes like this:
///
/// - construct a transverse slice along `core_idx`
/// - bitwise-or it with the availability slice
/// - count the 1 bits, compare to the total length; true on 2/3+
fn bitfields_indicate_availability(
core_idx: usize,
bitfields: &[SignedAvailabilityBitfield],
availability: &CoreAvailability,
) -> bool {
let mut availability = availability.clone();
// we need to pre-compute this to avoid a borrow-immutable-while-borrowing-mutable error in the error message
let availability_len = availability.len();

for bitfield in bitfields {
Expand All @@ -459,12 +459,18 @@ fn bitfields_indicate_availability(
// in principle, this function might return a `Result<bool, Error>` so that we can more clearly express this error condition
// however, in practice, that would just push off an error-handling routine which would look a whole lot like this one.
// simpler to just handle the error internally here.
log::warn!(target: "provisioner", "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", validator_idx, availability_len);
log::warn!(
target: "provisioner", "attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
validator_idx,
availability_len,
);

return false;
}
Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx],
}
}

3 * availability.count_ones() >= 2 * availability.len()
}

Expand Down
Loading