diff --git a/Cargo.lock b/Cargo.lock index d03214c2e8d..7180dbfca71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10446,26 +10446,42 @@ dependencies = [ name = "subspace-test-service" version = "0.1.0" dependencies = [ + "async-trait", "frame-system", "futures 0.3.27", + "futures-timer", "pallet-balances", "pallet-transaction-payment", "rand 0.8.5", + "sc-block-builder", "sc-cli", "sc-client-api", + "sc-consensus", + "sc-consensus-fraud-proof", "sc-consensus-slots", "sc-executor", "sc-network", "sc-network-common", "sc-service", "sc-tracing", + "sc-utils", + "sp-api", + "sp-application-crypto", "sp-arithmetic", "sp-blockchain", + "sp-consensus", + "sp-consensus-slots", + "sp-consensus-subspace", + "sp-inherents", "sp-keyring", "sp-runtime", + "sp-timestamp", + "subspace-core-primitives", + "subspace-fraud-proof", "subspace-networking", "subspace-runtime-primitives", "subspace-service", + "subspace-solving", "subspace-test-client", "subspace-test-runtime", "subspace-transaction-pool", @@ -10473,6 +10489,7 @@ dependencies = [ "substrate-test-utils", "tempfile", "tokio", + "tracing", ] [[package]] diff --git a/domains/client/domain-executor/src/tests.rs b/domains/client/domain-executor/src/tests.rs index e011bd4d19f..e3422a74e41 100644 --- a/domains/client/domain-executor/src/tests.rs +++ b/domains/client/domain-executor/src/tests.rs @@ -21,6 +21,7 @@ use sp_runtime::generic::{BlockId, Digest, DigestItem}; use sp_runtime::traits::{BlakeTwo256, Hash as HashT, Header as HeaderT}; use std::collections::HashSet; use subspace_core_primitives::BlockNumber; +use subspace_test_service::mock::MockPrimaryNode; use subspace_wasm_tools::read_core_domain_runtime_blob; use tempfile::TempDir; @@ -35,14 +36,11 @@ async fn test_executor_full_node_catching_up() { let tokio_handle = tokio::runtime::Handle::current(); // Start Ferdie - let (ferdie, ferdie_network_starter) = run_primary_chain_validator_node( + let mut ferdie = MockPrimaryNode::run_mock_primary_node( tokio_handle.clone(), Ferdie, - vec![], BasePath::new(directory.path().join("ferdie")), - ) - .await; - ferdie_network_starter.start_network(); + ); // Run Alice (a system domain authority node) let alice = domain_test_service::SystemDomainNodeBuilder::new( @@ -50,8 +48,7 @@ async fn test_executor_full_node_catching_up() { Alice, BasePath::new(directory.path().join("alice")), ) - .connect_to_primary_chain_node(&ferdie) - .build(Role::Authority, false, false) + .build_with_mock_primary_node(Role::Authority, &mut ferdie) .await; // Run Bob (a system domain full node) @@ -60,12 +57,17 @@ async fn test_executor_full_node_catching_up() { Bob, BasePath::new(directory.path().join("bob")), ) - .connect_to_primary_chain_node(&ferdie) - .build(Role::Full, false, false) + .build_with_mock_primary_node(Role::Full, &mut ferdie) .await; // Bob is able to sync blocks. - futures::future::join(alice.wait_for_blocks(3), bob.wait_for_blocks(3)).await; + futures::join!( + alice.wait_for_blocks(3), + bob.wait_for_blocks(3), + ferdie.produce_n_blocks(3), + ) + .2 + .unwrap(); let alice_block_hash = alice .client diff --git a/domains/test/service/src/lib.rs b/domains/test/service/src/lib.rs index 8d1db2dfea8..a1089b49ddc 100644 --- a/domains/test/service/src/lib.rs +++ b/domains/test/service/src/lib.rs @@ -51,6 +51,7 @@ use std::sync::Arc; use subspace_networking::libp2p::identity; use subspace_runtime_primitives::opaque::Block as PBlock; use subspace_service::{DsnConfig, SubspaceNetworking}; +use subspace_test_service::mock::MockPrimaryNode; use substrate_test_client::{ BlockchainEventsExt, RpcHandlersExt, RpcTransactionError, RpcTransactionOutput, }; @@ -104,6 +105,7 @@ pub type Client = /// /// A primary chain full node and system domain node will be started, similar to the behaviour in /// the production. +/// TODO: remove once all the existing tests integrated with `MockPrimaryNode` #[sc_tracing::logging::prefix_logs_with(system_domain_config.network.node_name.as_str())] async fn run_executor( system_domain_config: ServiceConfiguration, @@ -262,6 +264,93 @@ async fn run_executor( )) } +/// Start an executor with the given system domain `Configuration` and the mock primary node. +#[sc_tracing::logging::prefix_logs_with(system_domain_config.network.node_name.as_str())] +async fn run_executor_with_mock_primary_node( + system_domain_config: ServiceConfiguration, + mock_primary_node: &mut MockPrimaryNode, +) -> sc_service::error::Result<( + TaskManager, + Arc, + Arc, + Arc, + Arc>, + RpcHandlers, + Executor, +)> { + let (gossip_msg_sink, gossip_msg_stream) = + sc_utils::mpsc::tracing_unbounded("cross_domain_gossip_messages", 100); + let system_domain_config = DomainConfiguration { + service_config: system_domain_config, + maybe_relayer_id: None, + }; + let executor_streams = ExecutorStreams { + primary_block_import_throttling_buffer_size: 10, + subspace_imported_block_notification_stream: mock_primary_node + .imported_block_notification_stream(), + client_imported_block_notification_stream: mock_primary_node + .client + .every_import_notification_stream(), + new_slot_notification_stream: mock_primary_node.new_slot_notification_stream(), + _phantom: Default::default(), + }; + let system_domain_node = domain_service::new_full_system::< + _, + _, + _, + _, + _, + _, + domain_test_runtime::RuntimeApi, + RuntimeExecutor, + >( + system_domain_config, + mock_primary_node.client.clone(), + MockPrimaryNode::sync_oracle(), + &mock_primary_node.select_chain, + executor_streams, + gossip_msg_sink, + ) + .await?; + + let domain_service::NewFullSystem { + task_manager, + client, + backend, + code_executor, + network, + network_starter, + rpc_handlers, + executor, + tx_pool_sink, + } = system_domain_node; + + let mut domain_tx_pool_sinks = BTreeMap::new(); + domain_tx_pool_sinks.insert(DomainId::SYSTEM, tx_pool_sink); + let cross_domain_message_gossip_worker = + GossipWorker::::new(network.clone(), domain_tx_pool_sinks); + + task_manager + .spawn_essential_handle() + .spawn_essential_blocking( + "cross-domain-gossip-message-worker", + None, + Box::pin(cross_domain_message_gossip_worker.run(gossip_msg_stream)), + ); + + network_starter.start_network(); + + Ok(( + task_manager, + client, + backend, + code_executor, + network, + rpc_handlers, + executor, + )) +} + /// A Cumulus test node instance used for testing. pub struct SystemDomainNode { /// TaskManager's instance. @@ -371,6 +460,7 @@ impl SystemDomainNodeBuilder { } /// Build the [`SystemDomainNode`]. + /// TODO: remove once all the existing tests integrated with `MockPrimaryNode` pub async fn build( self, role: Role, @@ -420,6 +510,43 @@ impl SystemDomainNodeBuilder { executor, } } + + /// Build the [`SystemDomainNode`] with `MockPrimaryNode` as the embedded primary node. + pub async fn build_with_mock_primary_node( + self, + role: Role, + mock_primary_node: &mut MockPrimaryNode, + ) -> SystemDomainNode { + let system_domain_config = node_config( + self.tokio_handle.clone(), + self.key, + self.system_domain_nodes, + self.system_domain_nodes_exclusive, + role, + BasePath::new(self.base_path.path().join("system")), + ) + .expect("could not generate system domain node Configuration"); + + let multiaddr = system_domain_config.network.listen_addresses[0].clone(); + let (task_manager, client, backend, code_executor, network, rpc_handlers, executor) = + run_executor_with_mock_primary_node(system_domain_config, mock_primary_node) + .await + .expect("could not start system domain node"); + + let peer_id = network.local_peer_id(); + let addr = MultiaddrWithPeerId { multiaddr, peer_id }; + + SystemDomainNode { + task_manager, + client, + backend, + code_executor, + network, + addr, + rpc_handlers, + executor, + } + } } /// Create a system domain node `Configuration`. @@ -610,6 +737,7 @@ pub fn construct_extrinsic( /// /// This is essentially a wrapper around /// [`run_validator_node`](subspace_test_service::run_validator_node). +/// TODO: remove once all the existing tests integrated with `MockPrimaryNode` pub async fn run_primary_chain_validator_node( tokio_handle: tokio::runtime::Handle, key: Sr25519Keyring, diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index c02f0fb5afd..3b952f17a7f 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -139,7 +139,7 @@ pub const MILLISECS_PER_BLOCK: u64 = 2000; // NOTE: Currently it is not possible to change the slot duration after the chain has started. // Attempting to do so will brick block production. -const SLOT_DURATION: u64 = 2000; +pub const SLOT_DURATION: u64 = 2000; /// 1 in 6 slots (on average, not counting collisions) will have a block. /// Must match ratio between block and slot duration in constants above. diff --git a/test/subspace-test-service/Cargo.toml b/test/subspace-test-service/Cargo.toml index 61b562296a8..9e9b9b26e7f 100644 --- a/test/subspace-test-service/Cargo.toml +++ b/test/subspace-test-service/Cargo.toml @@ -15,30 +15,47 @@ include = [ targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-trait = "0.1.58" frame-system = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } futures = "0.3.26" +futures-timer = "3.0.1" rand = "0.8.5" pallet-balances = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } pallet-transaction-payment = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sc-block-builder = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sc-client-api = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sc-consensus-slots = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sc-executor = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sc-consensus-fraud-proof = { version = "0.1.0", path = "../../crates/sc-consensus-fraud-proof" } sc-network = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sc-network-common = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sc-service = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false } sc-tracing = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sc-utils = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-application-crypto = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sp-arithmetic = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sp-blockchain = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-consensus = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-consensus-subspace = { version = "0.1.0", path = "../../crates/sp-consensus-subspace" } +sp-consensus-slots = { version = "0.10.0-dev", default-features = false, git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sp-keyring = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-timestamp = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +sp-inherents = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } sp-runtime = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } +subspace-core-primitives = { version = "0.1.0", default-features = false, path = "../../crates/subspace-core-primitives" } +subspace-fraud-proof = { path = "../../crates/subspace-fraud-proof" } subspace-networking = { path = "../../crates/subspace-networking" } subspace-runtime-primitives = { path = "../../crates/subspace-runtime-primitives" } subspace-service = { path = "../../crates/subspace-service" } +subspace-solving = { version = "0.1.0", default-features = false, path = "../../crates/subspace-solving" } subspace-test-client = { path = "../subspace-test-client" } subspace-test-runtime = { version = "0.1.0", features = ["do-not-enforce-cost-of-storage"], path = "../subspace-test-runtime" } subspace-transaction-pool = { path = "../../crates/subspace-transaction-pool" } substrate-test-client = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" } tokio = "1.25.0" +tracing = "0.1.37" [dev-dependencies] sc-cli = { git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false } diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index d73b99edeae..e193a2720e8 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -56,6 +56,11 @@ use substrate_test_client::{ BlockchainEventsExt, RpcHandlersExt, RpcTransactionError, RpcTransactionOutput, }; +/// TODO: Replace `PrimaryTestNode` with `mock::MockPrimaryNode` once all the existing tests +/// integrated with the new testing framework. +#[allow(dead_code)] +pub mod mock; + /// Create a Subspace `Configuration`. /// /// By default an in-memory socket will be used, therefore you need to provide boot diff --git a/test/subspace-test-service/src/mock.rs b/test/subspace-test-service/src/mock.rs new file mode 100644 index 00000000000..f59b81951ae --- /dev/null +++ b/test/subspace-test-service/src/mock.rs @@ -0,0 +1,399 @@ +use crate::node_config; +use futures::channel::mpsc; +use futures::{select, FutureExt, StreamExt}; +use sc_block_builder::BlockBuilderProvider; +use sc_client_api::backend; +use sc_consensus::block_import::{ + BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, +}; +use sc_consensus::{BlockImport, BoxBlockImport, StateAction}; +use sc_executor::NativeElseWasmExecutor; +use sc_service::{BasePath, InPoolTransaction, TaskManager, TransactionPool}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_api::{ApiExt, HashT, HeaderT, ProvideRuntimeApi, TransactionFor}; +use sp_application_crypto::UncheckedFrom; +use sp_blockchain::HeaderBackend; +use sp_consensus::{BlockOrigin, CacheKeyId, Error as ConsensusError, NoNetwork, SyncOracle}; +use sp_consensus_slots::Slot; +use sp_consensus_subspace::digests::{CompatibleDigestItem, PreDigest}; +use sp_consensus_subspace::FarmerPublicKey; +use sp_inherents::{InherentData, InherentDataProvider}; +use sp_keyring::Sr25519Keyring; +use sp_runtime::generic::Digest; +use sp_runtime::traits::{BlakeTwo256, Block as BlockT, NumberFor}; +use sp_runtime::DigestItem; +use sp_timestamp::Timestamp; +use std::collections::HashMap; +use std::error::Error; +use std::sync::Arc; +use std::time; +use subspace_core_primitives::{Blake2b256Hash, Solution}; +use subspace_runtime_primitives::opaque::Block; +use subspace_runtime_primitives::{AccountId, Hash}; +use subspace_service::FullSelectChain; +use subspace_solving::create_chunk_signature; +use subspace_test_client::{Backend, Client, FraudProofVerifier, TestExecutorDispatch}; +use subspace_test_runtime::{RuntimeApi, SLOT_DURATION}; +use subspace_transaction_pool::bundle_validator::BundleValidator; +use subspace_transaction_pool::FullPool; + +type StorageChanges = sp_api::StorageChanges, Block>; + +/// A mock Subspace primary node instance used for testing. +pub struct MockPrimaryNode { + /// `TaskManager`'s instance. + pub task_manager: TaskManager, + /// Client's instance. + pub client: Arc, + /// Backend. + pub backend: Arc, + /// Code executor. + pub executor: NativeElseWasmExecutor, + /// Transaction pool. + pub transaction_pool: + Arc>>, + /// The SelectChain Strategy + pub select_chain: FullSelectChain, + /// The next slot number + next_slot: u64, + /// The slot notification subscribers + new_slot_notification_subscribers: Vec>, + /// Block import pipeline + block_import: + MockBlockImport>, Client, Block>, + /// Mock subspace solution used to mock the subspace `PreDigest` + mock_solution: Solution, +} + +impl MockPrimaryNode { + /// Run a mock primary node + pub fn run_mock_primary_node( + tokio_handle: tokio::runtime::Handle, + key: Sr25519Keyring, + base_path: BasePath, + ) -> MockPrimaryNode { + let config = node_config(tokio_handle, key, vec![], false, false, false, base_path); + + let executor = NativeElseWasmExecutor::::new( + config.wasm_method, + config.default_heap_pages, + config.max_runtime_instances, + config.runtime_cache_size, + ); + + let (client, backend, _, task_manager) = + sc_service::new_full_parts::(&config, None, executor.clone()) + .expect("Fail to new full parts"); + + let client = Arc::new(client); + + let select_chain = sc_consensus::LongestChain::new(backend.clone()); + + let bundle_validator = BundleValidator::new(client.clone()); + + let proof_verifier = subspace_fraud_proof::ProofVerifier::new( + client.clone(), + executor.clone(), + task_manager.spawn_handle(), + subspace_fraud_proof::PrePostStateRootVerifier::new(client.clone()), + ); + let transaction_pool = subspace_transaction_pool::new_full( + &config, + &task_manager, + client.clone(), + proof_verifier.clone(), + bundle_validator, + ); + + let fraud_proof_block_import = + sc_consensus_fraud_proof::block_import(client.clone(), client.clone(), proof_verifier); + + let block_import = MockBlockImport::< + BoxBlockImport>, + _, + _, + >::new(Box::new(fraud_proof_block_import), client.clone()); + + let mock_solution = { + let mut gs = Solution::genesis_solution( + FarmerPublicKey::unchecked_from(key.public().0), + key.to_account_id(), + ); + gs.chunk_signature = create_chunk_signature(&key.pair().into(), &gs.chunk.to_bytes()); + gs + }; + + MockPrimaryNode { + task_manager, + client, + backend, + executor, + transaction_pool, + select_chain, + next_slot: 1, + new_slot_notification_subscribers: Vec::new(), + block_import, + mock_solution, + } + } + + /// Sync oracle for `MockPrimaryNode` + pub fn sync_oracle() -> Arc { + Arc::new(NoNetwork) + } + + /// Return the next slot number + pub fn next_slot(&self) -> u64 { + self.next_slot + } + + /// Produce slot + pub fn produce_slot(&mut self) -> Slot { + let slot = Slot::from(self.next_slot); + self.next_slot += 1; + + let value = (slot, Hash::random().into()); + self.new_slot_notification_subscribers + .retain(|subscriber| subscriber.unbounded_send(value).is_ok()); + + slot + } + + /// Subscribe the new slot notification + pub fn new_slot_notification_stream( + &mut self, + ) -> TracingUnboundedReceiver<(Slot, Blake2b256Hash)> { + let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100); + self.new_slot_notification_subscribers.push(tx); + rx + } + + /// Subscribe the block import notification + pub fn imported_block_notification_stream( + &mut self, + ) -> TracingUnboundedReceiver<(NumberFor, mpsc::Sender<()>)> { + let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100); + self.block_import + .imported_block_notification_subscribers + .push(tx); + rx + } +} + +impl MockPrimaryNode { + async fn collect_txn_from_pool( + &self, + parent_number: NumberFor, + ) -> Vec<::Extrinsic> { + let mut t1 = self.transaction_pool.ready_at(parent_number).fuse(); + let mut t2 = futures_timer::Delay::new(time::Duration::from_micros(100)).fuse(); + let pending_iterator = select! { + res = t1 => res, + _ = t2 => { + tracing::warn!( + "Timeout fired waiting for transaction pool at #{}, proceeding with production.", + parent_number, + ); + self.transaction_pool.ready() + } + }; + let pushing_duration = time::Duration::from_micros(500); + let start = time::Instant::now(); + let mut extrinsics = Vec::new(); + for pending_tx in pending_iterator { + if start.elapsed() >= pushing_duration { + break; + } + let pending_tx_data = pending_tx.data().clone(); + extrinsics.push(pending_tx_data); + } + extrinsics + } + + async fn mock_inherent_data(slot: Slot) -> Result> { + let timestamp = sp_timestamp::InherentDataProvider::new(Timestamp::new( + >::into(slot) * SLOT_DURATION, + )); + let subspace_inherents = + sp_consensus_subspace::inherents::InherentDataProvider::new(slot, vec![]); + + let inherent_data = (subspace_inherents, timestamp) + .create_inherent_data() + .await?; + + Ok(inherent_data) + } + + fn mock_subspace_digest(&self, slot: Slot) -> Digest { + let pre_digest: PreDigest = PreDigest { + slot, + solution: self.mock_solution.clone(), + }; + let mut digest = Digest::default(); + digest.push(DigestItem::subspace_pre_digest(&pre_digest)); + digest + } + + /// Build block + async fn build_block( + &self, + slot: Slot, + parent_hash: ::Hash, + extrinsics: Vec<::Extrinsic>, + ) -> Result<(Block, StorageChanges), Box> { + let digest = self.mock_subspace_digest(slot); + let inherent_data = Self::mock_inherent_data(slot).await?; + + let mut block_builder = self.client.new_block_at(parent_hash, digest, false)?; + + let inherent_txns = block_builder.create_inherents(inherent_data)?; + + for tx in inherent_txns.into_iter().chain(extrinsics) { + sc_block_builder::BlockBuilder::push(&mut block_builder, tx)?; + } + + let (block, storage_changes, _) = block_builder.build()?.into_inner(); + Ok((block, storage_changes)) + } + + /// Import block + async fn import_block( + &mut self, + block: Block, + storage_changes: Option, + ) -> Result<(), Box> { + let (header, body) = block.deconstruct(); + let block_import_params = { + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.body = Some(body); + import_block.state_action = match storage_changes { + Some(changes) => { + StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(changes)) + } + None => StateAction::Execute, + }; + import_block + }; + + let import_result = self + .block_import + .import_block(block_import_params, Default::default()) + .await?; + + match import_result { + ImportResult::Imported(_) | ImportResult::AlreadyInChain => Ok(()), + bad_res => Err(format!("Fail to import block due to {bad_res:?}").into()), + } + } + + /// Produce block based on the current best block and the extrinsics in pool + pub async fn produce_block(&mut self) -> Result<(), Box> { + let block_timer = time::Instant::now(); + + let slot = self.produce_slot(); + + let parent_hash = self.client.info().best_hash; + let parent_number = self.client.info().best_number; + + let extrinsics = self.collect_txn_from_pool(parent_number).await; + + let (block, storage_changes) = self.build_block(slot, parent_hash, extrinsics).await?; + + tracing::info!( + "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]", + block.header().number(), + block_timer.elapsed().as_millis(), + block.header().hash(), + block.header().parent_hash(), + block.extrinsics().len(), + block.extrinsics() + .iter() + .map(|xt| BlakeTwo256::hash_of(xt).to_string()) + .collect::>() + .join(", ") + ); + + self.import_block(block, Some(storage_changes)).await?; + + Ok(()) + } + + /// Produce `n` number of blocks. + pub async fn produce_n_blocks(&mut self, n: u64) -> Result<(), Box> { + for _ in 0..n { + self.produce_block().await?; + } + Ok(()) + } +} + +// `MockBlockImport` is mostly port from `sc-consensus-subspace::SubspaceBlockImport` with all +// the consensus related logic removed. +struct MockBlockImport { + inner: Inner, + client: Arc, + imported_block_notification_subscribers: + Vec, mpsc::Sender<()>)>>, +} + +impl MockBlockImport { + fn new(inner: Inner, client: Arc) -> Self { + MockBlockImport { + inner, + client, + imported_block_notification_subscribers: Vec::new(), + } + } +} + +#[async_trait::async_trait] +impl BlockImport for MockBlockImport +where + Block: BlockT, + Inner: BlockImport, Error = ConsensusError> + + Send + + Sync, + Inner::Error: Into, + Client: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, + Client::Api: ApiExt, +{ + type Error = ConsensusError; + type Transaction = TransactionFor; + + async fn import_block( + &mut self, + mut block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + let block_number = *block.header.number(); + let current_best_number = self.client.info().best_number; + block.fork_choice = Some(ForkChoiceStrategy::Custom( + block_number > current_best_number, + )); + + let import_result = self.inner.import_block(block, new_cache).await?; + let (block_import_acknowledgement_sender, mut block_import_acknowledgement_receiver) = + mpsc::channel(0); + + // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver + // will block forever as there is still a sender not closed. + { + let value = (block_number, block_import_acknowledgement_sender); + self.imported_block_notification_subscribers + .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok()); + } + + while (block_import_acknowledgement_receiver.next().await).is_some() { + // Wait for all the acknowledgements to progress. + } + + Ok(import_result) + } + + async fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await.map_err(Into::into) + } +}