Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/ream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) {

// TODO 1: Load keystores from the config.
// TODO 2: Add RPC service for lean node.
let chain_service = LeanChainService::new(lean_chain.clone(), chain_receiver).await;
let chain_service =
LeanChainService::new(lean_chain.clone(), chain_receiver, chain_sender.clone()).await;
let network_service = LeanNetworkService::new(
Arc::new(LeanNetworkConfig {
gossipsub_config: LeanGossipsubConfig::default(),
Expand Down
14 changes: 9 additions & 5 deletions crates/common/chain/lean/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct LeanChainServiceMessage {
pub struct LeanChainService {
lean_chain: Arc<RwLock<LeanChain>>,
receiver: mpsc::UnboundedReceiver<LeanChainServiceMessage>,
sender: mpsc::UnboundedSender<LeanChainServiceMessage>,

// Objects that we will process once we have processed their parents
dependencies: HashMap<B256, Vec<QueueItem>>,
Expand All @@ -39,10 +40,12 @@ impl LeanChainService {
pub async fn new(
lean_chain: Arc<RwLock<LeanChain>>,
receiver: mpsc::UnboundedReceiver<LeanChainServiceMessage>,
sender: mpsc::UnboundedSender<LeanChainServiceMessage>,
) -> Self {
LeanChainService {
lean_chain,
receiver,
sender,
dependencies: HashMap::new(),
}
}
Expand Down Expand Up @@ -123,14 +126,14 @@ impl LeanChainService {
VoteItem::Signed(signed_vote) => {
let vote = &signed_vote.data;
info!(
"Received signed vote from validator {} for head {:?} at slot {}",
vote.validator_id, vote.head, vote.slot
"Received signed vote from validator {} for head {:?} / source_slot {:?} at slot {}",
vote.validator_id, vote.head, vote.source_slot, vote.slot
);
}
VoteItem::Unsigned(vote) => {
info!(
"Received unsigned vote from validator {} for head {:?} at slot {}",
vote.validator_id, vote.head, vote.slot
"Received unsigned vote from validator {} for head {:?} / source_slot {:?} at slot {}",
vote.validator_id, vote.head, vote.source_slot, vote.slot
);
}
}
Expand Down Expand Up @@ -168,9 +171,10 @@ impl LeanChainService {
drop(lean_chain);

// Once we have received a block, also process all of its dependencies
// by sending them to this service itself.
if let Some(queue_items) = self.dependencies.remove(&block_hash) {
for item in queue_items {
Box::pin(self.handle_item(item)).await;
self.sender.send(LeanChainServiceMessage { item })?;
}
}
}
Expand Down
Loading