Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/storage/db/src/implementation/mdbx/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use super::utils::*;
use crate::{
metrics::{DatabaseEnvMetrics, Operation},
metrics::{Operation, TableOperationMetrics},
DatabaseError,
};
use reth_db_api::{
Expand All @@ -15,7 +15,7 @@ use reth_db_api::{
};
use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};

/// Read only Cursor.
pub type CursorRO<T> = Cursor<RO, T>;
Expand All @@ -29,16 +29,16 @@ pub struct Cursor<K: TransactionKind, T: Table> {
pub(crate) inner: reth_libmdbx::Cursor<K>,
/// Cache buffer that receives compressed values.
buf: Vec<u8>,
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Per-table operation metrics. If `None`, metrics are not recorded.
metrics: Option<TableOperationMetrics>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}

impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
metrics: Option<TableOperationMetrics>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
}
Expand All @@ -54,7 +54,7 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
f: impl FnOnce(&mut Self) -> R,
) -> R {
if let Some(metrics) = self.metrics.clone() {
metrics.record_operation(T::NAME, operation, value_size, || f(self))
metrics[operation.index()].record(value_size, || f(self))
} else {
f(self)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<K: TransactionKind> Tx<K> {

Ok(Cursor::new_with_metrics(
inner,
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
self.metrics_handler.as_ref().map(|h| h.env_metrics.table_operation_metrics(T::NAME)),
))
}

Expand Down
82 changes: 61 additions & 21 deletions crates/storage/db/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use metrics::Histogram;
use quanta::Instant;
use reth_metrics::{metrics::Counter, Metrics};
use rustc_hash::FxHashMap;
use std::time::Duration;
use std::{array, sync::Arc, time::Duration};
use strum::{EnumCount, EnumIter, IntoEnumIterator};

const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
Expand All @@ -15,8 +15,8 @@ const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
/// Otherwise, metric recording will no-op.
#[derive(Debug)]
pub(crate) struct DatabaseEnvMetrics {
/// Caches `OperationMetrics` handles for each table and operation tuple.
operations: FxHashMap<(&'static str, Operation), OperationMetrics>,
/// Caches per-table operation metric handles for all database operation metrics.
operations: FxHashMap<&'static str, TableOperationMetrics>,
/// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
/// Updated both at tx open and close.
transactions: FxHashMap<TransactionMode, TransactionMetrics>,
Expand All @@ -26,6 +26,9 @@ pub(crate) struct DatabaseEnvMetrics {
FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
}

/// Per-table operation metric handles cached for hot cursor paths.
pub(crate) type TableOperationMetrics = Arc<[OperationMetrics; Operation::COUNT]>;

impl DatabaseEnvMetrics {
pub(crate) fn new() -> Self {
// Pre-populate metric handle maps with all possible combinations of labels
Expand All @@ -37,24 +40,23 @@ impl DatabaseEnvMetrics {
}
}

/// Generate a map of all possible operation handles for each table and operation tuple.
/// Used for tracking all operation metrics.
fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(
Tables::COUNT * Operation::COUNT,
Default::default(),
);
/// Generate a map of pre-bound operation handles for each table.
fn generate_operation_handles() -> FxHashMap<&'static str, TableOperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(Tables::COUNT, Default::default());

for table in Tables::ALL {
for operation in Operation::iter() {
operations.insert(
(table.name(), operation),
OperationMetrics::new_with_labels(&[
(Labels::Table.as_str(), table.name()),
(Labels::Operation.as_str(), operation.as_str()),
]),
);
}
let table_name = table.name();
let metrics = array::from_fn(|index| {
let operation = Operation::from_index(index);
OperationMetrics::new_with_labels(&[
(Labels::Table.as_str(), table_name),
(Labels::Operation.as_str(), operation.as_str()),
])
});

operations.insert(table_name, Arc::new(metrics));
}

operations
}

Expand Down Expand Up @@ -105,13 +107,18 @@ impl DatabaseEnvMetrics {
value_size: Option<usize>,
f: impl FnOnce() -> R,
) -> R {
if let Some(metrics) = self.operations.get(&(table, operation)) {
metrics.record(value_size, f)
if let Some(metrics) = self.operations.get(table) {
metrics[operation.index()].record(value_size, f)
} else {
f()
}
}

/// Returns pre-bound operation metric handles for a single table.
pub(crate) fn table_operation_metrics(&self, table: &'static str) -> TableOperationMetrics {
self.operations.get(table).expect("table operation metric handles not found").clone()
}

/// Record metrics for opening a database transaction.
pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
self.transactions
Expand Down Expand Up @@ -219,6 +226,39 @@ pub(crate) enum Operation {
}

impl Operation {
/// Returns the index of the operation in the cached per-table operation array.
pub(crate) const fn index(&self) -> usize {
match self {
Self::Get => 0,
Self::PutUpsert => 1,
Self::PutAppend => 2,
Self::Delete => 3,
Self::CursorUpsert => 4,
Self::CursorInsert => 5,
Self::CursorAppend => 6,
Self::CursorAppendDup => 7,
Self::CursorDeleteCurrent => 8,
Self::CursorDeleteCurrentDuplicates => 9,
}
}

/// Returns the operation for the given index in the cached per-table operation array.
const fn from_index(index: usize) -> Self {
match index {
0 => Self::Get,
1 => Self::PutUpsert,
2 => Self::PutAppend,
3 => Self::Delete,
4 => Self::CursorUpsert,
5 => Self::CursorInsert,
6 => Self::CursorAppend,
7 => Self::CursorAppendDup,
8 => Self::CursorDeleteCurrent,
9 => Self::CursorDeleteCurrentDuplicates,
_ => panic!("invalid operation index"),
}
}

/// Returns the operation as a string.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Expand Down
Loading