Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/src/cost_update_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod tests {
{
let accumulated_us: u64 = 1000;
let accumulated_units: u64 = 100;
let total_errored_units = 0;
let count: u32 = 10;
expected_cost = accumulated_units / count as u64;

Expand All @@ -261,6 +262,7 @@ mod tests {
accumulated_units,
count,
errored_txs_compute_consumed: vec![],
total_errored_units,
},
);
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
Expand Down Expand Up @@ -297,6 +299,7 @@ mod tests {
accumulated_units,
count,
errored_txs_compute_consumed: vec![],
total_errored_units: 0,
},
);
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
Expand Down Expand Up @@ -335,6 +338,7 @@ mod tests {
accumulated_units: 0,
count: 0,
errored_txs_compute_consumed: vec![],
total_errored_units: 0,
},
);
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
Expand All @@ -352,13 +356,16 @@ mod tests {
// new erroring compute costs
let cost_per_error = 1000;
{
let errored_txs_compute_consumed = vec![cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum();
execute_timings.details.per_program_timings.insert(
program_key_1,
ProgramTiming {
accumulated_us: 1000,
accumulated_units: 0,
count: 0,
errored_txs_compute_consumed: vec![cost_per_error; 3],
errored_txs_compute_consumed,
total_errored_units,
},
);
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
Expand All @@ -385,13 +392,16 @@ mod tests {
// The cost should not decrease for these new lesser errors
let smaller_cost_per_error = cost_per_error - 10;
{
let errored_txs_compute_consumed = vec![smaller_cost_per_error; 3];
let total_errored_units = errored_txs_compute_consumed.iter().sum();
execute_timings.details.per_program_timings.insert(
program_key_1,
ProgramTiming {
accumulated_us: 1000,
accumulated_units: 0,
count: 0,
errored_txs_compute_consumed: vec![smaller_cost_per_error; 3],
errored_txs_compute_consumed,
total_errored_units,
},
);
CostUpdateService::update_cost_model(&cost_model, &mut execute_timings);
Expand Down
25 changes: 18 additions & 7 deletions core/src/progress_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,19 @@ impl ReplaySlotStats {
.iter()
.collect();
per_pubkey_timings.sort_by(|a, b| b.1.accumulated_us.cmp(&a.1.accumulated_us));
let (total_us, total_units, total_count) =
per_pubkey_timings
.iter()
.fold((0, 0, 0), |(sum_us, sum_units, sum_count), a| {
let (total_us, total_units, total_count, total_errored_units, total_errored_count) =
per_pubkey_timings.iter().fold(
(0, 0, 0, 0, 0),
|(sum_us, sum_units, sum_count, sum_errored_units, sum_errored_count), a| {
(
sum_us + a.1.accumulated_us,
sum_units + a.1.accumulated_units,
sum_count + a.1.count,
sum_errored_units + a.1.total_errored_units,
sum_errored_count + a.1.errored_txs_compute_consumed.len(),
)
});
},
);

for (pubkey, time) in per_pubkey_timings.iter().take(5) {
datapoint_info!(
Expand All @@ -147,7 +150,13 @@ impl ReplaySlotStats {
("pubkey", pubkey.to_string(), String),
("execute_us", time.accumulated_us, i64),
("accumulated_units", time.accumulated_units, i64),
("count", time.count, i64)
("errored_units", time.total_errored_units, i64),
("count", time.count, i64),
(
"errored_count",
time.errored_txs_compute_consumed.len(),
i64
),
);
}
datapoint_info!(
Expand All @@ -156,7 +165,9 @@ impl ReplaySlotStats {
("pubkey", "all", String),
("execute_us", total_us, i64),
("accumulated_units", total_units, i64),
("count", total_count, i64)
("count", total_count, i64),
("errored_units", total_errored_units, i64),
("count", total_errored_count, i64)
);
}
}
Expand Down
171 changes: 144 additions & 27 deletions runtime/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ impl Executors {
}
}

#[derive(Default, Debug)]
#[derive(Default, Debug, PartialEq)]
pub struct ProgramTiming {
pub accumulated_us: u64,
pub accumulated_units: u64,
pub count: u32,
pub errored_txs_compute_consumed: Vec<u64>,
// Sum of all units in `errored_txs_compute_consumed`
pub total_errored_units: u64,
}

impl ProgramTiming {
Expand All @@ -78,9 +80,23 @@ impl ProgramTiming {
self.count = self.count.saturating_add(1);
}
}

pub fn accumulate_program_timings(&mut self, other: &ProgramTiming) {
self.accumulated_us = self.accumulated_us.saturating_add(other.accumulated_us);
self.accumulated_units = self
.accumulated_units
.saturating_add(other.accumulated_units);
self.count = self.count.saturating_add(other.count);
// Clones the entire vector, maybe not great...
self.errored_txs_compute_consumed
.extend(other.errored_txs_compute_consumed.clone());
self.total_errored_units = self
.total_errored_units
.saturating_add(other.total_errored_units);
}
}

#[derive(Default, Debug)]
#[derive(Default, Debug, PartialEq)]
pub struct ExecuteDetailsTimings {
pub serialize_us: u64,
pub create_vm_us: u64,
Expand All @@ -92,24 +108,51 @@ pub struct ExecuteDetailsTimings {
pub data_size_changed: usize,
pub per_program_timings: HashMap<Pubkey, ProgramTiming>,
}

impl ExecuteDetailsTimings {
pub fn accumulate(&mut self, other: &ExecuteDetailsTimings) {
self.serialize_us += other.serialize_us;
self.create_vm_us += other.create_vm_us;
self.execute_us += other.execute_us;
self.deserialize_us += other.deserialize_us;
self.changed_account_count += other.changed_account_count;
self.total_account_count += other.total_account_count;
self.total_data_size += other.total_data_size;
self.data_size_changed += other.data_size_changed;
self.serialize_us = self.serialize_us.saturating_add(other.serialize_us);
self.create_vm_us = self.create_vm_us.saturating_add(other.create_vm_us);
self.execute_us = self.execute_us.saturating_add(other.execute_us);
self.deserialize_us = self.deserialize_us.saturating_add(other.deserialize_us);
self.changed_account_count = self
.changed_account_count
.saturating_add(other.changed_account_count);
self.total_account_count = self
.total_account_count
.saturating_add(other.total_account_count);
self.total_data_size = self.total_data_size.saturating_add(other.total_data_size);
self.data_size_changed = self
.data_size_changed
.saturating_add(other.data_size_changed);
for (id, other) in &other.per_program_timings {
let program_timing = self.per_program_timings.entry(*id).or_default();
program_timing.accumulated_us += other.accumulated_us;
program_timing.accumulated_units += other.accumulated_units;
program_timing.count += other.count;
program_timing.accumulate_program_timings(other);
}
}

pub fn accumulate_program(
&mut self,
program_id: &Pubkey,
us: u64,
compute_units_consumed: u64,
is_error: bool,
) {
let program_timing = self.per_program_timings.entry(*program_id).or_default();
program_timing.accumulated_us = program_timing.accumulated_us.saturating_add(us);
if is_error {
program_timing
.errored_txs_compute_consumed
.push(compute_units_consumed);
program_timing.total_errored_units = program_timing
.total_errored_units
.saturating_add(compute_units_consumed);
} else {
program_timing.accumulated_units = program_timing
.accumulated_units
.saturating_add(compute_units_consumed);
program_timing.count = program_timing.count.saturating_add(1);
};
}
}

// The relevant state of an account before an Instruction executes, used
Expand Down Expand Up @@ -1295,20 +1338,13 @@ impl MessageProcessor {

time.stop();
let post_remaining_units = invoke_context.get_compute_meter().borrow().get_remaining();

let program_timing = timings.per_program_timings.entry(*program_id).or_default();
program_timing.accumulated_us += time.as_us();
let compute_units_consumed = pre_remaining_units.saturating_sub(post_remaining_units);
if execute_or_verify_result.is_err() {
program_timing
.errored_txs_compute_consumed
.push(compute_units_consumed);
} else {
program_timing.accumulated_units = program_timing
.accumulated_units
.saturating_add(compute_units_consumed);
program_timing.count = program_timing.count.saturating_add(1);
}
timings.accumulate_program(
program_id,
time.as_us(),
compute_units_consumed,
execute_or_verify_result.is_err(),
);

timings.accumulate(&invoke_context.timings);

Expand Down Expand Up @@ -2710,4 +2746,85 @@ mod tests {
);
}
}

fn construct_execute_timings_with_program(
program_id: &Pubkey,
us: u64,
compute_units_consumed: u64,
) -> ExecuteDetailsTimings {
let mut execute_details_timings = ExecuteDetailsTimings::default();

// Accumulate an erroring transaction
let is_error = true;
execute_details_timings.accumulate_program(
program_id,
us,
compute_units_consumed,
is_error,
);

// Accumulate a non-erroring transaction
let is_error = false;
execute_details_timings.accumulate_program(
program_id,
us,
compute_units_consumed,
is_error,
);

let program_timings = execute_details_timings
.per_program_timings
.get(program_id)
.unwrap();

// Both error and success transactions count towards `accumulated_us`
assert_eq!(program_timings.accumulated_us, us.saturating_mul(2));
assert_eq!(program_timings.accumulated_units, compute_units_consumed);
assert_eq!(program_timings.count, 1,);
assert_eq!(
program_timings.errored_txs_compute_consumed,
vec![compute_units_consumed]
);
assert_eq!(program_timings.total_errored_units, compute_units_consumed,);

execute_details_timings
}

#[test]
fn test_execute_details_timing_acumulate_program() {
// Acumulate an erroring transaction
let program_id = Pubkey::new_unique();
let us = 100;
let compute_units_consumed = 1;
construct_execute_timings_with_program(&program_id, us, compute_units_consumed);
}

#[test]
fn test_execute_details_timing_acumulate() {
// Acumulate an erroring transaction
let program_id = Pubkey::new_unique();
let us = 100;
let compute_units_consumed = 1;
let mut execute_details_timings = ExecuteDetailsTimings::default();

// Construct another separate instance of ExecuteDetailsTimings with non default fields
let mut other_execute_details_timings =
construct_execute_timings_with_program(&program_id, us, compute_units_consumed);
let account_count = 1;
let data_size_changed = 1;
other_execute_details_timings.serialize_us = us;
other_execute_details_timings.create_vm_us = us;
other_execute_details_timings.execute_us = us;
other_execute_details_timings.deserialize_us = us;
other_execute_details_timings.changed_account_count = account_count;
other_execute_details_timings.total_account_count = account_count;
other_execute_details_timings.total_data_size = data_size_changed;
other_execute_details_timings.data_size_changed = data_size_changed;

// Accumulate the other instance into the current instance
execute_details_timings.accumulate(&other_execute_details_timings);

// Check that the two instances are equal
assert_eq!(execute_details_timings, other_execute_details_timings);
}
}