From abfdc59cb908a5f1a892250cc5e6bd40ed539880 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Thu, 3 Nov 2022 15:56:19 +0000 Subject: [PATCH] Execute sequenced certs serially in execution driver (#5788) * Execute sequenced certs serially in execution driver * Just use a bool to indicate whether sequenced * PR comments * Prioritize sequenced over unsequenced when there are duplicates --- crates/sui-core/src/authority.rs | 28 +++- .../sui-core/src/authority/authority_store.rs | 13 +- .../src/authority/authority_store_tables.rs | 4 +- .../authority_active/execution_driver/mod.rs | 133 +++++++++++++++--- 4 files changed, 146 insertions(+), 32 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 8d312eee5b83f..4bcfa36673533 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -38,7 +38,7 @@ use tracing::{debug, error, instrument, warn}; use typed_store::Map; pub use authority_store::{ - AuthorityStore, GatewayStore, ResolverWrapper, SuiDataStore, UpdateType, + AuthorityStore, GatewayStore, PendingDigest, ResolverWrapper, SuiDataStore, UpdateType, }; use narwhal_config::{ Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache, @@ -1614,6 +1614,10 @@ impl AuthorityState { .await } + pub fn add_pending_sequenced_certificate(&self, cert: VerifiedCertificate) -> SuiResult { + self.add_pending_impl(vec![(*cert.digest(), Some(cert))], true) + } + /// Add a number of certificates to the pending transactions as well as the /// certificates structure if they are not already executed. /// Certificates are optional, and if not provided, they will be eventually @@ -1622,11 +1626,24 @@ impl AuthorityState { &self, certs: Vec<(TransactionDigest, Option)>, ) -> SuiResult<()> { + self.add_pending_impl(certs, false) + } + + fn add_pending_impl( + &self, + certs: Vec<(TransactionDigest, Option)>, + is_sequenced: bool, + ) -> SuiResult { self.node_sync_store .batch_store_certs(certs.iter().filter_map(|(_, cert_opt)| cert_opt.clone()))?; - self.database - .add_pending_digests(certs.iter().map(|(digest, _)| *digest).collect()) + self.database.add_pending_digests( + certs + .iter() + .map(|(seq_and_digest, _)| *seq_and_digest) + .collect(), + is_sequenced, + ) } // Continually pop in-progress txes from the WAL and try to drive them to completion. @@ -2303,10 +2320,7 @@ impl AuthorityState { ); // Schedule the certificate for execution - self.add_pending_certificates(vec![( - *certificate.digest(), - Some(certificate.clone()), - )])?; + self.add_pending_sequenced_certificate(certificate.clone())?; self.database .lock_shared_objects(&certificate, consensus_index) diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index e27726e5dfdac..91678a4358d00 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -36,6 +36,7 @@ pub type AuthorityStore = SuiDataStore; pub type GatewayStore = SuiDataStore; pub type InternalSequenceNumber = u64; +pub type PendingDigest = (bool /* is sequenced */, TransactionDigest); pub struct CertLockGuard(LockGuard); @@ -223,7 +224,11 @@ impl Deserialize<'de>> SuiDataStore { /// index. If two instanced run concurrently, the indexes are guaranteed to not overlap /// although some certificates may be included twice in the `pending_execution`, and /// the same certificate may be written twice (but that is OK since it is valid.) - pub fn add_pending_digests(&self, digests: Vec) -> SuiResult<()> { + pub fn add_pending_digests( + &self, + digests: Vec, + is_sequenced: bool, + ) -> SuiResult<()> { let first_index = self .next_pending_seq .fetch_add(digests.len() as u64, Ordering::Relaxed); @@ -234,7 +239,7 @@ impl Deserialize<'de>> SuiDataStore { digests .iter() .enumerate() - .map(|(num, digest)| ((num as u64) + first_index, digest)), + .map(|(num, digest)| ((num as u64) + first_index, (is_sequenced, *digest))), )?; batch.write()?; @@ -245,9 +250,7 @@ impl Deserialize<'de>> SuiDataStore { } /// Get all stored certificate digests - pub fn get_pending_digests( - &self, - ) -> SuiResult> { + pub fn get_pending_digests(&self) -> SuiResult> { Ok(self.epoch_tables().pending_execution.iter().collect()) } diff --git a/crates/sui-core/src/authority/authority_store_tables.rs b/crates/sui-core/src/authority/authority_store_tables.rs index e86436036d494..afa3c95355149 100644 --- a/crates/sui-core/src/authority/authority_store_tables.rs +++ b/crates/sui-core/src/authority/authority_store_tables.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - authority_store::{InternalSequenceNumber, ObjectKey}, + authority_store::{InternalSequenceNumber, ObjectKey, PendingDigest}, *, }; use narwhal_executor::ExecutionIndices; @@ -32,7 +32,7 @@ pub struct AuthorityEpochTables { /// reads this table and executes the certificates. The order is a hint as to their /// causal dependencies. Note that there is no guarantee digests are unique. Once executed, and /// effects are written the entry should be deleted. - pub(crate) pending_execution: DBMap, + pub(crate) pending_execution: DBMap, /// Hold the lock for shared objects. These locks are written by a single task: upon receiving a valid /// certified transaction from consensus, the authority assigns a lock to each shared objects of the diff --git a/crates/sui-core/src/authority_active/execution_driver/mod.rs b/crates/sui-core/src/authority_active/execution_driver/mod.rs index 6db4458689af3..0b297ad5130f2 100644 --- a/crates/sui-core/src/authority_active/execution_driver/mod.rs +++ b/crates/sui-core/src/authority_active/execution_driver/mod.rs @@ -3,9 +3,9 @@ use std::{collections::HashSet, sync::Arc}; use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate}; -use tracing::{debug, info}; +use tracing::{debug, error, info}; -use crate::authority::AuthorityState; +use crate::authority::{AuthorityState, PendingDigest}; use crate::authority_client::AuthorityAPI; use futures::{stream, StreamExt}; @@ -89,21 +89,33 @@ where } } -/// Reads all pending transactions as a block and executes them. -/// Returns whether all pending transactions succeeded. -async fn execute_pending(active_authority: Arc>) -> SuiResult -where - A: AuthorityAPI + Send + Sync + 'static + Clone, -{ - // Get the pending transactions - let pending_transactions = active_authority.state.database.get_pending_digests()?; +type PendingVec = Vec<(u64, PendingDigest)>; + +fn sort_and_partition_pending_certs( + mut pending_transactions: PendingVec, +) -> ( + PendingVec, // sequenced + PendingVec, // unsequenced + Vec, // duplicated indices, to be deleted +) { + // sort sequenced digests before unsequenced so that the deduplication below favors + // sequenced digests. + pending_transactions.sort_by(|(idx_a, (is_seq_a, _)), (idx_b, (is_seq_b, _))| { + match is_seq_b.cmp(is_seq_a) { + // when both are sequenced or unsequenced, sort by idx. + std::cmp::Ordering::Equal => idx_a.cmp(idx_b), + // otherwise sort sequenced before unsequenced + res => res, + } + }); // Before executing de-duplicate the list of pending trasnactions let mut seen = HashSet::new(); let mut indexes_to_delete = Vec::new(); - let pending_transactions: Vec<_> = pending_transactions + + let (pending_sequenced, pending_transactions): (Vec<_>, Vec<_>) = pending_transactions .into_iter() - .filter(|(idx, digest)| { + .filter(|(idx, (_, digest))| { if seen.contains(digest) { indexes_to_delete.push(*idx); false @@ -112,7 +124,63 @@ where true } }) - .collect(); + .partition(|(_, (is_sequenced, _))| *is_sequenced); + + debug!( + num_sequenced = ?pending_sequenced.len(), + num_unsequenced = ?pending_transactions.len() + ); + + (pending_sequenced, pending_transactions, indexes_to_delete) +} + +#[test] +fn test_sort_and_partition_pending_certs() { + let tx1 = TransactionDigest::random(); + let tx2 = TransactionDigest::random(); + let tx3 = TransactionDigest::random(); + let tx4 = TransactionDigest::random(); + + // partitioning works correctly. + assert_eq!( + sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx2))]), + (vec![(1, (true, tx2))], vec![(0, (false, tx1))], vec![],) + ); + + // if certs are duplicated, but some are sequenced, the sequenced certs take priority. + assert_eq!( + sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx1))]), + (vec![(1, (true, tx1))], vec![], vec![0],) + ); + + // sorting works correctly for both sequenced and unsequenced. + assert_eq!( + sort_and_partition_pending_certs(vec![ + (2, (false, tx3)), + (0, (false, tx2)), + (4, (true, tx4)), + (1, (true, tx1)) + ]), + ( + vec![(1, (true, tx1)), (4, (true, tx4))], + vec![(0, (false, tx2)), (2, (false, tx3))], + vec![], + ) + ); +} + +/// Reads all pending transactions as a block and executes them. +/// Returns whether all pending transactions succeeded. +async fn execute_pending(active_authority: Arc>) -> SuiResult +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + // Get the pending transactions + let pending_transactions = active_authority.state.database.get_pending_digests()?; + + let (pending_sequenced, pending_transactions, indexes_to_delete) = + sort_and_partition_pending_certs(pending_transactions); + active_authority .state .database @@ -121,22 +189,51 @@ where // Send them for execution let epoch = active_authority.state.committee.load().epoch; let sync_handle = active_authority.clone().node_sync_handle(); + + // Execute certs that have a sequencing index associated with them serially. + for (seq, (_, digest)) in pending_sequenced.iter() { + let mut result_stream = sync_handle + .handle_execution_request(epoch, std::iter::once(*digest)) + .await?; + + match result_stream.next().await.unwrap() { + Ok(_) => { + debug!(?seq, ?digest, "serial certificate execution complete"); + active_authority + .state + .database + .remove_pending_digests(vec![*seq]) + .tap_err(|err| { + error!(?seq, ?digest, "pending digest deletion failed: {}", err) + })?; + } + Err(err) => { + info!( + ?seq, + ?digest, + "serial certificate execution failed: {}", + err + ); + } + } + } + let executed: Vec<_> = sync_handle // map to extract digest .handle_execution_request( epoch, - pending_transactions.iter().map(|(_, digest)| *digest), + pending_transactions.iter().map(|(_, (_, digest))| *digest), ) .await? // zip results back together with seq .zip(stream::iter(pending_transactions.iter())) // filter out errors - .filter_map(|(result, (seq, digest))| async move { + .filter_map(|(result, (idx, digest))| async move { result - .tap_err(|e| info!(?seq, ?digest, "certificate execution failed: {}", e)) - .tap_ok(|_| debug!(?seq, ?digest, "certificate execution complete")) + .tap_err(|e| info!(?idx, ?digest, "certificate execution failed: {}", e)) + .tap_ok(|_| debug!(?idx, ?digest, "certificate execution complete")) .ok() - .map(|_| seq) + .map(|_| idx) }) .collect() .await;