Skip to content

Commit

Permalink
Prioritize sequenced over unsequenced when there are duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Nov 3, 2022
1 parent 7b78574 commit c5b6c51
Showing 1 changed file with 72 additions and 18 deletions.
90 changes: 72 additions & 18 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, sync::Arc};
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tracing::{debug, error, info};

use crate::authority::AuthorityState;
use crate::authority::{AuthorityState, PendingDigest};
use crate::authority_client::AuthorityAPI;

use futures::{stream, StreamExt};
Expand Down Expand Up @@ -89,20 +89,31 @@ where
}
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;
type PendingVec = Vec<(u64, PendingDigest)>;

fn sort_and_partition_pending_certs(
mut pending_transactions: PendingVec,
) -> (
PendingVec, // sequenced
PendingVec, // unsequenced
Vec<u64>, // duplicated indices, to be deleted
) {
// sort sequenced digests before unsequenced so that the deduplication below favors
// sequenced digests.
pending_transactions.sort_by(|(idx_a, (is_seq_a, _)), (idx_b, (is_seq_b, _))| {
match is_seq_b.cmp(is_seq_a) {
// when both are sequenced or unsequenced, sort by idx.
std::cmp::Ordering::Equal => idx_a.cmp(idx_b),
// otherwise sort sequenced before unsequenced
res => res,
}
});

// Before executing de-duplicate the list of pending trasnactions
let mut seen = HashSet::new();
let mut indexes_to_delete = Vec::new();

let (mut pending_sequenced, pending_transactions): (Vec<_>, Vec<_>) = pending_transactions
let (pending_sequenced, pending_transactions): (Vec<_>, Vec<_>) = pending_transactions
.into_iter()
.filter(|(idx, (_, digest))| {
if seen.contains(digest) {
Expand All @@ -117,21 +128,64 @@ where

debug!(
num_sequenced = ?pending_sequenced.len(),
num_unsequenced = ?pending_transactions.len(),
"executing sequenced and unsequenced transactions"
num_unsequenced = ?pending_transactions.len()
);

(pending_sequenced, pending_transactions, indexes_to_delete)
}

#[test]
fn test_sort_and_partition_pending_certs() {
let tx1 = TransactionDigest::random();
let tx2 = TransactionDigest::random();
let tx3 = TransactionDigest::random();
let tx4 = TransactionDigest::random();

// partitioning works correctly.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx2))]),
(vec![(1, (true, tx2))], vec![(0, (false, tx1))], vec![],)
);

// if certs are duplicated, but some are sequenced, the sequenced certs take priority.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx1))]),
(vec![(1, (true, tx1))], vec![], vec![0],)
);

// sorting works correctly for both sequenced and unsequenced.
assert_eq!(
sort_and_partition_pending_certs(vec![
(2, (false, tx3)),
(0, (false, tx2)),
(4, (true, tx4)),
(1, (true, tx1))
]),
(
vec![(1, (true, tx1)), (4, (true, tx4))],
vec![(0, (false, tx2)), (2, (false, tx3))],
vec![],
)
);
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;

let (pending_sequenced, pending_transactions, indexes_to_delete) =
sort_and_partition_pending_certs(pending_transactions);

active_authority
.state
.database
.remove_pending_digests(indexes_to_delete)?;

// Sort by earlier shared cert sequences
pending_sequenced.sort_by(|a, b| match (a, b) {
((seq_a, (true, _)), (seq_b, (true, _))) => seq_a.cmp(seq_b),
_ => unreachable!(),
});

// Send them for execution
let epoch = active_authority.state.committee.load().epoch;
let sync_handle = active_authority.clone().node_sync_handle();
Expand Down

0 comments on commit c5b6c51

Please sign in to comment.