Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel preprocessing of blocks + transactions #1360

Merged
merged 18 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ namespace graphene { namespace app {
}
}

void network_broadcast_api::broadcast_transaction(const signed_transaction& trx)
void network_broadcast_api::broadcast_transaction(const precomputable_transaction& trx)
{
trx.validate();
_app.chain_database()->precompute_parallel( trx ).wait();
_app.chain_database()->push_transaction(trx);
if( _app.p2p_node() != nullptr )
_app.p2p_node()->broadcast_transaction(trx);
}

fc::variant network_broadcast_api::broadcast_transaction_synchronous(const signed_transaction& trx)
fc::variant network_broadcast_api::broadcast_transaction_synchronous(const precomputable_transaction& trx)
{
fc::promise<fc::variant>::ptr prom( new fc::promise<fc::variant>() );
broadcast_transaction_with_callback( [=]( const fc::variant& v ){
broadcast_transaction_with_callback( [prom]( const fc::variant& v ){
prom->set_value(v);
}, trx );

Expand All @@ -179,14 +179,15 @@ namespace graphene { namespace app {

void network_broadcast_api::broadcast_block( const signed_block& b )
{
_app.chain_database()->precompute_parallel( b ).wait();
_app.chain_database()->push_block(b);
if( _app.p2p_node() != nullptr )
_app.p2p_node()->broadcast( net::block_message( b ));
}

void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const signed_transaction& trx)
void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const precomputable_transaction& trx)
{
trx.validate();
_app.chain_database()->precompute_parallel( trx ).wait();
_callbacks[trx.id()] = cb;
_app.chain_database()->push_transaction(trx);
if( _app.p2p_node() != nullptr )
Expand Down
51 changes: 40 additions & 11 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <graphene/app/application.hpp>
#include <graphene/app/plugin.hpp>

#include <graphene/chain/db_with.hpp>
#include <graphene/chain/genesis_state.hpp>
#include <graphene/chain/protocol/fee_schedule.hpp>
#include <graphene/chain/protocol/types.hpp>
Expand Down Expand Up @@ -394,12 +395,34 @@ void application_impl::startup()
_chain_db->enable_standby_votes_tracking( _options->at("enable-standby-votes-tracking").as<bool>() );
}

if( _options->count("replay-blockchain") )
if( _options->count("replay-blockchain") || _options->count("revalidate-blockchain") )
_chain_db->wipe( _data_dir / "blockchain", false );

try
{
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
// these flags are used in open() only, i. e. during replay
uint32_t skip;
if( _options->count("revalidate-blockchain") ) // see also handle_block()
{
if( !loaded_checkpoints.empty() )
wlog( "Warning - revalidate will not validate before last checkpoint" );
if( _options->count("force-validate") )
skip = graphene::chain::database::skip_nothing;
else
skip = graphene::chain::database::skip_transaction_signatures;
}
else // no revalidate, skip most checks
skip = graphene::chain::database::skip_witness_signature |
graphene::chain::database::skip_block_size_check |
graphene::chain::database::skip_merkle_check |
graphene::chain::database::skip_transaction_signatures |
graphene::chain::database::skip_transaction_dupe_check |
graphene::chain::database::skip_tapos_check |
graphene::chain::database::skip_witness_schedule_check;

graphene::chain::detail::with_skip_flags( *_chain_db, skip, [this,&initial_state] () {
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
});
}
catch( const fc::exception& e )
{
Expand Down Expand Up @@ -517,13 +540,17 @@ bool application_impl::handle_block(const graphene::net::block_message& blk_msg,
FC_ASSERT( (latency.count()/1000) > -5000, "Rejecting block with timestamp in the future" );

try {
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
// you can help the network code out by throwing a block_older_than_undo_history exception.
// when the net code sees that, it will stop trying to push blocks from that chain, but
// leave that peer connected so that they can get sync blocks from us
bool result = _chain_db->push_block( blk_msg.block,
(_is_block_producer | _force_validate) ?
database::skip_nothing : database::skip_transaction_signatures );
const uint32_t skip = (_is_block_producer | _force_validate) ?
database::skip_nothing : database::skip_transaction_signatures;
bool result = valve.do_serial( [this,&blk_msg,skip] () {
_chain_db->precompute_parallel( blk_msg.block, skip ).wait();
}, [this,&blk_msg,skip] () {
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
// you can help the network code out by throwing a block_older_than_undo_history exception.
// when the net code sees that, it will stop trying to push blocks from that chain, but
// leave that peer connected so that they can get sync blocks from us
return _chain_db->push_block( blk_msg.block, skip );
});

// the block was accepted, so we now know all of the transactions contained in the block
if (!sync_mode)
Expand Down Expand Up @@ -573,6 +600,7 @@ void application_impl::handle_transaction(const graphene::net::trx_message& tran
trx_count = 0;
}

_chain_db->precompute_parallel( transaction_message.trx ).wait();
_chain_db->push_transaction( transaction_message.trx );
} FC_CAPTURE_AND_RETHROW( (transaction_message) ) }

Expand Down Expand Up @@ -961,9 +989,10 @@ void application::set_program_options(boost::program_options::options_descriptio
"Path to create a Genesis State at. If a well-formed JSON file exists at the path, it will be parsed and any "
"missing fields in a Genesis State will be added, and any unknown fields will be removed. If no file or an "
"invalid file is found, it will be replaced with an example Genesis State.")
("replay-blockchain", "Rebuild object graph by replaying all blocks")
("replay-blockchain", "Rebuild object graph by replaying all blocks without validation")
("revalidate-blockchain", "Rebuild object graph by replaying all blocks with full validation")
("resync-blockchain", "Delete all blocks and re-sync with network from scratch")
("force-validate", "Force validation of all transactions")
("force-validate", "Force validation of all transactions during normal operation")
("genesis-timestamp", bpo::value<uint32_t>(),
"Replace timestamp from genesis.json with current time plus this many seconds (experts only!)")
;
Expand Down
4 changes: 4 additions & 0 deletions libraries/app/application_impl.hxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <fc/network/http/websocket.hpp>
#include <fc/thread/parallel.hpp>

#include <graphene/app/application.hpp>
#include <graphene/app/api_access.hpp>
#include <graphene/chain/genesis_state.hpp>
Expand Down Expand Up @@ -194,6 +196,8 @@ class application_impl : public net::node_delegate
std::map<string, std::shared_ptr<abstract_plugin>> _available_plugins;

bool _is_finished_syncing = false;
private:
fc::serial_valve valve;
};

}}} // namespace graphene namespace app namespace detail
6 changes: 3 additions & 3 deletions libraries/app/include/graphene/app/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ namespace graphene { namespace app {
* The transaction will be checked for validity in the local database prior to broadcasting. If it fails to
* apply locally, an error will be thrown and the transaction will not be broadcast.
*/
void broadcast_transaction(const signed_transaction& trx);
void broadcast_transaction(const precomputable_transaction& trx);

/** this version of broadcast transaction registers a callback method that will be called when the transaction is
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
* block.
*/
void broadcast_transaction_with_callback( confirmation_callback cb, const signed_transaction& trx);
void broadcast_transaction_with_callback( confirmation_callback cb, const precomputable_transaction& trx);

/** this version of broadcast transaction registers a callback method that will be called when the transaction is
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
* block.
*/
fc::variant broadcast_transaction_synchronous(const signed_transaction& trx);
fc::variant broadcast_transaction_synchronous(const precomputable_transaction& trx);

/**
* @brief Broadcast a signed block to the network
Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/account_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ set<account_id_type> account_member_index::get_account_members(const account_obj
result.insert(auth.first);
return result;
}
set<public_key_type, account_member_index::key_compare> account_member_index::get_key_members(const account_object& a)const
set<public_key_type, pubkey_comparator> account_member_index::get_key_members(const account_object& a)const
{
set<public_key_type, key_compare> result;
set<public_key_type, pubkey_comparator> result;
for( auto auth : a.owner.key_auths )
result.insert(auth.first);
for( auto auth : a.active.key_auths )
Expand Down Expand Up @@ -215,7 +215,7 @@ void account_member_index::object_modified(const object& after)


{
set<public_key_type, key_compare> after_key_members = get_key_members(a);
set<public_key_type, pubkey_comparator> after_key_members = get_key_members(a);

vector<public_key_type> removed; removed.reserve(before_key_members.size());
std::set_difference(before_key_members.begin(), before_key_members.end(),
Expand Down
99 changes: 79 additions & 20 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <graphene/chain/exceptions.hpp>
#include <graphene/chain/evaluator.hpp>

#include <fc/thread/parallel.hpp>
#include <fc/smart_ref_impl.hpp>

namespace graphene { namespace chain {
Expand Down Expand Up @@ -227,7 +228,7 @@ bool database::_push_block(const signed_block& new_block)
* queues full as well, it will be kept in the queue to be propagated later when a new block flushes out the pending
* queues.
*/
processed_transaction database::push_transaction( const signed_transaction& trx, uint32_t skip )
processed_transaction database::push_transaction( const precomputable_transaction& trx, uint32_t skip )
{ try {
processed_transaction result;
detail::with_skip_flags( *this, skip, [&]()
Expand All @@ -237,7 +238,7 @@ processed_transaction database::push_transaction( const signed_transaction& trx,
return result;
} FC_CAPTURE_AND_RETHROW( (trx) ) }

processed_transaction database::_push_transaction( const signed_transaction& trx )
processed_transaction database::_push_transaction( const precomputable_transaction& trx )
{
// If this is the first transaction pushed after applying a block, start a new undo session.
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
Expand Down Expand Up @@ -465,15 +466,17 @@ signed_block database::_generate_block(
void database::pop_block()
{ try {
_pending_tx_session.reset();
auto head_id = head_block_id();
optional<signed_block> head_block = fetch_block_by_id( head_id );
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );

_fork_db.pop_block();
auto fork_db_head = _fork_db.head();
FC_ASSERT( fork_db_head, "Trying to pop() from empty fork database!?" );
if( fork_db_head->id == head_block_id() )
_fork_db.pop_block();
else
{
fork_db_head = _fork_db.fetch_block( head_block_id() );
FC_ASSERT( fork_db_head, "Trying to pop() block that's not in fork database!?" );
}
pop_undo();

_popped_tx.insert( _popped_tx.begin(), head_block->transactions.begin(), head_block->transactions.end() );

_popped_tx.insert( _popped_tx.begin(), fork_db_head->data.transactions.begin(), fork_db_head->data.transactions.end() );
} FC_CAPTURE_AND_RETHROW() }

void database::clear_pending()
Expand Down Expand Up @@ -621,22 +624,17 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
{ try {
uint32_t skip = get_node_properties().skip_flags;

if( true || !(skip&skip_validate) ) /* issue #505 explains why this skip_flag is disabled */
trx.validate();
trx.validate();
abitmore marked this conversation as resolved.
Show resolved Hide resolved

auto& trx_idx = get_mutable_index_type<transaction_index>();
const chain_id_type& chain_id = get_chain_id();
transaction_id_type trx_id;
if( !(skip & skip_transaction_dupe_check) )
{
trx_id = trx.id();
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx_id) == trx_idx.indices().get<by_trx_id>().end() );
}
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx.id()) == trx_idx.indices().get<by_trx_id>().end() );
transaction_evaluation_state eval_state(this);
const chain_parameters& chain_parameters = get_global_properties().parameters;
eval_state._trx = &trx;

if( !(skip & (skip_transaction_signatures | skip_authority_check) ) )
if( !(skip & skip_transaction_signatures) )
{
auto get_active = [&]( account_id_type id ) { return &id(*this).active; };
auto get_owner = [&]( account_id_type id ) { return &id(*this).owner; };
Expand Down Expand Up @@ -665,8 +663,8 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
//Insert transaction into unique transactions database.
if( !(skip & skip_transaction_dupe_check) )
{
create<transaction_object>([&trx_id,&trx](transaction_object& transaction) {
transaction.trx_id = trx_id;
create<transaction_object>([&trx](transaction_object& transaction) {
transaction.trx_id = trx.id();
transaction.trx = trx;
});
}
Expand Down Expand Up @@ -750,4 +748,65 @@ bool database::before_last_checkpoint()const
return (_checkpoints.size() > 0) && (_checkpoints.rbegin()->first >= head_block_num());
}


static const uint32_t skip_expensive = database::skip_transaction_signatures | database::skip_witness_signature
| database::skip_merkle_check | database::skip_transaction_dupe_check;

template<typename Trx>
void database::_precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const
{
for( size_t i = 0; i < count; ++i, ++trx )
{
trx->validate(); // TODO - parallelize wrt confidential operations
if( !(skip&skip_transaction_dupe_check) )
trx->id();
if( !(skip&skip_transaction_signatures) )
trx->get_signature_keys( get_chain_id() );
}
}

fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
{ try {
std::vector<fc::future<void>> workers;
if( !block.transactions.empty() )
{
if( (skip & skip_expensive) == skip_expensive )
_precompute_parallel( &block.transactions[0], block.transactions.size(), skip );
else
{
uint32_t chunks = fc::asio::default_io_service_scope::get_num_threads();
uint32_t chunk_size = ( block.transactions.size() + chunks - 1 ) / chunks;
workers.reserve( chunks + 1 );
for( size_t base = 0; base < block.transactions.size(); base += chunk_size )
workers.push_back( fc::do_parallel( [this,&block,base,chunk_size,skip] () {
_precompute_parallel( &block.transactions[base],
base + chunk_size < block.transactions.size() ? chunk_size : block.transactions.size() - base,
skip );
}) );
}
}

if( !(skip&skip_witness_signature) )
workers.push_back( fc::do_parallel( [&block] () { block.signee(); } ) );
if( !(skip&skip_merkle_check) )
block.calculate_merkle_root();
block.id();

if( workers.empty() )
return fc::future< void >( fc::promise< void >::ptr( new fc::promise< void >( true ) ) );

auto first = workers.begin();
auto worker = first;
while( ++worker != workers.end() )
worker->wait();
return *first;
} FC_LOG_AND_RETHROW() }

fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
{
return fc::do_parallel([this,&trx] () {
_precompute_parallel( &trx, 1, skip_nothing );
});
}

} }
2 changes: 1 addition & 1 deletion libraries/chain/db_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void database::init_genesis(const genesis_state_type& genesis_state)
_undo_db.disable();
struct auth_inhibitor {
auth_inhibitor(database& db) : db(db), old_flags(db.node_properties().skip_flags)
{ db.node_properties().skip_flags |= skip_authority_check; }
{ db.node_properties().skip_flags |= skip_transaction_signatures; }
~auth_inhibitor()
{ db.node_properties().skip_flags = old_flags; }
private:
Expand Down
Loading