diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 171d0aef072e..ab86fc771cea 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -254,7 +254,7 @@ where } let response = self.prepare_chain_info_response(query).await?; // Trigger any outgoing cross-chain messages that haven't been confirmed yet. - let actions = self.create_network_actions().await?; + let actions = self.create_network_actions(None).await?; Ok((response, actions)) } @@ -356,8 +356,11 @@ where } } - /// Loads pending cross-chain requests. - async fn create_network_actions(&self) -> Result { + /// Loads pending cross-chain requests, and adds `NewRound` notifications where appropriate. + async fn create_network_actions( + &self, + old_round: Option, + ) -> Result { let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new(); let mut targets = self.chain.nonempty_outbox_chain_ids(); if let Some(tracked_chains) = self.tracked_chains.as_ref() { @@ -371,13 +374,30 @@ where let heights = outbox.queue.elements().await?; heights_by_recipient.insert(target, heights); } - self.create_cross_chain_requests(heights_by_recipient).await + let cross_chain_requests = self + .create_cross_chain_requests(heights_by_recipient) + .await?; + let mut notifications = Vec::new(); + if let Some(old_round) = old_round { + let round = self.chain.manager.current_round(); + if round > old_round { + let height = self.chain.tip_state.get().next_block_height; + notifications.push(Notification { + chain_id: self.chain_id(), + reason: Reason::NewRound { height, round }, + }); + } + } + Ok(NetworkActions { + cross_chain_requests, + notifications, + }) } async fn create_cross_chain_requests( &self, heights_by_recipient: BTreeMap>, - ) -> Result { + ) -> Result, WorkerError> { // Load all the certificates we will need, regardless of the medium. let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied()); let next_block_height = self.chain.tip_state.get().next_block_height; @@ -424,7 +444,7 @@ where .zip(certificates) .collect::>(); // For each medium, select the relevant messages. - let mut actions = NetworkActions::default(); + let mut cross_chain_requests = Vec::new(); for (recipient, heights) in heights_by_recipient { let mut bundles = Vec::new(); for height in heights { @@ -438,9 +458,9 @@ where recipient, bundles, }; - actions.cross_chain_requests.push(request); + cross_chain_requests.push(request); } - Ok(actions) + Ok(cross_chain_requests) } /// Returns true if there are no more outgoing messages in flight up to the given @@ -488,32 +508,20 @@ where } ); certificate.check(committee)?; - let mut actions = NetworkActions::default(); if self .chain .tip_state .get() .already_validated_block(certificate.inner().height())? { - return Ok((self.chain_info_response(), actions)); + return Ok((self.chain_info_response(), NetworkActions::default())); } let old_round = self.chain.manager.current_round(); - let timeout_chain_id = certificate.inner().chain_id(); - let timeout_height = certificate.inner().height(); self.chain .manager .handle_timeout_certificate(certificate, self.storage.clock().current_time()); - let round = self.chain.manager.current_round(); - if round > old_round { - actions.notifications.push(Notification { - chain_id: timeout_chain_id, - reason: Reason::NewRound { - height: timeout_height, - round, - }, - }) - } self.save().await?; + let actions = self.create_network_actions(Some(old_round)).await?; Ok((self.chain_info_response(), actions)) } @@ -580,7 +588,6 @@ where let (epoch, committee) = self.chain.current_committee()?; check_block_epoch(epoch, header.chain_id, header.epoch)?; certificate.check(committee)?; - let mut actions = NetworkActions::default(); let already_committed_block = self.chain.tip_state.get().already_validated_block(height)?; let should_skip_validated_block = || { self.chain @@ -590,7 +597,7 @@ where }; if already_committed_block || should_skip_validated_block()? { // If we just processed the same pending block, return the chain info unchanged. - return Ok((self.chain_info_response(), actions, true)); + return Ok((self.chain_info_response(), NetworkActions::default(), true)); } self.block_values @@ -620,13 +627,7 @@ where blobs, )?; self.save().await?; - let round = self.chain.manager.current_round(); - if round > old_round { - actions.notifications.push(Notification { - chain_id: self.chain_id(), - reason: Reason::NewRound { height, round }, - }) - } + let actions = self.create_network_actions(Some(old_round)).await?; Ok((self.chain_info_response(), actions, false)) } @@ -644,7 +645,7 @@ where let tip = self.chain.tip_state.get().clone(); if tip.next_block_height > height { // We already processed this block. - let actions = self.create_network_actions().await?; + let actions = self.create_network_actions(None).await?; self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered) .await; return Ok((self.chain_info_response(), actions)); @@ -722,7 +723,7 @@ where self.chain.preprocess_block(certificate.value()).await?; // Persist chain. self.save().await?; - let actions = self.create_network_actions().await?; + let actions = self.create_network_actions(None).await?; trace!("Preprocessed confirmed block {height} on chain {chain_id:.8}"); self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered) .await; @@ -803,7 +804,7 @@ where .apply_confirmed_block(certificate.value(), local_time) .await?; self.track_newly_created_chains(&proposed_block, &outcome); - let mut actions = self.create_network_actions().await?; + let mut actions = self.create_network_actions(None).await?; trace!("Processed confirmed block {height} on chain {chain_id:.8}"); let hash = certificate.hash(); let event_streams = certificate @@ -1130,6 +1131,7 @@ where chain.manager.verify_owner(&owner, proposal.content.round)?, WorkerError::InvalidOwner ); + let old_round = self.chain.manager.current_round(); match original_proposal { None => { if let Some(signer) = block.authenticated_signer { @@ -1240,7 +1242,7 @@ where None => (), } self.save().await?; - let actions = self.create_network_actions().await?; + let actions = self.create_network_actions(Some(old_round)).await?; Ok((self.chain_info_response(), actions)) } diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 51f68d883168..910f0c5fee6a 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -3623,7 +3623,8 @@ where .into_proposal_with_round(owner1, &signer, Round::MultiLeader(1)) .await .unwrap(); - let _ = env.worker().handle_block_proposal(proposal1).await?; + let (_, actions) = env.worker().handle_block_proposal(proposal1).await?; + assert_matches!(actions.notifications[0].reason, Reason::NewRound { .. }); let query_values = ChainInfoQuery::new(chain_id).with_manager_values(); let (response, _) = env.worker().handle_chain_info_query(query_values).await?; assert_eq!(response.info.manager.current_round, Round::MultiLeader(1));