Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ae89dcc
remove deprecated cumulus collator code
RomarQ Sep 5, 2025
b89c290
add prdoc
RomarQ Sep 5, 2025
7aa3f8c
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 5, 2025
25b2b7a
fix prdoc
RomarQ Sep 5, 2025
5db0f3a
remove unused code
RomarQ Sep 5, 2025
aa1bbf2
fix test
RomarQ Sep 5, 2025
20cb146
cleanup
RomarQ Sep 5, 2025
a9ec330
fix name
RomarQ Sep 5, 2025
8e27c06
remove imports
RomarQ Sep 5, 2025
1112f73
fix imports
RomarQ Sep 5, 2025
8bcc054
remove old test
RomarQ Sep 5, 2025
8ca6df9
remove unused type
RomarQ Sep 5, 2025
c51d269
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 8, 2025
56d5550
Revert "remove old test"
RomarQ Sep 8, 2025
dba2cc6
Revert "Revert "remove old test""
RomarQ Sep 8, 2025
ee3e864
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 8, 2025
63aa45a
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 9, 2025
8113e47
use cumulus_test_service::Consensus::Aura in zombienet test
RomarQ Sep 9, 2025
f21ddb1
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 9, 2025
6ba4252
cleanup
RomarQ Sep 9, 2025
5e39474
re-enable zombienet-cumulus-0002-pov_recovery test
RomarQ Sep 9, 2025
155aeba
try --disable-block-announcements
RomarQ Sep 9, 2025
6557eea
Revert "re-enable zombienet-cumulus-0002-pov_recovery test"
RomarQ Sep 10, 2025
ef9f886
update test
RomarQ Sep 11, 2025
ff285fb
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 11, 2025
9453e0c
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 12, 2025
d775e27
improve zombie_ci::pov_recovery::pov_recovery test
RomarQ Sep 12, 2025
4d6f9d6
Merge branch 'master' into rq/remove-deprecated-collator-code
RomarQ Sep 12, 2025
fb842d2
fix typo
RomarQ Sep 12, 2025
e3ebeb1
fix typo
RomarQ Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 1 addition & 334 deletions cumulus/client/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,128 +17,12 @@

//! Cumulus Collator implementation for Substrate.

use cumulus_primitives_core::{
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
};

use sc_client_api::BlockBackend;
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

use cumulus_client_consensus_common::ParachainConsensus;
use polkadot_node_primitives::{CollationGenerationConfig, CollationResult, MaybeCompressedPoV};
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId};

use codec::Decode;
use futures::prelude::*;
use std::sync::Arc;

use crate::service::CollatorService;

pub mod service;

/// The logging target.
const LOG_TARGET: &str = "cumulus-collator";

/// The implementation of the Cumulus `Collator`.
///
/// Note that this implementation is soon to be deprecated and removed, and it is suggested to
/// directly use the [`CollatorService`] instead, so consensus engine implementations
/// live at the top level.
pub struct Collator<Block: BlockT, BS, RA> {
service: CollatorService<Block, BS, RA>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
}

impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
fn clone(&self) -> Self {
Collator {
service: self.service.clone(),
parachain_consensus: self.parachain_consensus.clone(),
}
}
}

impl<Block, BS, RA> Collator<Block, BS, RA>
where
Block: BlockT,
BS: BlockBackend<Block>,
RA: ProvideRuntimeApi<Block>,
RA::Api: CollectCollationInfo<Block>,
{
/// Create a new instance.
fn new(
collator_service: CollatorService<Block, BS, RA>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
) -> Self {
Self { service: collator_service, parachain_consensus }
}

async fn produce_candidate(
mut self,
relay_parent: PHash,
validation_data: PersistedValidationData,
) -> Option<CollationResult> {
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
"Producing candidate",
);

let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
Ok(x) => x,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Could not decode the head data."
);
return None
},
};

let last_head_hash = last_head.hash();
if !self.service.check_block_status(last_head_hash, &last_head) {
return None
}

tracing::info!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
at = ?last_head_hash,
"Starting collation.",
);

let candidate = self
.parachain_consensus
.produce_candidate(&last_head, relay_parent, &validation_data)
.await?;

let block_hash = candidate.block.header().hash();

let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;

b.log_size_info();

if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
tracing::info!(
target: LOG_TARGET,
"Compressed PoV size: {}kb",
pov.block_data.0.len() as f64 / 1024f64,
);
}

let result_sender = self.service.announce_with_barrier(block_hash);

tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);

Some(CollationResult { collation, result_sender: Some(result_sender) })
}
}

/// Relay-chain-driven collators are those whose block production is driven purely
/// by new relay chain blocks and the most recently included parachain blocks
/// within them.
Expand Down Expand Up @@ -255,220 +139,3 @@ pub async fn initialize_collator_subsystems(
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
.await;
}

/// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
pub para_id: ParaId,
pub runtime_api: Arc<RA>,
pub block_status: Arc<BS>,
pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
pub overseer_handle: OverseerHandle,
pub spawner: Spawner,
pub key: CollatorPair,
pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
}

/// Start the collator.
#[deprecated = "Collators should run consensus futures which handle this logic internally"]
pub async fn start_collator<Block, RA, BS, Spawner>(
params: StartCollatorParams<Block, RA, BS, Spawner>,
) where
Block: BlockT,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
RA::Api: CollectCollationInfo<Block>,
{
// This never needed to be asynchronous, but shouldn't be changed due to backcompat.
#[allow(deprecated)]
start_collator_sync(params);
}

/// Start the collator in a synchronous function.
#[deprecated = "Collators should run consensus futures which handle this logic internally"]
pub fn start_collator_sync<Block, RA, BS, Spawner>(
StartCollatorParams {
para_id,
block_status,
announce_block,
overseer_handle,
spawner,
key,
parachain_consensus,
runtime_api,
}: StartCollatorParams<Block, RA, BS, Spawner>,
) where
Block: BlockT,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
RA::Api: CollectCollationInfo<Block>,
{
let collator_service =
CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);

let collator = Collator::new(collator_service, parachain_consensus);

let collation_future = Box::pin(async move {
let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;
while let Some(request) = request_stream.next().await {
let collation = collator
.clone()
.produce_candidate(
*request.relay_parent(),
request.persisted_validation_data().clone(),
)
.await;

request.complete(collation);
}
});

spawner.spawn("cumulus-relay-driven-collator", None, collation_future);
}

#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use codec::Encode;
use cumulus_client_consensus_common::ParachainCandidate;
use cumulus_primitives_core::ParachainBlockData;
use cumulus_test_client::{
Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
use cumulus_test_runtime::{Block, Header};
use futures::{channel::mpsc, executor::block_on, StreamExt};
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
use polkadot_primitives::HeadData;
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, Pair};
use sp_runtime::traits::BlakeTwo256;
use sp_state_machine::Backend;

struct AlwaysSupportsParachains;

#[async_trait]
impl HeadSupportsParachains for AlwaysSupportsParachains {
async fn head_supports_parachains(&self, _head: &PHash) -> bool {
true
}
}

#[derive(Clone)]
struct DummyParachainConsensus {
client: Arc<Client>,
}

#[async_trait::async_trait]
impl ParachainConsensus<Block> for DummyParachainConsensus {
async fn produce_candidate(
&mut self,
parent: &Header,
_: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<Block>> {
let mut sproof = RelayStateSproofBuilder::default();
sproof.included_para_head = Some(HeadData(parent.encode()));
sproof.para_id = cumulus_test_runtime::PARACHAIN_ID.into();

let cumulus_test_client::BlockBuilderAndSupportData { block_builder, .. } = self
.client
.init_block_builder_at(parent.hash(), Some(validation_data.clone()), sproof);

let (block, _, proof) = block_builder.build().expect("Creates block").into_inner();

self.client
.import(BlockOrigin::Own, block.clone())
.await
.expect("Imports the block");

Some(ParachainCandidate { block, proof: proof.expect("Proof is returned") })
}
}

#[test]
fn collates_produces_a_block_and_storage_proof_does_not_contains_code() {
Comment thread
RomarQ marked this conversation as resolved.
sp_tracing::try_init_simple();

let spawner = TaskExecutor::new();
let para_id = ParaId::from(100);
let announce_block = |_, _| ();
let client = Arc::new(TestClientBuilder::new().build());
let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();

let (sub_tx, sub_rx) = mpsc::channel(64);

let (overseer, handle) =
dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
.expect("Creates overseer builder")
.replace_collation_generation(|_| ForwardSubsystem(sub_tx))
.build()
.expect("Builds overseer");

spawner.spawn("overseer", None, overseer.run().then(|_| async {}).boxed());

#[allow(deprecated)]
let collator_start = start_collator(StartCollatorParams {
runtime_api: client.clone(),
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handle: OverseerHandle::new(handle),
spawner,
para_id,
key: CollatorPair::generate().0,
parachain_consensus: Box::new(DummyParachainConsensus { client }),
});
block_on(collator_start);

let msg = block_on(sub_rx.into_future())
.0
.expect("message should be send by `start_collator` above.");

let collator_fn = match msg {
CollationGenerationMessage::Initialize(CollationGenerationConfig {
collator: Some(c),
..
}) => c,
_ => panic!("unexpected message or no collator fn"),
};

let validation_data =
PersistedValidationData { parent_head: header.encode().into(), ..Default::default() };
let relay_parent = Default::default();

let collation = block_on(collator_fn(relay_parent, &validation_data))
.expect("Collation is build")
.collation;

let pov = collation.proof_of_validity.into_compressed();

let decompressed =
sp_maybe_compressed_blob::decompress(&pov.block_data.0, 1024 * 1024 * 10).unwrap();

let block =
ParachainBlockData::<Block>::decode(&mut &decompressed[..]).expect("Is a valid block");

assert_eq!(1, *block.blocks()[0].header().number());

// Ensure that we did not include `:code` in the proof.
let proof = block.proof().clone();

let backend = sp_state_machine::create_proof_check_backend::<BlakeTwo256>(
*header.state_root(),
proof.to_storage_proof::<BlakeTwo256>(None).unwrap().0,
)
.unwrap();

// Should return an error, as it was not included while building the proof.
assert!(backend
.storage(sp_core::storage::well_known_keys::CODE)
.unwrap_err()
.contains("Trie lookup error: Database missing expected key"));
}
}
Loading
Loading