Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: MystenLabs/sui
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: devnet-0.14.0
Choose a base ref
...
head repository: MystenLabs/sui
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: devnet-0.14.1
Choose a head ref
  • 8 commits
  • 21 files changed
  • 4 contributors

Commits on Nov 4, 2022

  1. Copy the full SHA
    1587d5d View commit details
  2. Execute sequenced certs serially in execution driver (#5788)

    * Execute sequenced certs serially in execution driver
    
    * Just use a bool to indicate whether sequenced
    
    * PR comments
    
    * Prioritize sequenced over unsequenced when there are duplicates
    mystenmark authored and ebmifa committed Nov 4, 2022
    Copy the full SHA
    abfdc59 View commit details
  3. cargo release build

    ebmifa committed Nov 4, 2022
    Copy the full SHA
    402e090 View commit details
  4. Sui v0.15.0 version bump (#5765)

    ebmifa authored and lanvidr committed Nov 4, 2022
    Copy the full SHA
    b556120 View commit details
  5. Copy the full SHA
    4460d9e View commit details
  6. add test

    lanvidr committed Nov 4, 2022
    Copy the full SHA
    63a7fdd View commit details
  7. review updates

    lanvidr committed Nov 4, 2022
    Copy the full SHA
    4aba351 View commit details
  8. Copy the full SHA
    0bc7d19 View commit details
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ members = [

[workspace.package]
# This version string will be inherited by sui-core, sui-faucet, sui-node, sui-tools, sui-sdk, and sui crates
version = "0.14.0"
version = "0.14.1"

[profile.release]
# debug = 1 means line charts only, which is minimum needed for good stack traces
81 changes: 69 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ use tracing::{debug, error, instrument, warn};
use typed_store::Map;

pub use authority_store::{
AuthorityStore, GatewayStore, ResolverWrapper, SuiDataStore, UpdateType,
AuthorityStore, GatewayStore, PendingDigest, ResolverWrapper, SuiDataStore, UpdateType,
};
use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
@@ -825,7 +825,7 @@ impl AuthorityState {
// If commit_certificate returns an error, tx_guard will be dropped and the certificate
// will be persisted in the log for later recovery.
let notifier_ticket = self.batch_notifier.ticket(bypass_validator_halt)?;
let seq = notifier_ticket.seq();
let ticket_seq = notifier_ticket.seq();
let res = self
.commit_certificate(
inner_temporary_store,
@@ -834,7 +834,6 @@ impl AuthorityState {
notifier_ticket,
)
.await;

let seq = match res {
Err(err) => {
if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) {
@@ -844,12 +843,56 @@ impl AuthorityState {
);
tx_guard.release();
} else {
error!(?digest, "commit_certificate failed: {}", err);
error!(?digest, seq=?ticket_seq, "commit_certificate failed: {}", err);
}
debug!(
seq=?ticket_seq,
"Ticket not notified due to commit failure",
);
// Check if we were able to sequence the tx at all
match self.db().get_tx_sequence(*certificate.digest()).await {
Err(db_err) => {
// TODO: Add retries on failing to read from db because
// this still stalls the batch maker
error!(
?digest,
seq=?ticket_seq,
"validator failed to read if db has locked the tx sequence: {}", db_err
);
}
Ok(None) => {
debug!(?digest, seq=?ticket_seq, "Closing the notifier ticket because we couldn't lock the tx sequence");
self.batch_notifier.notify(ticket_seq);
}
Ok(Some(tx_seq)) => {
if tx_seq < ticket_seq {
debug!(
?digest,
?tx_seq,
?ticket_seq,
"Notifying during retry failure, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
// Notify if we failed during a retry after sequencing
self.batch_notifier.notify(ticket_seq);
};
}
}
debug!("Failed to notify ticket with sequence number: {}", seq);
return Err(err);
}
Ok(seq) => seq,
Ok(seq) => {
if seq < ticket_seq {
debug!(
?digest,
?seq,
?ticket_seq,
"Notifying during retry, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
self.batch_notifier.notify(seq);
};
seq
}
};

// commit_certificate finished, the tx is fully committed to the store.
@@ -1614,6 +1657,10 @@ impl AuthorityState {
.await
}

pub fn add_pending_sequenced_certificate(&self, cert: VerifiedCertificate) -> SuiResult {
self.add_pending_impl(vec![(*cert.digest(), Some(cert))], true)
}

/// Add a number of certificates to the pending transactions as well as the
/// certificates structure if they are not already executed.
/// Certificates are optional, and if not provided, they will be eventually
@@ -1622,11 +1669,24 @@ impl AuthorityState {
&self,
certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
) -> SuiResult<()> {
self.add_pending_impl(certs, false)
}

fn add_pending_impl(
&self,
certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
is_sequenced: bool,
) -> SuiResult {
self.node_sync_store
.batch_store_certs(certs.iter().filter_map(|(_, cert_opt)| cert_opt.clone()))?;

self.database
.add_pending_digests(certs.iter().map(|(digest, _)| *digest).collect())
self.database.add_pending_digests(
certs
.iter()
.map(|(seq_and_digest, _)| *seq_and_digest)
.collect(),
is_sequenced,
)
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
@@ -2303,10 +2363,7 @@ impl AuthorityState {
);

// Schedule the certificate for execution
self.add_pending_certificates(vec![(
*certificate.digest(),
Some(certificate.clone()),
)])?;
self.add_pending_sequenced_certificate(certificate.clone())?;

self.database
.lock_shared_objects(&certificate, consensus_index)
33 changes: 18 additions & 15 deletions crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
@@ -85,6 +85,23 @@ impl TransactionNotifier {
self.low_watermark.load(Ordering::SeqCst)
}

pub fn notify(&self, seq: u64) {
let mut inner = self.inner.lock();
inner.live_tickets.remove(&seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.notify.notify_one();
}

pub fn pause(&self) {
self.is_paused.store(true, Ordering::SeqCst);
}
@@ -261,21 +278,7 @@ impl TransactionNotifierTicket {
self.seq
}
pub fn notify(self) {
let mut inner = self.transaction_notifier.inner.lock();
inner.live_tickets.remove(&self.seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.transaction_notifier
.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.transaction_notifier.notify.notify_one();
self.transaction_notifier.notify(self.seq);
}
}

20 changes: 15 additions & 5 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ pub type AuthorityStore = SuiDataStore<AuthoritySignInfo>;
pub type GatewayStore = SuiDataStore<EmptySignInfo>;

pub type InternalSequenceNumber = u64;
pub type PendingDigest = (bool /* is sequenced */, TransactionDigest);

pub struct CertLockGuard(LockGuard);

@@ -223,7 +224,11 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
/// index. If two instanced run concurrently, the indexes are guaranteed to not overlap
/// although some certificates may be included twice in the `pending_execution`, and
/// the same certificate may be written twice (but that is OK since it is valid.)
pub fn add_pending_digests(&self, digests: Vec<TransactionDigest>) -> SuiResult<()> {
pub fn add_pending_digests(
&self,
digests: Vec<TransactionDigest>,
is_sequenced: bool,
) -> SuiResult<()> {
let first_index = self
.next_pending_seq
.fetch_add(digests.len() as u64, Ordering::Relaxed);
@@ -234,7 +239,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
digests
.iter()
.enumerate()
.map(|(num, digest)| ((num as u64) + first_index, digest)),
.map(|(num, digest)| ((num as u64) + first_index, (is_sequenced, *digest))),
)?;
batch.write()?;

@@ -245,9 +250,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// Get all stored certificate digests
pub fn get_pending_digests(
&self,
) -> SuiResult<Vec<(InternalSequenceNumber, TransactionDigest)>> {
pub fn get_pending_digests(&self) -> SuiResult<Vec<(InternalSequenceNumber, PendingDigest)>> {
Ok(self.epoch_tables().pending_execution.iter().collect())
}

@@ -390,6 +393,13 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}
}

pub async fn get_tx_sequence(
&self,
tx: TransactionDigest,
) -> SuiResult<Option<TxSequenceNumber>> {
self.lock_service.get_tx_sequence(tx).await
}

/// Get the transaction envelope that currently locks the given object,
/// or returns Err(TransactionLockDoesNotExist) if the lock does not exist.
pub async fn get_object_locking_transaction(
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
authority_store::{InternalSequenceNumber, ObjectKey},
authority_store::{InternalSequenceNumber, ObjectKey, PendingDigest},
*,
};
use narwhal_executor::ExecutionIndices;
@@ -32,7 +32,7 @@ pub struct AuthorityEpochTables<S> {
/// reads this table and executes the certificates. The order is a hint as to their
/// causal dependencies. Note that there is no guarantee digests are unique. Once executed, and
/// effects are written the entry should be deleted.
pub(crate) pending_execution: DBMap<InternalSequenceNumber, TransactionDigest>,
pub(crate) pending_execution: DBMap<InternalSequenceNumber, PendingDigest>,

/// Hold the lock for shared objects. These locks are written by a single task: upon receiving a valid
/// certified transaction from consensus, the authority assigns a lock to each shared objects of the
Loading