Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/storage/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ derive_more.workspace = true
rustc-hash = { workspace = true, optional = true, features = ["std"] }
sysinfo = { workspace = true, features = ["system"] }
parking_lot = { workspace = true, optional = true }
dashmap.workspace = true

# arbitrary utils
strum = { workspace = true, features = ["derive"], optional = true }
Expand Down Expand Up @@ -92,7 +91,6 @@ arbitrary = [
"alloy-consensus/arbitrary",
"reth-primitives-traits/arbitrary",
"reth-prune-types/arbitrary",
"dashmap/arbitrary",
]
op = [
"reth-db-api/op",
Expand Down
29 changes: 22 additions & 7 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use reth_libmdbx::{
use reth_storage_errors::db::LogLevel;
use reth_tracing::tracing::error;
use std::{
collections::HashMap,
ops::{Deref, Range},
path::Path,
sync::Arc,
Expand Down Expand Up @@ -190,6 +191,12 @@ impl DatabaseArguments {
pub struct DatabaseEnv {
/// Libmdbx-sys environment.
inner: Environment,
/// Opened DBIs for reuse.
/// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
/// More generally, do not dynamically create, re-open, or drop tables at
/// runtime. It's better to perform table creation and migration only once
/// at startup.
dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried &'static and it was ~0.3% faster than Arc, but reverted for consistency with the rest of the code base. We can review and make several things (like also the DB environment, etc.) &'static together later.

/// Cache for metric handles. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Write lock for when dealing with a read-write environment.
Expand All @@ -201,16 +208,18 @@ impl Database for DatabaseEnv {
type TXMut = tx::Tx<RW>;

fn tx(&self) -> Result<Self::TX, DatabaseError> {
Tx::new_with_metrics(
Tx::new(
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}

fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Tx::new_with_metrics(
Tx::new(
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
Expand Down Expand Up @@ -445,6 +454,7 @@ impl DatabaseEnv {

let env = Self {
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
dbis: Arc::default(),
metrics: None,
_lock_file,
};
Expand All @@ -459,23 +469,27 @@ impl DatabaseEnv {
}

/// Creates all the tables defined in [`Tables`], if necessary.
pub fn create_tables(&self) -> Result<(), DatabaseError> {
pub fn create_tables(&mut self) -> Result<(), DatabaseError> {
self.create_tables_for::<Tables>()
}

/// Creates all the tables defined in the given [`TableSet`], if necessary.
pub fn create_tables_for<TS: TableSet>(&self) -> Result<(), DatabaseError> {
pub fn create_tables_for<TS: TableSet>(&mut self) -> Result<(), DatabaseError> {
let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?;
let mut dbis = HashMap::with_capacity(Tables::ALL.len());

for table in TS::tables() {
let flags =
if table.is_dupsort() { DatabaseFlags::DUP_SORT } else { DatabaseFlags::default() };

tx.create_db(Some(table.name()), flags)
let db = tx
.create_db(Some(table.name()), flags)
.map_err(|e| DatabaseError::CreateTable(e.into()))?;
dbis.insert(table.name(), db.dbi());
}

tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;
self.dbis = Arc::new(dbis);

Ok(())
}
Expand Down Expand Up @@ -543,8 +557,9 @@ mod tests {

/// Create database for testing with specified path
fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
let env = DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
.expect(ERROR_DB_CREATION);
let mut env =
DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
.expect(ERROR_DB_CREATION);
env.create_tables().expect(ERROR_TABLE_CREATION);
env
}
Expand Down
39 changes: 12 additions & 27 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
DatabaseError,
};
use dashmap::DashMap;
use reth_db_api::{
table::{Compress, DupSort, Encode, Table, TableImporter},
transaction::{DbTx, DbTxMut},
Expand All @@ -15,6 +14,7 @@ use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
use reth_tracing::tracing::{debug, trace, warn};
use std::{
backtrace::Backtrace,
collections::HashMap,
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -33,8 +33,7 @@ pub struct Tx<K: TransactionKind> {
pub inner: Transaction<K>,

/// Cached MDBX DBIs for reuse.
/// TODO: Reuse DBIs even among transactions, ideally with no synchronization overhead.
dbis: DashMap<&'static str, MDBX_dbi>,
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,

/// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
/// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
Expand All @@ -44,17 +43,12 @@ pub struct Tx<K: TransactionKind> {
}

impl<K: TransactionKind> Tx<K> {
/// Creates new `Tx` object with a `RO` or `RW` transaction.
#[inline]
pub fn new(inner: Transaction<K>) -> Self {
Self::new_inner(inner, None)
}

/// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
#[inline]
#[track_caller]
pub(crate) fn new_with_metrics(
pub(crate) fn new(
inner: Transaction<K>,
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
Expand All @@ -65,12 +59,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(handler)
})
.transpose()?;
Ok(Self::new_inner(inner, metrics_handler))
}

#[inline]
fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
Self { inner, metrics_handler, dbis: DashMap::new() }
Ok(Self { inner, dbis, metrics_handler })
}

/// Gets this transaction ID.
Expand All @@ -80,17 +69,13 @@ impl<K: TransactionKind> Tx<K> {

/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
match self.dbis.entry(T::NAME) {
dashmap::Entry::Occupied(occ) => Ok(*occ.get()),
dashmap::Entry::Vacant(vac) => {
let dbi = self
.inner
.open_db(Some(T::NAME))
.map(|db| db.dbi())
.map_err(|e| DatabaseError::Open(e.into()))?;
vac.insert(dbi);
Ok(dbi)
}
if let Some(dbi) = self.dbis.get(T::NAME) {
Ok(*dbi)
} else {
self.inner
.open_db(Some(T::NAME))
.map(|db| db.dbi())
.map_err(|e| DatabaseError::Open(e.into()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db/src/mdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn init_db_for<P: AsRef<Path>, TS: TableSet>(
args: DatabaseArguments,
) -> eyre::Result<DatabaseEnv> {
let client_version = args.client_version().clone();
let db = create_db(path, args)?;
let mut db = create_db(path, args)?;
db.create_tables_for::<TS>()?;
db.record_client_version(client_version)?;
Ok(db)
Expand Down
Loading