Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify batch notifier for actual sequence number because it could be blocking it #5868

Merged
merged 1 commit into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ impl AuthorityState {
// If commit_certificate returns an error, tx_guard will be dropped and the certificate
// will be persisted in the log for later recovery.
let notifier_ticket = self.batch_notifier.ticket(bypass_validator_halt)?;
let seq = notifier_ticket.seq();
let ticket_seq = notifier_ticket.seq();
let res = self
.commit_certificate(
inner_temporary_store,
Expand All @@ -839,7 +839,6 @@ impl AuthorityState {
notifier_ticket,
)
.await;

let seq = match res {
Err(err) => {
if matches!(err, SuiError::ValidatorHaltedAtEpochEnd) {
Expand All @@ -849,12 +848,56 @@ impl AuthorityState {
);
tx_guard.release();
} else {
error!(?digest, "commit_certificate failed: {}", err);
error!(?digest, seq=?ticket_seq, "commit_certificate failed: {}", err);
}
debug!(
seq=?ticket_seq,
"Ticket not notified due to commit failure",
);
// Check if we were able to sequence the tx at all
match self.db().get_tx_sequence(*certificate.digest()).await {
Err(db_err) => {
// TODO: Add retries on failing to read from db because
// this still stalls the batch maker
error!(
?digest,
seq=?ticket_seq,
"validator failed to read if db has locked the tx sequence: {}", db_err
);
}
Ok(None) => {
debug!(?digest, seq=?ticket_seq, "Closing the notifier ticket because we couldn't lock the tx sequence");
self.batch_notifier.notify(ticket_seq);
}
Ok(Some(tx_seq)) => {
if tx_seq < ticket_seq {
debug!(
?digest,
?tx_seq,
?ticket_seq,
"Notifying during retry failure, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
// Notify if we failed during a retry after sequencing
self.batch_notifier.notify(ticket_seq);
};
}
}
debug!("Failed to notify ticket with sequence number: {}", seq);
return Err(err);
}
Ok(seq) => seq,
Ok(seq) => {
if seq < ticket_seq {
debug!(
?digest,
?seq,
?ticket_seq,
"Notifying during retry, current low watermark {:?}",
self.batch_notifier.low_watermark()
);
self.batch_notifier.notify(seq);
};
seq
}
};

// commit_certificate finished, the tx is fully committed to the store.
Expand Down
33 changes: 18 additions & 15 deletions crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ impl TransactionNotifier {
self.low_watermark.load(Ordering::SeqCst)
}

pub fn notify(&self, seq: u64) {
let mut inner = self.inner.lock();
inner.live_tickets.remove(&seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.notify.notify_one();
}

pub fn pause(&self) {
self.is_paused.store(true, Ordering::SeqCst);
}
Expand Down Expand Up @@ -261,21 +278,7 @@ impl TransactionNotifierTicket {
self.seq
}
pub fn notify(self) {
let mut inner = self.transaction_notifier.inner.lock();
inner.live_tickets.remove(&self.seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.transaction_notifier
.low_watermark
.store(new_low_watermark, Ordering::SeqCst);
self.transaction_notifier.notify.notify_one();
self.transaction_notifier.notify(self.seq);
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}
}

pub async fn get_tx_sequence(
&self,
tx: TransactionDigest,
) -> SuiResult<Option<TxSequenceNumber>> {
self.lock_service.get_tx_sequence(tx).await
}

/// Get the transaction envelope that currently locks the given object,
/// or returns Err(TransactionLockDoesNotExist) if the lock does not exist.
pub async fn get_object_locking_transaction(
Expand Down
31 changes: 31 additions & 0 deletions crates/sui-storage/src/lock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ enum LockServiceQueries {
objects: Vec<ObjectRef>,
resp: oneshot::Sender<SuiResult>,
},
GetTxSequence {
tx: TransactionDigest,
resp: oneshot::Sender<Result<Option<TxSequenceNumber>, SuiError>>,
},
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -421,6 +425,11 @@ impl LockServiceImpl {
warn!("Could not respond to sender, sender dropped!");
}
}
LockServiceQueries::GetTxSequence { tx, resp } => {
if let Err(_e) = resp.send(self.get_tx_sequence(tx)) {
warn!("Could not respond to sender, sender dropped!");
}
}
}
}
info!("LockService queries loop stopped, the sender on other end hung up/dropped");
Expand Down Expand Up @@ -586,6 +595,28 @@ impl LockService {
.await
}

pub async fn get_tx_sequence(
&self,
tx: TransactionDigest,
) -> SuiResult<Option<TxSequenceNumber>> {
block_on_future_in_sim(async move {
let (os_sender, os_receiver) =
oneshot::channel::<Result<Option<TxSequenceNumber>, SuiError>>();
self.inner
.query_sender()
.send(LockServiceQueries::GetTxSequence {
tx,
resp: os_sender,
})
.await
.expect("Could not send message to inner LockService");
os_receiver
.await
.expect("Response from lockservice was cancelled, should not happen!")
})
.await
}

pub async fn create_locks_for_genesis_objects(&self, objects: Vec<ObjectRef>) -> SuiResult {
block_on_future_in_sim(async move {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
Expand Down