Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ where
}
let response = self.prepare_chain_info_response(query).await?;
// Trigger any outgoing cross-chain messages that haven't been confirmed yet.
let actions = self.create_network_actions().await?;
let actions = self.create_network_actions(None).await?;
Ok((response, actions))
}

Expand Down Expand Up @@ -356,8 +356,11 @@ where
}
}

/// Loads pending cross-chain requests.
async fn create_network_actions(&self) -> Result<NetworkActions, WorkerError> {
/// Loads pending cross-chain requests, and adds `NewRound` notifications where appropriate.
async fn create_network_actions(
&self,
old_round: Option<Round>,
) -> Result<NetworkActions, WorkerError> {
let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
let mut targets = self.chain.nonempty_outbox_chain_ids();
if let Some(tracked_chains) = self.tracked_chains.as_ref() {
Expand All @@ -371,13 +374,30 @@ where
let heights = outbox.queue.elements().await?;
heights_by_recipient.insert(target, heights);
}
self.create_cross_chain_requests(heights_by_recipient).await
let cross_chain_requests = self
.create_cross_chain_requests(heights_by_recipient)
.await?;
let mut notifications = Vec::new();
if let Some(old_round) = old_round {
let round = self.chain.manager.current_round();
if round > old_round {
let height = self.chain.tip_state.get().next_block_height;
notifications.push(Notification {
chain_id: self.chain_id(),
reason: Reason::NewRound { height, round },
});
}
}
Ok(NetworkActions {
cross_chain_requests,
notifications,
})
}

async fn create_cross_chain_requests(
&self,
heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
) -> Result<NetworkActions, WorkerError> {
) -> Result<Vec<CrossChainRequest>, WorkerError> {
// Load all the certificates we will need, regardless of the medium.
let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
let next_block_height = self.chain.tip_state.get().next_block_height;
Expand Down Expand Up @@ -424,7 +444,7 @@ where
.zip(certificates)
.collect::<HashMap<_, _>>();
// For each medium, select the relevant messages.
let mut actions = NetworkActions::default();
let mut cross_chain_requests = Vec::new();
for (recipient, heights) in heights_by_recipient {
let mut bundles = Vec::new();
for height in heights {
Expand All @@ -438,9 +458,9 @@ where
recipient,
bundles,
};
actions.cross_chain_requests.push(request);
cross_chain_requests.push(request);
}
Ok(actions)
Ok(cross_chain_requests)
}

/// Returns true if there are no more outgoing messages in flight up to the given
Expand Down Expand Up @@ -488,32 +508,20 @@ where
}
);
certificate.check(committee)?;
let mut actions = NetworkActions::default();
if self
.chain
.tip_state
.get()
.already_validated_block(certificate.inner().height())?
{
return Ok((self.chain_info_response(), actions));
return Ok((self.chain_info_response(), NetworkActions::default()));
}
let old_round = self.chain.manager.current_round();
let timeout_chain_id = certificate.inner().chain_id();
let timeout_height = certificate.inner().height();
self.chain
.manager
.handle_timeout_certificate(certificate, self.storage.clock().current_time());
let round = self.chain.manager.current_round();
if round > old_round {
actions.notifications.push(Notification {
chain_id: timeout_chain_id,
reason: Reason::NewRound {
height: timeout_height,
round,
},
})
}
self.save().await?;
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((self.chain_info_response(), actions))
}

Expand Down Expand Up @@ -580,7 +588,6 @@ where
let (epoch, committee) = self.chain.current_committee()?;
check_block_epoch(epoch, header.chain_id, header.epoch)?;
certificate.check(committee)?;
let mut actions = NetworkActions::default();
let already_committed_block = self.chain.tip_state.get().already_validated_block(height)?;
let should_skip_validated_block = || {
self.chain
Expand All @@ -590,7 +597,7 @@ where
};
if already_committed_block || should_skip_validated_block()? {
// If we just processed the same pending block, return the chain info unchanged.
return Ok((self.chain_info_response(), actions, true));
return Ok((self.chain_info_response(), NetworkActions::default(), true));
}

self.block_values
Expand Down Expand Up @@ -620,13 +627,7 @@ where
blobs,
)?;
self.save().await?;
let round = self.chain.manager.current_round();
if round > old_round {
actions.notifications.push(Notification {
chain_id: self.chain_id(),
reason: Reason::NewRound { height, round },
})
}
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((self.chain_info_response(), actions, false))
}

Expand All @@ -644,7 +645,7 @@ where
let tip = self.chain.tip_state.get().clone();
if tip.next_block_height > height {
// We already processed this block.
let actions = self.create_network_actions().await?;
let actions = self.create_network_actions(None).await?;
self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
.await;
return Ok((self.chain_info_response(), actions));
Expand Down Expand Up @@ -722,7 +723,7 @@ where
self.chain.preprocess_block(certificate.value()).await?;
// Persist chain.
self.save().await?;
let actions = self.create_network_actions().await?;
let actions = self.create_network_actions(None).await?;
trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}");
self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered)
.await;
Expand Down Expand Up @@ -803,7 +804,7 @@ where
.apply_confirmed_block(certificate.value(), local_time)
.await?;
self.track_newly_created_chains(&proposed_block, &outcome);
let mut actions = self.create_network_actions().await?;
let mut actions = self.create_network_actions(None).await?;
trace!("Processed confirmed block {height} on chain {chain_id:.8}");
let hash = certificate.hash();
let event_streams = certificate
Expand Down Expand Up @@ -1130,6 +1131,7 @@ where
chain.manager.verify_owner(&owner, proposal.content.round)?,
WorkerError::InvalidOwner
);
let old_round = self.chain.manager.current_round();
match original_proposal {
None => {
if let Some(signer) = block.authenticated_signer {
Expand Down Expand Up @@ -1240,7 +1242,7 @@ where
None => (),
}
self.save().await?;
let actions = self.create_network_actions().await?;
let actions = self.create_network_actions(Some(old_round)).await?;
Ok((self.chain_info_response(), actions))
}

Expand Down
3 changes: 2 additions & 1 deletion linera-core/src/unit_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3623,7 +3623,8 @@ where
.into_proposal_with_round(owner1, &signer, Round::MultiLeader(1))
.await
.unwrap();
let _ = env.worker().handle_block_proposal(proposal1).await?;
let (_, actions) = env.worker().handle_block_proposal(proposal1).await?;
assert_matches!(actions.notifications[0].reason, Reason::NewRound { .. });
let query_values = ChainInfoQuery::new(chain_id).with_manager_values();
let (response, _) = env.worker().handle_chain_info_query(query_values).await?;
assert_eq!(response.info.manager.current_round, Round::MultiLeader(1));
Expand Down
Loading