diff --git a/.github/zombienet-tests/zombienet_polkadot_tests.yml b/.github/zombienet-tests/zombienet_polkadot_tests.yml index 3e3fd238b068f..af7d5f6e90393 100644 --- a/.github/zombienet-tests/zombienet_polkadot_tests.yml +++ b/.github/zombienet-tests/zombienet_polkadot_tests.yml @@ -252,3 +252,8 @@ test-filter: "functional::shared_core_idle_parachain::shared_core_idle_parachain_test" runner-type: "default" use-zombienet-sdk: true + +- job-name: "zombienet-polkadot-collators-basic-reputation-persistence" + test-filter: "functional::collators_reputation_persistence" + runner-type: "default" + use-zombienet-sdk: true diff --git a/Cargo.lock b/Cargo.lock index 4c125748530f7..17271accd8341 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15161,6 +15161,7 @@ dependencies = [ "futures", "futures-timer", "itertools 0.11.0", + "kvdb-memorydb", "parity-scale-codec", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -17256,6 +17257,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "rand 0.8.5", + "regex", "sc-executor", "sc-runtime-utilities", "serde", diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index af20bafe921d1..e0915c1a09465 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -435,6 +435,7 @@ fn build_polkadot_full_node( invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, experimental_collator_protocol: false, + collator_reputation_persist_interval: None, }; let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend { diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index 5aef3e6c810c8..16e5f1671f5a7 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -170,6 +170,13 @@ pub struct RunCmd { /// Enable experimental collator protocol. TESTING ONLY! Don't use on production #[arg(long, hide = true, default_value = "false")] pub experimental_collator_protocol: bool, + + /// Collator reputation persistence interval in seconds. + /// If not specified, defaults to 600 seconds (10 minutes). + /// This should be used only with experimental_collator_protocol + /// and only on validators. + #[arg(long, requires = "experimental_collator_protocol", requires = "validator")] + pub collator_reputation_persist_interval: Option, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index 01c0f92023c77..d222a88477be7 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -294,6 +294,10 @@ where invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + collator_reputation_persist_interval: cli + .run + .collator_reputation_persist_interval + .map(std::time::Duration::from_secs), }, ) .map(|full| full.task_manager)?; diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index cebc576871936..9b62dace1d351 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -23,6 +23,7 @@ sp-core = { workspace = true, default-features = true } sp-keystore = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } +codec.workspace = true fatality = { workspace = true } polkadot-node-network-protocol = { workspace = true, default-features = true } polkadot-node-primitives = { workspace = true, default-features = true } @@ -30,10 +31,12 @@ polkadot-node-subsystem = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } thiserror = { workspace = true } +tokio.workspace = true tokio-util = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +kvdb-memorydb = { workspace = true } rstest = { workspace = true } sc-network-types = { workspace = true, default-features = true } sp-tracing = { workspace = true } diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index 65c188fc615c8..8c37b6be53654 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -23,6 +23,7 @@ use std::{ collections::HashSet, + sync::Arc, time::{Duration, Instant}, }; @@ -31,16 +32,16 @@ use futures::{ FutureExt, TryFutureExt, }; -use polkadot_node_subsystem_util::reputation::ReputationAggregator; +use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator}; use sp_keystore::KeystorePtr; use polkadot_node_network_protocol::{ request_response::{v2 as protocol_v2, IncomingRequestReceiver}, PeerId, UnifiedReputationChange as Rep, }; -use polkadot_primitives::CollatorPair; - use polkadot_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem}; +use polkadot_primitives::CollatorPair; +pub use validator_side_experimental::ReputationConfig; mod collator_side; mod validator_side; @@ -91,6 +92,10 @@ pub enum ProtocolSide { keystore: KeystorePtr, /// Prometheus metrics for validators. metrics: validator_side_experimental::Metrics, + /// Database used for reputation house keeping. + db: Arc, + /// Reputation configuration (column number). + reputation_config: validator_side_experimental::ReputationConfig, }, /// Collators operate on a parachain. Collator { @@ -148,8 +153,8 @@ impl CollatorProtocolSubsystem { .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed() }, - ProtocolSide::ValidatorExperimental { keystore, metrics } => { - validator_side_experimental::run(ctx, keystore, metrics) + ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_config } => { + validator_side_experimental::run(ctx, keystore, metrics, db, reputation_config) .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed() }, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs index 8dcfaba2b5c1e..5f465484ed141 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use codec::{Decode, Encode}; use polkadot_node_network_protocol::{ peer_set::CollationVersion, request_response::{outgoing::RequestError, v2 as request_v2}, @@ -84,7 +85,7 @@ pub const MAX_FETCH_DELAY: Duration = Duration::from_millis(300); pub const MIN_FETCH_TIMER_DELAY: Duration = Duration::from_millis(150); /// Reputation score type. -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default, Encode, Decode)] pub struct Score(u16); impl Score { diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs index f1a6015e2f14c..0a7831a1faa3f 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::LOG_TARGET; +use crate::{validator_side_experimental::peer_manager::PersistenceError, LOG_TARGET}; use fatality::Nested; use polkadot_node_subsystem::{ChainApiError, SubsystemError}; use polkadot_node_subsystem_util::{backing_implicit_view, runtime}; @@ -44,6 +44,12 @@ pub enum Error { #[fatal] #[error("Receiving message from overseer failed: {0}")] SubsystemReceive(#[source] SubsystemError), + #[fatal] + #[error("Failed to initialize reputation database: {0}")] + ReputationDbInit(PersistenceError), + #[fatal] + #[error("Failed to spawn background task: {0}")] + SpawnTask(String), #[error("Unable to retrieve block number for {0:?} from implicit view")] BlockNumberNotFoundInImplicitView(Hash), #[fatal(forward)] diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs index 1eec6a09eae00..e140690a3be20 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs @@ -22,38 +22,67 @@ mod state; #[cfg(test)] mod tests; -use crate::{validator_side_experimental::common::MIN_FETCH_TIMER_DELAY, LOG_TARGET}; +use crate::{ + validator_side_experimental::{common::MIN_FETCH_TIMER_DELAY, peer_manager::PersistentDb}, + LOG_TARGET, +}; use collation_manager::CollationManager; use common::{ProspectiveCandidate, MAX_STORED_SCORES_PER_PARA}; use error::{log_error, FatalError, FatalResult, Result}; use futures::{future::Fuse, select, FutureExt, StreamExt}; use futures_timer::Delay; use polkadot_node_network_protocol::{ - self as net_protocol, v1 as protocol_v1, v2 as protocol_v2, CollationProtocols, PeerId, + self as net_protocol, peer_set::PeerSet, v1 as protocol_v1, v2 as protocol_v2, + CollationProtocols, PeerId, }; use polkadot_node_subsystem::{ - messages::{CollatorProtocolMessage, NetworkBridgeEvent}, + messages::{CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage}, overseer, ActivatedLeaf, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, }; +use polkadot_node_subsystem_util::database::Database; use sp_keystore::KeystorePtr; -use std::{future, future::Future, pin::Pin, time::Duration}; +use std::{future, future::Future, pin::Pin, sync::Arc, time::Duration}; -use peer_manager::{Db, PeerManager}; +#[cfg(test)] +use peer_manager::Db; +use peer_manager::PeerManager; use state::State; pub use crate::validator_side_metrics::Metrics; +/// Default interval for persisting the reputation database to disk (in seconds). +const DEFAULT_PERSIST_INTERVAL_SECS: u64 = 600; + +/// Configuration for the reputation db. +#[derive(Debug, Clone, Copy)] +pub struct ReputationConfig { + /// The data column in the store to use for reputation data. + pub col_reputation_data: u32, + /// How often to persist the reputation database to disk. + /// If None, defaults to DEFAULT_PERSIST_INTERVAL_SECS seconds. + pub persist_interval: Option, +} + /// The main run loop. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] pub(crate) async fn run( mut ctx: Context, keystore: KeystorePtr, metrics: Metrics, + db: Arc, + reputation_config: ReputationConfig, ) -> FatalResult<()> { - gum::info!(LOG_TARGET, "Running experimental collator protocol"); - if let Some(state) = initialize(&mut ctx, keystore, metrics).await? { - run_inner(ctx, state).await?; + let persist_interval = reputation_config + .persist_interval + .unwrap_or(Duration::from_secs(DEFAULT_PERSIST_INTERVAL_SECS)); + gum::info!( + LOG_TARGET, + persist_interval_secs = persist_interval.as_secs(), + "Running experimental collator protocol" + ); + if let Some(state) = initialize(&mut ctx, keystore, metrics, db, reputation_config).await? { + run_inner(ctx, state, persist_interval).await?; } Ok(()) @@ -64,7 +93,9 @@ async fn initialize( ctx: &mut Context, keystore: KeystorePtr, metrics: Metrics, -) -> FatalResult>> { + db: Arc, + reputation_config: ReputationConfig, +) -> FatalResult>> { loop { let first_leaf = match wait_for_first_leaf(ctx).await? { Some(activated_leaf) => { @@ -84,7 +115,30 @@ async fn initialize( let scheduled_paras = collation_manager.assignments(); - let backend = Db::new(MAX_STORED_SCORES_PER_PARA).await; + // Create PersistentDb with disk persistence + let (backend, task) = match PersistentDb::new( + db.clone(), + reputation_config, + MAX_STORED_SCORES_PER_PARA, + ) + .await + { + Ok(result) => result, + Err(e) => { + gum::error!( + target: LOG_TARGET, + error = ?e, + "Failed to initialize persistent reputation DB" + ); + return Err(FatalError::ReputationDbInit(e)); + }, + }; + + // Background task for async writes + ctx.spawn_blocking("collator-reputation-persistence-task", task) + .map_err(|e| FatalError::SpawnTask(e.to_string()))?; + + gum::trace!(target: LOG_TARGET, "Spawned background reputation persistence task"); match PeerManager::startup(backend, ctx.sender(), scheduled_paras.into_iter().collect()) .await @@ -113,13 +167,45 @@ async fn wait_for_first_leaf(ctx: &mut Context) -> FatalResult {}, FromOrchestra::Communication { msg } => { - // TODO: we should actually disconnect peers connected on collation protocol while - // we're still bootstrapping. OR buffer these messages until we've bootstrapped. - gum::warn!( - target: LOG_TARGET, - ?msg, - "Received msg before first active leaves update. This is not expected - message will be dropped." - ) + // Disconnect peers that connect before the subsystem is initialized. + // They will reconnect later when we're ready. + match msg { + CollatorProtocolMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_id, ..), + ) => { + gum::info!( + target: LOG_TARGET, + ?peer_id, + "Disconnecting peer that connected before subsystem initialization", + ); + ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers( + vec![peer_id], + PeerSet::Collation, + )) + .await; + }, + CollatorProtocolMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_id, ..), + ) => { + gum::info!( + target: LOG_TARGET, + ?peer_id, + "Disconnecting peer that sent message before subsystem initialization", + ); + ctx.send_message(NetworkBridgeTxMessage::DisconnectPeers( + vec![peer_id], + PeerSet::Collation, + )) + .await; + }, + msg => { + gum::trace!( + target: LOG_TARGET, + ?msg, + "Received msg before first active leaves update, dropping.", + ); + }, + } }, } } @@ -134,9 +220,21 @@ fn create_timer(maybe_delay: Option) -> Fuse Fuse + Send>>> { + let delay: Pin + Send>> = Box::pin(Delay::new(interval)); + delay.fuse() +} + #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] -async fn run_inner(mut ctx: Context, mut state: State) -> FatalResult<()> { +async fn run_inner( + mut ctx: Context, + mut state: State, + persist_interval: Duration, +) -> FatalResult<()> { let mut timer = create_timer(None); + let mut persistence_timer = create_persistence_timer(persist_interval); + loop { select! { // Calling `fuse()` here is useless, because the termination state of the resulting @@ -154,7 +252,11 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu msg, ).await; } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => { + // Persist to disk before shutdown + state.persist_reputations().await; + break + }, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number))) => { state.handle_finalized_block(ctx.sender(), hash, number).await?; }, @@ -168,6 +270,12 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu // We don't need to do anything specific here. // If the timer expires, we only need to trigger the advertisement fetching logic. }, + _ = &mut persistence_timer => { + // Periodic persistence - write reputation DB to disk + state.background_persist_reputations(); + // Reset the timer for the next interval + persistence_timer = create_persistence_timer(persist_interval); + }, } // Now try triggering advertisement fetching, if we have room in any of the active leaves @@ -187,7 +295,7 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu /// The main message receiver switch. async fn process_msg( sender: &mut Sender, - state: &mut State, + state: &mut State, msg: CollatorProtocolMessage, ) { use CollatorProtocolMessage::*; @@ -241,7 +349,7 @@ async fn process_msg( /// Bridge event switch. async fn handle_network_msg( sender: &mut Sender, - state: &mut State, + state: &mut State, bridge_message: NetworkBridgeEvent, ) -> Result<()> { use NetworkBridgeEvent::*; @@ -289,7 +397,7 @@ async fn handle_network_msg( async fn process_incoming_peer_message( sender: &mut Sender, - state: &mut State, + state: &mut State, origin: PeerId, msg: CollationProtocols< protocol_v1::CollatorProtocolMessage, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs index 3478a282e5dae..340e22ab46479 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs @@ -43,12 +43,12 @@ impl Db { } } -type Timestamp = u128; +pub(crate) type Timestamp = u128; -#[derive(Clone, Debug)] -struct ScoreEntry { - score: Score, - last_bumped: Timestamp, +#[derive(Clone, Copy, Debug, codec::Encode, codec::Decode)] +pub(crate) struct ScoreEntry { + pub(crate) score: Score, + pub(crate) last_bumped: Timestamp, } #[async_trait] @@ -222,8 +222,39 @@ impl Db { } } + /// Get the last finalized block number (for persistence). + pub(crate) fn get_last_finalized(&self) -> Option { + self.last_finalized + } + + /// Set the last finalized block number (for loading from disk). + pub(crate) fn set_last_finalized(&mut self, last_finalized: Option) { + self.last_finalized = last_finalized; + } + + /// Get reputations for a specific para (for persistence). + pub(crate) fn get_para_reputations(&self, para_id: &ParaId) -> HashMap { + self.db.get(para_id).cloned().unwrap_or_default() + } + + /// Set reputations for a specific para (for loading from disk). + pub(crate) fn set_para_reputations( + &mut self, + para_id: ParaId, + reputations: HashMap, + ) { + self.db.insert(para_id, reputations); + } + + /// Get all reputations (for persistence). + pub(crate) fn all_reputations( + &self, + ) -> impl Iterator)> { + self.db.iter() + } + #[cfg(test)] - fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { self.db.len() } } diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs index 6da6eb526762b..e1b353f0b3d3d 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs @@ -16,6 +16,8 @@ mod backend; mod connected; mod db; +mod persistence; +mod persistent_db; use futures::channel::oneshot; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; @@ -32,7 +34,10 @@ use crate::{ }; pub use backend::Backend; use connected::ConnectedPeers; +#[cfg(test)] pub use db::Db; +pub use persistence::PersistenceError; +pub use persistent_db::PersistentDb; use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; use polkadot_node_subsystem::{ messages::{ChainApiMessage, NetworkBridgeTxMessage}, @@ -117,6 +122,17 @@ impl PeerManager { instance.db.process_bumps(latest_finalized_block_number, bumps, None).await; + if latest_finalized_block_number != processed_finalized_block_number { + gum::trace!( + target: LOG_TARGET, + blocks_processed = std::cmp::min( + latest_finalized_block_number.saturating_sub(processed_finalized_block_number), + MAX_STARTUP_ANCESTRY_LOOKBACK + ), + "Startup lookback completed" + ); + } + Ok(instance) } @@ -403,6 +419,23 @@ impl PeerManager { } } +impl PeerManager { + /// Persist the reputation database to disk asynchronously (fire-and-forget). + pub fn persist_to_disk_async(&mut self) { + self.db.persist_async(None); + } + + /// Persist and wait for the background writer to finish the write. + pub async fn persist_and_wait(&mut self) { + if self.db.persist_and_wait().await.is_err() { + gum::error!( + target: LOG_TARGET, + "Failed to persist reputation DB: background writer closed" + ); + } + } +} + async fn get_ancestors( sender: &mut Sender, k: usize, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs new file mode 100644 index 0000000000000..f55481eecb25f --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs @@ -0,0 +1,146 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Serialization types for disk persistence of collator reputation data. + +use codec::{Decode, Encode}; +use polkadot_node_network_protocol::PeerId; +use polkadot_primitives::{BlockNumber, Id as ParaId}; +use std::collections::HashMap; + +use super::db::ScoreEntry; + +/// Key prefix for per-para reputation data. +pub const REPUTATION_PARA_PREFIX: &[u8; 12] = b"Rep_per_para"; +/// Key for metadata. +pub const REPUTATION_META_KEY: &[u8; 8] = b"Rep_meta"; +/// Key for the list of stored para IDs. +pub const REPUTATION_PARA_LIST_KEY: &[u8; 12] = b"Rep_paralist"; + +/// Serializable PeerId wrapper. +/// PeerId is a Multihash which can be converted to/from bytes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct SerializablePeerId(pub(crate) PeerId); + +impl Encode for SerializablePeerId { + fn encode(&self) -> Vec { + self.0.to_bytes().encode() + } + + fn encode_to(&self, dest: &mut T) { + self.0.to_bytes().encode_to(dest) + } +} + +impl Decode for SerializablePeerId { + fn decode(input: &mut I) -> Result { + let bytes = Vec::::decode(input)?; + PeerId::from_bytes(&bytes) + .map(SerializablePeerId) + .map_err(|_| codec::Error::from("Invalid PeerId bytes")) + } +} + +/// Stored reputations for a single para. +/// This is the VALUE stored in the DB, keyed by ParaId. +#[derive(Debug, Clone, Encode, Decode, Default)] +pub(crate) struct StoredParaReputations { + /// Vec of (peer_id, score_entry) pairs. + pub(crate) entries: Vec<(SerializablePeerId, ScoreEntry)>, +} + +impl From> for StoredParaReputations { + fn from(map: HashMap) -> Self { + let entries = map + .into_iter() + .map(|(peer_id, entry)| (SerializablePeerId(peer_id), entry)) + .collect(); + StoredParaReputations { entries } + } +} + +impl From for HashMap { + fn from(stored: StoredParaReputations) -> Self { + stored.entries.into_iter().map(|(peer_id, entry)| (peer_id.0, entry)).collect() + } +} + +/// Metadata stored separately from per-para data. +#[derive(Debug, Clone, Encode, Decode)] +pub(crate) struct StoredMetadata { + /// The last finalized block number that was processed. + pub(crate) last_finalized: Option, +} + +/// Generate key for a para's reputation data. +/// Key format: "Rep_per_para" (12 bytes) + ParaId (4 bytes, big-endian) +pub fn para_reputation_key(para_id: ParaId) -> [u8; 16] { + let mut key = [0u8; 12 + 4]; + key[..12].copy_from_slice(REPUTATION_PARA_PREFIX); + key[12..].copy_from_slice(&u32::from(para_id).to_be_bytes()); + key +} + +/// Returns the metadata key. +pub fn metadata_key() -> &'static [u8] { + REPUTATION_META_KEY +} + +/// Returns the para list key. +pub fn para_list_key() -> &'static [u8] { + REPUTATION_PARA_LIST_KEY +} + +/// Stored list of para IDs that have reputation data on disk. +#[derive(Debug, Clone, Encode, Decode, Default)] +pub(crate) struct StoredParaList { + pub(crate) paras: Vec, +} + +/// Errors during persistence operations. +#[derive(Debug, thiserror::Error)] +pub enum PersistenceError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("Codec error: {0}")] + Codec(#[from] codec::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::validator_side_experimental::peer_manager::Score; + + #[test] + fn stored_para_reputations_roundtrip() { + let mut map = HashMap::new(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + + map.insert(peer1, ScoreEntry { score: Score::new(100).unwrap(), last_bumped: 1234567890 }); + map.insert(peer2, ScoreEntry { score: Score::new(50).unwrap(), last_bumped: 9876543210 }); + + let stored: StoredParaReputations = map.into(); + let encoded = stored.encode(); + let decoded = StoredParaReputations::decode(&mut &encoded[..]).expect("decode should work"); + + let restored_map: HashMap = decoded.into(); + + assert_eq!(restored_map.len(), 2); + assert_eq!(restored_map.get(&peer1).unwrap().score, Score::new(100).unwrap()); + assert_eq!(restored_map.get(&peer2).unwrap().score, Score::new(50).unwrap()); + } +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs new file mode 100644 index 0000000000000..5e233a7473d36 --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs @@ -0,0 +1,1108 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Disk-backed reputation database for collator protocol. + +use crate::{ + validator_side_experimental::{ + common::Score, + peer_manager::{ + backend::Backend, + db::{Db, ScoreEntry}, + persistence::{ + metadata_key, para_list_key, para_reputation_key, PersistenceError, StoredMetadata, + StoredParaList, StoredParaReputations, + }, + ReputationUpdate, + }, + ReputationConfig, + }, + LOG_TARGET, +}; +use async_trait::async_trait; +use codec::{Decode, Encode}; +use futures::{channel::oneshot, Future}; +use polkadot_node_network_protocol::PeerId; +use polkadot_node_subsystem_util::database::{DBTransaction, Database}; +use polkadot_primitives::{BlockNumber, Id as ParaId}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, + pin::Pin, + sync::Arc, +}; +use tokio::sync::mpsc; + +/// Describes the context of a persistence operation, used for logging +/// by the background writer after a disk write completes. +#[derive(Debug)] +pub enum LogInfo { + /// Periodic timer-triggered persistence. Fields are computed internally + /// by `send_persist_request` before sending to the background writer. + Periodic { + total_entries: usize, + para_count: usize, + dirty_para_count: usize, + last_finalized: Option, + }, + /// Immediate persistence after pruning unregistered paras. + Pruned { pruned_count: usize, remaining_count: usize, registered_count: usize }, + /// Immediate persistence after a reputation slash (security-critical). + Slash { para_id: ParaId, peer_id: PeerId, value: Score }, +} + +impl LogInfo { + fn log(&self) { + match self { + LogInfo::Periodic { total_entries, para_count, dirty_para_count, last_finalized } => { + gum::debug!( + target: LOG_TARGET, + total_peer_entries = total_entries, + para_count, + dirty_para_count, + ?last_finalized, + "Periodic persistence completed: reputation DB written to disk" + ); + }, + LogInfo::Pruned { pruned_count, remaining_count, registered_count } => { + gum::debug!( + target: LOG_TARGET, + pruned_para_count = pruned_count, + remaining_para_count = remaining_count, + registered_para_count = registered_count, + "Prune paras persisted to disk immediately" + ); + }, + LogInfo::Slash { para_id, peer_id, value } => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + ?peer_id, + slash_value = ?value, + "Slash persisted to disk immediately" + ); + }, + } + } +} + +/// Request sent to the background writer task +struct PersistenceRequest { + updates: Vec<(ParaId, Option)>, + metadata: StoredMetadata, + para_list: StoredParaList, + log_info: LogInfo, + completion_tx: Option>, +} + +pub type WriterFuture = Pin + Send + 'static>>; + +/// Persistent database implementation for collator reputation. +/// +/// This wraps the in-memory `Db` and adds disk persistence capability. +/// +/// **Persistence Policy:** +/// - All operations (bumps, decays, queries) happen in-memory only +/// - Disk writes happen: +/// 1. On slash operations (immediate, for security) +/// 2. When `persist()` is called explicitly by the main loop (periodic timer) +/// 3. On paras pruning (immediate) +/// +/// - Only modified paras are persisted during periodic persistence +/// - Paras are marked dirty when `process_bumps` modifies their reputation (bumps/decays) +/// +/// The main loop is responsible for calling `persist()` periodically (currently, every 10 minutes). +pub struct PersistentDb { + /// In-memory database (does all the actual logic). + inner: Db, + /// Disk database handle. + disk_db: Arc, + /// Column configuration. + config: ReputationConfig, + /// Paras whose reputation has changed since last persistence. + dirty_paras: BTreeSet, + /// Channel to send updates to the background writer. + background_tx: mpsc::Sender, +} + +impl PersistentDb { + /// Create a new persistent DB, loading existing state from disk. + pub async fn new( + disk_db: Arc, + config: ReputationConfig, + stored_limit_per_para: u16, + ) -> Result<(Self, WriterFuture), PersistenceError> { + // Create empty in-memory DB + let inner = Db::new(stored_limit_per_para).await; + + let (tx, rx) = mpsc::channel(1); + // Load data from disk into the in-memory DB + let mut instance = Self { + inner, + disk_db: disk_db.clone(), + config, + dirty_paras: BTreeSet::new(), + background_tx: tx, + }; + let (para_count, total_entries) = instance.load_from_disk().await?; + + let last_finalized = instance.inner.processed_finalized_block_number().await; + + gum::info!( + target: LOG_TARGET, + ?last_finalized, + para_count, + total_peer_entries = total_entries, + "Reputation DB initialized" + ); + + let task = Box::pin(Self::run_background_writer(disk_db, config, rx)); + + Ok((instance, task)) + } + + async fn run_background_writer( + disk_db: Arc, + config: ReputationConfig, + mut rx: mpsc::Receiver, + ) { + while let Some(req) = rx.recv().await { + let PersistenceRequest { updates, metadata, para_list, log_info, completion_tx } = req; + + let mut db_transaction = DBTransaction::new(); + + // Write metadata + db_transaction.put_vec(config.col_reputation_data, metadata_key(), metadata.encode()); + + // Write para list + db_transaction.put_vec(config.col_reputation_data, para_list_key(), para_list.encode()); + + // Write updates + for (para_id, maybe_data) in updates { + let key = para_reputation_key(para_id); + match maybe_data { + Some(stored_para_rep) => db_transaction.put( + config.col_reputation_data, + &key, + &stored_para_rep.encode(), + ), + None => db_transaction.delete(config.col_reputation_data, &key), + } + } + + // Commit transaction to disk + match disk_db.write(db_transaction) { + Ok(_) => { + log_info.log(); + }, + Err(e) => { + gum::error!( + target: LOG_TARGET, + error = ?e, + "Background persistence write failed" + ); + }, + } + + // Signal completion if requested (used by graceful shutdown) + if let Some(tx) = completion_tx { + let _ = tx.send(()); + } + } + gum::debug!(target: LOG_TARGET, "Background reputation writer shutting down"); + } + + /// Load all data from disk into the in-memory DB. + /// Returns (para_count, total_entries) for logging purposes. + async fn load_from_disk(&mut self) -> Result<(usize, usize), PersistenceError> { + gum::trace!( + target: LOG_TARGET, + "Starting to load reputation data from disk" + ); + + // Load metadata + if let Some(meta) = self.load_metadata()? { + self.inner.set_last_finalized(meta.last_finalized); + gum::debug!( + target: LOG_TARGET, + last_finalized = ?meta.last_finalized, + "Loaded reputation DB metadata from disk" + ); + } else { + gum::debug!( + target: LOG_TARGET, + "No existing reputation metadata found on disk (fresh start)" + ); + } + + // Load para list + let para_list = self.load_para_list()?; + + let mut total_entries = 0; + let mut para_count = 0; + for para_id in para_list { + let key = para_reputation_key(para_id); + if let Some(value) = self.disk_db.get(self.config.col_reputation_data, &key)? { + let stored: StoredParaReputations = + Decode::decode(&mut &value[..]).map_err(PersistenceError::Codec)?; + let entries: HashMap = stored.into(); + let entry_count = entries.len(); + total_entries += entry_count; + para_count += 1; + gum::trace!( + target: LOG_TARGET, + ?para_id, + peer_count = entry_count, + "Loaded reputation entries for para from disk" + ); + self.inner.set_para_reputations(para_id, entries); + } + } + + Ok((para_count, total_entries)) + } + + /// Load metadata from disk. + fn load_metadata(&self) -> Result, PersistenceError> { + match self.disk_db.get(self.config.col_reputation_data, metadata_key())? { + None => Ok(None), + Some(raw) => { + StoredMetadata::decode(&mut &raw[..]).map(Some).map_err(PersistenceError::Codec) + }, + } + } + + /// Load the list of stored para IDs from disk. + fn load_para_list(&self) -> Result, PersistenceError> { + match self.disk_db.get(self.config.col_reputation_data, para_list_key())? { + None => Ok(Vec::new()), + Some(raw) => { + let list = + StoredParaList::decode(&mut &raw[..]).map_err(PersistenceError::Codec)?; + Ok(list.paras) + }, + } + } + + /// Internal: snapshot dirty data and send to the background writer. + fn send_persist_request( + &mut self, + log_info: Option, + completion_tx: Option>, + ) { + let mut updates = Vec::new(); + let is_periodic = log_info.is_none(); + let mut stats_total_entries = 0; + + let paras_to_snapshot: Vec = self.dirty_paras.iter().cloned().collect(); + + for para_id in paras_to_snapshot { + let peer_scores = self.inner.get_para_reputations(¶_id); + if peer_scores.is_empty() { + updates.push((para_id, None)); + } else { + let stored: StoredParaReputations = peer_scores.into(); + if is_periodic { + stats_total_entries += stored.entries.len(); + } + + updates.push((para_id, Some(stored))); + } + } + + // Get the finalized block from the DB + let last_finalized = self.inner.get_last_finalized(); + let final_log_info = match log_info { + None => LogInfo::Periodic { + total_entries: stats_total_entries, + para_count: self.inner.all_reputations().count(), + dirty_para_count: self.dirty_paras.len(), + last_finalized, + }, + Some(other) => other, + }; + + let request = PersistenceRequest { + updates, + metadata: StoredMetadata { last_finalized: self.inner.get_last_finalized() }, + para_list: StoredParaList { + paras: self.inner.all_reputations().map(|(para_id, _)| *para_id).collect(), + }, + log_info: final_log_info, + completion_tx, + }; + + match self.background_tx.try_send(request) { + Ok(_) => { + // On success, we assume the data is handed off. + self.dirty_paras.clear(); + }, + Err(mpsc::error::TrySendError::Full(_)) => { + gum::warn!( + target: LOG_TARGET, + "Reputation persistence channel full. Modifications kept in memory for next retry." + ); + // We do NOT clear dirty_paras. + }, + Err(mpsc::error::TrySendError::Closed(_)) => { + gum::error!( + target: LOG_TARGET, + "Reputation persistence channel closed unexpectedly." + ); + }, + } + } + + /// Queue a snapshot of the dirty data to the background writer (fire-and-forget). + pub fn persist_async(&mut self, log_info: Option) { + self.send_persist_request(log_info, None); + } + + /// Queue a snapshot and return a receiver that completes when the write finishes. + /// Used for graceful shutdown to ensure data is flushed before exit. + pub fn persist_and_wait(&mut self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + self.send_persist_request(None, Some(tx)); + rx + } +} + +#[async_trait] +impl Backend for PersistentDb { + async fn processed_finalized_block_number(&self) -> Option { + self.inner.processed_finalized_block_number().await + } + + async fn query(&self, peer_id: &PeerId, para_id: &ParaId) -> Option { + self.inner.query(peer_id, para_id).await + } + + async fn slash(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) { + // Delegate to inner DB + self.inner.slash(peer_id, para_id, value).await; + + self.dirty_paras.insert(*para_id); + + // Immediately persist to disk after slash (security-critical) + self.persist_async(Some(LogInfo::Slash { para_id: *para_id, peer_id: *peer_id, value })); + } + + async fn prune_paras(&mut self, registered_paras: BTreeSet) { + // Collects all paras that have reputations and are still registered + let paras_to_prune: Vec = self + .inner + .all_reputations() + .filter(|(para_id, _)| !registered_paras.contains(para_id)) + .map(|(para_id, _)| *para_id) + .collect(); + + let pruned_count = paras_to_prune.len(); + + for para_id in ¶s_to_prune { + self.dirty_paras.insert(*para_id); + } + // Prune from in-memory state + self.inner.prune_paras(registered_paras.clone()).await; + let paras_after = self.inner.all_reputations().count(); + + self.persist_async(Some(LogInfo::Pruned { + pruned_count, + remaining_count: paras_after, + registered_count: registered_paras.len(), + })); + } + + async fn process_bumps( + &mut self, + leaf_number: BlockNumber, + bumps: BTreeMap>, + decay_value: Option, + ) -> Vec { + // Mark all paras in bumps as dirty. + for para_id in bumps.keys() { + self.dirty_paras.insert(*para_id); + } + + // Delegate to inner DB - NO PERSISTENCE HERE + // Persistence happens via the periodic timer calling persist() + self.inner.process_bumps(leaf_number, bumps, decay_value).await + } + + async fn max_scores_for_paras(&self, paras: BTreeSet) -> HashMap { + self.inner.max_scores_for_paras(paras).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter; + use std::time::Duration; + use tokio::time::sleep; + + const DATA_COL: u32 = 0; + const NUM_COLUMNS: u32 = 1; + + fn make_db() -> Arc { + let db = kvdb_memorydb::create(NUM_COLUMNS); + let db = DbAdapter::new(db, &[]); + Arc::new(db) + } + + fn make_config() -> ReputationConfig { + ReputationConfig { col_reputation_data: DATA_COL, persist_interval: None } + } + + /// Returns the DB handle and the JoinHandle for the background task. + async fn create_and_spawn_db( + disk_db: Arc, + config: ReputationConfig, + ) -> (PersistentDb, tokio::task::JoinHandle<()>) { + let (db, task) = + PersistentDb::new(disk_db, config, 100).await.expect("failed to create db"); + let handle = tokio::spawn(task); + (db, handle) + } + + #[tokio::test] + async fn load_from_empty_disk_fresh_start() { + // Test that PersistentDb can be created from an empty database (fresh start) + let disk_db = make_db(); + let config = make_config(); + + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Fresh start should have no finalized block + assert_eq!(db.processed_finalized_block_number().await, None); + + assert_eq!(db.inner.len(), 0); + } + + #[tokio::test] + async fn load_from_disk_with_existing_data() { + // Test that PersistentDb correctly loads existing data from disk + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // First, create a DB, add some data, and persist it + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + // Process some bumps to add reputation data + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + + db.process_bumps(10, bumps, None).await; + + // Persist to disk + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Now create a new DB instance and verify data was loaded + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Verify data was loaded correctly + assert_eq!(db.processed_finalized_block_number().await, Some(10)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + // Non-existent queries should return None + assert_eq!(db.query(&peer1, ¶_id_200).await, None); + assert_eq!(db.query(&peer2, ¶_id_100).await, None); + } + } + + #[tokio::test] + async fn slash_persists_immediately() { + // Test that slash operations persist to disk immediately + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add some reputation + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps = [(para_id, [(peer, Score::new(100).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + // Persist initial state + let _ = db.persist_and_wait().await; + handle.abort(); + } + + { + // 2. Slash (Async) + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + db.slash(&peer, ¶_id, Score::new(30).unwrap()).await; + + sleep(Duration::from_millis(50)).await; + handle.abort(); + } + + // 3. Verify + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("reload"); + assert_eq!(db.query(&peer, ¶_id).await, Some(Score::new(70).unwrap())); + } + + #[tokio::test] + async fn slash_that_removes_entry_persists_immediately() { + // Test that a slash that reduces score to zero (removing entry) persists immediately + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add some reputation + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + let bumps = [(para_id, [(peer, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Slash more than the current score - should remove entry + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + db.slash(&peer, ¶_id, Score::new(100).unwrap()).await; + + sleep(Duration::from_millis(50)).await; + handle.abort(); + } + + // Create new DB instance and verify entry was removed + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Entry should be gone + assert_eq!(db.query(&peer, ¶_id).await, None); + } + } + + #[tokio::test] + async fn prune_paras_persists_immediately() { + // Test that prune_paras persists immediately + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + let para_id_300 = ParaId::from(300); + + // Create DB and add reputation for multiple paras + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + (para_id_300, [(peer1, Score::new(25).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Prune - only keep para 200 registered + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + let registered_paras = [para_id_200].into_iter().collect(); + + db.prune_paras(registered_paras).await; + sleep(Duration::from_millis(50)).await; + handle.abort(); + } + + // Create new DB instance and verify pruning was persisted + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Only para 200 should remain + assert_eq!(db.query(&peer1, ¶_id_100).await, None); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + assert_eq!(db.query(&peer1, ¶_id_300).await, None); + } + } + + #[tokio::test] + async fn periodic_persist_writes_all_data() { + // Test that persist() correctly writes all in-memory data + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // Create DB, add data, and persist via background writer + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + // Add reputation via bumps (these don't trigger immediate persistence) + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(15, bumps, None).await; + + // Now call periodic persist + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Reload and verify + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + assert_eq!(db.processed_finalized_block_number().await, Some(15)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + } + } + + #[tokio::test] + async fn data_survives_simulated_restart() { + // Test full restart scenario: create, populate, persist, drop, reload + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // Session 1: Create and populate + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps = [ + ( + para_id_100, + [(peer1, Score::new(100).unwrap()), (peer2, Score::new(50).unwrap())] + .into_iter() + .collect(), + ), + (para_id_200, [(peer3, Score::new(200).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(20, bumps, None).await; + + // Slash peer2 (also persists all dirty paras via background writer) + db.slash(&peer2, ¶_id_100, Score::new(25).unwrap()).await; + + // Wait for background writer to finish + sleep(Duration::from_millis(50)).await; + handle.abort(); + } + + // Session 2: "Restart" - create new instance + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + // Verify all data survived + assert_eq!(db.processed_finalized_block_number().await, Some(20)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(100).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_100).await, Some(Score::new(25).unwrap())); + assert_eq!(db.query(&peer3, ¶_id_200).await, Some(Score::new(200).unwrap())); + + // Continue with more operations + let bumps = [(para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(25, bumps, None).await; + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Session 3: Verify continued state + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + assert_eq!(db.processed_finalized_block_number().await, Some(25)); + // peer1 should now have 100 + 50 = 150 + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(150).unwrap())); + } + } + + #[tokio::test] + async fn roundtrip_serialization_correctness() { + // Test that data roundtrips correctly through serialization + let disk_db = make_db(); + let config = make_config(); + + // Create peers with specific scores to verify exact values + let peers: Vec<_> = (0..10).map(|_| PeerId::random()).collect(); + let para_id = ParaId::from(42); + + let original_scores: HashMap = peers + .iter() + .enumerate() + .map(|(i, peer)| (*peer, Score::new((i as u16 + 1) * 100).unwrap())) + .collect(); + + // Store data + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps = + [(para_id, original_scores.iter().map(|(peer, score)| (*peer, *score)).collect())] + .into_iter() + .collect(); + db.process_bumps(100, bumps, None).await; + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Reload and verify exact values + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + for (peer, expected_score) in &original_scores { + let actual_score = db.query(peer, ¶_id).await; + assert_eq!( + actual_score, + Some(*expected_score), + "Score mismatch for peer after roundtrip" + ); + } + } + } + + #[tokio::test] + async fn bumps_without_persist_not_saved() { + // Test that bumps without explicit persist are NOT saved to disk + // (they only persist via periodic timer or slash) + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add bumps, but DON'T persist + { + let (mut db, _) = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [(para_id, [(peer, Score::new(100).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + // Verify in-memory state + assert_eq!(db.query(&peer, ¶_id).await, Some(Score::new(100).unwrap())); + + // Don't call persist - just drop + } + + // Create new instance - data should NOT be there + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Data was never persisted + assert_eq!(db.query(&peer, ¶_id).await, None); + assert_eq!(db.processed_finalized_block_number().await, None); + } + } + + #[tokio::test] + async fn multiple_paras_multiple_peers() { + // Test handling of multiple paras with multiple peers each + let disk_db = make_db(); + let config = make_config(); + + let peers: Vec<_> = (0..5).map(|_| PeerId::random()).collect(); + let paras: Vec<_> = (100..105).map(ParaId::from).collect(); + + // Create complex state + { + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps: BTreeMap> = paras + .iter() + .enumerate() + .map(|(para_idx, para_id)| { + let peer_scores: HashMap = peers + .iter() + .enumerate() + .map(|(peer_idx, peer)| { + let score = ((para_idx + 1) * 10 + peer_idx) as u16; + (*peer, Score::new(score).unwrap()) + }) + .collect(); + (*para_id, peer_scores) + }) + .collect(); + + db.process_bumps(50, bumps, None).await; + let _ = db.persist_and_wait().await; + handle.abort(); + } + + // Verify all data + { + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + for (para_idx, para_id) in paras.iter().enumerate() { + for (peer_idx, peer) in peers.iter().enumerate() { + let expected_score = ((para_idx + 1) * 10 + peer_idx) as u16; + assert_eq!( + db.query(peer, para_id).await, + Some(Score::new(expected_score).unwrap()), + "Mismatch for para {} peer {}", + para_idx, + peer_idx + ); + } + } + } + } + + #[tokio::test] + async fn dirty_tracking_only_persists_modified_paras() { + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + assert!(db.dirty_paras.is_empty(), "Fresh DB should have no dirty paras"); + + let bumps_para_100 = + [(para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps_para_100, None).await; + + assert!(db.dirty_paras.contains(¶_id_100), "Para 100 should be dirty after bump"); + assert!(!db.dirty_paras.contains(¶_id_200), "Para 200 should NOT be dirty"); + assert_eq!(db.dirty_paras.len(), 1, "Only one para should be dirty"); + + let _ = db.persist_and_wait().await; + + assert!(db.dirty_paras.is_empty(), "Dirty paras should be cleared after persist"); + + handle.abort(); + drop(db); + + let (reloaded_db, _) = + PersistentDb::new(disk_db, config, 100).await.expect("should reload db"); + + assert_eq!( + reloaded_db.query(&peer1, ¶_id_100).await, + Some(Score::new(50).unwrap()), + "Para 100 data should be persisted correctly" + ); + + assert_eq!(reloaded_db.inner.len(), 1); + + assert_eq!( + reloaded_db.processed_finalized_block_number().await, + Some(10), + "Last finalized block should be 10" + ); + } + + #[tokio::test] + async fn dirty_tracking_cleared_after_prune() { + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps: BTreeMap> = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer1, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + assert_eq!(db.dirty_paras.len(), 2); + assert!(db.dirty_paras.contains(¶_id_100)); + assert!(db.dirty_paras.contains(¶_id_200)); + + let registered = [para_id_100].into_iter().collect(); + db.prune_paras(registered).await; + + assert!( + db.dirty_paras.is_empty(), + "Dirty paras should be cleared after prune_paras persists" + ); + + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(db.query(&peer1, ¶_id_200).await, None); + + sleep(Duration::from_millis(50)).await; + handle.abort(); + drop(db); + let (reloaded, _) = + PersistentDb::new(disk_db, config, 100).await.expect("should reload db"); + + assert_eq!(reloaded.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(reloaded.query(&peer1, ¶_id_200).await, None); + } + + #[tokio::test] + async fn dirty_tracking_cleared_after_slash() { + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + let bumps: BTreeMap> = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + assert!(db.dirty_paras.contains(¶_id_100)); + assert!(db.dirty_paras.contains(¶_id_200)); + assert_eq!(db.dirty_paras.len(), 2); + + db.slash(&peer1, ¶_id_100, Score::new(30).unwrap()).await; + + assert!(!db.dirty_paras.contains(¶_id_100)); + assert!(!db.dirty_paras.contains(¶_id_200)); + assert_eq!(db.dirty_paras.len(), 0); + + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(20).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + + sleep(Duration::from_millis(50)).await; + handle.abort(); + drop(db); + let (reloaded, _) = + PersistentDb::new(disk_db, config, 100).await.expect("should reload db"); + + assert_eq!(reloaded.query(&peer1, ¶_id_100).await, Some(Score::new(20).unwrap())); + assert_eq!(reloaded.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + } + + #[tokio::test] + async fn crash_before_persist_loses_bumps_but_not_slashes() { + let disk_db = make_db(); + let config = make_config(); + let peer1 = PeerId::random(); + let para_id_100 = ParaId::from(100); + + let (mut db, handle) = create_and_spawn_db(disk_db.clone(), config).await; + + // 1. Initial bump + Async Persist + let bumps1 = [(para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps1, None).await; + db.persist_async(None); + sleep(Duration::from_millis(50)).await; + + // 2. Slash (Persists immediately) + db.slash(&peer1, ¶_id_100, Score::new(20).unwrap()).await; + sleep(Duration::from_millis(50)).await; + + // 3. New Bump (Memory only) + let bumps2 = [(para_id_100, [(peer1, Score::new(15).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(20, bumps2, None).await; + + // "Crash" (Abort handle, drop DB) + handle.abort(); + drop(db); + + // 4. Verify Disk State + let (db, _) = PersistentDb::new(disk_db, config, 100).await.expect("reload"); + // Should have: 50 (initial) - 20 (slash) = 30. The +15 bump was lost. + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(30).unwrap())); + } + + #[tokio::test] + async fn corrupted_metadata_returns_error() { + // Test that corrupted metadata in the database returns a codec error + let disk_db = make_db(); + let config = make_config(); + + // Write some corrupted metadata directly to disk + let mut tx = DBTransaction::new(); + tx.put_vec(config.col_reputation_data, metadata_key(), vec![0xff, 0xff, 0xff]); + disk_db.write(tx).expect("should write corrupted data"); + + // Attempt to create PersistentDb - should fail with codec error + let err = PersistentDb::new(disk_db, config, 100).await.err().unwrap(); + assert!(matches!(err, PersistenceError::Codec(_))); + } + + #[tokio::test] + async fn corrupted_para_reputation_returns_error() { + // Test that corrupted para reputation data returns a codec error + let disk_db = make_db(); + let config = make_config(); + let para_id = ParaId::from(100); + + // Write a valid para list that references the para, but corrupted para data + let mut tx = DBTransaction::new(); + let para_list = StoredParaList { paras: vec![para_id] }; + tx.put_vec(config.col_reputation_data, para_list_key(), para_list.encode()); + let key = para_reputation_key(para_id); + tx.put_vec(config.col_reputation_data, &key, vec![0xde, 0xad, 0xbe, 0xef]); + disk_db.write(tx).expect("should write corrupted data"); + + // Attempt to create PersistentDb - should fail with codec error + let err = PersistentDb::new(disk_db, config, 100).await.err().unwrap(); + assert!(matches!(err, PersistenceError::Codec(_))); + } +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs index ab7c959cc4188..852e0efa1cf02 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs @@ -23,7 +23,7 @@ use crate::{ ProspectiveCandidate, TryAcceptOutcome, INVALID_COLLATION_SLASH, }, error::{Error, FatalResult}, - peer_manager::Backend, + peer_manager::{Backend, PersistentDb}, Metrics, PeerManager, }, LOG_TARGET, @@ -618,3 +618,18 @@ impl State { self.collation_manager.advertisements() } } + +// Specific implementation for PersistentDb to support disk persistence. +impl State { + /// Persist the reputation database to disk asynchronously (fire-and-forget). + /// Called on periodic timer. + pub fn background_persist_reputations(&mut self) { + self.peer_manager.persist_to_disk_async(); + } + + /// Persist the reputation database to disk and wait for completion. + /// Called on graceful shutdown. + pub async fn persist_reputations(&mut self) { + self.peer_manager.persist_and_wait().await; + } +} diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index b55114257e727..d9801a9a58445 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -34,6 +34,7 @@ use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE; use gum::info; use mmr_gadget::MmrGadget; use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD; +use polkadot_collator_protocol::ReputationConfig; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; @@ -100,6 +101,8 @@ pub struct NewFullParams { pub collator_protocol_hold_off: Option, /// Use experimental collator protocol pub experimental_collator_protocol: bool, + /// Collator reputation persistence interval. If None, defaults to 600 seconds. + pub collator_reputation_persist_interval: Option, } /// Completely built polkadot node service. @@ -212,6 +215,7 @@ where invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + collator_reputation_persist_interval, }, overseer_connector, partial_components: @@ -424,6 +428,10 @@ where stagnant_check_interval: Default::default(), stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly, }; + let reputation_config = ReputationConfig { + col_reputation_data: parachains_db::REAL_COLUMNS.col_collator_reputation_data, + persist_interval: collator_reputation_persist_interval, + }; // Kusama + testnets get a higher threshold, we are conservative on Polkadot for now. let fetch_chunks_threshold = @@ -456,6 +464,7 @@ where invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + reputation_config, }) }; diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 44526e268ec53..b645e6c22bca5 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use super::{Error, IsParachainNode, Registry}; +use polkadot_collator_protocol::ReputationConfig; use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient}; use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError}; use sp_core::traits::SpawnNamed; @@ -149,6 +150,8 @@ pub struct ExtendedOverseerGenArgs { pub collator_protocol_hold_off: Option, /// Use experimental collator protocol pub experimental_collator_protocol: bool, + /// Reputation DB config used by experimental collator protocol, + pub reputation_config: ReputationConfig, } /// Obtain a prepared validator `Overseer`, that is initialized with all default values. @@ -186,6 +189,7 @@ pub fn validator_overseer_builder( invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + reputation_config, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -308,6 +312,8 @@ where ProtocolSide::ValidatorExperimental { keystore: keystore.clone(), metrics: Metrics::register(registry)?, + db: parachains_db.clone(), + reputation_config, } } else { ProtocolSide::Validator { diff --git a/polkadot/node/service/src/parachains_db/mod.rs b/polkadot/node/service/src/parachains_db/mod.rs index 887db80a30348..3d5df79026550 100644 --- a/polkadot/node/service/src/parachains_db/mod.rs +++ b/polkadot/node/service/src/parachains_db/mod.rs @@ -47,13 +47,27 @@ pub(crate) mod columns { } pub mod v4 { + pub use super::v5::{NUM_COLUMNS, ORDERED_COL}; + } + + pub mod v5 { pub const NUM_COLUMNS: u32 = 5; + + pub const ORDERED_COL: &[u32] = &[ + super::v6::COL_AVAILABILITY_META, + super::v6::COL_CHAIN_SELECTION_DATA, + super::v6::COL_DISPUTE_COORDINATOR_DATA, + ]; + } + + pub mod v6 { + pub const NUM_COLUMNS: u32 = 6; pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_META: u32 = 1; pub const COL_APPROVAL_DATA: u32 = 2; pub const COL_CHAIN_SELECTION_DATA: u32 = 3; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; - + pub const COL_COLLATOR_REPUTATION_DATA: u32 = 5; pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; } @@ -73,16 +87,19 @@ pub struct ColumnsConfig { pub col_chain_selection_data: u32, /// The column used by dispute coordinator for data. pub col_dispute_coordinator_data: u32, + /// The column used to keep data about collators reputation. + pub col_collator_reputation_data: u32, } /// The real columns used by the parachains DB. #[cfg(any(test, feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { - col_availability_data: columns::v4::COL_AVAILABILITY_DATA, - col_availability_meta: columns::v4::COL_AVAILABILITY_META, - col_approval_data: columns::v4::COL_APPROVAL_DATA, - col_chain_selection_data: columns::v4::COL_CHAIN_SELECTION_DATA, - col_dispute_coordinator_data: columns::v4::COL_DISPUTE_COORDINATOR_DATA, + col_availability_data: columns::v6::COL_AVAILABILITY_DATA, + col_availability_meta: columns::v6::COL_AVAILABILITY_META, + col_approval_data: columns::v6::COL_APPROVAL_DATA, + col_chain_selection_data: columns::v6::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::v6::COL_DISPUTE_COORDINATOR_DATA, + col_collator_reputation_data: columns::v6::COL_COLLATOR_REPUTATION_DATA, }; #[derive(PartialEq, Copy, Clone)] @@ -123,17 +140,17 @@ pub fn open_creating_rocksdb( let path = root.join("parachains").join("db"); - let mut db_config = DatabaseConfig::with_columns(columns::v4::NUM_COLUMNS); + let mut db_config = DatabaseConfig::with_columns(columns::v6::NUM_COLUMNS); let _ = db_config .memory_budget - .insert(columns::v4::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + .insert(columns::v6::COL_AVAILABILITY_DATA, cache_sizes.availability_data); let _ = db_config .memory_budget - .insert(columns::v4::COL_AVAILABILITY_META, cache_sizes.availability_meta); + .insert(columns::v6::COL_AVAILABILITY_META, cache_sizes.availability_meta); let _ = db_config .memory_budget - .insert(columns::v4::COL_APPROVAL_DATA, cache_sizes.approval_data); + .insert(columns::v6::COL_APPROVAL_DATA, cache_sizes.approval_data); let path_str = path .to_str() @@ -144,7 +161,7 @@ pub fn open_creating_rocksdb( let db = Database::open(&db_config, &path_str)?; let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new( db, - columns::v4::ORDERED_COL, + columns::v6::ORDERED_COL, ); Ok(Arc::new(db)) @@ -164,12 +181,12 @@ pub fn open_creating_paritydb( std::fs::create_dir_all(&path_str)?; upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB, upgrade::CURRENT_VERSION)?; - let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_3_config(&path)) + let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_6_config(&path)) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( db, - columns::v4::ORDERED_COL, + columns::v6::ORDERED_COL, ); Ok(Arc::new(db)) } diff --git a/polkadot/node/service/src/parachains_db/upgrade.rs b/polkadot/node/service/src/parachains_db/upgrade.rs index 3eb701dc88565..2eb414be15920 100644 --- a/polkadot/node/service/src/parachains_db/upgrade.rs +++ b/polkadot/node/service/src/parachains_db/upgrade.rs @@ -40,7 +40,8 @@ const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Version 4 changes approval db format for `OurAssignment`. /// Version 5 changes approval db format to hold some additional /// information about delayed approvals. -pub(crate) const CURRENT_VERSION: Version = 5; +/// Version 6 adds a new column for collator reputation data. +pub(crate) const CURRENT_VERSION: Version = 6; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -111,6 +112,7 @@ pub(crate) fn try_upgrade_db_to_next_version( // 3 -> 4 migration Some(3) => migrate_from_version_3_or_4_to_5(db_path, db_kind, v1_to_latest)?, Some(4) => migrate_from_version_3_or_4_to_5(db_path, db_kind, v2_to_latest)?, + Some(5) => migrate_from_version_5_to_6(db_path, db_kind)?, // Already at current version, do nothing. Some(CURRENT_VERSION) => CURRENT_VERSION, // This is an arbitrary future version, we don't handle it. @@ -229,7 +231,7 @@ where }; gum::info!(target: LOG_TARGET, "Migration complete! "); - Ok(CURRENT_VERSION) + Ok(5) } fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result { @@ -244,6 +246,18 @@ fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result Result { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 5 to version 6 ..."); + match db_kind { + DatabaseKind::ParityDB => parity_db_migrate_from_version_5_to_6(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_5_to_6(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result { @@ -348,11 +362,24 @@ fn paritydb_fix_columns( Ok(()) } +fn rocksdb_migrate_from_version_5_to_6(path: &Path) -> Result { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path + .to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); + let mut db = Database::open(&db_cfg, db_path)?; + + db.add_column()?; + Ok(6) +} + /// Database configuration for version 1. pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - for i in columns::v4::ORDERED_COL { + for i in columns::v5::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -363,7 +390,7 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8); - for i in columns::v4::ORDERED_COL { + for i in columns::v5::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -381,12 +408,21 @@ pub(crate) fn paritydb_version_3_config(path: &Path) -> parity_db::Options { options } +pub(crate) fn paritydb_version_6_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v6::NUM_COLUMNS as u8); + for idx in columns::v6::ORDERED_COL { + options.columns[*idx as usize].btree_index = true; + } + options +} + /// Database configuration for version 0. This is useful just for testing. #[cfg(test)] pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v0::NUM_COLUMNS as u8); - options.columns[super::columns::v4::COL_AVAILABILITY_META as usize].btree_index = true; + options.columns[super::columns::v6::COL_AVAILABILITY_META as usize].btree_index = true; options } @@ -401,7 +437,7 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result { paritydb_fix_columns( path, paritydb_version_1_config(path), - vec![super::columns::v4::COL_DISPUTE_COORDINATOR_DATA], + vec![super::columns::v6::COL_DISPUTE_COORDINATOR_DATA], )?; Ok(1) @@ -427,6 +463,15 @@ fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result { Ok(3) } +/// Migration from version 5 to version 6: +/// - add a new column for reputation +fn parity_db_migrate_from_version_5_to_6(path: &Path) -> Result { + let mut options = paritydb_version_3_config(path); + parity_db::Db::add_column(&mut options, parity_db::ColumnOptions::default()) + .map_err(|e| other_io_error(format!("Error adding a new column {:?}", e)))?; + Ok(6) +} + /// Remove the lock file. If file is locked, it will wait up to 1s. #[cfg(test)] pub fn remove_file_lock(path: &std::path::Path) { @@ -455,7 +500,7 @@ pub fn remove_file_lock(path: &std::path::Path) { #[cfg(test)] mod tests { use super::{ - columns::{v2::COL_SESSION_WINDOW_DATA, v4::*}, + columns::{v2::COL_SESSION_WINDOW_DATA, v6::*}, *, }; use kvdb_rocksdb::{Database, DatabaseConfig}; @@ -559,7 +604,7 @@ mod tests { // We need to properly set db version for upgrade to work. fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version"); { - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); db.write(DBTransaction { ops: vec![DBOp::Insert { col: COL_DISPUTE_COORDINATOR_DATA, @@ -577,7 +622,7 @@ mod tests { assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); assert_eq!( db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(), @@ -624,9 +669,9 @@ mod tests { try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); v1_to_latest_sanity_check(std::sync::Arc::new(db), approval_cfg, expected_candidates) .unwrap(); @@ -655,9 +700,9 @@ mod tests { try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); v1_to_latest_sanity_check(std::sync::Arc::new(db), approval_cfg, expected_candidates) .unwrap(); @@ -673,10 +718,10 @@ mod tests { fs::write(version_file_path(db_dir.path()), "0").expect("Failed to write DB version"); try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - assert_eq!(db.num_columns(), columns::v4::NUM_COLUMNS); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS); } #[test] @@ -697,7 +742,7 @@ mod tests { try_upgrade_db(&path, DatabaseKind::ParityDB, 5).unwrap(); let db = Db::open(&paritydb_version_3_config(&path)).unwrap(); - assert_eq!(db.num_columns(), columns::v4::NUM_COLUMNS as u8); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS as u8); } #[test] @@ -755,4 +800,38 @@ mod tests { assert_eq!(db.num_columns(), super::columns::v3::NUM_COLUMNS); } + + #[test] + fn test_paritydb_migrate_5_to_6() { + use parity_db::Db; + + let db_dir = tempfile::tempdir().unwrap(); + fs::write(version_file_path(db_dir.path()), "5").expect("Failed to write DB version."); + { + let db = Db::open_or_create(&paritydb_version_3_config(&db_dir.path())).unwrap(); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS as u8); + } + try_upgrade_db(db_dir.path(), DatabaseKind::ParityDB, 6).unwrap(); + let db = Db::open(&paritydb_version_6_config(&db_dir.path())).unwrap(); + assert_eq!(db.num_columns(), columns::v6::NUM_COLUMNS as u8); + } + + #[test] + fn test_rocksdb_migrate_5_to_6() { + let db_dir = tempfile::tempdir().unwrap(); + let db_path = db_dir.path().to_str().unwrap(); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); + + { + let db = Database::open(&db_cfg, db_path).unwrap(); + assert_eq!(db.num_columns(), super::columns::v5::NUM_COLUMNS); + } + fs::write(version_file_path(db_dir.path()), "5").expect("Failed to write DB version."); + try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 6).unwrap(); + + let db_cfg = DatabaseConfig::with_columns(super::columns::v6::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path).unwrap(); + + assert_eq!(db.num_columns(), super::columns::v6::NUM_COLUMNS); + } } diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index 1a27683e8c58b..8141e08973989 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -103,6 +103,7 @@ pub fn new_full( invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, experimental_collator_protocol: false, + collator_reputation_persist_interval: None, }; match config.network.network_backend { diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 6bcbd723485b9..0562db150d2f4 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -102,6 +102,7 @@ fn main() -> Result<()> { invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, experimental_collator_protocol: false, + collator_reputation_persist_interval: None, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 81187e3b32e24..372201525590b 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -114,6 +114,7 @@ fn main() -> Result<()> { invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, experimental_collator_protocol: false, + collator_reputation_persist_interval: None, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/src/lib.rs b/polkadot/parachain/test-parachains/undying/src/lib.rs index bfac3629f9bf6..764c6eca63923 100644 --- a/polkadot/parachain/test-parachains/undying/src/lib.rs +++ b/polkadot/parachain/test-parachains/undying/src/lib.rs @@ -166,8 +166,12 @@ pub fn execute( ); if block_data.experimental_send_approved_peer { + // Create a valid PeerId in multihash format: [hash_code, digest_size, ...digest_bytes] + // Using multihash code 0x0 (identity hash) with 32 bytes of data + let mut peer_id_bytes = alloc::vec![0x0, 32]; // hash code 0x0, size 32 + peer_id_bytes.extend_from_slice(&[1u8; 32]); // 32 bytes of data upward_messages - .force_push(UMPSignal::ApprovedPeer(alloc::vec![1, 2, 3].try_into().unwrap()).encode()); + .force_push(UMPSignal::ApprovedPeer(peer_id_bytes.try_into().unwrap()).encode()); } // We need to clone the block data as the fn will mutate it's state. diff --git a/polkadot/zombienet-sdk-tests/Cargo.toml b/polkadot/zombienet-sdk-tests/Cargo.toml index d97326b0aaf01..9b78898b1fb46 100644 --- a/polkadot/zombienet-sdk-tests/Cargo.toml +++ b/polkadot/zombienet-sdk-tests/Cargo.toml @@ -19,6 +19,7 @@ log = { workspace = true } pallet-revive = { workspace = true, features = ["std"] } polkadot-primitives = { workspace = true, default-features = true } rand = { workspace = true } +regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sp-core = { workspace = true } diff --git a/polkadot/zombienet-sdk-tests/tests/functional/collators_reputation_persistence.rs b/polkadot/zombienet-sdk-tests/tests/functional/collators_reputation_persistence.rs new file mode 100644 index 0000000000000..a8a0417110771 --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/collators_reputation_persistence.rs @@ -0,0 +1,533 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +//! Comprehensive Collator Reputation Persistence Test +//! +//! This test verifies multiple aspects of the collator reputation persistence system: +//! 1. Basic persistence on graceful shutdown +//! 2. Startup lookback with different gap sizes +//! 3. Pruning when parachains are deregistered +//! +//! ## Test Phases +//! +//! ### Phase 1: Large Gap Lookback (>= 20 blocks) +//! - Spawn network with 2 parachains +//! - Wait for initial persistence +//! - Pause validator-0 to create a 30+ block gap +//! - Restart and verify lookback processes MAX_STARTUP_ANCESTRY_LOOKBACK blocks +//! +//! ### Phase 2: Small Gap Lookback (< 20 blocks) +//! - Pause validator-0 again +//! - Create a ~10 block gap +//! - Restart and verify lookback processes the entire gap +//! +//! ### Phase 3: Pruning on Parachain Deregistration +//! - Deregister parachain 2001 using sudo +//! - Wait for session change (triggers pruning) +//! - Verify pruning logs show para 2001 removed +//! - Restart validator-0 +//! - Verify only para 2000's reputation was loaded +//! +//! ## Success Criteria +//! - All persistence and restart operations succeed +//! - Lookback correctly handles both large and small gaps +//! - Pruning removes deregistered parachain data +//! - System continues normal operation after each phase + +use anyhow::anyhow; +use regex::Regex; +use tokio::time::Duration; + +use cumulus_zombienet_sdk_helpers::{assert_para_throughput, wait_for_first_session_change}; +use polkadot_primitives::Id as ParaId; +use serde_json::json; +use zombienet_orchestrator::network::node::LogLineCountOptions; +use zombienet_sdk::{ + subxt::{ext::scale_value::value, OnlineClient, PolkadotConfig}, + subxt_signer::sr25519::dev, + NetworkConfigBuilder, +}; + +const PARA_ID_1: u32 = 2000; +const PARA_ID_2: u32 = 2001; + +#[tokio::test(flavor = "multi_thread")] +async fn comprehensive_reputation_persistence_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + // === Network Setup === + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![ + ("-lparachain=debug,parachain::collator-protocol=trace").into(), + ("--experimental-collator-protocol").into(), + ("--collator-reputation-persist-interval").into(), + ("30").into(), + ]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4, + "num_cores": 2 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..4).fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(PARA_ID_1) + .with_default_command("undying-collator") + .cumulus_based(false) + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .with_default_args(vec![ + ("-lparachain=debug").into(), + ("--experimental-send-approved-peer").into(), + ]) + .with_collator(|n| n.with_name("collator-1")) + }) + .with_parachain(|p| { + p.with_id(PARA_ID_2) + .with_default_command("undying-collator") + .cumulus_based(false) + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .with_default_args(vec![ + ("-lparachain=debug").into(), + ("--experimental-send-approved-peer").into(), + ]) + .with_collator(|n| n.with_name("collator-2")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let validator_0 = network.get_node("validator-0")?; + let validator0_client: OnlineClient = validator_0.wait_client().await?; + + // Verify fresh start (no existing data) + verify_db_initialized(validator_0, 1, 0).await?; + + log::info!("Network spawned, waiting for both parachains to produce blocks"); + assert_para_throughput( + &validator0_client, + 10, + [(ParaId::from(PARA_ID_1), 8..11), (ParaId::from(PARA_ID_2), 8..11)], + ) + .await?; + + // Wait for initial persistence + log::info!("Parachains producing blocks, waiting for initial periodic persistence"); + wait_for_persistence(validator_0, 1).await?; + + log::info!("Pausing validator-0 to create a block gap"); + validator_0.pause().await?; + + let block_at_persistence = extract_last_finalized_from_logs(validator_0).await?; + log::info!("Initial persistence completed at finalized block {}", block_at_persistence); + + log::info!("=== Phase 1: Testing Startup Lookback with Large Gap (>= 20 blocks) ==="); + + let validator_1 = network.get_node("validator-1")?; + let validator_1_client: OnlineClient = validator_1.wait_client().await?; + let mut finalized_blocks_1 = validator_1_client.blocks().subscribe_finalized().await?; + + let target_gap = 30u32; + let block_at_restart = + wait_for_block_gap(&mut finalized_blocks_1, block_at_persistence, target_gap, "large gap") + .await?; + + log::info!("Restarting validator-0 (first restart - large gap)"); + validator_0.restart(None).await?; + let _: OnlineClient = validator_0.wait_client().await?; + + // Verify loaded with both paras' reputation + verify_db_initialized(validator_0, 2, 2).await?; + + let blocks_processed = verify_lookback_completed(validator_0, 1).await?; + assert_eq!( + blocks_processed, 20, + "Expected blocks_processed ({blocks_processed}) == MAX_STARTUP_ANCESTRY_LOOKBACK (20)", + ); + log::info!( + "Phase 1 passed: Lookback processed {blocks_processed} blocks (capped at MAX), actual gap was {}", + block_at_restart.saturating_sub(block_at_persistence) + ); + + let relay_client: OnlineClient = validator_0.wait_client().await?; + wait_for_peer_reconnection(&relay_client).await?; + + // Verify validator resumes normal operation + assert_para_throughput( + &relay_client, + 5, + [(ParaId::from(PARA_ID_1), 3..7), (ParaId::from(PARA_ID_2), 3..7)], + ) + .await?; + + log::info!("=== Phase 2: Testing Startup Lookback with Small Gap (< 20 blocks) ==="); + + // Wait for another persistence to get a precise starting point + wait_for_persistence(validator_0, 2).await?; + + validator_0.pause().await?; + log::info!("Pausing validator-0 again to create a smaller gap"); + + let block_before_second_pause = extract_last_finalized_from_logs(validator_0).await?; + log::info!("Second persistence completed at finalized block {}", block_before_second_pause); + + // Fresh subscription to avoid stale buffered blocks from Phase 1 + let mut finalized_blocks_2 = validator_1_client.blocks().subscribe_finalized().await?; + + let small_gap_target = 10u32; + let block_at_second_restart = wait_for_block_gap( + &mut finalized_blocks_2, + block_before_second_pause, + small_gap_target, + "small gap", + ) + .await?; + + log::info!("Restarting validator-0 (second restart - small gap)"); + validator_0.restart(None).await?; + let validator0_client: OnlineClient = validator_0.wait_client().await?; + wait_for_peer_reconnection(&validator0_client).await?; + + // Verify loaded with both paras' reputation + verify_db_initialized(validator_0, 3, 2).await?; + let processed_second = verify_lookback_completed(validator_0, 2).await?; + let expected_gap = block_at_second_restart.saturating_sub(block_before_second_pause); + + assert!(expected_gap < 20, "Expected second gap to be < 20, but got {expected_gap}"); + + // The key invariant: lookback should NOT be capped at MAX (20), + // and should process approximately the gap. Cross-node timing + // differences mean the exact value can drift, so use wide bounds. + assert!( + processed_second < 20, + "Small gap should process fewer than MAX_STARTUP_ANCESTRY_LOOKBACK blocks, got {processed_second}", + ); + assert!( + processed_second >= expected_gap.saturating_sub(6), + "Expected lookback to process approximately {expected_gap} blocks, got {processed_second}", + ); + + log::info!( + "Phase 2 passed: Lookback processed {} blocks (entire gap of {})", + processed_second, + expected_gap + ); + + log::info!("=== Phase 3: Testing Pruning on Parachain Deregistration ==="); + + // Wait for another persistence to ensure both paras are on disk + wait_for_persistence(validator_0, 4).await?; + + // Verify both paras have reputation before pruning + let para_count_before = extract_para_count_from_persistence_logs(validator_0).await?; + log::info!("Before pruning: para_count={}", para_count_before); + assert_eq!( + para_count_before, 2, + "Expected 2 paras with reputation before pruning, but found {para_count_before}", + ); + + // Deregister parachain 2001 + log::info!( + "Deregistering parachain {} using ParasSudoWrapper::sudo_schedule_para_cleanup", + PARA_ID_2 + ); + let alice = dev::alice(); + let cleanup_calls = vec![ + value! { + ParasSudoWrapper(sudo_schedule_para_cleanup { id: PARA_ID_2 }) + }, + value! { + Paras(force_queue_action { para: PARA_ID_2 }) + }, + ]; + let sudo_batch_call = zombienet_sdk::subxt::tx::dynamic( + "Sudo", + "sudo", + vec![value! { + Utility(batch_all { calls: cleanup_calls }) + }], + ); + + let tx_progress = validator0_client + .tx() + .sign_and_submit_then_watch_default(&sudo_batch_call, &alice) + .await?; + let _finalized = tx_progress.wait_for_finalized_success().await?; + log::info!("Para cleanup scheduled successfully"); + + // Stop the collator for para 2001 + log::info!("Stopping collator-2 for the deregistered parachain {}", PARA_ID_2); + let collator_2 = network.get_node("collator-2")?; + collator_2.pause().await?; + + // Wait for session change to trigger pruning + log::info!("Waiting for session change to trigger pruning"); + let mut best_blocks = validator0_client.blocks().subscribe_best().await?; + wait_for_first_session_change(&mut best_blocks).await?; + log::info!("Session change detected, para {} should now be offboarded", PARA_ID_2); + + // Verify pruning happened + verify_pruning(validator_0).await?; + log::info!("Pruning verified: pruned 1 para, 1 remaining"); + + // Restart validator-0 to verify only para 2000's reputation loads + log::info!("Restarting validator-0 to verify pruned state persisted"); + validator_0.restart(None).await?; + let validator0_client_after: OnlineClient = validator_0.wait_client().await?; + wait_for_peer_reconnection(&validator0_client_after).await?; + + // Verify loaded with only para 2000's reputation (para 2001 pruned) + verify_db_initialized(validator_0, 4, 1).await?; + + // Double-check via log parsing (redundant but shows consistency) + let para_count_after = extract_para_count_from_init_logs(validator_0).await?; + log::info!("After restart: para_count={}", para_count_after); + assert!( + para_count_after <= 1, + "Expected at most 1 para after pruning, but found {para_count_after}", + ); + + // Verify para 2000 continues normal operation + log::info!("Verifying para {} continues normal operation", PARA_ID_1); + assert_para_throughput(&validator0_client_after, 5, [(ParaId::from(PARA_ID_1), 3..7)]).await?; + + log::info!("Phase 3 passed: Pruning successfully removed deregistered parachain"); + Ok(()) +} + +// === Helper Functions === + +/// Wait for a few finalized blocks to allow peers to reconnect after a node restart. +async fn wait_for_peer_reconnection( + client: &OnlineClient, +) -> Result<(), anyhow::Error> { + log::info!("Waiting for peers to reconnect after restart"); + let mut blocks = client.blocks().subscribe_finalized().await?; + for _ in 0..3 { + let _ = blocks.next().await; + } + Ok(()) +} + +async fn verify_db_initialized( + validator: &zombienet_sdk::NetworkNode, + expected_count: u32, + expected_para_count: u32, +) -> Result<(), anyhow::Error> { + let result = validator + .wait_log_line_count_with_timeout( + "Reputation DB initialized", + false, + LogLineCountOptions::new(move |n| n >= expected_count, Duration::from_secs(60), false), + ) + .await?; + assert!( + result.success(), + "Expected validator to log 'Reputation DB initialized' (count >= {expected_count})", + ); + + // Parse and verify para_count + let logs = validator.logs().await?; + let init_re = Regex::new(r"Reputation DB initialized.*para_count=(\d+)")?; + + let mut para_count: Option = None; + for line in logs.lines().rev() { + if let Some(caps) = init_re.captures(line) { + para_count = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if para_count.is_some() { + break; + } + } + } + + let actual = para_count.ok_or(anyhow!("Could not parse para_count from init log"))?; + assert_eq!( + actual, expected_para_count, + "Expected para_count={expected_para_count}, but got {actual}", + ); + log::info!("DB initialization verified: para_count={}", actual); + + Ok(()) +} + +async fn wait_for_persistence( + validator: &zombienet_sdk::NetworkNode, + expected_count: u32, +) -> Result<(), anyhow::Error> { + let result = validator + .wait_log_line_count_with_timeout( + "Periodic persistence completed:", + false, + LogLineCountOptions::new(move |n| n >= expected_count, Duration::from_secs(60), false), + ) + .await?; + assert!( + result.success(), + "Periodic persistence should have completed (count >= {expected_count})", + ); + Ok(()) +} + +async fn verify_lookback_completed( + validator: &zombienet_sdk::NetworkNode, + expected_count: u32, +) -> Result { + let result = validator + .wait_log_line_count_with_timeout( + "Startup lookback completed", + false, + LogLineCountOptions::new(move |n| n >= expected_count, Duration::from_secs(30), false), + ) + .await?; + assert!( + result.success(), + "Expected 'Startup lookback completed' log (count >= {expected_count})", + ); + + let logs = validator.logs().await?; + let lookback_re = Regex::new(r"Startup lookback completed.*blocks_processed=(\d+)")?; + + // Find the last occurrence (most recent) + let mut blocks_processed: Option = None; + for line in logs.lines().rev() { + if let Some(caps) = lookback_re.captures(line) { + blocks_processed = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if blocks_processed.is_some() { + break; + } + } + } + + blocks_processed.ok_or(anyhow!("Could not parse blocks_processed from lookback log")) +} + +async fn extract_last_finalized_from_logs( + validator: &zombienet_sdk::NetworkNode, +) -> Result { + let logs = validator.logs().await?; + let persistence_re = + Regex::new(r"Periodic persistence completed:.*last_finalized=Some\((\d+)\)")?; + + // Find the last occurrence + let mut last_finalized: Option = None; + for line in logs.lines().rev() { + if let Some(caps) = persistence_re.captures(line) { + last_finalized = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if last_finalized.is_some() { + break; + } + } + } + + last_finalized.ok_or(anyhow!("Could not parse last_finalized from persistence log")) +} + +async fn extract_para_count_from_persistence_logs( + validator: &zombienet_sdk::NetworkNode, +) -> Result { + let logs = validator.logs().await?; + let para_count_re = Regex::new(r"Periodic persistence completed:.* para_count=(\d+)")?; + + let mut para_count: Option = None; + for line in logs.lines().rev() { + if let Some(caps) = para_count_re.captures(line) { + para_count = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if para_count.is_some() { + break; + } + } + } + + para_count.ok_or(anyhow!("Could not parse para_count from persistence log")) +} + +async fn extract_para_count_from_init_logs( + validator: &zombienet_sdk::NetworkNode, +) -> Result { + let logs = validator.logs().await?; + let init_re = Regex::new(r"Reputation DB initialized.*para_count=(\d+)")?; + + let mut para_count: Option = None; + for line in logs.lines().rev() { + if let Some(caps) = init_re.captures(line) { + para_count = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if para_count.is_some() { + break; + } + } + } + + para_count.ok_or(anyhow!("Could not parse para_count from init log")) +} + +async fn wait_for_block_gap( + finalized_blocks: &mut zombienet_sdk::subxt::backend::StreamOfResults< + zombienet_sdk::subxt::blocks::Block>, + >, + start_block: u32, + target_gap: u32, + gap_name: &str, +) -> Result { + log::info!("Waiting for {} blocks while validator-0 is paused", target_gap); + let mut current_block = start_block; + while current_block < start_block + target_gap { + if let Some(Ok(block)) = finalized_blocks.next().await { + current_block = block.number(); + log::info!( + "Finalized block {} (gap: {})", + current_block, + current_block.saturating_sub(start_block) + ); + } + } + log::info!( + "{} created: finalized block now at {}, gap of {} blocks", + gap_name, + current_block, + current_block.saturating_sub(start_block) + ); + Ok(current_block) +} + +async fn verify_pruning(validator: &zombienet_sdk::NetworkNode) -> Result<(), anyhow::Error> { + let result = validator + .wait_log_line_count_with_timeout( + "Prune paras persisted to disk immediately pruned_para_count=1 remaining_para_count=1 registered_para_count=1", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(90), false), + ) + .await?; + assert!( + result.success(), + "Expected validator to log pruning with pruned=1, remaining=1, registered=1" + ); + Ok(()) +} diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index e28cfb4039303..50dba5d39a436 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -4,6 +4,7 @@ mod approval_voting_coalescing; mod approved_peer_mixed_validators; mod async_backing_6_seconds_rate; +mod collators_reputation_persistence; mod dispute_old_finalized; mod duplicate_collations; mod shared_core_idle_parachain; diff --git a/prdoc/pr_10917.prdoc b/prdoc/pr_10917.prdoc new file mode 100644 index 0000000000000..b52125d51f42f --- /dev/null +++ b/prdoc/pr_10917.prdoc @@ -0,0 +1,37 @@ +title: Implement persistent reputation database for collator protocol (#7751) +doc: +- audience: Node Dev + description: |- + Implements persistent storage for the experimental collator protocol's reputation database. + + Changes: + + - Adds `PersistentDb` wrapper that persists the in-memory reputation DB to disk + - Periodic persistence every 10 minutes + - Adds `--collator-reputation-persist-interval` CLI to specify the persistence interval in seconds. + - Immediate persistence on slashes and parachain deregistration + - Loads existing state on startup with lookback for missed blocks + + Implementation: + + `PersistentDb` wraps the existing `Db` and adds persistence on top: + + - All reputation logic (scoring, decay, LRU) stays in `Db` + - Persistence layer handles disk I/O and serialization + - Per-para data stored in parachains_db + + Tests: + + - `basic_persistence.rs`: Validates persistence across restarts and startup lookback + - `pruning.rs`: Validates automatic cleanup on parachain deregistration +crates: +- name: polkadot-collator-protocol + bump: major +- name: polkadot-service + bump: major +- name: polkadot + bump: major +- name: polkadot-cli + bump: major +- name: cumulus-relay-chain-inprocess-interface + bump: major