Skip to content
Merged
12 changes: 6 additions & 6 deletions massa-execution-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ pub trait ExecutionController: Send + Sync {

/// Get the execution status of operation that have been executed both speculatively or finaly
///
/// # Return value
/// * `(speculative_statuses, final_statuses)`
/// * for each hashmap:
/// * key: the operation id
/// * value: true: operation executed successfully,
/// * false: operation failed
/// Return value
/// `(speculative_statuses, final_statuses)`
/// for each hashmap:
/// key: the operation id
/// value: true: operation executed successfully,
/// false: operation failed
fn get_op_exec_status(&self) -> (HashMap<OperationId, bool>, HashMap<OperationId, bool>);

/// Get a copy of a single datastore entry with its final and active values
Expand Down
2 changes: 2 additions & 0 deletions massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,13 @@ mod tests {
#[test]
#[serial]
fn test_operation_execution_status() {
let vesting = get_initials_vesting(false);
// setup the period duration and the maximum gas for asynchronous messages execution
let exec_cfg = ExecutionConfig {
t0: 100.into(),
max_async_gas: 100_000,
cursor_delay: 0.into(),
initial_vesting_path: vesting.path().to_path_buf(),
..ExecutionConfig::default()
};
// get a sample final state
Expand Down
99 changes: 59 additions & 40 deletions massa-pool-worker/src/tests/operation_pool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::tests::tools::OpGenerator;

use super::tools::{create_some_operations, operation_pool_test, pool_test};
use massa_execution_exports::test_exports::MockExecutionControllerMessage;
use massa_models::{amount::Amount, slot::Slot};
use massa_models::{amount::Amount, operation::OperationId, slot::Slot};
use massa_pool_exports::PoolConfig;
use std::time::Duration;

Expand All @@ -31,6 +31,8 @@ fn test_add_operation() {
let op_gen = OpGenerator::default().expirery(2);
storage.store_operations(create_some_operations(10, &op_gen));
operation_pool.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(100));
assert_eq!(operation_pool.storage.get_op_refs().len(), 10);
});
}
Expand All @@ -46,6 +48,8 @@ fn test_add_irrelevant_operation() {
storage.store_operations(create_some_operations(10, &op_gen));
operation_pool.notify_final_cs_periods(&vec![51; thread_count.into()]);
operation_pool.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(100));
assert_eq!(operation_pool.storage.get_op_refs().len(), 0);
});
}
Expand All @@ -60,16 +64,15 @@ fn test_pool() {
// generate (id, transactions, range of validity) by threads
let mut thread_tx_lists = vec![Vec::new(); pool_config.thread_count as usize];

let mut storage = storage_base.clone_without_refs();
for i in 0..18 {
let expire_period: u64 = 40 + i;
let expire_period: u64 = 10 + i;
let op = OpGenerator::default()
.expirery(expire_period)
.fee(Amount::from_raw(40 + i))
.generate(); //get_transaction(expire_period, fee);

let mut storage = storage_base.clone_without_refs();
storage.store_operations(vec![op.clone()]);
pool.add_operations(storage);

//TODO: compare
// assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());
Expand All @@ -90,8 +93,19 @@ fn test_pool() {

thread_tx_lists[op_thread as usize].push((op, start_period..=expire_period));
}

pool.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(200));

// sort from bigger fee to smaller and truncate
for lst in thread_tx_lists.iter_mut() {
lst.reverse();
lst.truncate(pool_config.max_operation_pool_size_per_thread);
}

std::thread::spawn(move || loop {
match execution_receiver.recv_timeout(Duration::from_millis(100)) {
match execution_receiver.recv_timeout(Duration::from_millis(2000)) {
// forward on the operations
Ok(MockExecutionControllerMessage::UnexecutedOpsAmong {
ops,
Expand All @@ -114,42 +128,39 @@ fn test_pool() {
}
});

// sort from bigger fee to smaller and truncate
for lst in thread_tx_lists.iter_mut() {
lst.reverse();
lst.truncate(pool_config.max_operation_pool_size_per_thread);
}

// checks ops are the expected ones for thread 0 and 1 and various periods
for thread in 0u8..pool_config.thread_count {
for period in 0u64..70 {
let target_slot = Slot::new(period, thread);
let max_count = 3;
let (ids, storage) = pool.get_block_operations(&target_slot);

assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
assert_eq!(
ids.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.collect::<Vec<(OperationId, Vec<u8>)>>(),
thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
.map(|(op, _)| (op.id, op.serialized_data.clone()))
.collect::<Vec<(OperationId, Vec<u8>)>>()
);
}
}

// op ending before or at period 45 won't appear in the block due to incompatible validity range
// we don't keep them as expected ops
let final_period = 45u64;
pool.notify_final_cs_periods(&vec![final_period; pool_config.thread_count as usize]);
// Wait for pool to manage the above command
std::thread::sleep(Duration::from_millis(200));
for lst in thread_tx_lists.iter_mut() {
lst.retain(|(op, _)| op.content.expire_period > final_period);
}
Expand All @@ -160,22 +171,25 @@ fn test_pool() {
let target_slot = Slot::new(period, thread);
let max_count = 4;
let (ids, storage) = pool.get_block_operations(&target_slot);
assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
assert_eq!(
ids.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.collect::<Vec<(OperationId, Vec<u8>)>>(),
thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
.map(|(op, _)| (op.id, op.serialized_data.clone()))
.collect::<Vec<(OperationId, Vec<u8>)>>()
);
}
}

Expand All @@ -191,12 +205,17 @@ fn test_pool() {
let mut storage = storage_base.clone_without_refs();
storage.store_operations(vec![op.clone()]);
pool.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(100));
//TODO: compare
//assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());
let op_thread = op
.content_creator_address
.get_thread(pool_config.thread_count);
let (ids, _) = pool.get_block_operations(&Slot::new(expire_period - 1, op_thread));
let (ids, _) = pool.get_block_operations(&Slot::new(
expire_period - pool_config.operation_validity_periods - 1,
op_thread,
));
assert!(ids.is_empty());
}
pool_manager.stop();
Expand Down
6 changes: 5 additions & 1 deletion massa-pool-worker/src/tests/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ fn test_simple_get_operations() {
storage.store_operations(create_some_operations(10, &op_gen));
let unexecuted_ops = storage.get_op_refs().clone();
pool_controller.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(100));

// Start mock execution thread.
// Provides the data for `pool_controller.get_block_operations`
Expand Down Expand Up @@ -92,7 +94,7 @@ pub fn launch_basic_get_block_operation_execution_mock(
creator_address: Address,
balance_vec: Vec<(Option<Amount>, Option<Amount>)>,
) {
let receive = |er: &Receiver<ControllerMsg>| er.recv_timeout(Duration::from_millis(10));
let receive = |er: &Receiver<ControllerMsg>| er.recv_timeout(Duration::from_millis(100));
std::thread::spawn(move || {
match receive(&recvr) {
Ok(ControllerMsg::UnexecutedOpsAmong { response_tx, .. }) => {
Expand Down Expand Up @@ -166,6 +168,8 @@ fn test_get_operations_overflow() {
storage.store_operations(operations);
let unexecuted_ops = storage.get_op_refs().clone();
pool_controller.add_operations(storage);
// Allow some time for the pool to add the operations
std::thread::sleep(Duration::from_millis(100));

// start mock execution thread
launch_basic_get_block_operation_execution_mock(
Expand Down
4 changes: 2 additions & 2 deletions massa-serialization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ impl Deserializer<bool> for BoolDeserializer {
context("Failed bool deserialization", |input: &'a [u8]| {
Ok((&buffer[1..], {
match buffer.first() {
Some(&b'1') => Ok(true),
Some(&b'0') => Ok(false),
Some(1) => Ok(true),
Some(0) => Ok(false),
_ => Err(nom::Err::Error(ParseError::from_error_kind(
input,
nom::error::ErrorKind::Fail,
Expand Down