Skip to content

Commit

Permalink
migrate blocks in batches (rework db iterator impl) (#3450)
Browse files Browse the repository at this point in the history
* rework db iterator
more robust block migration based on low level iterator

* cleanup

* cleanup

* fixup
  • Loading branch information
antiochp authored Oct 7, 2020
1 parent eab26b3 commit e7bbda8
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 128 deletions.
41 changes: 30 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ use crate::types::{
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::{util::RwLock, ChainStore};
use grin_core::ser;
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{collections::HashMap, io::Cursor};

/// Orphan pool size is limited by MAX_ORPHAN_SIZE
pub const MAX_ORPHAN_SIZE: usize = 200;
Expand Down Expand Up @@ -1187,10 +1188,9 @@ impl Chain {
let tail = batch.get_block_header(&tail_hash)?;

// Remove old blocks (including short lived fork blocks) which height < tail.height
// here b is a block
for (_, b) in batch.blocks_iter()? {
if b.header.height < tail.height {
let _ = batch.delete_block(&b.hash());
for block in batch.blocks_iter()? {
if block.header.height < tail.height {
let _ = batch.delete_block(&block.hash());
count += 1;
}
}
Expand Down Expand Up @@ -1407,13 +1407,32 @@ impl Chain {
/// Migrate our local db from v2 to v3.
/// "commit only" inputs.
fn migrate_db_v2_v3(store: &ChainStore) -> Result<(), Error> {
let store_v2 = store.with_version(ProtocolVersion(2));
let batch = store_v2.batch()?;
for (_, block) in batch.blocks_iter()? {
batch.migrate_block(&block, ProtocolVersion(3))?;
let mut keys_to_migrate = vec![];
for (k, v) in store.batch()?.blocks_raw_iter()? {
// We want to migrate all blocks that cannot be read via v3 protocol version.
let block_v2: Result<Block, _> =
ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(2));
let block_v3: Result<Block, _> =
ser::deserialize(&mut Cursor::new(&v), ProtocolVersion(3));
if let (Ok(_), Err(_)) = (block_v2, block_v3) {
keys_to_migrate.push(k);
}
}
batch.commit()?;
Ok(())
debug!(
"migrate_db_v2_v3: {} blocks to migrate",
keys_to_migrate.len()
);
let mut count = 0;
keys_to_migrate.chunks(100).try_for_each(|keys| {
let batch = store.batch()?;
for key in keys {
batch.migrate_block(&key, ProtocolVersion(2), ProtocolVersion(3))?;
count += 1;
}
batch.commit()?;
debug!("migrate_db_v2_v3: successfully migrated {} blocks", count);
Ok(())
})
}

/// Gets the block header in which a given output appears in the txhashset.
Expand Down
8 changes: 8 additions & 0 deletions chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ impl From<io::Error> for Error {
}
}

impl From<ser::Error> for Error {
fn from(error: ser::Error) -> Error {
Error {
inner: Context::new(ErrorKind::SerErr(error)),
}
}
}

impl From<secp::Error> for Error {
fn from(e: secp::Error) -> Error {
Error {
Expand Down
4 changes: 2 additions & 2 deletions chain/src/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
let mut list_count = 0;
let mut entry_count = 0;
let prefix = to_key(self.list_prefix, "");
for (key, _) in batch.db.iter::<ListWrapper<T>>(&prefix)? {
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
list_count += 1;
}
let prefix = to_key(self.entry_prefix, "");
for (key, _) in batch.db.iter::<ListEntry<T>>(&prefix)? {
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
entry_count += 1;
}
Expand Down
64 changes: 42 additions & 22 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::linked_list::MultiIndex;
use crate::types::{CommitPos, Tip};
use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap;
use grin_core::ser;
use grin_store as store;
use grin_store::{option_to_not_found, to_key, Error, SerIterator};
use grin_store::{option_to_not_found, to_key, Error};
use std::convert::TryInto;
use std::sync::Arc;

Expand Down Expand Up @@ -57,17 +58,6 @@ impl ChainStore {
Ok(ChainStore { db })
}

/// Create a new instance of the chain store based on this instance
/// but with the provided protocol version. This is used when migrating
/// data in the db to a different protocol version, reading using one version and
/// writing back to the db with a different version.
pub fn with_version(&self, version: ProtocolVersion) -> ChainStore {
let db_with_version = self.db.with_version(version);
ChainStore {
db: db_with_version,
}
}

/// The current chain head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEAD_PREFIX]), || "HEAD".to_owned())
Expand Down Expand Up @@ -224,11 +214,20 @@ impl<'a> Batch<'a> {
Ok(())
}

/// Migrate a block stored in the db by serializing it using the provided protocol version.
/// Block may have been read using a previous protocol version but we do not actually care.
pub fn migrate_block(&self, b: &Block, version: ProtocolVersion) -> Result<(), Error> {
self.db
.put_ser_with_version(&to_key(BLOCK_PREFIX, b.hash())[..], b, version)?;
/// Migrate a block stored in the db reading from one protocol version and writing
/// with new protocol version.
pub fn migrate_block(
&self,
key: &[u8],
from_version: ProtocolVersion,
to_version: ProtocolVersion,
) -> Result<(), Error> {
let block: Option<Block> = self.db.get_with(key, move |_, mut v| {
ser::deserialize(&mut v, from_version).map_err(From::from)
})?;
if let Some(block) = block {
self.db.put_ser_with_version(key, &block, to_version)?;
}
Ok(())
}

Expand Down Expand Up @@ -283,9 +282,14 @@ impl<'a> Batch<'a> {
}

/// Iterator over the output_pos index.
pub fn output_pos_iter(&self) -> Result<SerIterator<(u64, u64)>, Error> {
pub fn output_pos_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)>, Error> {
let key = to_key(OUTPUT_POS_PREFIX, "");
self.db.iter(&key)
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |k, mut v| {
ser::deserialize(&mut v, protocol_version)
.map(|pos| (k.to_vec(), pos))
.map_err(From::from)
})
}

/// Get output_pos from index.
Expand Down Expand Up @@ -371,10 +375,26 @@ impl<'a> Batch<'a> {
})
}

/// An iterator to all block in db
pub fn blocks_iter(&self) -> Result<SerIterator<Block>, Error> {
/// Iterator over all full blocks in the db.
/// Uses default db serialization strategy via db protocol version.
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Block>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key)
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
ser::deserialize(&mut v, protocol_version).map_err(From::from)
})
}

/// Iterator over raw data for full blocks in the db.
/// Used during block migration (we need flexibility around deserialization).
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec())))
}

/// Protocol version of our underlying db.
pub fn protocol_version(&self) -> ProtocolVersion {
self.db.protocol_version()
}
}

Expand Down
7 changes: 4 additions & 3 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,13 @@ impl TxHashSet {
// Iterate over the current output_pos index, removing any entries that
// do not point to to the expected output.
let mut removed_count = 0;
for (key, (pos, _)) in batch.output_pos_iter()? {
if let Some(out) = output_pmmr.get_data(pos) {
for (key, pos) in batch.output_pos_iter()? {
if let Some(out) = output_pmmr.get_data(pos.pos) {
if let Ok(pos_via_mmr) = batch.get_output_pos(&out.commitment()) {
// If the pos matches and the index key matches the commitment
// then keep the entry, other we want to clean it up.
if pos == pos_via_mmr && batch.is_match_output_pos_key(&key, &out.commitment())
if pos.pos == pos_via_mmr
&& batch.is_match_output_pos_key(&key, &out.commitment())
{
continue;
}
Expand Down
17 changes: 9 additions & 8 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,16 @@ impl Peers {
}
}

/// All peer information we have in storage
/// Iterator over all peers we know about (stored in our db).
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
self.store.peers_iter().map_err(From::from)
}

/// Convenience for reading all peers.
pub fn all_peers(&self) -> Vec<PeerData> {
match self.store.all_peers() {
Ok(peers) => peers,
Err(e) => {
error!("all_peers failed: {:?}", e);
vec![]
}
}
self.peers_iter()
.map(|peers| peers.collect())
.unwrap_or(vec![])
}

/// Find peers in store (not necessarily connected) and return their data
Expand Down
20 changes: 13 additions & 7 deletions p2p/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,27 @@ impl PeerStore {
cap: Capabilities,
count: usize,
) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let peers = self
.db
.iter::<PeerData>(&key)?
.map(|(_, v)| v)
.peers_iter()?
.filter(|p| p.flags == state && p.capabilities.contains(cap))
.choose_multiple(&mut thread_rng(), count);
Ok(peers)
}

/// Iterator over all known peers.
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
ser::deserialize(&mut v, protocol_version).map_err(From::from)
})
}

/// List all known peers
/// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
Ok(self.db.iter::<PeerData>(&key)?.map(|(_, v)| v).collect())
let peers: Vec<PeerData> = self.peers_iter()?.collect();
Ok(peers)
}

/// Convenience method to load a peer data, update its status and save it
Expand Down Expand Up @@ -194,7 +200,7 @@ impl PeerStore {
{
let mut to_remove = vec![];

for x in self.all_peers()? {
for x in self.peers_iter()? {
if predicate(&x) {
to_remove.push(x)
}
Expand Down
17 changes: 8 additions & 9 deletions servers/src/grin/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
use chrono::prelude::{DateTime, Utc};
use chrono::{Duration, MIN_DATE};
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::{mpsc, Arc};
Expand Down Expand Up @@ -145,14 +144,14 @@ fn monitor_peers(
tx: mpsc::Sender<PeerAddr>,
preferred_peers: &[PeerAddr],
) {
// regularly check if we need to acquire more peers and if so, gets
// regularly check if we need to acquire more peers and if so, gets
// them from db
let total_count = peers.all_peers().len();
let mut total_count = 0;
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defuncts = vec![];

for x in peers.all_peers() {
for x in peers.all_peers().into_iter() {
match x.flags {
p2p::State::Banned => {
let interval = Utc::now().timestamp() - x.last_banned;
Expand All @@ -172,6 +171,7 @@ fn monitor_peers(
p2p::State::Healthy => healthy_count += 1,
p2p::State::Defunct => defuncts.push(x),
}
total_count += 1;
}

debug!(
Expand Down Expand Up @@ -223,11 +223,10 @@ fn monitor_peers(
}
}

// take a random defunct peer and mark it healthy: over a long period any
// take a random defunct peer and mark it healthy: over a long enough period any
// peer will see another as defunct eventually, gives us a chance to retry
if !defuncts.is_empty() {
defuncts.shuffle(&mut thread_rng());
let _ = peers.update_state(defuncts[0].addr, p2p::State::Healthy);
if let Some(peer) = defuncts.into_iter().choose(&mut thread_rng()) {
let _ = peers.update_state(peer.addr, p2p::State::Healthy);
}

// find some peers from our db
Expand Down
Loading

0 comments on commit e7bbda8

Please sign in to comment.