Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
5b429fa
move tx_lookup hash calculation to db crate
joshieDo Oct 6, 2023
8090431
update doc on tx hash calc
joshieDo Oct 6, 2023
f5762b4
Merge remote-tracking branch 'origin/main' into joshie/snapshot-bin-2
joshieDo Oct 12, 2023
8eaab06
use databaseprovider instead
joshieDo Oct 12, 2023
95e0574
add find_transaction_range
joshieDo Oct 12, 2023
e03a3c0
cargo deps
joshieDo Oct 12, 2023
16e4457
add TransactionsProviderExt
joshieDo Oct 12, 2023
a6edbc4
add transactions snapshot segment
joshieDo Oct 12, 2023
765ea2e
snapshot benches compare results
joshieDo Oct 12, 2023
1b41008
add TransactionNotFound error
joshieDo Oct 12, 2023
b6789e4
add additional empty provider impls to SnapshotProvider
joshieDo Oct 12, 2023
a900860
add SegmentHeader to snapshot jars
joshieDo Oct 13, 2023
9ae5939
add wip transaction_by_hash
joshieDo Oct 13, 2023
96f77fc
fix doc test
joshieDo Oct 13, 2023
16c33a7
fmt
joshieDo Oct 13, 2023
d14a483
add with_hash to walk bench as well
joshieDo Oct 13, 2023
055a818
fix recover_hashes range
joshieDo Oct 13, 2023
5bef37f
Merge remote-tracking branch 'origin/main' into joshie/snapshot-bin-2
joshieDo Oct 16, 2023
a66b603
fix doc test
joshieDo Oct 16, 2023
f327d91
fix test_snap
joshieDo Oct 16, 2023
5582db3
share dataset_for_compression implementation
joshieDo Oct 16, 2023
13136ed
add snapshot segment
joshieDo Oct 16, 2023
d4c8b33
add receipts command
joshieDo Oct 16, 2023
36fa32f
Merge remote-tracking branch 'origin/main' into joshie/snapshot-bin-2
joshieDo Oct 19, 2023
648d55e
move dictionaries inside NippyJar
joshieDo Oct 21, 2023
bfde955
introduce snapshotjarprovider and dashmap
joshieDo Oct 23, 2023
0beb01a
docs
joshieDo Oct 23, 2023
b4d1d1e
Update bin/reth/src/db/snapshots/receipts.rs
joshieDo Oct 24, 2023
79c72db
Update bin/reth/src/db/snapshots/transactions.rs
joshieDo Oct 24, 2023
8880d6f
Update bin/reth/src/db/snapshots/transactions.rs
joshieDo Oct 24, 2023
6224354
Update bin/reth/src/db/snapshots/receipts.rs
joshieDo Oct 24, 2023
8e56d1d
Update crates/interfaces/src/provider.rs
joshieDo Oct 24, 2023
4fb1871
add transaction_hashes_by_range to provider
joshieDo Oct 24, 2023
9ab6333
add TxHashOrNumber type
joshieDo Oct 24, 2023
c562e0c
Update bin/reth/src/db/snapshots/receipts.rs
joshieDo Oct 24, 2023
27d6ab2
Update bin/reth/src/db/snapshots/transactions.rs
joshieDo Oct 24, 2023
21edb80
Merge remote-tracking branch 'origin/main' into joshie/snapshot-bin-2
joshieDo Oct 24, 2023
3a385ec
Merge branch 'joshie/snapshot-bin-2' into joshie/sprov
joshieDo Oct 24, 2023
2f8d303
clippy
joshieDo Oct 24, 2023
3631dec
Merge branch 'joshie/snapshot-bin-2' into joshie/sprov
joshieDo Oct 24, 2023
f2768f3
small fixes
joshieDo Oct 24, 2023
47e0036
add serde dictionaries
joshieDo Oct 24, 2023
e5e71be
yes clippy
joshieDo Oct 24, 2023
d7c01b2
Merge branch 'joshie/snapshot-bin-2' into joshie/sprov
joshieDo Oct 24, 2023
e920318
fix doc
joshieDo Oct 25, 2023
5fd1124
Merge branch 'joshie/snapshot-bin-2' into joshie/sprov
joshieDo Oct 25, 2023
30c4748
Merge remote-tracking branch 'origin/main' into joshie/snapshot-bin-2
joshieDo Oct 26, 2023
efeb602
fmt
joshieDo Oct 26, 2023
7dab55d
Merge branch 'joshie/snapshot-bin-2' into joshie/sprov
joshieDo Oct 26, 2023
888e123
Merge remote-tracking branch 'origin/main' into joshie/sprov
joshieDo Oct 26, 2023
8f9cbdb
use BLOCKS_PER_SNAPSHOT instead
joshieDo Oct 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 10 additions & 17 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory};
use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::segments::{Headers, Segment};
use std::{path::Path, sync::Arc};

impl Command {
Expand Down Expand Up @@ -54,20 +55,12 @@ impl Command {

let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Headers,
filters,
compression,
&range,
))?;

let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path =
SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range);
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;

for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
Expand Down
26 changes: 2 additions & 24 deletions bin/reth/src/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ use clap::Parser;
use itertools::Itertools;
use reth_db::{open_db_read_only, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{
compression::{DecoderDictionary, Decompressor},
NippyJar,
};
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction, SegmentHeader},
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::{providers::SnapshotProvider, ProviderFactory};
use reth_provider::ProviderFactory;
use std::{path::Path, sync::Arc};

mod bench;
Expand Down Expand Up @@ -134,22 +130,4 @@ impl Command {

Ok(())
}

/// Returns a [`SnapshotProvider`] of the provided [`NippyJar`], alongside a list of
/// [`DecoderDictionary`] and [`Decompressor`] if necessary.
fn prepare_jar_provider<'a>(
&self,
jar: &'a mut NippyJar<SegmentHeader>,
dictionaries: &'a mut Option<Vec<DecoderDictionary<'_>>>,
) -> eyre::Result<(SnapshotProvider<'a>, Vec<Decompressor<'a>>)> {
let mut decompressors: Vec<Decompressor<'_>> = vec![];
if let Some(reth_nippy_jar::compression::Compressors::Zstd(zstd)) = jar.compressor_mut() {
if zstd.use_dict {
*dictionaries = zstd.generate_decompress_dictionaries();
decompressors = zstd.generate_decompressors(dictionaries.as_ref().expect("qed"))?;
}
}

Ok((SnapshotProvider { jar: &*jar }, decompressors))
}
}
34 changes: 13 additions & 21 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};

impl Command {
Expand Down Expand Up @@ -59,26 +55,22 @@ impl Command {
let block_range = self.from..=(self.from + self.block_interval - 1);

let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Receipts,
filters,
compression,
&block_range,
))?;

let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
.transaction_range_by_block_range(block_range.clone())?;

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path = SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
);
Comment on lines 59 to +69
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can avoid cloning by initializing path first, and then passing block_range by value into transaction_range_by_block_range

let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;

for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
Expand Down
34 changes: 13 additions & 21 deletions bin/reth/src/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, SnapshotSegment, TransactionSignedNoHash,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};

impl Command {
Expand Down Expand Up @@ -59,26 +55,22 @@ impl Command {
let block_range = self.from..=(self.from + self.block_interval - 1);

let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Transactions,
filters,
compression,
&block_range,
))?;

let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
.transaction_range_by_block_range(block_range.clone())?;

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path = SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
);
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;

for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
Expand Down
3 changes: 3 additions & 0 deletions crates/primitives/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ mod segment;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment};

/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
70 changes: 67 additions & 3 deletions crates/primitives/src/snapshot/segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{BlockNumber, TxNumber};
use crate::{snapshot::PerfectHashingFunction, BlockNumber, TxNumber};
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::PathBuf};

use super::{Compression, Filters, InclusionFilter};

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
Expand All @@ -14,8 +16,70 @@ pub enum SnapshotSegment {
Receipts,
}

impl SnapshotSegment {
/// Returns the default configuration of the segment.
const fn config(&self) -> (Filters, Compression) {
let default_config = (
Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph),
Compression::Lz4,
);

match self {
SnapshotSegment::Headers => default_config,
SnapshotSegment::Transactions => default_config,
SnapshotSegment::Receipts => default_config,
}
}

/// Returns the default file name for the provided segment and range.
pub fn filename(&self, range: &RangeInclusive<BlockNumber>) -> PathBuf {
let (filters, compression) = self.config();
self.filename_with_configuration(filters, compression, range)
}

/// Returns file name for the provided segment, filters, compression and range.
pub fn filename_with_configuration(
&self,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work with something like 0..=10 or does this require a reference to &(0..=10)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it requires a ref. would you rather it be an object and not a ref?

) -> PathBuf {
let segment_name = match self {
SnapshotSegment::Headers => "headers",
SnapshotSegment::Transactions => "transactions",
SnapshotSegment::Receipts => "receipts",
};
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
let inclusion_filter = match inclusion_filter {
InclusionFilter::Cuckoo => "cuckoo",
};
let phf = match phf {
PerfectHashingFunction::Fmph => "fmph",
PerfectHashingFunction::GoFmph => "gofmph",
};
format!("{inclusion_filter}-{phf}")
}
Filters::WithoutFilters => "none".to_string(),
};
let compression_name = match compression {
Compression::Lz4 => "lz4",
Compression::Zstd => "zstd",
Compression::ZstdWithDictionary => "zstd-dict",
Compression::Uncompressed => "uncompressed",
};

format!(
"snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}",
range.start(),
range.end(),
)
.into()
}
}

/// A segment header that contains information common to all segments. Used for storage.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
pub struct SegmentHeader {
block_range: RangeInclusive<BlockNumber>,
tx_range: RangeInclusive<TxNumber>,
Expand Down
44 changes: 2 additions & 42 deletions crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use reth_primitives::{
BlockNumber, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::PathBuf};
use std::ops::RangeInclusive;

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
&get_snapshot_segment_file_name(segment, filters, compression, &block_range),
&segment.filename_with_configuration(filters, compression, &block_range),
SegmentHeader::new(block_range, tx_range),
);

Expand Down Expand Up @@ -90,43 +90,3 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(

Ok(nippy_jar)
}

/// Returns file name for the provided segment, filters, compression and range.
pub fn get_snapshot_segment_file_name(
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
) -> PathBuf {
let segment_name = match segment {
SnapshotSegment::Headers => "headers",
SnapshotSegment::Transactions => "transactions",
SnapshotSegment::Receipts => "receipts",
};
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
let inclusion_filter = match inclusion_filter {
InclusionFilter::Cuckoo => "cuckoo",
};
let phf = match phf {
PerfectHashingFunction::Fmph => "fmph",
PerfectHashingFunction::GoFmph => "gofmph",
};
format!("{inclusion_filter}-{phf}")
}
Filters::WithoutFilters => "none".to_string(),
};
let compression_name = match compression {
Compression::Lz4 => "lz4",
Compression::Zstd => "zstd",
Compression::ZstdWithDictionary => "zstd-dict",
Compression::Uncompressed => "uncompressed",
};

format!(
"snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}",
range.start(),
range.end(),
)
.into()
}
1 change: 1 addition & 0 deletions crates/storage/nippy-jar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing-appender = "0.2"
anyhow = "1.0"
thiserror.workspace = true
hex = "*"
derive_more = "0.99"

[dev-dependencies]
rand = { version = "0.8", features = ["small_rng"] }
Expand Down
Loading