diff --git a/Cargo.lock b/Cargo.lock index 0da532e46c4..d449782b30b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6038,6 +6038,7 @@ dependencies = [ "byteorder", "bytes", "c-kzg", + "clap", "crc", "criterion", "derive_more", @@ -6322,8 +6323,10 @@ name = "reth-snapshot" version = "0.1.0-alpha.10" dependencies = [ "assert_matches", + "clap", "reth-db", "reth-interfaces", + "reth-nippy-jar", "reth-primitives", "reth-provider", "reth-stages", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index f7be0688bff..5326fa28f51 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -19,7 +19,7 @@ normal = [ [dependencies] # reth reth-config = { path = "../../crates/config" } -reth-primitives = { workspace = true, features = ["arbitrary"] } +reth-primitives = { workspace = true, features = ["arbitrary", "clap"] } reth-db = { workspace = true, features = ["mdbx", "test-utils"] } # TODO: Temporary use of the test-utils feature reth-provider = { workspace = true, features = ["test-utils"] } @@ -48,7 +48,7 @@ reth-payload-builder.workspace = true reth-basic-payload-builder = { path = "../../crates/payload/basic" } reth-discv4 = { path = "../../crates/net/discv4" } reth-prune = { path = "../../crates/prune" } -reth-snapshot = { path = "../../crates/snapshot" } +reth-snapshot = { path = "../../crates/snapshot", features = ["clap"] } reth-trie = { path = "../../crates/trie" } reth-nippy-jar = { path = "../../crates/storage/nippy-jar" } diff --git a/bin/reth/src/db/snapshots/bench.rs b/bin/reth/src/db/snapshots/bench.rs index 8c926d22629..edcfe6fa503 100644 --- a/bin/reth/src/db/snapshots/bench.rs +++ b/bin/reth/src/db/snapshots/bench.rs @@ -1,6 +1,8 @@ -use super::JarConfig; use reth_db::DatabaseEnvRO; -use reth_primitives::ChainSpec; +use reth_primitives::{ + snapshot::{Compression, Filters}, + ChainSpec, SnapshotSegment, +}; use reth_provider::{DatabaseProviderRO, ProviderFactory}; use std::{sync::Arc, time::Instant}; @@ -15,7 +17,9 @@ pub(crate) enum BenchKind { pub(crate) fn bench( bench_kind: BenchKind, db: (DatabaseEnvRO, Arc), - jar_config: JarConfig, + segment: SnapshotSegment, + filters: Filters, + compression: Compression, mut snapshot_method: F1, database_method: F2, ) -> eyre::Result<()> @@ -23,12 +27,11 @@ where F1: FnMut() -> eyre::Result<()>, F2: Fn(DatabaseProviderRO<'_, DatabaseEnvRO>) -> eyre::Result<()>, { - let (mode, compression, phf) = jar_config; let (db, chain) = db; println!(); println!("############"); - println!("## [{mode:?}] [{compression:?}] [{phf:?}] [{bench_kind:?}]"); + println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]"); { let start = Instant::now(); snapshot_method()?; diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 387c1e53d69..4fc60f3cf6f 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -1,78 +1,37 @@ use super::{ bench::{bench, BenchKind}, - Command, Compression, PerfectHashingFunction, Rows, Snapshots, + Command, }; use crate::utils::DbTool; use rand::{seq::SliceRandom, Rng}; -use reth_db::{ - cursor::DbCursorRO, database::Database, open_db_read_only, snapshot::create_snapshot_T1_T2, - table::Decompress, tables, transaction::DbTx, DatabaseEnvRO, -}; +use reth_db::{database::Database, open_db_read_only, table::Decompress, DatabaseEnvRO}; use reth_interfaces::db::LogLevel; use reth_nippy_jar::NippyJar; -use reth_primitives::{BlockNumber, ChainSpec, Header}; +use reth_primitives::{ + snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, + ChainSpec, Header, SnapshotSegment, +}; use reth_provider::{HeaderProvider, ProviderError, ProviderFactory}; +use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment}; use std::{path::Path, sync::Arc}; -use tables::*; impl Command { pub(crate) fn generate_headers_snapshot( &self, tool: &DbTool<'_, DatabaseEnvRO>, compression: Compression, + inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let mut jar = self.prepare_jar(2, (Snapshots::Headers, compression, phf), tool, || { - // Generates the dataset to train a zstd dictionary if necessary, with the most recent - // rows (at most 1000). - let dataset = tool.db.view(|tx| { - let mut cursor = tx.cursor_read::>()?; - let v1 = cursor - .walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))? - .take(self.block_interval.min(1000)) - .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) - .collect::>(); - let mut cursor = tx.cursor_read::>()?; - let v2 = cursor - .walk_back(Some(RawKey::from((self.from + self.block_interval - 1) as u64)))? - .take(self.block_interval.min(1000)) - .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) - .collect::>(); - Ok::(vec![v1, v2]) - })??; - Ok(dataset) - })?; - - tool.db.view(|tx| { - // Hacky type inference. TODO fix - let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]); - let _ = none_vec.take(); - - // Generate list of hashes for filters & PHF - let mut cursor = tx.cursor_read::>()?; - let mut hashes = None; + let segment = Headers::new( + compression, if self.with_filters { - hashes = Some( - cursor - .walk(Some(RawKey::from(self.from as u64)))? - .take(self.block_interval) - .map(|row| { - row.map(|(_key, value)| value.into_value()).map_err(|e| e.into()) - }), - ); - } - - create_snapshot_T1_T2::( - tx, - self.from as u64..=(self.from as u64 + self.block_interval as u64), - None, - // We already prepared the dictionary beforehand - none_vec, - hashes, - self.block_interval, - &mut jar, - ) - })??; + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }, + ); + segment.snapshot(&tool.db.tx()?, self.from..=(self.from + self.block_interval - 1))?; Ok(()) } @@ -83,14 +42,26 @@ impl Command { log_level: Option, chain: Arc, compression: Compression, + inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let mode = Snapshots::Headers; - let jar_config = (mode, compression, phf); - let mut row_indexes = (self.from..(self.from + self.block_interval)).collect::>(); + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let range = self.from..=(self.from + self.block_interval - 1); + + let mut row_indexes = range.clone().collect::>(); let mut rng = rand::thread_rng(); let mut dictionaries = None; - let mut jar = NippyJar::load_without_header(&self.get_file_path(jar_config))?; + let mut jar = NippyJar::load_without_header(&get_snapshot_segment_file_name( + SnapshotSegment::Headers, + filters, + compression, + &range, + ))?; let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; let mut cursor = if !decompressors.is_empty() { @@ -103,13 +74,15 @@ impl Command { bench( bench_kind, (open_db_read_only(db_path, log_level)?, chain.clone()), - jar_config, + SnapshotSegment::Headers, + filters, + compression, || { for num in row_indexes.iter() { Header::decompress( cursor - .row_by_number_with_cols::<0b01, 2>(num - self.from)? - .ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?[0], + .row_by_number_with_cols::<0b01, 2>((num - self.from) as usize)? + .ok_or(ProviderError::HeaderNotFound((*num).into()))?[0], )?; // TODO: replace with below when eventually SnapshotProvider re-uses cursor // provider.header_by_number(num as @@ -120,8 +93,8 @@ impl Command { |provider| { for num in row_indexes.iter() { provider - .header_by_number(*num as u64)? - .ok_or(ProviderError::HeaderNotFound((*num as u64).into()))?; + .header_by_number(*num)? + .ok_or(ProviderError::HeaderNotFound((*num).into()))?; } Ok(()) }, @@ -137,7 +110,9 @@ impl Command { bench( BenchKind::RandomOne, (open_db_read_only(db_path, log_level)?, chain.clone()), - jar_config, + SnapshotSegment::Headers, + filters, + compression, || { Header::decompress( cursor @@ -167,7 +142,9 @@ impl Command { bench( BenchKind::RandomHash, (open_db_read_only(db_path, log_level)?, chain.clone()), - jar_config, + SnapshotSegment::Headers, + filters, + compression, || { let header = Header::decompress( cursor @@ -176,7 +153,7 @@ impl Command { )?; // Might be a false positive, so in the real world we have to validate it - assert!(header.hash_slow() == header_hash); + assert_eq!(header.hash_slow(), header_hash); Ok(()) }, |provider| { diff --git a/bin/reth/src/db/snapshots/mod.rs b/bin/reth/src/db/snapshots/mod.rs index 47b5cc4ea69..afc2b0ce852 100644 --- a/bin/reth/src/db/snapshots/mod.rs +++ b/bin/reth/src/db/snapshots/mod.rs @@ -1,39 +1,54 @@ -use crate::utils::DbTool; -use clap::{clap_derive::ValueEnum, Parser}; -use eyre::WrapErr; +use crate::{db::genesis_value_parser, utils::DbTool}; +use clap::Parser; use itertools::Itertools; -use reth_db::{database::Database, open_db_read_only, table::Table, tables, DatabaseEnvRO}; +use reth_db::open_db_read_only; use reth_interfaces::db::LogLevel; use reth_nippy_jar::{ compression::{DecoderDictionary, Decompressor}, NippyJar, }; -use reth_primitives::ChainSpec; -use reth_provider::providers::SnapshotProvider; -use std::{ - path::{Path, PathBuf}, - sync::Arc, +use reth_primitives::{ + snapshot::{Compression, InclusionFilter, PerfectHashingFunction}, + BlockNumber, ChainSpec, SnapshotSegment, }; +use reth_provider::providers::SnapshotProvider; +use std::{path::Path, sync::Arc}; mod bench; mod headers; -pub(crate) type Rows = Vec>>; -pub(crate) type JarConfig = (Snapshots, Compression, PerfectHashingFunction); - #[derive(Parser, Debug)] /// Arguments for the `reth db snapshot` command. pub struct Command { - /// Snapshot categories to generate. - modes: Vec, + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + /// + /// Built-in chains: + /// - mainnet + /// - goerli + /// - sepolia + /// - holesky + #[arg( + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = genesis_value_parser, + global = true, + )] + chain: Arc, + + /// Snapshot segments to generate. + segments: Vec, /// Starting block for the snapshot. #[arg(long, short, default_value = "0")] - from: usize, + from: BlockNumber, /// Number of blocks in the snapshot. #[arg(long, short, default_value = "500000")] - block_interval: usize, + block_interval: u64, /// Flag to enable database-to-snapshot benchmarking. #[arg(long, default_value = "false")] @@ -52,7 +67,7 @@ pub struct Command { with_filters: bool, /// Specifies the perfect hashing function to use. - #[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "mphf"))] + #[arg(long, value_delimiter = ',', default_value_if("with_filters", "true", "fmph"))] phf: Vec, } @@ -65,7 +80,7 @@ impl Command { chain: Arc, ) -> eyre::Result<()> { let all_combinations = self - .modes + .segments .iter() .cartesian_product(self.compression.iter()) .cartesian_product(self.phf.iter()); @@ -77,11 +92,14 @@ impl Command { if !self.only_bench { for ((mode, compression), phf) in all_combinations.clone() { match mode { - Snapshots::Headers => { - self.generate_headers_snapshot(&tool, *compression, *phf)? - } - Snapshots::Transactions => todo!(), - Snapshots::Receipts => todo!(), + SnapshotSegment::Headers => self.generate_headers_snapshot( + &tool, + *compression, + InclusionFilter::Cuckoo, + *phf, + )?, + SnapshotSegment::Transactions => todo!(), + SnapshotSegment::Receipts => todo!(), } } } @@ -90,15 +108,16 @@ impl Command { if self.only_bench || self.bench { for ((mode, compression), phf) in all_combinations { match mode { - Snapshots::Headers => self.bench_headers_snapshot( + SnapshotSegment::Headers => self.bench_headers_snapshot( db_path, log_level, chain.clone(), *compression, + InclusionFilter::Cuckoo, *phf, )?, - Snapshots::Transactions => todo!(), - Snapshots::Receipts => todo!(), + SnapshotSegment::Transactions => todo!(), + SnapshotSegment::Receipts => todo!(), } } } @@ -121,96 +140,6 @@ impl Command { } } - Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from as u64 }, decompressors)) - } - - /// Returns a [`NippyJar`] according to the desired configuration. - fn prepare_jar eyre::Result>( - &self, - num_columns: usize, - jar_config: JarConfig, - tool: &DbTool<'_, DatabaseEnvRO>, - prepare_compression: F, - ) -> eyre::Result { - let (mode, compression, phf) = jar_config; - let snap_file = self.get_file_path(jar_config); - let table_name = match mode { - Snapshots::Headers => tables::Headers::NAME, - Snapshots::Transactions | Snapshots::Receipts => tables::Transactions::NAME, - }; - - let total_rows = tool.db.view(|tx| { - let table_db = tx.inner.open_db(Some(table_name)).wrap_err("Could not open db.")?; - let stats = tx - .inner - .db_stat(&table_db) - .wrap_err(format!("Could not find table: {}", table_name))?; - - Ok::((stats.entries() - self.from).min(self.block_interval)) - })??; - - assert!( - total_rows >= self.block_interval, - "Not enough rows on database {} < {}.", - total_rows, - self.block_interval - ); - - let mut nippy_jar = NippyJar::new_without_header(num_columns, snap_file.as_path()); - nippy_jar = match compression { - Compression::Lz4 => nippy_jar.with_lz4(), - Compression::Zstd => nippy_jar.with_zstd(false, 0), - Compression::ZstdWithDictionary => { - let dataset = prepare_compression()?; - - nippy_jar = nippy_jar.with_zstd(true, 5_000_000); - nippy_jar.prepare_compression(dataset)?; - nippy_jar - } - Compression::Uncompressed => nippy_jar, - }; - - if self.with_filters { - nippy_jar = nippy_jar.with_cuckoo_filter(self.block_interval); - nippy_jar = match phf { - PerfectHashingFunction::Mphf => nippy_jar.with_mphf(), - PerfectHashingFunction::GoMphf => nippy_jar.with_gomphf(), - }; - } - - Ok(nippy_jar) + Ok((SnapshotProvider { jar: &*jar, jar_start_block: self.from }, decompressors)) } - - /// Generates a filename according to the desired configuration. - fn get_file_path(&self, jar_config: JarConfig) -> PathBuf { - let (mode, compression, phf) = jar_config; - format!( - "snapshot_{mode:?}_{}_{}_{compression:?}_{phf:?}", - self.from, - self.from + self.block_interval - ) - .into() - } -} - -#[derive(Debug, Copy, Clone, ValueEnum)] -pub(crate) enum Snapshots { - Headers, - Transactions, - Receipts, -} - -#[derive(Debug, Copy, Clone, ValueEnum, Default)] -pub(crate) enum Compression { - Lz4, - Zstd, - ZstdWithDictionary, - #[default] - Uncompressed, -} - -#[derive(Debug, Copy, Clone, ValueEnum)] -pub(crate) enum PerfectHashingFunction { - Mphf, - GoMphf, } diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 69bf1bfc8c0..1b0e18b6f09 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -38,6 +38,7 @@ tokio-stream.workspace = true # misc bytes.workspace = true byteorder = "1" +clap = { workspace = true, features = ["derive"], optional = true } serde.workspace = true serde_json.workspace = true serde_with = "3.3.0" @@ -96,6 +97,7 @@ test-utils = ["dep:plain_hasher", "dep:hash-db", "dep:ethers-core"] # value-256 controls whether transaction Value fields are DB-encoded as 256 bits instead of the # default of 128 bits. value-256 = ["reth-codecs/value-256"] +clap = ["dep:clap"] [[bench]] name = "recover_ecdsa_crit" diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index c9c335cac16..53818144533 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -35,20 +35,19 @@ pub mod listener; mod log; mod net; mod peer; +mod precaution; pub mod proofs; mod prune; mod receipt; pub mod serde_helper; +pub mod snapshot; pub mod stage; mod storage; - /// Helpers for working with transactions mod transaction; pub mod trie; mod withdrawal; -mod precaution; - pub use account::{Account, Bytecode}; pub use block::{ Block, BlockBody, BlockBodyRoots, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, @@ -83,6 +82,7 @@ pub use prune::{ }; pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts}; pub use serde_helper::JsonU256; +pub use snapshot::SnapshotSegment; pub use storage::StorageEntry; pub use transaction::{ util::secp256k1::{public_key_to_address, recover_signer, sign_message}, diff --git a/crates/primitives/src/snapshot/compression.rs b/crates/primitives/src/snapshot/compression.rs new file mode 100644 index 00000000000..c67e3f63bc9 --- /dev/null +++ b/crates/primitives/src/snapshot/compression.rs @@ -0,0 +1,11 @@ +#[derive(Debug, Copy, Clone, Default)] +#[cfg_attr(feature = "clap", derive(clap::ValueEnum))] +#[allow(missing_docs)] +/// Snapshot compression +pub enum Compression { + Lz4, + Zstd, + ZstdWithDictionary, + #[default] + Uncompressed, +} diff --git a/crates/primitives/src/snapshot/filters.rs b/crates/primitives/src/snapshot/filters.rs new file mode 100644 index 00000000000..e9716ac707d --- /dev/null +++ b/crates/primitives/src/snapshot/filters.rs @@ -0,0 +1,33 @@ +#[derive(Debug, Copy, Clone)] +/// Snapshot filters. +pub enum Filters { + /// Snapshot uses filters with [InclusionFilter] and [PerfectHashingFunction]. + WithFilters(InclusionFilter, PerfectHashingFunction), + /// Snapshot doesn't use any filters. + WithoutFilters, +} + +impl Filters { + /// Returns `true` if snapshot uses filters. + pub const fn has_filters(&self) -> bool { + matches!(self, Self::WithFilters(_, _)) + } +} + +#[derive(Debug, Copy, Clone)] +#[cfg_attr(feature = "clap", derive(clap::ValueEnum))] +/// Snapshot inclusion filter. Also see [Filters]. +pub enum InclusionFilter { + /// Cuckoo filter + Cuckoo, +} + +#[derive(Debug, Copy, Clone)] +#[cfg_attr(feature = "clap", derive(clap::ValueEnum))] +/// Snapshot perfect hashing function. Also see [Filters]. +pub enum PerfectHashingFunction { + /// Fingerprint-Based Minimal Perfect Hash Function + Fmph, + /// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization + GoFmph, +} diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs new file mode 100644 index 00000000000..6355ff0efe5 --- /dev/null +++ b/crates/primitives/src/snapshot/mod.rs @@ -0,0 +1,9 @@ +//! Snapshot primitives. + +mod compression; +mod filters; +mod segment; + +pub use compression::Compression; +pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; +pub use segment::SnapshotSegment; diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs new file mode 100644 index 00000000000..8902e500537 --- /dev/null +++ b/crates/primitives/src/snapshot/segment.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)] +#[cfg_attr(feature = "clap", derive(clap::ValueEnum))] +/// Segment of the data that can be snapshotted. +pub enum SnapshotSegment { + /// Snapshot segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTD` tables. + Headers, + /// Snapshot segment responsible for the `Transactions` table. + Transactions, + /// Snapshot segment responsible for the `Receipts` table. + Receipts, +} diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index 65976236da1..5597331fd2d 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -16,6 +16,7 @@ reth-primitives.workspace = true reth-db.workspace = true reth-provider.workspace = true reth-interfaces.workspace = true +reth-nippy-jar = { path = "../storage/nippy-jar" } # async tokio = { workspace = true, features = ["sync"] } @@ -23,6 +24,7 @@ tokio = { workspace = true, features = ["sync"] } # misc thiserror.workspace = true tracing.workspace = true +clap = { workspace = true, features = ["derive"], optional = true } [dev-dependencies] # reth @@ -32,3 +34,6 @@ reth-stages = { path = "../stages", features = ["test-utils"] } # misc assert_matches.workspace = true + +[features] +clap = ["dep:clap"] \ No newline at end of file diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 60a62cae53e..18b22bdb54a 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -10,6 +10,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod error; +pub mod segments; mod snapshotter; pub use error::SnapshotterError; diff --git a/crates/snapshot/src/segments/headers.rs b/crates/snapshot/src/segments/headers.rs new file mode 100644 index 00000000000..7388682ed05 --- /dev/null +++ b/crates/snapshot/src/segments/headers.rs @@ -0,0 +1,97 @@ +use crate::segments::{prepare_jar, Segment}; +use reth_db::{ + cursor::DbCursorRO, snapshot::create_snapshot_T1_T2_T3, table::Table, tables, + transaction::DbTx, RawKey, RawTable, +}; +use reth_interfaces::RethResult; +use reth_primitives::{ + snapshot::{Compression, Filters}, + BlockNumber, SnapshotSegment, +}; +use std::ops::RangeInclusive; + +/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data. +#[derive(Debug)] +pub struct Headers { + compression: Compression, + filters: Filters, +} + +impl Headers { + /// Creates new instance of [Headers] snapshot segment. + pub fn new(compression: Compression, filters: Filters) -> Self { + Self { compression, filters } + } + + // Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000). + fn dataset_for_compression<'tx, T: Table>( + &self, + tx: &impl DbTx<'tx>, + range: &RangeInclusive, + range_len: usize, + ) -> RethResult>> { + let mut cursor = tx.cursor_read::>()?; + Ok(cursor + .walk_back(Some(RawKey::from(*range.end())))? + .take(range_len.min(1000)) + .map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist")) + .collect::>()) + } +} + +impl Segment for Headers { + fn snapshot<'tx>( + &self, + tx: &impl DbTx<'tx>, + range: RangeInclusive, + ) -> RethResult<()> { + let range_len = range.clone().count(); + let mut jar = prepare_jar::<3, tables::Headers>( + tx, + SnapshotSegment::Headers, + self.filters, + self.compression, + range.clone(), + range_len, + || { + Ok([ + self.dataset_for_compression::(tx, &range, range_len)?, + self.dataset_for_compression::(tx, &range, range_len)?, + self.dataset_for_compression::( + tx, &range, range_len, + )?, + ]) + }, + )?; + + // Generate list of hashes for filters & PHF + let mut cursor = tx.cursor_read::>()?; + let mut hashes = None; + if self.filters.has_filters() { + hashes = Some( + cursor + .walk(Some(RawKey::from(*range.start())))? + .take(range_len) + .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())), + ); + } + + create_snapshot_T1_T2_T3::< + tables::Headers, + tables::HeaderTD, + tables::CanonicalHeaders, + BlockNumber, + >( + tx, + range, + None, + // We already prepared the dictionary beforehand + None::>>>, + hashes, + range_len, + &mut jar, + )?; + + Ok(()) + } +} diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs new file mode 100644 index 00000000000..94ab254fe9c --- /dev/null +++ b/crates/snapshot/src/segments/mod.rs @@ -0,0 +1,108 @@ +//! Snapshot segment implementations and utilities. + +mod headers; + +pub use headers::Headers; + +use reth_db::{table::Table, transaction::DbTx}; +use reth_interfaces::RethResult; +use reth_nippy_jar::NippyJar; +use reth_primitives::{ + snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, + BlockNumber, SnapshotSegment, +}; +use std::{ops::RangeInclusive, path::PathBuf}; + +pub(crate) type Rows = [Vec>; COLUMNS]; + +/// A segment represents a snapshotting of some portion of the data. +pub trait Segment { + /// Snapshot data using the provided range. + fn snapshot<'tx>( + &self, + tx: &impl DbTx<'tx>, + range: RangeInclusive, + ) -> RethResult<()>; +} + +/// Returns a [`NippyJar`] according to the desired configuration. +pub(crate) fn prepare_jar<'tx, const COLUMNS: usize, T: Table>( + tx: &impl DbTx<'tx>, + segment: SnapshotSegment, + filters: Filters, + compression: Compression, + range: RangeInclusive, + range_len: usize, + prepare_compression: impl Fn() -> RethResult>, +) -> RethResult { + let mut nippy_jar = NippyJar::new_without_header( + COLUMNS, + &get_snapshot_segment_file_name(segment, filters, compression, &range), + ); + + nippy_jar = match compression { + Compression::Lz4 => nippy_jar.with_lz4(), + Compression::Zstd => nippy_jar.with_zstd(false, 0), + Compression::ZstdWithDictionary => { + let dataset = prepare_compression()?; + + nippy_jar = nippy_jar.with_zstd(true, 5_000_000); + nippy_jar.prepare_compression(dataset.to_vec())?; + nippy_jar + } + Compression::Uncompressed => nippy_jar, + }; + + if let Filters::WithFilters(inclusion_filter, phf) = filters { + let total_rows = (tx.entries::()? - *range.start() as usize).min(range_len); + nippy_jar = match inclusion_filter { + InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows), + }; + nippy_jar = match phf { + PerfectHashingFunction::Fmph => nippy_jar.with_fmph(), + PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(), + }; + } + + Ok(nippy_jar) +} + +/// Returns file name for the provided segment, filters, compression and range. +pub fn get_snapshot_segment_file_name( + segment: SnapshotSegment, + filters: Filters, + compression: Compression, + range: &RangeInclusive, +) -> PathBuf { + let segment_name = match segment { + SnapshotSegment::Headers => "headers", + SnapshotSegment::Transactions => "transactions", + SnapshotSegment::Receipts => "receipts", + }; + let filters_name = match filters { + Filters::WithFilters(inclusion_filter, phf) => { + let inclusion_filter = match inclusion_filter { + InclusionFilter::Cuckoo => "cuckoo", + }; + let phf = match phf { + PerfectHashingFunction::Fmph => "fmph", + PerfectHashingFunction::GoFmph => "gofmph", + }; + format!("{inclusion_filter}-{phf}") + } + Filters::WithoutFilters => "none".to_string(), + }; + let compression_name = match compression { + Compression::Lz4 => "lz4", + Compression::Zstd => "zstd", + Compression::ZstdWithDictionary => "zstd-dict", + Compression::Uncompressed => "uncompressed", + }; + + format!( + "snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}", + range.start(), + range.end(), + ) + .into() +} diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index bf8d683d027..7544e3c29e5 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -182,13 +182,13 @@ where } /// Adds [`phf::Fmph`] perfect hashing function. - pub fn with_mphf(mut self) -> Self { + pub fn with_fmph(mut self) -> Self { self.phf = Some(Functions::Fmph(Fmph::new())); self } /// Adds [`phf::GoFmph`] perfect hashing function. - pub fn with_gomphf(mut self) -> Self { + pub fn with_gofmph(mut self) -> Self { self.phf = Some(Functions::GoFmph(GoFmph::new())); self } @@ -556,12 +556,12 @@ mod tests { assert_eq!(indexes, collect_indexes(&loaded_nippy)); }; - // mphf bytes size for 100 values of 32 bytes: 54 - nippy = nippy.with_mphf(); + // fmph bytes size for 100 values of 32 bytes: 54 + nippy = nippy.with_fmph(); check_phf(&mut nippy); - // mphf bytes size for 100 values of 32 bytes: 46 - nippy = nippy.with_gomphf(); + // fmph bytes size for 100 values of 32 bytes: 46 + nippy = nippy.with_gofmph(); check_phf(&mut nippy); } @@ -771,7 +771,7 @@ mod tests { NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start }) .with_zstd(true, 5000) .with_cuckoo_filter(col1.len()) - .with_mphf(); + .with_fmph(); nippy.prepare_compression(data.clone()).unwrap(); nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap(); @@ -849,7 +849,7 @@ mod tests { let mut nippy = NippyJar::new_without_header(num_columns, file_path.path()) .with_zstd(true, 5000) .with_cuckoo_filter(col1.len()) - .with_mphf(); + .with_fmph(); nippy.prepare_compression(data.clone()).unwrap(); nippy.prepare_index(clone_with_result(&col1), col1.len()).unwrap(); diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs index 72aed71d0b8..ec2c36e6b93 100644 --- a/crates/storage/provider/src/providers/snapshot.rs +++ b/crates/storage/provider/src/providers/snapshot.rs @@ -164,7 +164,7 @@ mod test { } if with_filter { - nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_mphf(); + nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph(); } let tx = db.tx().unwrap();