Skip to content

Commit

Permalink
DB v1->v2 migration (#3044)
Browse files Browse the repository at this point in the history
* wip

* exhaustive match

* wip

* cleanup docs and move migration fns to chain itself
  • Loading branch information
antiochp authored Sep 19, 2019
1 parent 02cee80 commit 1c072f5
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 18 deletions.
30 changes: 27 additions & 3 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Chain {
)?;
Chain::log_heads(&store)?;

Ok(Chain {
let chain = Chain {
db_root,
store,
adapter,
Expand All @@ -201,7 +201,18 @@ impl Chain {
verifier_cache,
archive_mode,
genesis: genesis.header.clone(),
})
};

// DB migrations to be run prior to the chain being used.
{
// Migrate full blocks to protocol version v2.
chain.migrate_db_v1_v2()?;

// Rebuild height_for_pos index.
chain.rebuild_height_for_pos()?;
}

Ok(chain)
}

/// Return our shared header MMR handle.
Expand Down Expand Up @@ -1245,9 +1256,22 @@ impl Chain {
self.header_pmmr.read().get_header_hash_by_height(height)
}

/// Migrate our local db from v1 to v2.
/// This covers blocks which themselves contain transactions.
/// Transaction kernels changed in v2 due to "variable size kernels".
fn migrate_db_v1_v2(&self) -> Result<(), Error> {
let store_v1 = self.store.with_version(ProtocolVersion(1));
let batch = store_v1.batch()?;
for (_, block) in batch.blocks_iter()? {
batch.migrate_block(&block, ProtocolVersion(2))?;
}
batch.commit()?;
Ok(())
}

/// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)'
/// Note: should only be called when Node start-up, for database migration from the old version.
pub fn rebuild_height_for_pos(&self) -> Result<(), Error> {
fn rebuild_height_for_pos(&self) -> Result<(), Error> {
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let mut outputs_pos = txhashset.get_all_output_pos()?;
Expand Down
23 changes: 23 additions & 0 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::core::consensus::HeaderInfo;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
use crate::types::Tip;
use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap;
Expand Down Expand Up @@ -49,6 +50,17 @@ impl ChainStore {
let db = store::Store::new(db_root, None, Some(STORE_SUBPATH.clone()), None)?;
Ok(ChainStore { db })
}

/// Create a new instance of the chain store based on this instance
/// but with the provided protocol version. This is used when migrating
/// data in the db to a different protocol version, reading using one version and
/// writing back to the db with a different version.
pub fn with_version(&self, version: ProtocolVersion) -> ChainStore {
let db_with_version = self.db.with_version(version);
ChainStore {
db: db_with_version,
}
}
}

impl ChainStore {
Expand Down Expand Up @@ -258,6 +270,17 @@ impl<'a> Batch<'a> {
Ok(())
}

/// Migrate a block stored in the db by serializing it using the provided protocol version.
/// Block may have been read using a previous protocol version but we do not actually care.
pub fn migrate_block(&self, b: &Block, version: ProtocolVersion) -> Result<(), Error> {
self.db.put_ser_with_version(
&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..],
b,
version,
)?;
Ok(())
}

/// Delete a full block. Does not delete any record associated with a block
/// header.
pub fn delete_block(&self, bh: &Hash) -> Result<(), Error> {
Expand Down
3 changes: 0 additions & 3 deletions servers/src/grin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ impl Server {
archive_mode,
)?);

// launching the database migration if needed
shared_chain.rebuild_height_for_pos()?;

pool_adapter.set_chain(shared_chain.clone());

let net_adapter = Arc::new(NetToChainAdapter::new(
Expand Down
45 changes: 33 additions & 12 deletions store/src/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ where
}
}

const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(2);

/// LMDB-backed store facilitating data access and serialization. All writes
/// are done through a Batch abstraction providing atomicity.
pub struct Store {
env: Arc<lmdb::Environment>,
db: RwLock<Option<Arc<lmdb::Database<'static>>>>,
db: Arc<RwLock<Option<Arc<lmdb::Database<'static>>>>>,
name: String,
version: ProtocolVersion,
}
Expand Down Expand Up @@ -113,9 +115,9 @@ impl Store {
);
let res = Store {
env: Arc::new(env),
db: RwLock::new(None),
db: Arc::new(RwLock::new(None)),
name: db_name,
version: ProtocolVersion(1),
version: DEFAULT_DB_VERSION,
};

{
Expand All @@ -129,6 +131,17 @@ impl Store {
Ok(res)
}

/// Construct a new store using a specific protocol version.
/// Permits access to the db with legacy protocol versions for db migrations.
pub fn with_version(&self, version: ProtocolVersion) -> Store {
Store {
env: self.env.clone(),
db: self.db.clone(),
name: self.name.clone(),
version: version,
}
}

/// Opens the database environment
pub fn open(&self) -> Result<(), Error> {
let mut w = self.db.write();
Expand Down Expand Up @@ -275,11 +288,8 @@ impl Store {
if self.needs_resize()? {
self.do_resize()?;
}
let txn = lmdb::WriteTransaction::new(self.env.clone())?;
Ok(Batch {
store: self,
tx: txn,
})
let tx = lmdb::WriteTransaction::new(self.env.clone())?;
Ok(Batch { store: self, tx })
}
}

Expand All @@ -299,10 +309,21 @@ impl<'a> Batch<'a> {
Ok(())
}

/// Writes a single key and its `Writeable` value to the db. Encapsulates
/// serialization.
/// Writes a single key and its `Writeable` value to the db.
/// Encapsulates serialization using the (default) version configured on the store instance.
pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> {
let ser_value = ser::ser_vec(value, self.store.version);
self.put_ser_with_version(key, value, self.store.version)
}

/// Writes a single key and its `Writeable` value to the db.
/// Encapsulates serialization using the specified protocol version.
pub fn put_ser_with_version<W: ser::Writeable>(
&self,
key: &[u8],
value: &W,
version: ProtocolVersion,
) -> Result<(), Error> {
let ser_value = ser::ser_vec(value, version);
match ser_value {
Ok(data) => self.put(key, &data),
Err(err) => Err(Error::SerErr(format!("{}", err))),
Expand Down Expand Up @@ -356,7 +377,7 @@ impl<'a> Batch<'a> {
}
}

/// An iterator thad produces Readable instances back. Wraps the lower level
/// An iterator that produces Readable instances back. Wraps the lower level
/// DBIterator and deserializes the returned values.
pub struct SerIterator<T>
where
Expand Down

0 comments on commit 1c072f5

Please sign in to comment.