Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions bin/node/src/flags/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ pub struct P2PArgs {
/// Peer Redialing threshold is the maximum amount of times to attempt to redial a peer that
/// disconnects. By default, peers are *not* redialed. If set to 0, the peer will be
/// redialed indefinitely.
///
/// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1854>`): we are temporarily setting this to 0 to redial all peers indefinitely.
/// We will change this default to `None` once we have a more robust p2p stack.
#[arg(long = "p2p.redial", env = "KONA_NODE_P2P_REDIAL", default_value = "0")]
#[arg(long = "p2p.redial", env = "KONA_NODE_P2P_REDIAL", default_value = "5")]
pub peer_redial: Option<u64>,

/// An optional list of bootnode ENRs to start the node with.
Expand Down
60 changes: 45 additions & 15 deletions crates/node/p2p/src/gossip/driver.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! Consensus-layer gossipsub driver for Optimism.

use alloy_primitives::map::HashSet;
use derive_more::Debug;
use discv5::Enr;
use futures::stream::StreamExt;
use libp2p::{
Multiaddr, PeerId, Swarm, TransportError,
gossipsub::{IdentTopic, MessageId},
multiaddr::Protocol,
swarm::SwarmEvent,
};
use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
Expand Down Expand Up @@ -33,6 +35,8 @@
///
/// A peer cannot be redialed more than [`GossipDriverBuilder.peer_redialing`] times.
pub dialed_peers: HashMap<Multiaddr, u64>,
/// A set of [`PeerId`]s that are currently being dialed.
pub current_dials: HashSet<PeerId>,
/// A mapping from [`PeerId`] to [`Multiaddr`].
pub peerstore: HashMap<PeerId, Multiaddr>,
/// A mapping from [`PeerId`] to [`libp2p::identify::Info`].
Expand Down Expand Up @@ -63,6 +67,7 @@
swarm,
addr,
handler,
current_dials: Default::default(),

Check warning on line 70 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L70

Added line #L70 was not covered by tests
dialed_peers: Default::default(),
peerstore: Default::default(),
peer_infos: Default::default(),
Expand Down Expand Up @@ -177,11 +182,31 @@

/// Dials the given [`Multiaddr`].
pub fn dial_multiaddr(&mut self, addr: Multiaddr) {
let Some(peer_id) = addr
.iter()
.find_map(|p| if let Protocol::P2p(peer_id) = p { Some(peer_id) } else { None })

Check warning on line 187 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L185-L187

Added lines #L185 - L187 were not covered by tests
else {
warn!(target: "gossip", peer=?addr, "Failed to extract peer id from multiaddr");
return;

Check warning on line 190 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L189-L190

Added lines #L189 - L190 were not covered by tests
};

if self.swarm.connected_peers().any(|p| p == &peer_id) {
debug!(target: "gossip", peer=?addr, "Already connected to peer, not dialing");
return;
}

if self.current_dials.contains(&peer_id) {
debug!(target: "gossip", peer=?addr, "Already dialing peer, not dialing");
return;
}

Check warning on line 202 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L193-L202

Added lines #L193 - L202 were not covered by tests
if self.dial_threshold_reached(&addr) {
event!(tracing::Level::TRACE, peer=%addr, "Dial threshold reached, not dialing");
debug!(target: "gossip", peer=?addr, "Dial threshold reached, not dialing");

Check warning on line 204 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L204

Added line #L204 was not covered by tests
return;
}

self.current_dials.insert(peer_id);

Check warning on line 209 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L208-L209

Added lines #L208 - L209 were not covered by tests
match self.swarm.dial(addr.clone()) {
Ok(_) => {
trace!(target: "gossip", peer=?addr, "Dialed peer");
Expand All @@ -194,14 +219,6 @@
}
}

/// Redials the given [`PeerId`] using the peerstore.
pub fn redial(&mut self, peer_id: PeerId) {
if let Some(addr) = self.peerstore.get(&peer_id) {
trace!(target: "gossip", "Redialing peer with id: {:?}", peer_id);
self.dial_multiaddr(addr.clone());
}
}

fn handle_gossip_event(&mut self, event: Event) -> Option<OpNetworkPayloadEnvelope> {
match event {
Event::Gossipsub(e) => return self.handle_gossipsub_event(e),
Expand Down Expand Up @@ -283,14 +300,15 @@
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
let peer_count = self.swarm.connected_peers().count();
debug!(target: "gossip", "Connection established: {:?} | Peer Count: {}", peer_id, peer_count);
info!(target: "gossip", "Connection established: {:?} | Peer Count: {}", peer_id, peer_count);

Check warning on line 303 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L303

Added line #L303 was not covered by tests
kona_macros::inc!(
gauge,
crate::Metrics::GOSSIPSUB_CONNECTION,
"type" => "connected",
"peer" => peer_id.to_string(),
);
kona_macros::set!(gauge, crate::Metrics::GOSSIP_PEER_COUNT, peer_count as f64);

Check warning on line 311 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L311

Added line #L311 was not covered by tests
self.peerstore.insert(peer_id, endpoint.get_remote_address().clone());
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
Expand All @@ -301,12 +319,15 @@
"type" => "outgoing_error",
"peer" => peer_id.map(|p| p.to_string()).unwrap_or_default()
);
if let Some(id) = peer_id {
self.redial(id);

// If the connection was initiated by us, remove the peer from the current dials
// set.
if let Some(peer_id) = peer_id {
self.current_dials.remove(&peer_id);

Check warning on line 326 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L325-L326

Added lines #L325 - L326 were not covered by tests
}
}
SwarmEvent::IncomingConnectionError { error, connection_id, .. } => {
trace!(target: "gossip", "Incoming connection error: {:?}", error);
debug!(target: "gossip", "Incoming connection error: {:?}", error);

Check warning on line 330 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L330

Added line #L330 was not covered by tests
kona_macros::inc!(
gauge,
crate::Metrics::GOSSIPSUB_CONNECTION,
Expand All @@ -316,19 +337,28 @@
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
let peer_count = self.swarm.connected_peers().count();
debug!(target: "gossip", "Connection closed, redialing peer: {:?} | {:?} | Peer Count: {}", peer_id, cause, peer_count);
warn!(target: "gossip", ?peer_id, ?cause, peer_count, "Connection closed");

Check warning on line 340 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L340

Added line #L340 was not covered by tests
kona_macros::inc!(
gauge,
crate::Metrics::GOSSIPSUB_CONNECTION,
"type" => "closed",
"peer" => peer_id.to_string()
);
kona_macros::set!(gauge, crate::Metrics::GOSSIP_PEER_COUNT, peer_count as f64);
self.redial(peer_id);

// If the connection was initiated by us, remove the peer from the current dials
// set so that we can dial it again.
self.current_dials.remove(&peer_id);

Check warning on line 351 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L348-L351

Added lines #L348 - L351 were not covered by tests
}
SwarmEvent::NewListenAddr { listener_id, address } => {
debug!(target: "gossip", reporter_id = ?listener_id, new_address = ?address, "New listen address");
}
SwarmEvent::Dialing { peer_id, connection_id } => {
debug!(target: "gossip", ?peer_id, ?connection_id, "Dialing peer");

Check warning on line 357 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L356-L357

Added lines #L356 - L357 were not covered by tests
}
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
debug!(target: "gossip", ?peer_id, ?address, "New external address of peer");

Check warning on line 360 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L359-L360

Added lines #L359 - L360 were not covered by tests
}
_ => {
debug!(target: "gossip", ?event, "Ignoring non-behaviour in event handler");
}
Expand Down
27 changes: 25 additions & 2 deletions crates/node/p2p/src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@
})
.collect();

// We consider that kona-nodes are gossiping blocks if their peers are subscribed to any of
// the blocks topics.
// This is the same heuristic as the one used in the op-node (`<https://github.com/ethereum-optimism/optimism/blob/6a8b2349c29c2a14f948fcb8aefb90526130acec/op-node/p2p/rpc_server.go#L179-L183>`).
let peer_gossip_info = gossip
.swarm
.behaviour()
.gossipsub
.all_peers()
.filter_map(|(peer_id, topics)| {
let supported_topics = HashSet::from([
gossip.handler.blocks_v1_topic.hash(),
gossip.handler.blocks_v2_topic.hash(),
gossip.handler.blocks_v3_topic.hash(),
gossip.handler.blocks_v4_topic.hash(),
]);

if topics.iter().any(|topic| supported_topics.contains(topic)) {
Some(*peer_id)

Check warning on line 167 in crates/node/p2p/src/rpc/request.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/rpc/request.rs#L150-L167

Added lines #L150 - L167 were not covered by tests
} else {
None

Check warning on line 169 in crates/node/p2p/src/rpc/request.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/rpc/request.rs#L169

Added line #L169 was not covered by tests
}
})
.collect::<HashSet<_>>();

Check warning on line 173 in crates/node/p2p/src/rpc/request.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/rpc/request.rs#L171-L173

Added lines #L171 - L173 were not covered by tests
let disc_table_infos = disc.table_infos();

tokio::spawn(async move {
Expand Down Expand Up @@ -211,13 +235,12 @@
// Note: we use the chain id from the ENR if it exists, otherwise we
// use 0 to be consistent with op-node's behavior (`<https://github.com/ethereum-optimism/optimism/blob/6a8b2349c29c2a14f948fcb8aefb90526130acec/op-service/apis/p2p.go#L55>`).
chain_id: opstack_enr.map(|enr| enr.chain_id).unwrap_or(0),
gossip_blocks: peer_gossip_info.contains(peer_id),

Check warning on line 238 in crates/node/p2p/src/rpc/request.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/rpc/request.rs#L238

Added line #L238 was not covered by tests
// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1562>`): support these fields
protected: false,
// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1562>`): support these fields
latency: 0,
// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1562>`): support these fields
gossip_blocks: false,
// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1562>`): support these fields
peer_scores: PeerScores::default(),
},
)
Expand Down
Loading