Skip to content

Commit a5f6d1d

Browse files
committed
Revert "fix(l1): use transactional database with RocksDB (#4599)"
This reverts commit 7c5315c.
1 parent 939a931 commit a5f6d1d

File tree

3 files changed

+44
-45
lines changed

3 files changed

+44
-45
lines changed

crates/storage/store_db/rocksdb.rs

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use ethrex_common::{
99
};
1010
use ethrex_trie::{Nibbles, NodeHash, Trie};
1111
use rocksdb::{
12-
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded,
13-
OptimisticTransactionDB, Options, WriteBatchWithTransaction,
12+
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, DBWithThreadMode,
13+
MultiThreaded, Options, WriteBatch,
1414
};
1515
use std::{collections::HashSet, path::Path, sync::Arc};
1616
use tracing::info;
@@ -103,7 +103,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors";
103103

104104
#[derive(Debug)]
105105
pub struct Store {
106-
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
106+
db: Arc<DBWithThreadMode<MultiThreaded>>,
107107
}
108108

109109
impl Store {
@@ -112,7 +112,7 @@ impl Store {
112112
db_options.create_if_missing(true);
113113
db_options.create_missing_column_families(true);
114114

115-
let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache
115+
let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache
116116

117117
db_options.set_max_open_files(-1);
118118
db_options.set_max_file_opening_threads(16);
@@ -166,18 +166,17 @@ impl Store {
166166
];
167167

168168
// Get existing column families to know which ones to drop later
169-
let existing_cfs =
170-
match OptimisticTransactionDB::<MultiThreaded>::list_cf(&db_options, path) {
171-
Ok(cfs) => {
172-
info!("Found existing column families: {:?}", cfs);
173-
cfs
174-
}
175-
Err(_) => {
176-
// Database doesn't exist yet
177-
info!("Database doesn't exist, will create with expected column families");
178-
vec!["default".to_string()]
179-
}
180-
};
169+
let existing_cfs = match DBWithThreadMode::<MultiThreaded>::list_cf(&db_options, path) {
170+
Ok(cfs) => {
171+
info!("Found existing column families: {:?}", cfs);
172+
cfs
173+
}
174+
Err(_) => {
175+
// Database doesn't exist yet
176+
info!("Database doesn't exist, will create with expected column families");
177+
vec!["default".to_string()]
178+
}
179+
};
181180

182181
// Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs)
183182
let mut all_cfs_to_open = HashSet::new();
@@ -208,7 +207,7 @@ impl Store {
208207
cf_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
209208
cf_opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB
210209
cf_opts.set_max_write_buffer_number(4);
211-
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB
210+
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB
212211

213212
let mut block_opts = BlockBasedOptions::default();
214213
block_opts.set_block_cache(&cache);
@@ -220,7 +219,7 @@ impl Store {
220219
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
221220
cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
222221
cf_opts.set_max_write_buffer_number(3);
223-
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB
222+
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB
224223

225224
let mut block_opts = BlockBasedOptions::default();
226225
block_opts.set_block_cache(&cache);
@@ -231,14 +230,14 @@ impl Store {
231230
}
232231
CF_STATE_TRIE_NODES | CF_STORAGE_TRIES_NODES => {
233232
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
234-
cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB
233+
cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB
235234
cf_opts.set_max_write_buffer_number(6);
236235
cf_opts.set_min_write_buffer_number_to_merge(2);
237-
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB
238-
cf_opts.set_memtable_prefix_bloom_ratio(0.2); // Bloom filter
236+
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB
237+
cf_opts.set_memtable_prefix_bloom_ratio(0.2); // Bloom filter
239238

240239
let mut block_opts = BlockBasedOptions::default();
241-
block_opts.set_block_size(16 * 1024); // 16KB
240+
block_opts.set_block_size(16 * 1024); // 16KB
242241
block_opts.set_block_cache(&cache);
243242
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
244243
block_opts.set_cache_index_and_filter_blocks(true);
@@ -262,7 +261,7 @@ impl Store {
262261
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
263262
cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
264263
cf_opts.set_max_write_buffer_number(3);
265-
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB
264+
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB
266265

267266
let mut block_opts = BlockBasedOptions::default();
268267
block_opts.set_block_size(16 * 1024);
@@ -274,7 +273,7 @@ impl Store {
274273
cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
275274
}
276275

277-
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
276+
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
278277
&db_options,
279278
path,
280279
cf_descriptors,
@@ -370,7 +369,7 @@ impl Store {
370369
let db = self.db.clone();
371370

372371
tokio::task::spawn_blocking(move || {
373-
let mut batch = WriteBatchWithTransaction::default();
372+
let mut batch = WriteBatch::default();
374373

375374
for (cf_name, key, value) in batch_ops {
376375
let cf = db.cf_handle(&cf_name).ok_or_else(|| {
@@ -467,7 +466,7 @@ impl StoreEngine for Store {
467466
)?;
468467

469468
let _span = tracing::trace_span!("Block DB update").entered();
470-
let mut batch = WriteBatchWithTransaction::default();
469+
let mut batch = WriteBatch::default();
471470

472471
for (node_hash, node_data) in update_batch.account_updates {
473472
batch.put_cf(&cf_state, node_hash.as_ref(), node_data);
@@ -537,7 +536,7 @@ impl StoreEngine for Store {
537536
let db = self.db.clone();
538537

539538
tokio::task::spawn_blocking(move || {
540-
let mut batch = WriteBatchWithTransaction::default();
539+
let mut batch = WriteBatch::default();
541540

542541
let [cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs(
543542
&db,
@@ -646,7 +645,7 @@ impl StoreEngine for Store {
646645
}
647646

648647
async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
649-
let mut batch = WriteBatchWithTransaction::default();
648+
let mut batch = WriteBatch::default();
650649

651650
let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
652651
return Ok(());
@@ -896,7 +895,7 @@ impl StoreEngine for Store {
896895
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;
897896

898897
let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
899-
let mut batch = WriteBatchWithTransaction::default();
898+
let mut batch = WriteBatch::default();
900899

901900
while let Some(Ok((key, _))) = iter.next() {
902901
batch.delete_cf(&cf, key);
@@ -1120,7 +1119,7 @@ impl StoreEngine for Store {
11201119
let db = self.db.clone();
11211120

11221121
tokio::task::spawn_blocking(move || {
1123-
let mut batch = WriteBatchWithTransaction::default();
1122+
let mut batch = WriteBatch::default();
11241123

11251124
let [cf_canonical, cf_chain_data] =
11261125
open_cfs(&db, [CF_CANONICAL_BLOCK_HASHES, CF_CHAIN_DATA])?;
@@ -1396,7 +1395,7 @@ impl StoreEngine for Store {
13961395

13971396
/// Open column families
13981397
fn open_cfs<'a, const N: usize>(
1399-
db: &'a Arc<OptimisticTransactionDB<MultiThreaded>>,
1398+
db: &'a Arc<DBWithThreadMode<MultiThreaded>>,
14001399
names: [&str; N],
14011400
) -> Result<[Arc<BoundColumnFamily<'a>>; N], StoreError> {
14021401
let mut handles = Vec::with_capacity(N);

crates/storage/trie_db/rocksdb.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use ethrex_common::H256;
22
use ethrex_rlp::encode::RLPEncode;
33
use ethrex_trie::{Node, NodeHash, TrieDB, error::TrieError};
4-
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
4+
use rocksdb::{DBWithThreadMode, MultiThreaded};
55
use std::sync::Arc;
66

77
/// RocksDB implementation for the TrieDB trait, with get and put operations.
88
pub struct RocksDBTrieDB {
99
/// RocksDB database
10-
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
10+
db: Arc<DBWithThreadMode<MultiThreaded>>,
1111
/// Column family name
1212
cf_name: String,
1313
/// Storage trie address prefix
@@ -16,7 +16,7 @@ pub struct RocksDBTrieDB {
1616

1717
impl RocksDBTrieDB {
1818
pub fn new(
19-
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
19+
db: Arc<DBWithThreadMode<MultiThreaded>>,
2020
cf_name: &str,
2121
address_prefix: Option<H256>,
2222
) -> Result<Self, TrieError> {
@@ -69,7 +69,7 @@ impl TrieDB for RocksDBTrieDB {
6969

7070
fn put_batch(&self, key_values: Vec<(NodeHash, Vec<u8>)>) -> Result<(), TrieError> {
7171
let cf = self.cf_handle()?;
72-
let mut batch = rocksdb::WriteBatchWithTransaction::default();
72+
let mut batch = rocksdb::WriteBatch::default();
7373

7474
for (key, value) in key_values {
7575
let db_key = self.make_key(&key);
@@ -104,7 +104,7 @@ impl TrieDB for RocksDBTrieDB {
104104
mod tests {
105105
use super::*;
106106
use ethrex_trie::NodeHash;
107-
use rocksdb::{ColumnFamilyDescriptor, MultiThreaded, Options};
107+
use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options};
108108
use tempfile::TempDir;
109109

110110
#[test]
@@ -118,7 +118,7 @@ mod tests {
118118
db_options.create_missing_column_families(true);
119119

120120
let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
121-
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
121+
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
122122
&db_options,
123123
db_path,
124124
vec![cf_descriptor],
@@ -158,7 +158,7 @@ mod tests {
158158
db_options.create_missing_column_families(true);
159159

160160
let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
161-
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
161+
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
162162
&db_options,
163163
db_path,
164164
vec![cf_descriptor],
@@ -195,7 +195,7 @@ mod tests {
195195
db_options.create_missing_column_families(true);
196196

197197
let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
198-
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
198+
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
199199
&db_options,
200200
db_path,
201201
vec![cf_descriptor],

crates/storage/trie_db/rocksdb_locked.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
use ethrex_common::H256;
22
use ethrex_trie::{NodeHash, TrieDB, error::TrieError};
3-
use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode};
3+
use rocksdb::{DBWithThreadMode, MultiThreaded, SnapshotWithThreadMode};
44
use std::sync::Arc;
55

66
/// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot.
77
pub struct RocksDBLockedTrieDB {
88
/// RocksDB database
9-
db: &'static Arc<OptimisticTransactionDB<MultiThreaded>>,
9+
db: &'static Arc<DBWithThreadMode<MultiThreaded>>,
1010
/// Column family handle
1111
cf: std::sync::Arc<rocksdb::BoundColumnFamily<'static>>,
1212
/// Read-only snapshot for consistent reads
13-
snapshot: SnapshotWithThreadMode<'static, OptimisticTransactionDB<MultiThreaded>>,
13+
snapshot: SnapshotWithThreadMode<'static, DBWithThreadMode<MultiThreaded>>,
1414
/// Storage trie address prefix
1515
address_prefix: Option<H256>,
1616
}
1717

1818
impl RocksDBLockedTrieDB {
1919
pub fn new(
20-
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
20+
db: Arc<DBWithThreadMode<MultiThreaded>>,
2121
cf_name: &str,
2222
address_prefix: Option<H256>,
2323
) -> Result<Self, TrieError> {
@@ -61,8 +61,8 @@ impl Drop for RocksDBLockedTrieDB {
6161
// Restore the leaked database reference
6262
unsafe {
6363
drop(Box::from_raw(
64-
self.db as *const Arc<OptimisticTransactionDB<MultiThreaded>>
65-
as *mut Arc<OptimisticTransactionDB<MultiThreaded>>,
64+
self.db as *const Arc<DBWithThreadMode<MultiThreaded>>
65+
as *mut Arc<DBWithThreadMode<MultiThreaded>>,
6666
));
6767
}
6868
}

0 commit comments

Comments
 (0)