Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use finality_aleph::{
};
use futures::channel::mpsc;
use log::warn;
use sc_client_api::ExecutorProvider;
use sc_client_api::{ExecutorProvider};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_network::NetworkService;
use sc_service::{
Expand Down Expand Up @@ -285,7 +285,7 @@ pub fn new_authority(

let (_rpc_handlers, network, network_starter) = setup(
config,
backend,
backend.clone(),
&keystore_container,
import_queue,
transaction_pool.clone(),
Expand Down Expand Up @@ -348,6 +348,7 @@ pub fn new_authority(
let aleph_config = AlephConfig {
network,
client,
backend,
select_chain,
session_period,
millisecs_per_block,
Expand Down Expand Up @@ -396,7 +397,7 @@ pub fn new_full(

let (_rpc_handlers, network, network_starter) = setup(
config,
backend,
backend.clone(),
&keystore_container,
import_queue,
transaction_pool,
Expand All @@ -423,6 +424,7 @@ pub fn new_full(
let aleph_config = AlephConfig {
network,
client,
backend,
select_chain,
session_period,
millisecs_per_block,
Expand Down
19 changes: 9 additions & 10 deletions finality-aleph/src/justification/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures::{channel::mpsc, Stream, StreamExt};
use futures_timer::Delay;
use log::{debug, error};
use sc_client_api::HeaderBackend;
use sc_client_api::blockchain::Backend as BlockchainBackend;
use sp_api::BlockT;
use sp_runtime::traits::Header;
use tokio::time::timeout;
Expand All @@ -20,50 +20,49 @@ use crate::{
network, Metrics, STATUS_REPORT_INTERVAL,
};

pub struct JustificationHandler<B, V, RB, C, S, SI, F>
pub struct JustificationHandler<B, V, RB, S, SI, F, BB>
where
B: BlockT,
V: Verifier<B>,
RB: network::RequestBlocks<B> + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
S: JustificationRequestScheduler,
SI: SessionInfoProvider<B, V>,
F: BlockFinalizer<B>,
BB: BlockchainBackend<B> + Send + Sync + 'static,
{
session_info_provider: SI,
block_requester: BlockRequester<B, RB, C, S, F, V>,
block_requester: BlockRequester<B, RB, S, F, V, BB>,
verifier_timeout: Duration,
notification_timeout: Duration,
}

impl<B, V, RB, C, S, SI, F> JustificationHandler<B, V, RB, C, S, SI, F>
impl<B, V, RB, S, SI, F, BB> JustificationHandler<B, V, RB, S, SI, F, BB>
where
B: BlockT,
V: Verifier<B>,
RB: network::RequestBlocks<B> + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
S: JustificationRequestScheduler,
SI: SessionInfoProvider<B, V>,
F: BlockFinalizer<B>,
BB: BlockchainBackend<B> + Send + Sync + 'static,
{
pub fn new(
session_info_provider: SI,
block_requester: RB,
client: Arc<C>,
backend: Arc<BB>,
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
justification_handler_config: JustificationHandlerConfig<B>,
justification_handler_config: JustificationHandlerConfig,
) -> Self {
Self {
session_info_provider,
block_requester: BlockRequester::new(
block_requester,
client,
backend,
finalizer,
justification_request_scheduler,
metrics,
justification_handler_config.min_allowed_delay,
),
verifier_timeout: justification_handler_config.verifier_timeout,
notification_timeout: justification_handler_config.notification_timeout,
Expand Down
19 changes: 6 additions & 13 deletions finality-aleph/src/justification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,29 @@ pub struct JustificationNotification<Block: BlockT> {
}

#[derive(Clone)]
pub struct JustificationHandlerConfig<B: BlockT> {
pub struct JustificationHandlerConfig {
/// How long should we wait when the session verifier is not yet available.
verifier_timeout: Duration,
/// How long should we wait for any notification.
notification_timeout: Duration,
///Distance (in amount of blocks) between the best and the block we want to request justification
min_allowed_delay: NumberFor<B>,
}

impl<B: BlockT> Default for JustificationHandlerConfig<B> {
impl Default for JustificationHandlerConfig {
fn default() -> Self {
Self {
verifier_timeout: Duration::from_millis(500),
notification_timeout: Duration::from_millis(1000),
min_allowed_delay: 3u32.into(),
// request justifications slightly more frequently than they're created
notification_timeout: Duration::from_millis(800),
Comment thread
timorl marked this conversation as resolved.
}
}
}

#[cfg(test)]
impl<B: BlockT> JustificationHandlerConfig<B> {
pub fn new(
verifier_timeout: Duration,
notification_timeout: Duration,
min_allowed_delay: NumberFor<B>,
) -> Self {
impl JustificationHandlerConfig {
pub fn new(verifier_timeout: Duration, notification_timeout: Duration) -> Self {
Self {
verifier_timeout,
notification_timeout,
min_allowed_delay,
}
}
}
94 changes: 61 additions & 33 deletions finality-aleph/src/justification/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{fmt, marker::PhantomData, sync::Arc, time::Instant};

use aleph_primitives::ALEPH_ENGINE_ID;
use log::{debug, error, info, warn};
use sc_client_api::HeaderBackend;
use sc_client_api::{blockchain::Backend as BlockchainBackend, HeaderBackend};
use sp_api::{BlockId, BlockT, NumberFor};
use sp_runtime::traits::Header;
use sp_runtime::traits::{Header, One};

use crate::{
finalization::BlockFinalizer,
Expand Down Expand Up @@ -72,49 +72,46 @@ impl<B: BlockT> fmt::Display for JustificationRequestStatus<B> {
}
}

pub struct BlockRequester<B, RB, C, S, F, V>
pub struct BlockRequester<B, RB, S, F, V, BB>
Comment thread
maciejnems marked this conversation as resolved.
where
B: BlockT,
RB: network::RequestBlocks<B> + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
S: JustificationRequestScheduler,
F: BlockFinalizer<B>,
V: Verifier<B>,
BB: BlockchainBackend<B>,
{
block_requester: RB,
client: Arc<C>,
backend: Arc<BB>,
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
min_allowed_delay: NumberFor<B>,
request_status: JustificationRequestStatus<B>,
_phantom: PhantomData<V>,
}

impl<B, RB, C, S, F, V> BlockRequester<B, RB, C, S, F, V>
impl<B, RB, S, F, V, BB> BlockRequester<B, RB, S, F, V, BB>
where
B: BlockT,
RB: network::RequestBlocks<B> + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
S: JustificationRequestScheduler,
F: BlockFinalizer<B>,
V: Verifier<B>,
BB: BlockchainBackend<B>,
{
pub fn new(
block_requester: RB,
client: Arc<C>,
backend: Arc<BB>,
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
min_allowed_delay: NumberFor<B>,
) -> Self {
BlockRequester {
block_requester,
client,
backend,
finalizer,
justification_request_scheduler,
metrics,
min_allowed_delay,
request_status: JustificationRequestStatus::new(),
_phantom: PhantomData,
}
Expand Down Expand Up @@ -172,26 +169,9 @@ where
pub fn request_justification(&mut self, num: NumberFor<B>) {
match self.justification_request_scheduler.schedule_action() {
SchedulerActions::Request => {
let num = if num > self.client.info().best_number
&& self.client.info().best_number > self.min_allowed_delay
{
self.client.info().best_number - self.min_allowed_delay
} else {
num
};

debug!(target: "aleph-justification", "Trying to request block {:?}", num);
self.request_status.save_block_number(num);

if let Ok(Some(header)) = self.client.header(BlockId::Number(num)) {
self.request_status.insert_hash(header.hash());
debug!(target: "aleph-justification", "We have block {:?} with hash {:?}. Requesting justification.", num, header.hash());
self.justification_request_scheduler.on_request_sent();
self.block_requester
.request_justification(&header.hash(), *header.number());
} else {
debug!(target: "aleph-justification", "Cancelling request, because we don't have block {:?}.", num);
}
self.request_targets(num)
.into_iter()
.for_each(|hash| self.request(hash));
}
SchedulerActions::ClearQueue => {
debug!(target: "aleph-justification", "Clearing justification request queue");
Expand All @@ -202,6 +182,54 @@ where
}

pub fn finalized_number(&self) -> NumberFor<B> {
self.client.info().finalized_number
self.backend.info().finalized_number
}

fn request(&mut self, hash: <B as BlockT>::Hash) {
if let Ok(Some(header)) = self.backend.header(BlockId::Hash(hash)) {
let number = *header.number();
debug!(target: "aleph-justification", "Trying to request block {:?}", number);
Comment thread
maciejnems marked this conversation as resolved.
Outdated
self.request_status.save_block_number(number);
self.request_status.insert_hash(hash);
debug!(target: "aleph-justification", "We have block {:?} with hash {:?}. Requesting justification.", number, header.hash());
self.justification_request_scheduler.on_request_sent();
self.block_requester.request_justification(&hash, number);
} else {
debug!(target: "aleph-justification", "Cancelling request, because we don't have block {:?}.", hash);
}
}

// We request justifications for all the children of last finalized block and a justification
// for a block of number num on longest branch.
// Assuming that we request at the same pace that finalization is progressing, the former ensures
// that we are up to date with finalization. On the other hand, the former enables fast catch up
Comment thread
maciejnems marked this conversation as resolved.
Outdated
// if we are behind.
// We don't remove the child that it's on the same branch as best since a fork may happen
Comment thread
maciejnems marked this conversation as resolved.
Outdated
// somewhere in between them.
fn request_targets(&self, mut top_wanted: NumberFor<B>) -> Vec<<B as BlockT>::Hash> {
let blockchain_info = self.backend.info();
let finalized_hash = blockchain_info.finalized_hash;

let mut targets = self.backend.children(finalized_hash).unwrap_or_default();
let best_number = blockchain_info.best_number;
Comment thread
timorl marked this conversation as resolved.
Outdated
if best_number <= top_wanted {
// most probably block best_number is not yet finalized
top_wanted = best_number - NumberFor::<B>::one();
Comment thread
mike1729 marked this conversation as resolved.
Outdated
}
match self.backend.hash(top_wanted) {
Ok(Some(hash)) => {
if !targets.contains(&hash) {
targets.push(hash);
}
}
Ok(None) => {
debug!(target: "aleph-justification", "Cancelling request, because we don't have block {:?}.", top_wanted);
}
Err(err) => {
debug!(target: "aleph-justification", "Cancelling request, because fetching block {:?} failed {:?}.", top_wanted, err);
}
}

targets
}
}
5 changes: 3 additions & 2 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::{
channel::{mpsc, oneshot},
Future,
};
use sc_client_api::{backend::Backend, BlockchainEvents, Finalizer, LockImportRun, TransactionFor};
use sc_client_api::{Backend, BlockchainEvents, Finalizer, LockImportRun, TransactionFor};
use sc_consensus::BlockImport;
use sc_network::{ExHashT, NetworkService};
use sc_service::SpawnTaskHandle;
Expand Down Expand Up @@ -242,9 +242,10 @@ impl<H, N> From<(H, N)> for HashNum<H, N> {

pub type BlockHashNum<B> = HashNum<<B as Block>::Hash, NumberFor<B>>;

pub struct AlephConfig<B: Block, H: ExHashT, C, SC> {
pub struct AlephConfig<B: Block, H: ExHashT, C, SC, BE> {
pub network: Arc<NetworkService<B, H>>,
pub client: Arc<C>,
pub backend: Arc<BE>,
pub select_chain: SC,
pub spawn_handle: SpawnTaskHandle,
pub keystore: Arc<dyn CryptoStore>,
Expand Down
12 changes: 8 additions & 4 deletions finality-aleph/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aleph_primitives::{AuthorityId, SessionAuthorityData};
use codec::Encode;
use log::warn;
pub use nonvalidator_node::run_nonvalidator_node;
use sc_client_api::blockchain::Backend as BlockchainBackend;
use sc_client_api::Backend;
use sc_network::{ExHashT, NetworkService};
use sp_runtime::{
Expand Down Expand Up @@ -83,9 +84,10 @@ impl<B: Block> Verifier<B> for JustificationVerifier {
}
}

struct JustificationParams<B: Block, H: ExHashT, C> {
struct JustificationParams<B: Block, H: ExHashT, C, BB> {
pub network: Arc<NetworkService<B, H>>,
pub client: Arc<C>,
pub backend: Arc<BB>,
pub justification_rx: mpsc::UnboundedReceiver<JustificationNotification<B>>,
pub metrics: Option<Metrics<<B::Header as Header>::Hash>>,
pub session_period: SessionPeriod,
Expand Down Expand Up @@ -126,8 +128,8 @@ impl<B: Block> SessionInfoProvider<B, JustificationVerifier> for SessionInfoProv
}
}

fn setup_justification_handler<B, H, C, BE>(
just_params: JustificationParams<B, H, C>,
fn setup_justification_handler<B, H, C, BB, BE>(
just_params: JustificationParams<B, H, C, BB>,
) -> (
UnboundedSender<JustificationNotification<B>>,
impl Future<Output = ()>,
Expand All @@ -137,11 +139,13 @@ where
H: ExHashT,
C: crate::ClientForAleph<B, BE> + Send + Sync + 'static,
C::Api: aleph_primitives::AlephSessionApi<B>,
BB: BlockchainBackend<B> + 'static,
BE: Backend<B> + 'static,
{
let JustificationParams {
network,
client,
backend,
justification_rx,
metrics,
session_period,
Expand All @@ -152,7 +156,7 @@ where
let handler = JustificationHandler::new(
SessionInfoProviderImpl::new(session_map, session_period),
network,
client.clone(),
backend,
AlephFinalizer::new(client),
JustificationRequestSchedulerImpl::new(&session_period, &millisecs_per_block, MAX_ATTEMPTS),
metrics,
Expand Down
Loading