diff --git a/Cargo.lock b/Cargo.lock index c5e5e392ef..10ea9c53a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3100,6 +3100,7 @@ name = "massa_execution_worker" version = "2.5.0" dependencies = [ "anyhow", + "assert_matches", "blake3", "bs58", "cfg-if", diff --git a/massa-execution-worker/Cargo.toml b/massa-execution-worker/Cargo.toml index 92f8403686..3bb8684359 100644 --- a/massa-execution-worker/Cargo.toml +++ b/massa-execution-worker/Cargo.toml @@ -109,3 +109,5 @@ massa_test_framework = { workspace = true, "features" = ["test-exports"] } tokio = { workspace = true, features = ["sync"] } hex-literal = { workspace = true } mockall = { workspace = true } +assert_matches = { workspace = true } + diff --git a/massa-execution-worker/src/active_history.rs b/massa-execution-worker/src/active_history.rs index c632a2405b..194c99b08e 100644 --- a/massa-execution-worker/src/active_history.rs +++ b/massa-execution-worker/src/active_history.rs @@ -83,7 +83,9 @@ impl ActiveHistory { return HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)); } Some(SetUpdateOrDelete::Update(msg_update)) => { - current_updates.apply(msg_update.clone()); + let mut combined_message_update = msg_update.clone(); + combined_message_update.apply(current_updates); + current_updates = combined_message_update; } Some(SetUpdateOrDelete::Delete) => return HistorySearchResult::Absent, _ => (), @@ -348,3 +350,208 @@ impl ActiveHistory { .collect() } } + +#[cfg(test)] +mod test { + // std + use std::cmp::Reverse; + use std::collections::BTreeMap; + use std::fmt::Formatter; + use std::str::FromStr; + // third-party + use assert_matches::assert_matches; + use num::rational::Ratio; + // internal + use super::*; + use massa_async_pool::AsyncPoolChanges; + use massa_final_state::StateChanges; + + impl std::fmt::Debug for HistorySearchResult { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + HistorySearchResult::Present(obj) => write!(f, "{:?}", obj), + HistorySearchResult::Absent => write!(f, "Absent"), + HistorySearchResult::NoInfo => write!(f, "NoInfo"), + } + } + } + + #[test] + fn test_fetch_message() { + // Unit testing ActiveHistory.fetch_msg + + // Setup some objects + + let addr_sender = + Address::from_str("AU12fZLkHnLED3okr8Lduyty7dz9ZKkd24xMCc2JJWPcdmfn2eUEx").unwrap(); + let addr_dest = + Address::from_str("AS12fZLkHnLED3okr8Lduyty7dz9ZKkd24xMCc2JJWPcdmfn2eUEx").unwrap(); + + // Setup ActiveHistory + + let rev: Reverse> = Default::default(); + let message_id: AsyncMessageId = (rev, Slot::new(1, 0), 1u64); + let emission_slot_2 = Slot::new(1, 0); + let emission_index_2 = 2; + let message_id_2: AsyncMessageId = (rev, emission_slot_2, emission_index_2); + let msg_2 = AsyncMessage::new( + emission_slot_2, + emission_index_2, + addr_sender, + addr_dest, + "send_fee_to".to_string(), // SC function name + 0, + Default::default(), + Default::default(), + Slot::new(3, 0), + Slot::new(4, 0), + vec![], + None, + None, + ); + + let emission_slot_3 = Slot::new(2, 0); + let emission_index_3 = 3; + let message_id_3: AsyncMessageId = (rev, emission_slot_3, emission_index_3); + let msg_3_function_new_1 = "send_max_fee_to".to_string(); + let msg_update_3_1 = AsyncMessageUpdate { + function: SetOrKeep::Set(msg_3_function_new_1.clone()), + ..Default::default() + }; + + let msg_3_function_new_2 = "send_max_fee_to_v2".to_string(); + let msg_3_coins_new = Amount::from_raw(1000); + let msg_update_3_2 = AsyncMessageUpdate { + coins: SetOrKeep::Set(msg_3_coins_new), + function: SetOrKeep::Set(msg_3_function_new_2.clone()), + ..Default::default() + }; + + let async_pool_changes_1 = AsyncPoolChanges(BTreeMap::from([ + (message_id, SetUpdateOrDelete::Delete), + (message_id_2, SetUpdateOrDelete::Set(msg_2.clone())), + (message_id_3, SetUpdateOrDelete::Update(msg_update_3_1)), + ])); + let async_pool_changes_2 = AsyncPoolChanges(BTreeMap::from([( + message_id_3, + SetUpdateOrDelete::Update(msg_update_3_2.clone()), + )])); + + let state_changes_1 = StateChanges { + ledger_changes: Default::default(), + async_pool_changes: async_pool_changes_1, + pos_changes: Default::default(), + executed_ops_changes: Default::default(), + executed_denunciations_changes: Default::default(), + execution_trail_hash_change: Default::default(), + }; + let state_changes_2 = StateChanges { + ledger_changes: Default::default(), + async_pool_changes: async_pool_changes_2, + pos_changes: Default::default(), + executed_ops_changes: Default::default(), + executed_denunciations_changes: Default::default(), + execution_trail_hash_change: Default::default(), + }; + + let exec_output_1 = ExecutionOutput { + slot: Slot::new(1, 0), + block_info: None, + state_changes: state_changes_1.clone(), + events: Default::default(), + #[cfg(feature = "execution-trace")] + slot_trace: None, + #[cfg(feature = "dump-block")] + storage: None, + deferred_credits_execution: vec![], + cancel_async_message_execution: vec![], + auto_sell_execution: vec![], + }; + let exec_output_2 = ExecutionOutput { + slot: Slot::new(1, 1), + block_info: None, + state_changes: state_changes_2.clone(), + events: Default::default(), + #[cfg(feature = "execution-trace")] + slot_trace: None, + #[cfg(feature = "dump-block")] + storage: None, + deferred_credits_execution: vec![], + cancel_async_message_execution: vec![], + auto_sell_execution: vec![], + }; + + let active_history = ActiveHistory(VecDeque::from([exec_output_1, exec_output_2])); + + // Test fetch_message with message_id (expect HistorySearchResult::Absent) + { + let current_updates = AsyncMessageUpdate::default(); + let fetched = active_history.fetch_message(&message_id, current_updates); + assert_matches!(fetched, HistorySearchResult::Absent); + } + + // Test fetch_message with message_id_2 (expect HistorySearchResult::Set) + { + let current_updates = AsyncMessageUpdate::default(); + let fetched = active_history.fetch_message(&message_id_2, current_updates); + + if let HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)) = fetched { + assert_eq!(msg, msg_2); + } else { + panic!( + "Expected a HistorySearchRestul::Set(...) and not: {:?}", + fetched + ) + } + } + + { + // Test fetch_message with message_id_2 (expect HistorySearchResult::Set) + current_updates + // (which modifies validity_end) + + let validity_end_new = Slot::new(5, 0); + let current_updates = AsyncMessageUpdate { + emission_slot: Default::default(), + emission_index: Default::default(), + sender: Default::default(), + destination: Default::default(), + function: Default::default(), + max_gas: Default::default(), + fee: Default::default(), + coins: Default::default(), + validity_start: Default::default(), + validity_end: SetOrKeep::Set(validity_end_new), + function_params: Default::default(), + trigger: Default::default(), + can_be_executed: Default::default(), + }; + let fetched = active_history.fetch_message(&message_id_2, current_updates); + + if let HistorySearchResult::Present(SetUpdateOrDelete::Set(msg)) = fetched { + assert_ne!(msg, msg_2); + assert_eq!(msg.validity_end, Slot::new(5, 0)); + } else { + panic!( + "Expected a HistorySearchRestul::Set(...) and not: {:?}", + fetched + ) + } + } + + // Test fetch_message with message_id_3 (expect HistorySearchResult::Present) + { + let current_updates = AsyncMessageUpdate::default(); + let fetched = active_history.fetch_message(&message_id_3, current_updates); + + if let HistorySearchResult::Present(SetUpdateOrDelete::Update(msg_up)) = fetched { + // Note that there should be to async message update & we ensure that the latest + // updates are applied (and overwrite the earliest updates) + assert_eq!(msg_up.coins, SetOrKeep::Set(msg_3_coins_new)); + // function should == "send_max_fee_to_v2" (latest value) and not "send_max_fee_to" + assert_eq!(msg_up.function, SetOrKeep::Set(msg_3_function_new_2)); + } else { + panic!("Expected a HistorySearchRestul::Set(SetUpdateOrDelete::Update(...)) and not: {:?}", fetched) + } + } + } +} diff --git a/massa-execution-worker/src/speculative_async_pool.rs b/massa-execution-worker/src/speculative_async_pool.rs index 082ed2fb36..e84a68b19e 100644 --- a/massa-execution-worker/src/speculative_async_pool.rs +++ b/massa-execution-worker/src/speculative_async_pool.rs @@ -287,7 +287,7 @@ impl SpeculativeAsyncPool { } Present(SetUpdateOrDelete::Update(msg_update)) => { current_changes.entry(message_id).and_modify(|e| { - e.apply(msg_update.clone()); + *e = msg_update.clone(); }); return true; }