Skip to content

Commit 1471c05

Browse files
authored
Merge pull request #1360 from bitshares/performance_opt
Parallel preprocessing of blocks + transactions
2 parents 1fc6dc1 + 2c01109 commit 1471c05

38 files changed

+567
-371
lines changed

libraries/app/api.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -159,18 +159,18 @@ namespace graphene { namespace app {
159159
}
160160
}
161161

162-
void network_broadcast_api::broadcast_transaction(const signed_transaction& trx)
162+
void network_broadcast_api::broadcast_transaction(const precomputable_transaction& trx)
163163
{
164-
trx.validate();
164+
_app.chain_database()->precompute_parallel( trx ).wait();
165165
_app.chain_database()->push_transaction(trx);
166166
if( _app.p2p_node() != nullptr )
167167
_app.p2p_node()->broadcast_transaction(trx);
168168
}
169169

170-
fc::variant network_broadcast_api::broadcast_transaction_synchronous(const signed_transaction& trx)
170+
fc::variant network_broadcast_api::broadcast_transaction_synchronous(const precomputable_transaction& trx)
171171
{
172172
fc::promise<fc::variant>::ptr prom( new fc::promise<fc::variant>() );
173-
broadcast_transaction_with_callback( [=]( const fc::variant& v ){
173+
broadcast_transaction_with_callback( [prom]( const fc::variant& v ){
174174
prom->set_value(v);
175175
}, trx );
176176

@@ -179,14 +179,15 @@ namespace graphene { namespace app {
179179

180180
void network_broadcast_api::broadcast_block( const signed_block& b )
181181
{
182+
_app.chain_database()->precompute_parallel( b ).wait();
182183
_app.chain_database()->push_block(b);
183184
if( _app.p2p_node() != nullptr )
184185
_app.p2p_node()->broadcast( net::block_message( b ));
185186
}
186187

187-
void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const signed_transaction& trx)
188+
void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const precomputable_transaction& trx)
188189
{
189-
trx.validate();
190+
_app.chain_database()->precompute_parallel( trx ).wait();
190191
_callbacks[trx.id()] = cb;
191192
_app.chain_database()->push_transaction(trx);
192193
if( _app.p2p_node() != nullptr )

libraries/app/application.cpp

+40-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <graphene/app/application.hpp>
2727
#include <graphene/app/plugin.hpp>
2828

29+
#include <graphene/chain/db_with.hpp>
2930
#include <graphene/chain/genesis_state.hpp>
3031
#include <graphene/chain/protocol/fee_schedule.hpp>
3132
#include <graphene/chain/protocol/types.hpp>
@@ -394,12 +395,34 @@ void application_impl::startup()
394395
_chain_db->enable_standby_votes_tracking( _options->at("enable-standby-votes-tracking").as<bool>() );
395396
}
396397

397-
if( _options->count("replay-blockchain") )
398+
if( _options->count("replay-blockchain") || _options->count("revalidate-blockchain") )
398399
_chain_db->wipe( _data_dir / "blockchain", false );
399400

400401
try
401402
{
402-
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
403+
// these flags are used in open() only, i. e. during replay
404+
uint32_t skip;
405+
if( _options->count("revalidate-blockchain") ) // see also handle_block()
406+
{
407+
if( !loaded_checkpoints.empty() )
408+
wlog( "Warning - revalidate will not validate before last checkpoint" );
409+
if( _options->count("force-validate") )
410+
skip = graphene::chain::database::skip_nothing;
411+
else
412+
skip = graphene::chain::database::skip_transaction_signatures;
413+
}
414+
else // no revalidate, skip most checks
415+
skip = graphene::chain::database::skip_witness_signature |
416+
graphene::chain::database::skip_block_size_check |
417+
graphene::chain::database::skip_merkle_check |
418+
graphene::chain::database::skip_transaction_signatures |
419+
graphene::chain::database::skip_transaction_dupe_check |
420+
graphene::chain::database::skip_tapos_check |
421+
graphene::chain::database::skip_witness_schedule_check;
422+
423+
graphene::chain::detail::with_skip_flags( *_chain_db, skip, [this,&initial_state] () {
424+
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
425+
});
403426
}
404427
catch( const fc::exception& e )
405428
{
@@ -517,13 +540,17 @@ bool application_impl::handle_block(const graphene::net::block_message& blk_msg,
517540
FC_ASSERT( (latency.count()/1000) > -5000, "Rejecting block with timestamp in the future" );
518541

519542
try {
520-
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
521-
// you can help the network code out by throwing a block_older_than_undo_history exception.
522-
// when the net code sees that, it will stop trying to push blocks from that chain, but
523-
// leave that peer connected so that they can get sync blocks from us
524-
bool result = _chain_db->push_block( blk_msg.block,
525-
(_is_block_producer | _force_validate) ?
526-
database::skip_nothing : database::skip_transaction_signatures );
543+
const uint32_t skip = (_is_block_producer | _force_validate) ?
544+
database::skip_nothing : database::skip_transaction_signatures;
545+
bool result = valve.do_serial( [this,&blk_msg,skip] () {
546+
_chain_db->precompute_parallel( blk_msg.block, skip ).wait();
547+
}, [this,&blk_msg,skip] () {
548+
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
549+
// you can help the network code out by throwing a block_older_than_undo_history exception.
550+
// when the net code sees that, it will stop trying to push blocks from that chain, but
551+
// leave that peer connected so that they can get sync blocks from us
552+
return _chain_db->push_block( blk_msg.block, skip );
553+
});
527554

528555
// the block was accepted, so we now know all of the transactions contained in the block
529556
if (!sync_mode)
@@ -573,6 +600,7 @@ void application_impl::handle_transaction(const graphene::net::trx_message& tran
573600
trx_count = 0;
574601
}
575602

603+
_chain_db->precompute_parallel( transaction_message.trx ).wait();
576604
_chain_db->push_transaction( transaction_message.trx );
577605
} FC_CAPTURE_AND_RETHROW( (transaction_message) ) }
578606

@@ -961,9 +989,10 @@ void application::set_program_options(boost::program_options::options_descriptio
961989
"Path to create a Genesis State at. If a well-formed JSON file exists at the path, it will be parsed and any "
962990
"missing fields in a Genesis State will be added, and any unknown fields will be removed. If no file or an "
963991
"invalid file is found, it will be replaced with an example Genesis State.")
964-
("replay-blockchain", "Rebuild object graph by replaying all blocks")
992+
("replay-blockchain", "Rebuild object graph by replaying all blocks without validation")
993+
("revalidate-blockchain", "Rebuild object graph by replaying all blocks with full validation")
965994
("resync-blockchain", "Delete all blocks and re-sync with network from scratch")
966-
("force-validate", "Force validation of all transactions")
995+
("force-validate", "Force validation of all transactions during normal operation")
967996
("genesis-timestamp", bpo::value<uint32_t>(),
968997
"Replace timestamp from genesis.json with current time plus this many seconds (experts only!)")
969998
;

libraries/app/application_impl.hxx

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22

33
#include <fc/network/http/websocket.hpp>
4+
#include <fc/thread/parallel.hpp>
5+
46
#include <graphene/app/application.hpp>
57
#include <graphene/app/api_access.hpp>
68
#include <graphene/chain/genesis_state.hpp>
@@ -194,6 +196,8 @@ class application_impl : public net::node_delegate
194196
std::map<string, std::shared_ptr<abstract_plugin>> _available_plugins;
195197

196198
bool _is_finished_syncing = false;
199+
private:
200+
fc::serial_valve valve;
197201
};
198202

199203
}}} // namespace graphene namespace app namespace detail

libraries/app/include/graphene/app/api.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -266,19 +266,19 @@ namespace graphene { namespace app {
266266
* The transaction will be checked for validity in the local database prior to broadcasting. If it fails to
267267
* apply locally, an error will be thrown and the transaction will not be broadcast.
268268
*/
269-
void broadcast_transaction(const signed_transaction& trx);
269+
void broadcast_transaction(const precomputable_transaction& trx);
270270

271271
/** this version of broadcast transaction registers a callback method that will be called when the transaction is
272272
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
273273
* block.
274274
*/
275-
void broadcast_transaction_with_callback( confirmation_callback cb, const signed_transaction& trx);
275+
void broadcast_transaction_with_callback( confirmation_callback cb, const precomputable_transaction& trx);
276276

277277
/** this version of broadcast transaction registers a callback method that will be called when the transaction is
278278
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
279279
* block.
280280
*/
281-
fc::variant broadcast_transaction_synchronous(const signed_transaction& trx);
281+
fc::variant broadcast_transaction_synchronous(const precomputable_transaction& trx);
282282

283283
/**
284284
* @brief Broadcast a signed block to the network

libraries/chain/account_object.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ set<account_id_type> account_member_index::get_account_members(const account_obj
121121
result.insert(auth.first);
122122
return result;
123123
}
124-
set<public_key_type, account_member_index::key_compare> account_member_index::get_key_members(const account_object& a)const
124+
set<public_key_type, pubkey_comparator> account_member_index::get_key_members(const account_object& a)const
125125
{
126-
set<public_key_type, key_compare> result;
126+
set<public_key_type, pubkey_comparator> result;
127127
for( auto auth : a.owner.key_auths )
128128
result.insert(auth.first);
129129
for( auto auth : a.active.key_auths )
@@ -215,7 +215,7 @@ void account_member_index::object_modified(const object& after)
215215

216216

217217
{
218-
set<public_key_type, key_compare> after_key_members = get_key_members(a);
218+
set<public_key_type, pubkey_comparator> after_key_members = get_key_members(a);
219219

220220
vector<public_key_type> removed; removed.reserve(before_key_members.size());
221221
std::set_difference(before_key_members.begin(), before_key_members.end(),

libraries/chain/db_block.cpp

+79-20
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <graphene/chain/exceptions.hpp>
3838
#include <graphene/chain/evaluator.hpp>
3939

40+
#include <fc/thread/parallel.hpp>
4041
#include <fc/smart_ref_impl.hpp>
4142

4243
namespace graphene { namespace chain {
@@ -227,7 +228,7 @@ bool database::_push_block(const signed_block& new_block)
227228
* queues full as well, it will be kept in the queue to be propagated later when a new block flushes out the pending
228229
* queues.
229230
*/
230-
processed_transaction database::push_transaction( const signed_transaction& trx, uint32_t skip )
231+
processed_transaction database::push_transaction( const precomputable_transaction& trx, uint32_t skip )
231232
{ try {
232233
processed_transaction result;
233234
detail::with_skip_flags( *this, skip, [&]()
@@ -237,7 +238,7 @@ processed_transaction database::push_transaction( const signed_transaction& trx,
237238
return result;
238239
} FC_CAPTURE_AND_RETHROW( (trx) ) }
239240

240-
processed_transaction database::_push_transaction( const signed_transaction& trx )
241+
processed_transaction database::_push_transaction( const precomputable_transaction& trx )
241242
{
242243
// If this is the first transaction pushed after applying a block, start a new undo session.
243244
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
@@ -465,15 +466,17 @@ signed_block database::_generate_block(
465466
void database::pop_block()
466467
{ try {
467468
_pending_tx_session.reset();
468-
auto head_id = head_block_id();
469-
optional<signed_block> head_block = fetch_block_by_id( head_id );
470-
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );
471-
472-
_fork_db.pop_block();
469+
auto fork_db_head = _fork_db.head();
470+
FC_ASSERT( fork_db_head, "Trying to pop() from empty fork database!?" );
471+
if( fork_db_head->id == head_block_id() )
472+
_fork_db.pop_block();
473+
else
474+
{
475+
fork_db_head = _fork_db.fetch_block( head_block_id() );
476+
FC_ASSERT( fork_db_head, "Trying to pop() block that's not in fork database!?" );
477+
}
473478
pop_undo();
474-
475-
_popped_tx.insert( _popped_tx.begin(), head_block->transactions.begin(), head_block->transactions.end() );
476-
479+
_popped_tx.insert( _popped_tx.begin(), fork_db_head->data.transactions.begin(), fork_db_head->data.transactions.end() );
477480
} FC_CAPTURE_AND_RETHROW() }
478481

479482
void database::clear_pending()
@@ -621,22 +624,17 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
621624
{ try {
622625
uint32_t skip = get_node_properties().skip_flags;
623626

624-
if( true || !(skip&skip_validate) ) /* issue #505 explains why this skip_flag is disabled */
625-
trx.validate();
627+
trx.validate();
626628

627629
auto& trx_idx = get_mutable_index_type<transaction_index>();
628630
const chain_id_type& chain_id = get_chain_id();
629-
transaction_id_type trx_id;
630631
if( !(skip & skip_transaction_dupe_check) )
631-
{
632-
trx_id = trx.id();
633-
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx_id) == trx_idx.indices().get<by_trx_id>().end() );
634-
}
632+
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx.id()) == trx_idx.indices().get<by_trx_id>().end() );
635633
transaction_evaluation_state eval_state(this);
636634
const chain_parameters& chain_parameters = get_global_properties().parameters;
637635
eval_state._trx = &trx;
638636

639-
if( !(skip & (skip_transaction_signatures | skip_authority_check) ) )
637+
if( !(skip & skip_transaction_signatures) )
640638
{
641639
auto get_active = [&]( account_id_type id ) { return &id(*this).active; };
642640
auto get_owner = [&]( account_id_type id ) { return &id(*this).owner; };
@@ -665,8 +663,8 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
665663
//Insert transaction into unique transactions database.
666664
if( !(skip & skip_transaction_dupe_check) )
667665
{
668-
create<transaction_object>([&trx_id,&trx](transaction_object& transaction) {
669-
transaction.trx_id = trx_id;
666+
create<transaction_object>([&trx](transaction_object& transaction) {
667+
transaction.trx_id = trx.id();
670668
transaction.trx = trx;
671669
});
672670
}
@@ -750,4 +748,65 @@ bool database::before_last_checkpoint()const
750748
return (_checkpoints.size() > 0) && (_checkpoints.rbegin()->first >= head_block_num());
751749
}
752750

751+
752+
static const uint32_t skip_expensive = database::skip_transaction_signatures | database::skip_witness_signature
753+
| database::skip_merkle_check | database::skip_transaction_dupe_check;
754+
755+
template<typename Trx>
756+
void database::_precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const
757+
{
758+
for( size_t i = 0; i < count; ++i, ++trx )
759+
{
760+
trx->validate(); // TODO - parallelize wrt confidential operations
761+
if( !(skip&skip_transaction_dupe_check) )
762+
trx->id();
763+
if( !(skip&skip_transaction_signatures) )
764+
trx->get_signature_keys( get_chain_id() );
765+
}
766+
}
767+
768+
fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
769+
{ try {
770+
std::vector<fc::future<void>> workers;
771+
if( !block.transactions.empty() )
772+
{
773+
if( (skip & skip_expensive) == skip_expensive )
774+
_precompute_parallel( &block.transactions[0], block.transactions.size(), skip );
775+
else
776+
{
777+
uint32_t chunks = fc::asio::default_io_service_scope::get_num_threads();
778+
uint32_t chunk_size = ( block.transactions.size() + chunks - 1 ) / chunks;
779+
workers.reserve( chunks + 1 );
780+
for( size_t base = 0; base < block.transactions.size(); base += chunk_size )
781+
workers.push_back( fc::do_parallel( [this,&block,base,chunk_size,skip] () {
782+
_precompute_parallel( &block.transactions[base],
783+
base + chunk_size < block.transactions.size() ? chunk_size : block.transactions.size() - base,
784+
skip );
785+
}) );
786+
}
787+
}
788+
789+
if( !(skip&skip_witness_signature) )
790+
workers.push_back( fc::do_parallel( [&block] () { block.signee(); } ) );
791+
if( !(skip&skip_merkle_check) )
792+
block.calculate_merkle_root();
793+
block.id();
794+
795+
if( workers.empty() )
796+
return fc::future< void >( fc::promise< void >::ptr( new fc::promise< void >( true ) ) );
797+
798+
auto first = workers.begin();
799+
auto worker = first;
800+
while( ++worker != workers.end() )
801+
worker->wait();
802+
return *first;
803+
} FC_LOG_AND_RETHROW() }
804+
805+
fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
806+
{
807+
return fc::do_parallel([this,&trx] () {
808+
_precompute_parallel( &trx, 1, skip_nothing );
809+
});
810+
}
811+
753812
} }

libraries/chain/db_init.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ void database::init_genesis(const genesis_state_type& genesis_state)
235235
_undo_db.disable();
236236
struct auth_inhibitor {
237237
auth_inhibitor(database& db) : db(db), old_flags(db.node_properties().skip_flags)
238-
{ db.node_properties().skip_flags |= skip_authority_check; }
238+
{ db.node_properties().skip_flags |= skip_transaction_signatures; }
239239
~auth_inhibitor()
240240
{ db.node_properties().skip_flags = old_flags; }
241241
private:

0 commit comments

Comments
 (0)