Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
3e9abcf
stupid, but it compiles
ordian Sep 5, 2020
e1fe858
redo
ordian Sep 8, 2020
d09622b
cleanup
ordian Sep 8, 2020
9e8cfe2
add ValidatorDiscovery to msgs
ordian Sep 8, 2020
0bae9e4
sketch network bridge code
ordian Sep 8, 2020
4fa0236
ConnectToAuthorities instead of validators
ordian Sep 9, 2020
9f81f90
more stuff
ordian Sep 10, 2020
d8d1302
cleanup
ordian Sep 10, 2020
6c18572
more stuff
ordian Sep 10, 2020
251427d
complete ConnectToAuthoritiesState
ordian Sep 10, 2020
31d7329
Update node/network/bridge/src/lib.rs
ordian Sep 10, 2020
f715c33
Collator protocol subsystem (#1659)
montekki Sep 10, 2020
b80e050
handle multiple in-flight connection requests
ordian Sep 10, 2020
0e0525d
handle cancelled requests
ordian Sep 10, 2020
64dcdb1
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 10, 2020
2cf1610
Update node/core/runtime-api/src/lib.rs
ordian Sep 11, 2020
1bee32c
redo it again
ordian Sep 11, 2020
c0d3a5a
more stuff
ordian Sep 12, 2020
de19f1d
redo it again
ordian Sep 14, 2020
eb1afd7
Merge branch 'ao-validator-discovery-api' of github.com:paritytech/po…
ordian Sep 14, 2020
dbbfe23
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 14, 2020
e6a0a85
update comments
ordian Sep 14, 2020
36cf3f4
workaround Future is not Send
ordian Sep 14, 2020
9f20552
fix trailing spaces
ordian Sep 14, 2020
ef4c6da
clarify comments
ordian Sep 14, 2020
6305c41
bridge: fix compilation in tests
ordian Sep 14, 2020
14fe353
update more comments
ordian Sep 14, 2020
7ea8588
small fixes
ordian Sep 14, 2020
f6a4068
port collator protocol to new validator discovery api
ordian Sep 14, 2020
ac02180
collator tests compile
ordian Sep 15, 2020
3dea047
collator tests pass
ordian Sep 15, 2020
62e46a1
do not revoke a request when the stream receiver is closed
ordian Sep 15, 2020
bbaf435
make revoking opt-in
ordian Sep 15, 2020
8cfab6f
fix is_fulfilled
ordian Sep 15, 2020
68fc8bb
handle request revokation in collator
ordian Sep 16, 2020
a23edc0
tests
ordian Sep 16, 2020
0a9c064
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 16, 2020
98d8346
wait for validator connections asyncronously
ordian Sep 17, 2020
c03f766
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 17, 2020
e077b51
fix compilation
ordian Sep 17, 2020
2132114
relabel my todos
ordian Sep 17, 2020
eb3bacb
apply Fedor's patch
ordian Sep 17, 2020
1112368
resolve reconnection TODO
ordian Sep 18, 2020
6fbca68
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 18, 2020
3d2def9
resolve revoking TODO
ordian Sep 18, 2020
dee8b27
resolve channel capacity TODO
ordian Sep 18, 2020
c59d5a7
resolve peer cloning TODO
ordian Sep 18, 2020
0580441
resolve peer disconnected TODO
ordian Sep 18, 2020
db37a2f
resolve PeerSet TODO
ordian Sep 18, 2020
a237119
wip tests
ordian Sep 18, 2020
a8e3105
more tests
ordian Sep 22, 2020
95f989e
resolve Arc TODO
ordian Sep 22, 2020
86a64fa
rename pending to non_revoked
ordian Sep 22, 2020
7585d6e
one more test
ordian Sep 22, 2020
9534c2b
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 22, 2020
f83534c
extract utility function into util crate
ordian Sep 22, 2020
cb97211
fix compilation in tests
ordian Sep 22, 2020
c1da4c6
Apply suggestions from code review
ordian Sep 23, 2020
c6aa649
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 23, 2020
ae7f529
revert pin_project removal
ordian Sep 23, 2020
fbf8901
fix while let loop
ordian Sep 23, 2020
78216fc
Revert "revert pin_project removal"
ordian Sep 23, 2020
4cb79ad
fix compilation
ordian Sep 23, 2020
0295436
Update node/subsystem/src/messages.rs
ordian Sep 23, 2020
66a6eed
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
7e67489
docs on pub items
ordian Sep 28, 2020
30da479
guide updates
ordian Sep 28, 2020
9c5b654
Merge branch 'master' into ao-validator-discovery-api
ordian Sep 28, 2020
0e9771c
remove a TODO
ordian Sep 28, 2020
4cc5f3e
small guide update
ordian Sep 28, 2020
23bde8d
fix a typo
ordian Sep 28, 2020
88e83ed
link to the issue
ordian Sep 28, 2020
3893324
Merge branch 'master' into ao-validator-discovery-api
ordian Oct 6, 2020
a7b5ddc
validator discovery: on_request docs
ordian Oct 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ fn make_runtime_api_request<Client>(
Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
}
}

Expand Down Expand Up @@ -169,7 +170,7 @@ mod tests {
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent,
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
Expand Down Expand Up @@ -258,6 +259,10 @@ mod tests {
fn candidate_events(&self) -> Vec<CandidateEvent> {
self.candidate_events.clone()
}

fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
vec![None; ids.len()]
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
async-trait = "0.1"
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = "1.3.4"
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
Expand Down
132 changes: 102 additions & 30 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};

use sc_network::Event as NetworkEvent;
use sp_runtime::ConsensusEngineId;
Expand All @@ -34,16 +34,19 @@ use polkadot_subsystem::messages::{
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage,
};
use polkadot_primitives::v1::{Block, Hash, ValidatorId};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
};

use std::collections::hash_map::{HashMap, Entry as HEntry};
use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
use std::pin::Pin;
use std::sync::Arc;


mod validator_discovery;

/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
Expand Down Expand Up @@ -188,29 +191,41 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
}

/// The network bridge subsystem.
pub struct NetworkBridge<N>(N);
pub struct NetworkBridge<N, AD> {
network_service: N,
authority_discovery_service: AD,
}

impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service.
impl<N, AD> NetworkBridge<N, AD> {
/// Create a new network bridge subsystem with underlying network service and authority discovery service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(net_service)
pub fn new(network_service: N, authority_discovery_service: AD) -> Self {
NetworkBridge {
network_service,
authority_discovery_service,
}
}
}

impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
where
Net: Network,
Net: Network + validator_discovery::Network,
AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
let Self { network_service, authority_discovery_service } = self;
SpawnedSubsystem {
name: "network-bridge-subsystem",
future: run_network(self.0, ctx).map(|_| ()).boxed(),
future: run_network(
network_service,
authority_discovery_service,
ctx,
).map(|_| ()).boxed(),
}
}
}
Expand All @@ -224,7 +239,11 @@ struct PeerData {
enum Action {
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
ConnectToValidators(PeerSet, Vec<ValidatorId>, oneshot::Sender<Vec<(ValidatorId, PeerId)>>),
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
},
ReportPeer(PeerId, ReputationChange),

ActiveLeaves(ActiveLeavesUpdate),
Expand Down Expand Up @@ -254,8 +273,11 @@ fn action_from_overseer_message(
=> Action::SendValidationMessage(peers, msg),
NetworkBridgeMessage::SendCollationMessage(peers, msg)
=> Action::SendCollationMessage(peers, msg),
NetworkBridgeMessage::ConnectToValidators(peer_set, validators, res)
=> Action::ConnectToValidators(peer_set, validators, res),
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
revoke,
} => Action::ConnectToValidators { validator_ids, connected, revoke },
},
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop,
Expand Down Expand Up @@ -538,11 +560,16 @@ async fn dispatch_collation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}

async fn run_network<N: Network>(
mut net: N,
async fn run_network<N, AD>(
mut network_service: N,
mut authority_discovery_service: AD,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> {
let mut event_stream = net.event_stream().fuse();
) -> SubsystemResult<()>
where
N: Network + validator_discovery::Network,
AD: validator_discovery::AuthorityDiscovery,
{
let mut event_stream = network_service.event_stream().fuse();

// Most recent heads are at the back.
let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS);
Expand All @@ -551,7 +578,10 @@ async fn run_network<N: Network>(
let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();

let mut validator_discovery = validator_discovery::Service::<N, AD>::new();

loop {

let action = {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
Expand All @@ -568,31 +598,43 @@ async fn run_network<N: Network>(
Action::Abort => return Ok(()),

Action::SendValidationMessage(peers, msg) => send_message(
&mut net,
&mut network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
).await?,

Action::SendCollationMessage(peers, msg) => send_message(
&mut net,
&mut network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
).await?,

Action::ConnectToValidators(_peer_set, _validators, _res) => {
// TODO: https://github.com/paritytech/polkadot/issues/1461
}
Action::ConnectToValidators {
validator_ids,
connected,
revoke,
} => {
let (ns, ads) = validator_discovery.on_request(
validator_ids,
connected,
revoke,
network_service,
authority_discovery_service,
).await;
network_service = ns;
authority_discovery_service = ads;
},

Action::ReportPeer(peer, rep) => net.report_peer(peer, rep).await?,
Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,

Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h));

update_view(
&mut net,
&mut network_service,
&mut ctx,
&live_heads,
&mut local_view,
Expand All @@ -607,9 +649,11 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers,
};

validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;

match peer_map.entry(peer.clone()) {
HEntry::Occupied(_) => continue,
HEntry::Vacant(vacant) => {
hash_map::Entry::Occupied(_) => continue,
hash_map::Entry::Vacant(vacant) => {
vacant.insert(PeerData {
view: View(Vec::new()),
});
Expand Down Expand Up @@ -650,6 +694,8 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers,
};

validator_discovery.on_peer_disconnected(&peer, &mut authority_discovery_service).await;

if peer_map.remove(&peer).is_some() {
let res = match peer_set {
PeerSet::Validation => dispatch_validation_event_to_all(
Expand Down Expand Up @@ -677,7 +723,7 @@ async fn run_network<N: Network>(
peer.clone(),
&mut validation_peers,
v_messages,
&mut net,
&mut network_service,
).await?;

if let Err(e) = dispatch_validation_events_to_all(
Expand All @@ -697,7 +743,7 @@ async fn run_network<N: Network>(
peer.clone(),
&mut collation_peers,
c_messages,
&mut net,
&mut network_service,
).await?;

if let Err(e) = dispatch_collation_events_to_all(
Expand All @@ -716,20 +762,24 @@ async fn run_network<N: Network>(
}
}


#[cfg(test)]
mod tests {
use super::*;
use futures::channel::mpsc;
use futures::executor;

use std::sync::Arc;
use std::collections::HashSet;
use async_trait::async_trait;
use parking_lot::Mutex;
use assert_matches::assert_matches;

use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
use polkadot_node_subsystem_test_helpers::{
SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
};
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;

// The subsystem's view of the network - only supports a single call to `event_stream`.
Expand All @@ -738,6 +788,8 @@ mod tests {
action_tx: mpsc::UnboundedSender<NetworkAction>,
}

struct TestAuthorityDiscovery;

// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct TestNetworkHandle {
Expand All @@ -748,6 +800,7 @@ mod tests {
fn new_test_network() -> (
TestNetwork,
TestNetworkHandle,
TestAuthorityDiscovery,
) {
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();
Expand All @@ -761,6 +814,7 @@ mod tests {
action_rx,
net_tx,
},
TestAuthorityDiscovery,
)
}

Expand All @@ -786,6 +840,23 @@ mod tests {
}
}

impl validator_discovery::Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
}

#[async_trait]
impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
None
}

async fn get_authority_id_by_peer_id(&mut self, _peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
None
}
}

impl TestNetworkHandle {
// Get the next network action.
async fn next_network_action(&mut self) -> NetworkAction {
Expand Down Expand Up @@ -842,11 +913,12 @@ mod tests {

fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::TaskExecutor::new();
let (network, network_handle) = new_test_network();
let (network, network_handle, discovery) = new_test_network();
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);

let network_bridge = run_network(
network,
discovery,
context,
)
.map_err(|_| panic!("subsystem execution failed"))
Expand Down
Loading