Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ normal = [
[dependencies]
# reth
reth-config = { path = "../../crates/config" }
reth-primitives = { workspace = true, features = ["arbitrary"] }
reth-primitives = { workspace = true, features = ["arbitrary", "clap"] }
reth-db = { workspace = true, features = ["mdbx", "test-utils"] }
# TODO: Temporary use of the test-utils feature
reth-provider = { workspace = true, features = ["test-utils"] }
Expand Down Expand Up @@ -48,7 +48,7 @@ reth-payload-builder.workspace = true
reth-basic-payload-builder = { path = "../../crates/payload/basic" }
reth-discv4 = { path = "../../crates/net/discv4" }
reth-prune = { path = "../../crates/prune" }
reth-snapshot = { path = "../../crates/snapshot" }
reth-snapshot = { path = "../../crates/snapshot", features = ["clap"] }
reth-trie = { path = "../../crates/trie" }
reth-nippy-jar = { path = "../../crates/storage/nippy-jar" }

Expand Down
13 changes: 8 additions & 5 deletions bin/reth/src/db/snapshots/bench.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::JarConfig;
use reth_db::DatabaseEnvRO;
use reth_primitives::ChainSpec;
use reth_primitives::{
snapshot::{Compression, Filters},
ChainSpec, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{sync::Arc, time::Instant};

Expand All @@ -15,20 +17,21 @@ pub(crate) enum BenchKind {
pub(crate) fn bench<F1, F2>(
bench_kind: BenchKind,
db: (DatabaseEnvRO, Arc<ChainSpec>),
jar_config: JarConfig,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
mut snapshot_method: F1,
database_method: F2,
) -> eyre::Result<()>
where
F1: FnMut() -> eyre::Result<()>,
F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>,
{
let (mode, compression, phf) = jar_config;
let (db, chain) = db;

println!();
println!("############");
println!("## [{mode:?}] [{compression:?}] [{phf:?}] [{bench_kind:?}]");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
{
let start = Instant::now();
snapshot_method()?;
Expand Down
115 changes: 46 additions & 69 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,37 @@
use super::{
bench::{bench, BenchKind},
Command, Compression, PerfectHashingFunction, Rows, Snapshots,
Command,
};
use crate::utils::DbTool;
use rand::{seq::SliceRandom, Rng};
use reth_db::{
cursor::DbCursorRO, database::Database, open_db_read_only, snapshot::create_snapshot_T1_T2,
table::Decompress, tables, transaction::DbTx, DatabaseEnvRO,
};
use reth_db::{database::Database, open_db_read_only, table::Decompress, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{BlockNumber, ChainSpec, Header};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{HeaderProvider, ProviderError, ProviderFactory};
use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment};
use std::{path::Path, sync::Arc};
use tables::*;

impl Command {
pub(crate) fn generate_headers_snapshot(
&self,
tool: &DbTool<'_, DatabaseEnvRO>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mut jar = self.prepare_jar(2, (Snapshots::Headers, compression, phf), tool, || {
// Generates the dataset to train a zstd dictionary if necessary, with the most recent
// rows (at most 1000).
let dataset = tool.db.view(|tx| {
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::Headers>>()?;
let v1 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
let mut cursor = tx.cursor_read::<reth_db::RawTable<reth_db::HeaderTD>>()?;
let v2 = cursor
.walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))?
.take(self.block_interval.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>();
Ok::<Rows, eyre::Error>(vec![v1, v2])
})??;
Ok(dataset)
})?;

tool.db.view(|tx| {
// Hacky type inference. TODO fix
let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]);
let _ = none_vec.take();

// Generate list of hashes for filters & PHF
let mut cursor = tx.cursor_read::<RawTable<CanonicalHeaders>>()?;
let mut hashes = None;
let segment = Headers::new(
compression,
if self.with_filters {
hashes = Some(
cursor
.walk(Some(RawKey::from(self.from as u64)))?
.take(self.block_interval)
.map(|row| {
row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())
}),
);
}

create_snapshot_T1_T2::<Headers, HeaderTD, BlockNumber>(
tx,
self.from as u64..=(self.from as u64 + self.block_interval as u64),
None,
// We already prepared the dictionary beforehand
none_vec,
hashes,
self.block_interval,
&mut jar,
)
})??;
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot(&tool.db.tx()?, self.from..=(self.from + self.block_interval - 1))?;

Ok(())
}
Expand All @@ -83,14 +42,26 @@ impl Command {
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let mode = Snapshots::Headers;
let jar_config = (mode, compression, phf);
let mut row_indexes = (self.from..(self.from + self.block_interval)).collect::<Vec<_>>();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let range = self.from..=(self.from + self.block_interval - 1);

let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load_without_header(&self.get_file_path(jar_config))?;
let mut jar = NippyJar::load_without_header(&get_snapshot_segment_file_name(
SnapshotSegment::Headers,
filters,
compression,
&range,
))?;

let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
Expand All @@ -103,13 +74,15 @@ impl Command {
bench(
bench_kind,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
for num in row_indexes.iter() {
Header::decompress(
cursor
.row_by_number_with_cols::<0b01, 2>(num - self.from)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?[0],
.row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?[0],
)?;
// TODO: replace with below when eventually SnapshotProvider re-uses cursor
// provider.header_by_number(num as
Expand All @@ -120,8 +93,8 @@ impl Command {
|provider| {
for num in row_indexes.iter() {
provider
.header_by_number(*num as u64)?
.ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?;
.header_by_number(*num)?
.ok_or(ProviderError::HeaderNotFound((*num).into()))?;
}
Ok(())
},
Expand All @@ -137,7 +110,9 @@ impl Command {
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
Header::decompress(
cursor
Expand Down Expand Up @@ -167,7 +142,9 @@ impl Command {
bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, log_level)?, chain.clone()),
jar_config,
SnapshotSegment::Headers,
filters,
compression,
|| {
let header = Header::decompress(
cursor
Expand All @@ -176,7 +153,7 @@ impl Command {
)?;

// Might be a false positive, so in the real world we have to validate it
assert!(header.hash_slow() == header_hash);
assert_eq!(header.hash_slow(), header_hash);
Ok(())
},
|provider| {
Expand Down
Loading