Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions bin/node/src/flags/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::Result;
use clap::Parser;
use discv5::{Enr, enr::k256};
use kona_genesis::RollupConfig;
use kona_p2p::{Config, LocalNode};
use kona_p2p::{Config, GaterConfig, LocalNode};
use kona_peers::{PeerMonitoring, PeerScoreLevel};
use kona_sources::RuntimeLoader;
use libp2p::identity::Keypair;
Expand Down Expand Up @@ -126,9 +126,7 @@ pub struct P2PArgs {
pub gossip_flood_publish: bool,
/// Sets the peer scoring strategy for the P2P stack.
/// Can be one of: none or light.
///
/// TODO(@theochap, `<https://github.com/op-rs/kona/issues/1855>`): By default, the P2P stack is configured to not score peers.
#[arg(long = "p2p.scoring", default_value = "off", env = "KONA_NODE_P2P_SCORING")]
#[arg(long = "p2p.scoring", default_value = "light", env = "KONA_NODE_P2P_SCORING")]
pub scoring: PeerScoreLevel,

/// Allows to ban peers based on their score.
Expand Down Expand Up @@ -169,6 +167,12 @@ pub struct P2PArgs {
#[arg(long = "p2p.redial", env = "KONA_NODE_P2P_REDIAL", default_value = "500")]
pub peer_redial: Option<u64>,

/// The duration in minutes of the peer dial period.
/// When the last time a peer was dialed is longer than the dial period, the number of peer
/// dials is reset to 0, allowing the peer to be dialed again.
#[arg(long = "p2p.redial.period", env = "KONA_NODE_P2P_REDIAL_PERIOD", default_value = "60")]
pub redial_period: u64,

/// An optional list of bootnode ENRs to start the node with.
#[arg(long = "p2p.bootnodes", value_delimiter = ',', env = "KONA_NODE_P2P_BOOTNODES")]
pub bootnodes: Vec<Enr>,
Expand Down Expand Up @@ -384,7 +388,10 @@ impl P2PArgs {
monitor_peers,
bootstore: self.bootstore,
topic_scoring: self.topic_scoring,
redial: self.peer_redial,
gater_config: GaterConfig {
peer_redialing: self.peer_redial,
dial_period: Duration::from_secs(60 * self.redial_period),
},
bootnodes: self.bootnodes,
rollup_config: config.clone(),
})
Expand Down
24 changes: 11 additions & 13 deletions crates/node/p2p/src/gossip/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use libp2p::{
use std::time::Duration;
use tokio::sync::watch::Receiver;

use crate::{Behaviour, BlockHandler, GossipDriver, GossipDriverBuilderError};
use crate::{
Behaviour, BlockHandler, GossipDriver, GossipDriverBuilderError, gossip::gater::GaterConfig,
};

/// A builder for the [`GossipDriver`].
#[derive(Debug, Default)]
Expand All @@ -32,10 +34,8 @@ pub struct GossipDriverBuilder {
/// If set, the gossip layer will monitor peer scores and ban peers that are below a given
/// threshold.
peer_monitoring: Option<PeerMonitoring>,
/// The number of times to redial a peer.
/// If unset, peers will not be redialed.
/// If set to `0`, peers will be redialed indefinitely.
peer_redial: Option<u64>,
/// The configuration for the connection gater.
gater_config: Option<GaterConfig>,
/// The [`RollupConfig`] for the network.
rollup_config: Option<RollupConfig>,
/// Topic scoring. Disabled by default.
Expand All @@ -54,17 +54,15 @@ impl GossipDriverBuilder {
config: None,
block_time: None,
peer_monitoring: None,
peer_redial: None,
gater_config: None,
rollup_config: None,
topic_scoring: false,
}
}

/// Sets the number of times to redial a peer.
/// If unset, peers will not be redialed.
/// If set to `0`, peers will be redialed indefinitely.
pub const fn with_peer_redial(mut self, peer_redial: Option<u64>) -> Self {
self.peer_redial = peer_redial;
/// Sets the configuration for the connection gater.
pub const fn with_gater_config(mut self, config: GaterConfig) -> Self {
self.gater_config = Some(config);
self
}

Expand Down Expand Up @@ -215,9 +213,9 @@ impl GossipDriverBuilder {
.with_swarm_config(|c| c.with_idle_connection_timeout(timeout))
.build();

let redialing = self.peer_redial;
let gater_config = self.gater_config.take().unwrap_or_default();
let gate = crate::ConnectionGater::new(gater_config);

let gate = crate::ConnectionGater::new(redialing);
Ok(GossipDriver::new(swarm, addr, handler, sync_handler, sync_protocol, gate))
}
}
19 changes: 8 additions & 11 deletions crates/node/p2p/src/gossip/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
#[debug(skip)]
pub sync_handler: libp2p_stream::Control,
/// The inbound streams for the sync request/response protocol.
/// Set to `None` if the sync request/response protocol is not enabled.
///
/// This is an option to allow to take the underlying value when the gossip driver gets
/// activated.
///
/// TODO(@theochap, `<https://github.com/op-rs/kona/issues/2141>`): remove the sync-req-resp protocol once the `op-node` phases it out.
#[debug(skip)]
pub sync_protocol: Option<IncomingStreams>,
/// A mapping from [`PeerId`] to [`Multiaddr`].
pub peerstore: HashMap<PeerId, Multiaddr>,
/// A mapping from [`PeerId`] to [`libp2p::identify::Info`].
/// TODO(@theochap, `<https://github.com/op-rs/kona/issues/2015>`): we should probably find a way to merge `peer_infos` and `peerstore` into a
/// single map.
pub peer_infos: HashMap<PeerId, libp2p::identify::Info>,
pub peerstore: HashMap<PeerId, libp2p::identify::Info>,
/// If set, the gossip layer will monitor peer scores and ban peers that are below a given
/// threshold.
pub peer_monitoring: Option<PeerMonitoring>,
Expand Down Expand Up @@ -82,11 +82,9 @@
addr,
handler,
peerstore: Default::default(),
peer_infos: Default::default(),
peer_monitoring: None,
peer_connection_start: Default::default(),
sync_handler,
// TODO(@theochap): make this field truly optional (through CLI args).
sync_protocol: Some(sync_protocol),
connection_gate: gate,
ping: Arc::new(Mutex::new(Default::default())),
Expand Down Expand Up @@ -264,7 +262,7 @@
match event {
libp2p::identify::Event::Received { connection_id, peer_id, info } => {
debug!(target: "gossip", ?connection_id, ?peer_id, ?info, "Received identify info from peer");
self.peer_infos.insert(peer_id, info);
self.peerstore.insert(peer_id, info);

Check warning on line 265 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L265

Added line #L265 was not covered by tests
}
libp2p::identify::Event::Sent { connection_id, peer_id } => {
debug!(target: "gossip", ?connection_id, ?peer_id, "Sent identify info to peer");
Expand Down Expand Up @@ -327,7 +325,7 @@
SwarmEvent::Behaviour(behavior_event) => {
return self.handle_gossip_event(behavior_event)
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {

Check warning on line 328 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L328

Added line #L328 was not covered by tests
let peer_count = self.swarm.connected_peers().count();
info!(target: "gossip", "Connection established: {:?} | Peer Count: {}", peer_id, peer_count);
kona_macros::inc!(
Expand All @@ -338,7 +336,6 @@
);
kona_macros::set!(gauge, crate::Metrics::GOSSIP_PEER_COUNT, peer_count as f64);

self.peerstore.insert(peer_id, endpoint.get_remote_address().clone());
self.peer_connection_start.insert(peer_id, Instant::now());
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
Expand Down
49 changes: 32 additions & 17 deletions crates/node/p2p/src/gossip/gater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,36 @@ impl Default for DialInfo {
}
}

/// Configuration for the connection gater.
#[derive(Debug, Clone)]
pub struct GaterConfig {
/// The number of times to dial a peer.
pub peer_redialing: Option<u64>,
/// The duration of a dial period.
///
/// A peer cannot be dialed more than [`GossipDriverBuilder.peer_redialing`] times during a
/// dial period. The dial period is reset once the last time the peer was dialed is longer
/// than the dial period. This is to prevent peers from being dialed too often.
///
/// By default, the dial period is set to 1 hour.
pub dial_period: Duration,
}

impl Default for GaterConfig {
fn default() -> Self {
Self { peer_redialing: None, dial_period: Duration::from_secs(60 * 60) }
}
}

/// Connection Gater
///
/// A connection gate that regulates peer connections for the libp2p gossip swarm.
///
/// An implementation of the [`ConnectionGate`] trait.
#[derive(Default, Debug, Clone)]
pub struct ConnectionGater {
/// The number of times to dial a peer.
pub peer_redialing: Option<u64>,
/// The configuration for the connection gater.
config: GaterConfig,
/// A set of [`PeerId`]s that are currently being dialed.
pub current_dials: HashSet<PeerId>,
/// A mapping from [`Multiaddr`] to the dial info for the peer.
Expand All @@ -54,19 +75,10 @@ pub struct ConnectionGater {
}

impl ConnectionGater {
/// The duration of a dial period.
///
/// A peer cannot be dialed more than [`GossipDriverBuilder.peer_redialing`] times during a
/// dial period. The dial period is reset once the last time the peer was dialed is longer
/// than the dial period. This is to prevent peers from being dialed too often.
///
/// TODO(@theochap): this should be configurable through CLI.
const DIAL_PERIOD: Duration = Duration::from_secs(60 * 60);

/// Creates a new instance of the `ConnectionGater`.
pub fn new(peer_redialing: Option<u64>) -> Self {
pub fn new(config: GaterConfig) -> Self {
Self {
peer_redialing,
config,
current_dials: HashSet::new(),
dialed_peers: HashMap::new(),
connectedness: HashMap::new(),
Expand All @@ -84,7 +96,7 @@ impl ConnectionGater {
return false;
};
// If the peer has been dialed and the threshold is not set, the threshold is reached.
let Some(redialing) = self.peer_redialing else {
let Some(redialing) = self.config.peer_redialing else {
return true;
};
// If the threshold is set to `0`, redial indefinitely.
Expand All @@ -101,7 +113,7 @@ impl ConnectionGater {
let Some(dial_info) = self.dialed_peers.get(addr) else {
return false;
};
dial_info.last_dial.elapsed() > Self::DIAL_PERIOD
dial_info.last_dial.elapsed() > self.config.dial_period
}

/// Gets the [`PeerId`] from a given [`Multiaddr`].
Expand Down Expand Up @@ -215,7 +227,7 @@ impl ConnectionGate for ConnectionGater {
.or_insert(DialInfo { num_dials: 0, last_dial: Instant::now() });

// If the last dial was longer than the dial period, reset the number of dials.
if dial_info.last_dial.elapsed() > Self::DIAL_PERIOD {
if dial_info.last_dial.elapsed() > self.config.dial_period {
dial_info.num_dials = 0;
}

Expand Down Expand Up @@ -301,7 +313,10 @@ impl ConnectionGate for ConnectionGater {
fn test_check_ip_in_blocked_subnets_ipv4() {
use std::str::FromStr;

let mut gater = ConnectionGater::new(None);
let mut gater = ConnectionGater::new(GaterConfig {
peer_redialing: None,
dial_period: Duration::from_secs(60 * 60),
});
gater.blocked_subnets.insert("192.168.1.0/24".parse::<IpNet>().unwrap());
gater.blocked_subnets.insert("10.0.0.0/8".parse::<IpNet>().unwrap());
gater.blocked_subnets.insert("172.16.0.0/16".parse::<IpNet>().unwrap());
Expand Down
1 change: 1 addition & 0 deletions crates/node/p2p/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod gater;
pub use gater::{
ConnectionGater, // implementation
DialInfo,
GaterConfig,
};

mod builder;
Expand Down
2 changes: 1 addition & 1 deletion crates/node/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod gossip;
pub use gossip::{
Behaviour, BehaviourError, BlockHandler, BlockInvalidError, ConnectionGate, ConnectionGater,
DEFAULT_MESH_D, DEFAULT_MESH_DHI, DEFAULT_MESH_DLAZY, DEFAULT_MESH_DLO, DialInfo, Event,
GLOBAL_VALIDATE_THROTTLE, GOSSIP_HEARTBEAT, GossipDriver, GossipDriverBuilder,
GLOBAL_VALIDATE_THROTTLE, GOSSIP_HEARTBEAT, GaterConfig, GossipDriver, GossipDriverBuilder,
GossipDriverBuilderError, Handler, HandlerEncodeError, MAX_GOSSIP_SIZE, MAX_OUTBOUND_QUEUE,
MAX_VALIDATE_QUEUE, MIN_GOSSIP_SIZE, PEER_SCORE_INSPECT_FREQUENCY, PublishError,
SEEN_MESSAGES_TTL, default_config, default_config_builder,
Expand Down
10 changes: 5 additions & 5 deletions crates/node/p2p/src/net/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::broadcast::Sender as BroadcastSender;

use crate::{
Broadcast, Config, Discv5Builder, GossipDriverBuilder, Network, NetworkBuilderError,
P2pRpcRequest, discv5::LocalNode,
P2pRpcRequest, discv5::LocalNode, gossip::GaterConfig,
};

/// Constructs a [`Network`] for the OP Stack Consensus Layer.
Expand Down Expand Up @@ -48,7 +48,7 @@ impl From<Config> for NetworkBuilder {
.with_block_time(config.block_time)
.with_keypair(config.keypair)
.with_topic_scoring(config.topic_scoring)
.with_peer_redial(config.redial)
.with_gater_config(config.gater_config)
}
}

Expand All @@ -65,9 +65,9 @@ impl NetworkBuilder {
}
}

/// Sets the number of times to redial a peer.
pub fn with_peer_redial(self, redial: Option<u64>) -> Self {
Self { gossip: self.gossip.with_peer_redial(redial), ..self }
/// Sets the configuration for the connection gater.
pub fn with_gater_config(self, config: GaterConfig) -> Self {
Self { gossip: self.gossip.with_gater_config(config), ..self }
}

/// Sets the bootstore path for the [`crate::Discv5Driver`].
Expand Down
6 changes: 3 additions & 3 deletions crates/node/p2p/src/net/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Configuration for the `Network`.

use crate::discv5::LocalNode;
use crate::{discv5::LocalNode, gossip::GaterConfig};
use alloy_primitives::Address;
use discv5::Enr;
use kona_genesis::RollupConfig;
Expand Down Expand Up @@ -39,8 +39,8 @@ pub struct Config {
pub block_time: u64,
/// An optional path to the bootstore.
pub bootstore: Option<PathBuf>,
/// The optional number of times to redial a peer.
pub redial: Option<u64>,
/// The configuration for the connection gater.
pub gater_config: GaterConfig,
/// An optional list of bootnode ENRs to start the node with.
pub bootnodes: Vec<Enr>,
/// The [`RollupConfig`].
Expand Down
6 changes: 3 additions & 3 deletions crates/node/p2p/src/net/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,16 @@
);
}

if let Some(addr) = self.gossip.peerstore.remove(&peer_to_remove){
if let Some(info) = self.gossip.peerstore.remove(&peer_to_remove){

Check warning on line 211 in crates/node/p2p/src/net/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/driver.rs#L211

Added line #L211 was not covered by tests
use crate::ConnectionGate;
self.gossip.connection_gate.remove_dial(&peer_to_remove);
let score = self.gossip.swarm.behaviour().gossipsub.peer_score(&peer_to_remove).unwrap_or_default();
kona_macros::inc!(gauge, crate::Metrics::BANNED_PEERS, "peer_id" => peer_to_remove.to_string(), "score" => score.to_string());
return Some(addr);
return Some(info.listen_addrs);

Check warning on line 216 in crates/node/p2p/src/net/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/driver.rs#L216

Added line #L216 was not covered by tests
}

None
}).collect::<HashSet<_>>();
}).collect::<Vec<_>>().into_iter().flatten().collect::<HashSet<_>>();

Check warning on line 220 in crates/node/p2p/src/net/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/driver.rs#L220

Added line #L220 was not covered by tests

// We send a request to the discovery handler to ban the set of addresses.
if let Err(send_err) = handler.sender.send(HandlerRequest::BanAddrs { addrs_to_ban: addrs_to_ban.into(), ban_duration: ban_peers.ban_duration }).await{
Expand Down
2 changes: 1 addition & 1 deletion crates/node/p2p/src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@

// Build a map of peer ids to their supported protocols and addresses.
let mut peer_metadata: HashMap<PeerId, PeerMetadata> = gossip
.peer_infos
.peerstore

Check warning on line 277 in crates/node/p2p/src/rpc/request.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/rpc/request.rs#L277

Added line #L277 was not covered by tests
.iter()
.map(|(id, info)| {
let protocols = if info.protocols.is_empty() {
Expand Down
10 changes: 7 additions & 3 deletions crates/node/p2p/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use alloy_primitives::Address;
use kona_genesis::RollupConfig;
use kona_p2p::{Behaviour, BlockHandler, ConnectionGater, GossipDriver};
use kona_p2p::{Behaviour, BlockHandler, ConnectionGater, GaterConfig, GossipDriver};
use libp2p::{Multiaddr, StreamProtocol, SwarmBuilder, identity::Keypair, multiaddr::Protocol};
use std::net::Ipv4Addr;
use std::{net::Ipv4Addr, time::Duration};

/// Helper function to create a new gossip driver instance.
pub(crate) fn gossip_driver(port: u16) -> GossipDriver<ConnectionGater> {
Expand Down Expand Up @@ -51,6 +51,10 @@ pub(crate) fn gossip_driver(port: u16) -> GossipDriver<ConnectionGater> {
.with_swarm_config(|c| c.with_idle_connection_timeout(timeout))
.build();

let gate = ConnectionGater::new(Some(2)); // Allow redialing peers twice.
let gate = ConnectionGater::new(GaterConfig {
peer_redialing: Some(2),
dial_period: Duration::from_secs(60 * 60),
});

GossipDriver::new(swarm, addr, handler, sync_handler, sync_protocol, gate)
}
Loading