Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic LMDB mapsize allocation [1.1.0] #2605

Merged
merged 16 commits into from
Feb 27, 2019
13 changes: 11 additions & 2 deletions .auto-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@ if [[ $TRAVIS_OS_NAME == 'osx' ]]; then
md5 "grin-$tagname-$TRAVIS_JOB_ID-osx.tgz" > "grin-$tagname-$TRAVIS_JOB_ID-osx.tgz"-md5sum.txt
/bin/ls -ls *-md5sum.txt | awk '{print $6,$7,$8,$9,$10}'
cd - > /dev/null;
echo "tarball generated\n"
echo "osx tarball generated\n"

# Only generate changelog on Linux platform, to avoid duplication
exit 0
elif [[ $TRAVIS_OS_NAME == 'windows' ]]; then
# Custom requirements for windows
cd target/release ; rm -f *.zip; zip "grin-$tagname-$TRAVIS_JOB_ID-win-x64.zip" grin
/bin/ls -ls *.zip | awk '{print $6,$7,$8,$9,$10}'
md5 "grin-$tagname-$TRAVIS_JOB_ID-win-x64.zip" > "grin-$tagname-$TRAVIS_JOB_ID-win-x64.zip"-md5sum.txt
/bin/ls -ls *-md5sum.txt | awk '{print $6,$7,$8,$9,$10}'
cd - > /dev/null;
echo "windows zip file generated\n"
exit 0
else
# Do some custom requirements on Linux
cd target/release ; rm -f *.tgz; tar zcf "grin-$tagname-$TRAVIS_JOB_ID-linux-amd64.tgz" grin
/bin/ls -ls *.tgz | awk '{print $6,$7,$8,$9,$10}'
md5sum "grin-$tagname-$TRAVIS_JOB_ID-linux-amd64.tgz" > "grin-$tagname-$TRAVIS_JOB_ID-linux-amd64.tgz"-md5sum.txt
/bin/ls -ls *-md5sum.txt | awk '{print $6,$7,$8,$9,$10}'
cd - > /dev/null;
echo "tarball generated\n"
echo "linux tarball generated\n"
fi

version="$tagname"
Expand Down
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ edition = "2018"
[dependencies]
bitflags = "1"
byteorder = "1"
lmdb-zero = "0.4.4"
failure = "0.1"
failure_derive = "0.1"
croaring = "0.3"
Expand Down
53 changes: 35 additions & 18 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::core::core::{
use crate::core::global;
use crate::core::pow;
use crate::error::{Error, ErrorKind};
use crate::lmdb;
use crate::pipe;
use crate::store;
use crate::txhashset;
Expand Down Expand Up @@ -142,7 +141,7 @@ impl OrphanBlockPool {
/// maintains locking for the pipeline to avoid conflicting processing.
pub struct Chain {
db_root: String,
store: Arc<store::ChainStore>,
store: Arc<RwLock<store::ChainStore>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the use of a RwLock but can't this be pushed down into our LMDB store? All regular operations would be considered a read, except for the close/open of the DB which would take the write.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a bit of doing and testing, but think I've managed it in the latest push.

adapter: Arc<dyn ChainAdapter + Send + Sync>,
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
Expand All @@ -160,7 +159,6 @@ impl Chain {
/// based on the genesis block if necessary.
pub fn init(
db_root: String,
db_env: Arc<lmdb::Environment>,
adapter: Arc<dyn ChainAdapter + Send + Sync>,
genesis: Block,
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
Expand All @@ -177,13 +175,13 @@ impl Chain {
return Err(ErrorKind::Stopped.into());
}

let store = Arc::new(store::ChainStore::new(db_env)?);
let store = Arc::new(RwLock::new(store::ChainStore::new(&db_root)?));

// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;

setup_head(&genesis, &store, &mut txhashset)?;
Chain::log_heads(&store)?;
setup_head(&genesis, &mut store.write(), &mut txhashset)?;
Chain::log_heads(&store.read())?;

Chain {
db_root,
Expand Down Expand Up @@ -214,7 +212,7 @@ impl Chain {
}

/// Shared store instance.
pub fn store(&self) -> Arc<store::ChainStore> {
pub fn store(&self) -> Arc<RwLock<store::ChainStore>> {
self.store.clone()
}

Expand Down Expand Up @@ -251,7 +249,8 @@ impl Chain {
/// the "sync" header MMR from a known consistent state and to ensure we track
/// the header chain correctly at the fork point.
pub fn reset_sync_head(&self) -> Result<Tip, Error> {
let batch = self.store.batch()?;
let mut s = self.store.write();
let batch = s.batch()?;
batch.reset_sync_head()?;
let head = batch.get_sync_head()?;
batch.commit()?;
Expand Down Expand Up @@ -302,7 +301,8 @@ impl Chain {
}

let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut s = self.store.write();
let batch = s.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;

let prev_head = ctx.batch.head()?;
Expand Down Expand Up @@ -380,7 +380,8 @@ impl Chain {
// We take a write lock on the txhashset and create a new batch
// but this is strictly readonly so we do not commit the batch.
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut s = self.store.write();
let batch = s.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;
pipe::process_block_header(bh, &mut ctx)?;
Ok(())
Expand All @@ -400,7 +401,8 @@ impl Chain {
}

let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut s = self.store.write();
let batch = s.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;

pipe::sync_block_headers(headers, &mut ctx)?;
Expand Down Expand Up @@ -543,7 +545,7 @@ impl Chain {

/// Validate the current chain state.
pub fn validate(&self, fast_validation: bool) -> Result<(), Error> {
let header = self.store.head_header()?;
let header = self.store.read().head_header()?;

// Lets just treat an "empty" node that just got started up as valid.
if header.height == 0 {
Expand Down Expand Up @@ -736,7 +738,8 @@ impl Chain {
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
let mut s = self.store.write();
let mut batch = s.batch()?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
Expand All @@ -755,7 +758,8 @@ impl Chain {
head: &Tip,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
let mut s = self.store.write();
let mut batch = s.batch()?;
txhashset::header_extending(txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
Expand Down Expand Up @@ -880,7 +884,8 @@ impl Chain {
// all good, prepare a new batch and update all the required records
debug!("txhashset_write: rewinding a 2nd time (writeable)");

let mut batch = self.store.batch()?;
let mut s = self.store.write();
let mut batch = s.batch()?;

txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rewind(&header)?;
Expand Down Expand Up @@ -984,10 +989,12 @@ impl Chain {
let tail = self.get_header_by_height(head.height - horizon)?;
let mut current = self.get_header_by_height(head.height - horizon - 1)?;

let batch = self.store.batch()?;
let mut s = self.store.write();
let batch = s.batch()?;

loop {
// Go to the store directly so we can handle NotFoundErr robustly.
match self.store.get_block(&current.hash()) {
match self.store.read().get_block(&current.hash()) {
Ok(b) => {
batch.delete_block(&b.hash())?;
count += 1;
Expand Down Expand Up @@ -1089,55 +1096,63 @@ impl Chain {
/// Tip (head) of the block chain.
pub fn head(&self) -> Result<Tip, Error> {
self.store
.read()
.head()
.map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into())
}

/// Tail of the block chain in this node after compact (cross-block cut-through)
pub fn tail(&self) -> Result<Tip, Error> {
self.store
.read()
.tail()
.map_err(|e| ErrorKind::StoreErr(e, "chain tail".to_owned()).into())
}

/// Tip (head) of the header chain.
pub fn header_head(&self) -> Result<Tip, Error> {
self.store
.read()
.header_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain header head".to_owned()).into())
}

/// Block header for the chain head
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.store
.read()
.head_header()
.map_err(|e| ErrorKind::StoreErr(e, "chain head header".to_owned()).into())
}

/// Gets a block header by hash
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
self.store
.read()
.get_block(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get block".to_owned()).into())
}

/// Gets a block header by hash
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
self.store
.read()
.get_block_header(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get header".to_owned()).into())
}

/// Get previous block header.
pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
self.store
.read()
.get_previous_header(header)
.map_err(|e| ErrorKind::StoreErr(e, "chain get previous header".to_owned()).into())
}

/// Get block_sums by header hash.
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
self.store
.read()
.get_block_sums(h)
.map_err(|e| ErrorKind::StoreErr(e, "chain get block_sums".to_owned()).into())
}
Expand Down Expand Up @@ -1202,6 +1217,7 @@ impl Chain {
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
self.store
.read()
.get_sync_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain get sync head".to_owned()).into())
}
Expand All @@ -1218,14 +1234,15 @@ impl Chain {
/// Check whether we have a block without reading it
pub fn block_exists(&self, h: Hash) -> Result<bool, Error> {
self.store
.read()
.block_exists(&h)
.map_err(|e| ErrorKind::StoreErr(e, "chain block exists".to_owned()).into())
}
}

fn setup_head(
genesis: &Block,
store: &store::ChainStore,
store: &mut store::ChainStore,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = store.batch()?;
Expand Down
2 changes: 0 additions & 2 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#[macro_use]
extern crate bitflags;

use lmdb_zero as lmdb;

#[macro_use]
extern crate serde_derive;
#[macro_use]
Expand Down
16 changes: 8 additions & 8 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use crate::core::consensus::HeaderInfo;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::pow::Difficulty;
use crate::lmdb;
use crate::types::Tip;
use crate::util::secp::pedersen::Commitment;
use crate::util::RwLock;
use croaring::Bitmap;
use grin_store as store;
use grin_store::{option_to_not_found, to_key, Error};
Expand All @@ -45,8 +45,8 @@ pub struct ChainStore {

impl ChainStore {
/// Create new chain store
pub fn new(db_env: Arc<lmdb::Environment>) -> Result<ChainStore, Error> {
let db = store::Store::open(db_env, STORE_SUBPATH);
pub fn new(db_root: &str) -> Result<ChainStore, Error> {
let db = store::Store::new(db_root, Some(STORE_SUBPATH.clone()), None)?;
Ok(ChainStore { db })
}
}
Expand Down Expand Up @@ -114,7 +114,7 @@ impl ChainStore {
}

/// Builds a new batch to be used with this store.
pub fn batch(&self) -> Result<Batch<'_>, Error> {
pub fn batch(&mut self) -> Result<Batch<'_>, Error> {
Ok(Batch {
db: self.db.batch()?,
})
Expand Down Expand Up @@ -355,7 +355,7 @@ impl<'a> Batch<'a> {
/// calculation.
pub struct DifficultyIter<'a> {
start: Hash,
store: Option<Arc<ChainStore>>,
store: Option<Arc<RwLock<ChainStore>>>,
batch: Option<Batch<'a>>,

// maintain state for both the "next" header in this iteration
Expand All @@ -369,7 +369,7 @@ pub struct DifficultyIter<'a> {
impl<'a> DifficultyIter<'a> {
/// Build a new iterator using the provided chain store and starting from
/// the provided block hash.
pub fn from<'b>(start: Hash, store: Arc<ChainStore>) -> DifficultyIter<'b> {
pub fn from<'b>(start: Hash, store: Arc<RwLock<ChainStore>>) -> DifficultyIter<'b> {
DifficultyIter {
start,
store: Some(store),
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<'a> Iterator for DifficultyIter<'a> {
batch.get_block_header(&self.start).ok()
} else {
if let Some(ref store) = self.store {
store.get_block_header(&self.start).ok()
store.read().get_block_header(&self.start).ok()
} else {
None
}
Expand All @@ -419,7 +419,7 @@ impl<'a> Iterator for DifficultyIter<'a> {
self.prev_header = batch.get_previous_header(&header).ok();
} else {
if let Some(ref store) = self.store {
self.prev_header = store.get_previous_header(&header).ok();
self.prev_header = store.read().get_previous_header(&header).ok();
} else {
self.prev_header = None;
}
Expand Down
Loading