Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute sequenced certs serially in execution driver #5788

Merged
merged 4 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tracing::{debug, error, instrument, warn};
use typed_store::Map;

pub use authority_store::{
AuthorityStore, GatewayStore, ResolverWrapper, SuiDataStore, UpdateType,
AuthorityStore, GatewayStore, PendingDigest, ResolverWrapper, SuiDataStore, UpdateType,
};
use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
Expand Down Expand Up @@ -1614,6 +1614,10 @@ impl AuthorityState {
.await
}

pub fn add_pending_sequenced_certificate(&self, cert: VerifiedCertificate) -> SuiResult {
self.add_pending_impl(vec![(*cert.digest(), Some(cert))], true)
}

/// Add a number of certificates to the pending transactions as well as the
/// certificates structure if they are not already executed.
/// Certificates are optional, and if not provided, they will be eventually
Expand All @@ -1622,11 +1626,24 @@ impl AuthorityState {
&self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename this function to add_pending_certificates_from_gossip? Not sure where else this is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is called in authority_server as well - if we can't execute a cert received via handle_certificate, we enqueue it there for causal completion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, why did we have to enqueue the failed certificate in authority server? Just to avoid lagging behind?

certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
) -> SuiResult<()> {
self.add_pending_impl(certs, false)
}

fn add_pending_impl(
&self,
certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
is_sequenced: bool,
) -> SuiResult {
self.node_sync_store
.batch_store_certs(certs.iter().filter_map(|(_, cert_opt)| cert_opt.clone()))?;

self.database
.add_pending_digests(certs.iter().map(|(digest, _)| *digest).collect())
self.database.add_pending_digests(
certs
.iter()
.map(|(seq_and_digest, _)| *seq_and_digest)
.collect(),
is_sequenced,
)
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
Expand Down Expand Up @@ -2314,10 +2331,7 @@ impl AuthorityState {
);

// Schedule the certificate for execution
self.add_pending_certificates(vec![(
*certificate.digest(),
Some(certificate.clone()),
)])?;
self.add_pending_sequenced_certificate(certificate.clone())?;

if certificate.contains_shared_object() {
self.database
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub type AuthorityStore = SuiDataStore<AuthoritySignInfo>;
pub type GatewayStore = SuiDataStore<EmptySignInfo>;

pub type InternalSequenceNumber = u64;
pub type PendingDigest = (bool /* is sequenced */, TransactionDigest);

pub struct CertLockGuard(LockGuard);

Expand Down Expand Up @@ -223,7 +224,11 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
/// index. If two instanced run concurrently, the indexes are guaranteed to not overlap
/// although some certificates may be included twice in the `pending_execution`, and
/// the same certificate may be written twice (but that is OK since it is valid.)
pub fn add_pending_digests(&self, digests: Vec<TransactionDigest>) -> SuiResult<()> {
pub fn add_pending_digests(
&self,
digests: Vec<TransactionDigest>,
is_sequenced: bool,
) -> SuiResult<()> {
let first_index = self
.next_pending_seq
.fetch_add(digests.len() as u64, Ordering::Relaxed);
Expand All @@ -234,7 +239,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
digests
.iter()
.enumerate()
.map(|(num, digest)| ((num as u64) + first_index, digest)),
.map(|(num, digest)| ((num as u64) + first_index, (is_sequenced, *digest))),
)?;
batch.write()?;

Expand All @@ -245,9 +250,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// Get all stored certificate digests
pub fn get_pending_digests(
&self,
) -> SuiResult<Vec<(InternalSequenceNumber, TransactionDigest)>> {
pub fn get_pending_digests(&self) -> SuiResult<Vec<(InternalSequenceNumber, PendingDigest)>> {
Ok(self.epoch_tables().pending_execution.iter().collect())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
authority_store::{InternalSequenceNumber, ObjectKey},
authority_store::{InternalSequenceNumber, ObjectKey, PendingDigest},
*,
};
use narwhal_executor::ExecutionIndices;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct AuthorityEpochTables<S> {
/// reads this table and executes the certificates. The order is a hint as to their
/// causal dependencies. Note that there is no guarantee digests are unique. Once executed, and
/// effects are written the entry should be deleted.
pub(crate) pending_execution: DBMap<InternalSequenceNumber, TransactionDigest>,
pub(crate) pending_execution: DBMap<InternalSequenceNumber, PendingDigest>,

/// Hold the lock for shared objects. These locks are written by a single task: upon receiving a valid
/// certified transaction from consensus, the authority assigns a lock to each shared objects of the
Expand Down
133 changes: 115 additions & 18 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

use std::{collections::HashSet, sync::Arc};
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tracing::{debug, info};
use tracing::{debug, error, info};

use crate::authority::AuthorityState;
use crate::authority::{AuthorityState, PendingDigest};
use crate::authority_client::AuthorityAPI;

use futures::{stream, StreamExt};
Expand Down Expand Up @@ -89,21 +89,33 @@ where
}
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;
type PendingVec = Vec<(u64, PendingDigest)>;

fn sort_and_partition_pending_certs(
mut pending_transactions: PendingVec,
) -> (
PendingVec, // sequenced
PendingVec, // unsequenced
Vec<u64>, // duplicated indices, to be deleted
) {
// sort sequenced digests before unsequenced so that the deduplication below favors
// sequenced digests.
pending_transactions.sort_by(|(idx_a, (is_seq_a, _)), (idx_b, (is_seq_b, _))| {
match is_seq_b.cmp(is_seq_a) {
// when both are sequenced or unsequenced, sort by idx.
std::cmp::Ordering::Equal => idx_a.cmp(idx_b),
// otherwise sort sequenced before unsequenced
res => res,
}
});

// Before executing de-duplicate the list of pending trasnactions
let mut seen = HashSet::new();
let mut indexes_to_delete = Vec::new();
let pending_transactions: Vec<_> = pending_transactions

let (pending_sequenced, pending_transactions): (Vec<_>, Vec<_>) = pending_transactions
.into_iter()
.filter(|(idx, digest)| {
.filter(|(idx, (_, digest))| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we need to favor digests that are sequenced, if there is duplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, we first serially execute all sequenced digests.

if seen.contains(digest) {
indexes_to_delete.push(*idx);
false
Expand All @@ -112,7 +124,63 @@ where
true
}
})
.collect();
.partition(|(_, (is_sequenced, _))| *is_sequenced);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a debug logging here to print the number on each side of the partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

debug!(
num_sequenced = ?pending_sequenced.len(),
num_unsequenced = ?pending_transactions.len()
);

(pending_sequenced, pending_transactions, indexes_to_delete)
}

#[test]
fn test_sort_and_partition_pending_certs() {
let tx1 = TransactionDigest::random();
let tx2 = TransactionDigest::random();
let tx3 = TransactionDigest::random();
let tx4 = TransactionDigest::random();

// partitioning works correctly.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx2))]),
(vec![(1, (true, tx2))], vec![(0, (false, tx1))], vec![],)
);

// if certs are duplicated, but some are sequenced, the sequenced certs take priority.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx1))]),
(vec![(1, (true, tx1))], vec![], vec![0],)
);

// sorting works correctly for both sequenced and unsequenced.
assert_eq!(
sort_and_partition_pending_certs(vec![
(2, (false, tx3)),
(0, (false, tx2)),
(4, (true, tx4)),
(1, (true, tx1))
]),
(
vec![(1, (true, tx1)), (4, (true, tx4))],
vec![(0, (false, tx2)), (2, (false, tx3))],
vec![],
)
);
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;

let (pending_sequenced, pending_transactions, indexes_to_delete) =
sort_and_partition_pending_certs(pending_transactions);

active_authority
.state
.database
Expand All @@ -121,22 +189,51 @@ where
// Send them for execution
let epoch = active_authority.state.committee.load().epoch;
let sync_handle = active_authority.clone().node_sync_handle();

// Execute certs that have a sequencing index associated with them serially.
for (seq, (_, digest)) in pending_sequenced.iter() {
let mut result_stream = sync_handle
.handle_execution_request(epoch, std::iter::once(*digest))
.await?;

match result_stream.next().await.unwrap() {
Ok(_) => {
debug!(?seq, ?digest, "serial certificate execution complete");
active_authority
.state
.database
.remove_pending_digests(vec![*seq])
.tap_err(|err| {
error!(?seq, ?digest, "pending digest deletion failed: {}", err)
})?;
}
Err(err) => {
info!(
?seq,
?digest,
"serial certificate execution failed: {}",
err
);
}
}
}

let executed: Vec<_> = sync_handle
// map to extract digest
.handle_execution_request(
epoch,
pending_transactions.iter().map(|(_, digest)| *digest),
pending_transactions.iter().map(|(_, (_, digest))| *digest),
)
.await?
// zip results back together with seq
.zip(stream::iter(pending_transactions.iter()))
// filter out errors
.filter_map(|(result, (seq, digest))| async move {
.filter_map(|(result, (idx, digest))| async move {
result
.tap_err(|e| info!(?seq, ?digest, "certificate execution failed: {}", e))
.tap_ok(|_| debug!(?seq, ?digest, "certificate execution complete"))
.tap_err(|e| info!(?idx, ?digest, "certificate execution failed: {}", e))
.tap_ok(|_| debug!(?idx, ?digest, "certificate execution complete"))
.ok()
.map(|_| seq)
.map(|_| idx)
})
.collect()
.await;
Expand Down