Skip to content

Commit

Permalink
Notify batch notifier for actual sequence number because it could be …
Browse files Browse the repository at this point in the history
…blocking it
  • Loading branch information
sadhansood committed Nov 4, 2022
1 parent c2223cf commit 28887bc
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 20 deletions.
53 changes: 48 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ impl AuthorityState {
// If commit_certificate returns an error, tx_guard will be dropped and the certificate
// will be persisted in the log for later recovery.
let notifier_ticket = self.batch_notifier.ticket(bypass_validator_halt)?;
let seq = notifier_ticket.seq();
let ticket_seq = notifier_ticket.seq();
let res = self
.commit_certificate(
inner_temporary_store,
Expand All @@ -839,7 +839,6 @@ impl AuthorityState {
notifier_ticket,
)
.await;

let seq = match res {
Err(err) => {
if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) {
Expand All @@ -849,12 +848,56 @@ impl AuthorityState {
);
tx_guard.release();
} else {
error!(?digest, "commit_certificate failed: {}", err);
error!(?digest, seq=?ticket_seq, "commit_certificate failed: {}", err);
}
debug!(
seq=?ticket_seq,
"Ticket not notified due to commit failure",
);
// Check if we were able to sequence the tx at all
match self.db().get_tx_sequence(*certificate.digest()).await {
Err(db_err) => {
// TODO: Add retries on failing to read from db because
// this still stalls the batch maker
error!(
?digest,
seq=?ticket_seq,
"validator failed to read if db has locked the tx sequence: {}", db_err
);
}
Ok(None) => {
debug!(?digest, seq=?ticket_seq, "Closing the notifier ticket because we couldn't lock the tx sequence");
self.batch_notifier.notify(ticket_seq);
}
Ok(Some(tx_seq)) => {
if tx_seq < ticket_seq {
debug!(
?digest,
?tx_seq,
?ticket_seq,
"Notifying during retry failure, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
// Notify if we failed during a retry after sequencing
self.batch_notifier.notify(ticket_seq);
};
}
}
debug!("Failed to notify ticket with sequence number: {}", seq);
return Err(err);
}
Ok(seq) => seq,
Ok(seq) => {
if seq < ticket_seq {
debug!(
?digest,
?seq,
?ticket_seq,
"Notifying during retry, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
self.batch_notifier.notify(seq);
};
seq
}
};

// commit_certificate finished, the tx is fully committed to the store.
Expand Down
33 changes: 18 additions & 15 deletions crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ impl TransactionNotifier {
self.low_watermark.load(Ordering::SeqCst)
}

pub fn notify(&self, seq: u64) {
let mut inner = self.inner.lock();
inner.live_tickets.remove(&seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.notify.notify_one();
}

pub fn pause(&self) {
self.is_paused.store(true, Ordering::SeqCst);
}
Expand Down Expand Up @@ -261,21 +278,7 @@ impl TransactionNotifierTicket {
self.seq
}
pub fn notify(self) {
let mut inner = self.transaction_notifier.inner.lock();
inner.live_tickets.remove(&self.seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.transaction_notifier
.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.transaction_notifier.notify.notify_one();
self.transaction_notifier.notify(self.seq);
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}
}

pub async fn get_tx_sequence(
&self,
tx: TransactionDigest,
) -> SuiResult<Option<TxSequenceNumber>> {
self.lock_service.get_tx_sequence(tx).await
}

/// Get the transaction envelope that currently locks the given object,
/// or returns Err(TransactionLockDoesNotExist) if the lock does not exist.
pub async fn get_object_locking_transaction(
Expand Down
31 changes: 31 additions & 0 deletions crates/sui-storage/src/lock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ enum LockServiceQueries {
objects: Vec<ObjectRef>,
resp: oneshot::Sender<SuiResult>,
},
GetTxSequence {
tx: TransactionDigest,
resp: oneshot::Sender<Result<Option<TxSequenceNumber>, SuiError>>,
},
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -421,6 +425,11 @@ impl LockServiceImpl {
warn!("Could not respond to sender, sender dropped!");
}
}
LockServiceQueries::GetTxSequence { tx, resp } => {
if let Err(_e) = resp.send(self.get_tx_sequence(tx)) {
warn!("Could not respond to sender, sender dropped!");
}
}
}
}
info!("LockService queries loop stopped, the sender on other end hung up/dropped");
Expand Down Expand Up @@ -586,6 +595,28 @@ impl LockService {
.await
}

pub async fn get_tx_sequence(
&self,
tx: TransactionDigest,
) -> SuiResult<Option<TxSequenceNumber>> {
block_on_future_in_sim(async move {
let (os_sender, os_receiver) =
oneshot::channel::<Result<Option<TxSequenceNumber>, SuiError>>();
self.inner
.query_sender()
.send(LockServiceQueries::GetTxSequence {
tx,
resp: os_sender,
})
.await
.expect("Could not send message to inner LockService");
os_receiver
.await
.expect("Response from lockservice was cancelled, should not happen!")
})
.await
}

pub async fn create_locks_for_genesis_objects(&self, objects: Vec<ObjectRef>) -> SuiResult {
block_on_future_in_sim(async move {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
Expand Down

0 comments on commit 28887bc

Please sign in to comment.