Skip to content

Commit

Permalink
cherry pick 12669 to 1.4 (#12725)
Browse files Browse the repository at this point in the history
## Description 

cherry pick #12669 to 1.4

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
longbowlu authored Jun 28, 2023
1 parent 2d51899 commit 7ffd0ba
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 141 deletions.
221 changes: 118 additions & 103 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use fastcrypto::hash::MultisetHash;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use move_binary_format::CompiledModule;
use move_bytecode_utils::module_cache::GetModule;
use move_core_types::language_storage::ModuleId;
use move_core_types::value::MoveStructLayout;
use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
Expand Down Expand Up @@ -229,6 +230,7 @@ pub struct AuthorityMetrics {
post_processing_total_events_emitted: IntCounter,
post_processing_total_tx_indexed: IntCounter,
post_processing_total_tx_had_event_processed: IntCounter,
post_processing_total_failures: IntCounter,

pending_notify_read: IntGauge,

Expand Down Expand Up @@ -505,6 +507,12 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
post_processing_total_failures: register_int_counter_with_registry!(
"post_processing_total_failures",
"Total number of failure in post processing",
registry,
)
.unwrap(),
pending_notify_read: register_int_gauge_with_registry!(
"pending_notify_read",
"Pending notify read requests",
Expand Down Expand Up @@ -1076,17 +1084,7 @@ impl AuthorityState {
.map(|(_, ((id, seq, _), obj, _))| InputKey(*id, (!obj.is_package()).then_some(*seq)))
.collect();

let events = inner_temporary_store.events.clone();

let loaded_child_objects = if self.is_fullnode(epoch_store) {
// We only care about this for full nodes
inner_temporary_store.loaded_child_objects.clone()
} else {
BTreeMap::new()
};

let tx_coins = self
.commit_certificate(inner_temporary_store, certificate, effects, epoch_store)
self.commit_certificate(inner_temporary_store, certificate, effects, epoch_store)
.await?;

// commit_certificate finished, the tx is fully committed to the store.
Expand All @@ -1102,19 +1100,6 @@ impl AuthorityState {
self.transaction_manager
.notify_commit(&digest, output_keys, epoch_store);

// index certificate
let _ = self
.post_process_one_tx(
certificate,
effects,
&events,
epoch_store,
tx_coins,
loaded_child_objects,
)
.await
.tap_err(|e| error!("tx post processing failed: {e}"));

// Update metrics.
self.metrics.total_effects.inc();
self.metrics.total_certs.inc();
Expand Down Expand Up @@ -1500,12 +1485,13 @@ impl AuthorityState {
effects: &TransactionEffects,
events: &TransactionEvents,
timestamp_ms: u64,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx_coins: Option<TxCoins>,
loaded_child_objects: BTreeMap<ObjectID, SequenceNumber>,
written: &WrittenObjects,
module_resolver: &impl GetModule,
loaded_child_objects: &BTreeMap<ObjectID, SequenceNumber>,
) -> SuiResult<u64> {
let changes = self
.process_object_index(effects, epoch_store)
.process_object_index(effects, written, module_resolver)
.tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;

indexes
Expand Down Expand Up @@ -1542,20 +1528,25 @@ impl AuthorityState {
fn process_object_index(
&self,
effects: &TransactionEffects,
epoch_store: &Arc<AuthorityPerEpochStore>,
written: &WrittenObjects,
module_resolver: &impl GetModule,
) -> SuiResult<ObjectIndexChanges> {
let modified_at_version = effects
.modified_at_versions()
.iter()
.cloned()
.collect::<HashMap<_, _>>();

let tx_digest = effects.transaction_digest();
let mut deleted_owners = vec![];
let mut deleted_dynamic_fields = vec![];
for (id, _, _) in effects.deleted().iter().chain(effects.wrapped()) {
let old_version = modified_at_version.get(id).unwrap();

match self.get_owner_at_version(id, *old_version)? {
// When we process the index, the latest object hasn't been written yet so
// the old object must be present.
match self.get_owner_at_version(id, *old_version).unwrap_or_else(
|e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
) {
Owner::AddressOwner(addr) => deleted_owners.push((addr, *id)),
Owner::ObjectOwner(object_id) => {
deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
Expand All @@ -1571,14 +1562,14 @@ impl AuthorityState {
let id = &oref.0;
// For mutated objects, retrieve old owner and delete old index if there is a owner change.
if let WriteKind::Mutate = kind {
let Some(old_version) = modified_at_version.get(id) else{
error!("Error processing object owner index for tx [{:?}], cannot find modified at version for mutated object [{id}].", effects.transaction_digest());
continue;
};
let Some(old_version) = modified_at_version.get(id) else {
panic!("tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].", tx_digest);
};
// When we process the index, the latest object hasn't been written yet so
// the old object must be present.
let Some(old_object) = self.database.get_object_by_key(id, *old_version)? else {
error!("Error processing object owner index for tx [{:?}], cannot find object [{id}] at version [{old_version}].", effects.transaction_digest());
continue;
};
panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}", tx_digest, id, old_version);
};
if &old_object.owner != owner {
match old_object.owner {
Owner::AddressOwner(addr) => {
Expand All @@ -1595,11 +1586,13 @@ impl AuthorityState {
match owner {
Owner::AddressOwner(addr) => {
// TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
let Some(o) = self.database.get_object_by_key(id, oref.1)? else{
continue;
};
let new_object = written.get(id).unwrap_or_else(
|| panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
);
assert_eq!(new_object.0.1, oref.1, "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}", tx_digest, id, new_object.0.1, oref.1);

let type_ = o
let type_ = new_object
.1
.type_()
.map(|type_| ObjectType::Struct(type_.clone()))
.unwrap_or(ObjectType::Package);
Expand All @@ -1617,10 +1610,13 @@ impl AuthorityState {
));
}
Owner::ObjectOwner(owner) => {
let Some(o) = self.database.get_object_by_key(&oref.0, oref.1)? else{
continue;
};
let Some(df_info) = self.try_create_dynamic_field_info(&o, epoch_store)? else{
let new_object = written.get(id).unwrap_or_else(
|| panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
);
assert_eq!(new_object.0.1, oref.1, "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}", tx_digest, id, new_object.0.1, oref.1);

let Some(df_info) = self.try_create_dynamic_field_info(&new_object.1, written, module_resolver)
.expect("try_create_dynamic_field_info should not fail.") else {
// Skip indexing for non dynamic field objects.
continue;
};
Expand All @@ -1641,7 +1637,8 @@ impl AuthorityState {
fn try_create_dynamic_field_info(
&self,
o: &Object,
epoch_store: &Arc<AuthorityPerEpochStore>,
written: &WrittenObjects,
resolver: &impl GetModule,
) -> SuiResult<Option<DynamicFieldInfo>> {
// Skip if not a move object
let Some(move_object) = o.data.try_as_move().cloned() else {
Expand All @@ -1651,10 +1648,8 @@ impl AuthorityState {
if !move_object.type_().is_dynamic_field() {
return Ok(None);
}
let move_struct = move_object.to_move_struct_with_resolver(
ObjectFormatOptions::default(),
epoch_store.module_cache().as_ref(),
)?;
let move_struct =
move_object.to_move_struct_with_resolver(ObjectFormatOptions::default(), resolver)?;

let (name_value, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;
Expand All @@ -1675,16 +1670,28 @@ impl AuthorityState {
Ok(Some(match type_ {
DynamicFieldType::DynamicObject => {
// Find the actual object from storage using the object id obtained from the wrapper.
let Some(object) = self.database.find_object_lt_or_eq_version(object_id, o.version()) else{
return Err(UserInputError::ObjectNotFound {
object_id,
version: Some(o.version()),
}.into())
};
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap();

// Try to find the object in the written objects first.
let (version, digest, object_type) =
if let Some((_, object, _)) = written.get(&object_id) {
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap().clone();
(version, digest, object_type)
} else {
// If not found, try to find it in the database.
let object = self
.database
.get_object_by_key(&object_id, o.version())?
.ok_or_else(|| UserInputError::ObjectNotFound {
object_id,
version: Some(o.version()),
})?;
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap().clone();
(version, digest, object_type)
};
DynamicFieldInfo {
name,
bcs_name,
Expand Down Expand Up @@ -1712,65 +1719,72 @@ impl AuthorityState {
&self,
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
events: &TransactionEvents,
inner_temporary_store: &InnerTemporaryStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx_coins: Option<TxCoins>,
loaded_child_objects: BTreeMap<ObjectID, SequenceNumber>,
) -> SuiResult {
if self.indexes.is_none() {
return Ok(());
}

let tx_digest = certificate.digest();
let timestamp_ms = Self::unixtime_now_ms();
let events = &inner_temporary_store.events;
let written = &inner_temporary_store.written;
let module_resolver =
TemporaryModuleResolver::new(inner_temporary_store, epoch_store.module_cache().clone());

let tx_coins =
self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);

// Index tx
if let Some(indexes) = &self.indexes {
let res = self
let _ = self
.index_tx(
indexes.as_ref(),
tx_digest,
certificate,
effects,
events,
timestamp_ms,
epoch_store,
tx_coins,
loaded_child_objects,
written,
&module_resolver,
&inner_temporary_store.loaded_child_objects,
)
.await
.tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
.tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"));
.tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
.expect("Indexing tx should not fail");

let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
// Emit events
if res.is_ok() {
self.subscription_handler
.process_tx(
certificate.data().transaction_data(),
&effects,
&SuiTransactionBlockEvents::try_from(
events.clone(),
*tx_digest,
Some(timestamp_ms),
epoch_store.module_cache(),
)?,
self.subscription_handler
.process_tx(
certificate.data().transaction_data(),
&effects,
&SuiTransactionBlockEvents::try_from(
events.clone(),
*tx_digest,
Some(timestamp_ms),
&module_resolver,
)?,
)
.await
.tap_ok(|_| {
self.metrics
.post_processing_total_tx_had_event_processed
.inc()
})
.tap_err(|e| {
warn!(
?tx_digest,
"Post processing - Couldn't process events for tx: {}", e
)
.await
.tap_ok(|_| {
self.metrics
.post_processing_total_tx_had_event_processed
.inc()
})
.tap_err(|e| {
warn!(
?tx_digest,
"Post processing - Couldn't process events for tx: {}", e
)
})?;
})?;

self.metrics
.post_processing_total_events_emitted
.inc_by(events.data.len() as u64);
}
self.metrics
.post_processing_total_events_emitted
.inc_by(events.data.len() as u64);
};
Ok(())
}
Expand Down Expand Up @@ -2049,7 +2063,7 @@ impl AuthorityState {
)),
Owner::ObjectOwner(object_id) => {
let id = o.id();
let Some(info) = self.try_create_dynamic_field_info(o, epoch_store)? else{
let Some(info) = self.try_create_dynamic_field_info(o, &BTreeMap::new(), epoch_store.module_cache())? else{
continue;
};
new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
Expand Down Expand Up @@ -3060,10 +3074,6 @@ impl AuthorityState {
Ok(checkpoints)
}

pub async fn get_timestamp_ms(&self, digest: &TransactionDigest) -> SuiResult<Option<u64>> {
self.get_indexes()?.get_timestamp_ms(digest)
}

pub fn query_events(
&self,
query: EventFilter,
Expand Down Expand Up @@ -3410,7 +3420,7 @@ impl AuthorityState {
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Option<TxCoins>> {
) -> SuiResult {
let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();

let tx_digest = certificate.digest();
Expand All @@ -3427,9 +3437,14 @@ impl AuthorityState {
None
};

// Returns coin objects for indexing for fullnode if indexing is enabled.
let tx_coins =
self.fullnode_only_get_tx_coins_for_indexing(&inner_temporary_store, epoch_store);
// index certificate
let _ = self
.post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
.await
.tap_err(|e| {
self.metrics.post_processing_total_failures.inc();
error!(?tx_digest, "tx post processing failed: {e}");
});

// The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
// we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
Expand Down Expand Up @@ -3458,7 +3473,7 @@ impl AuthorityState {
.pending_notify_read
.set(self.database.executed_effects_notify_read.num_pending() as i64);

Ok(tx_coins)
Ok(())
}

/// Get the TransactionEnvelope that currently locks the given object, if any.
Expand Down
Loading

0 comments on commit 7ffd0ba

Please sign in to comment.