28
28
#include < graphene/chain/hardfork.hpp>
29
29
#include < curl/curl.h>
30
30
31
+ #include < boost/algorithm/string.hpp>
32
+
31
33
namespace graphene { namespace elasticsearch {
32
34
33
35
namespace detail
@@ -51,6 +53,9 @@ class elasticsearch_plugin_impl
51
53
return _self.database ();
52
54
}
53
55
56
+ friend class graphene ::elasticsearch::elasticsearch_plugin;
57
+
58
+ private:
54
59
elasticsearch_plugin& _self;
55
60
primary_index< operation_history_index >* _oho_index;
56
61
@@ -78,8 +83,10 @@ class elasticsearch_plugin_impl
78
83
std::string bulk_line;
79
84
std::string index_name;
80
85
bool is_sync = false ;
81
- private:
82
- bool add_elasticsearch ( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
86
+ bool is_es_version_7_or_above = true ;
87
+
88
+ bool add_elasticsearch ( const account_id_type account_id, const optional<operation_history_object>& oho,
89
+ const uint32_t block_number );
83
90
const account_transaction_history_object& addNewEntry (const account_statistics_object& stats_obj,
84
91
const account_id_type& account_id,
85
92
const optional <operation_history_object>& oho);
@@ -94,6 +101,7 @@ class elasticsearch_plugin_impl
94
101
void createBulkLine (const account_transaction_history_object& ath);
95
102
void prepareBulk (const account_transaction_history_id_type& ath_id);
96
103
void populateESstruct ();
104
+ void init_program_options (const boost::program_options::variables_map& options);
97
105
};
98
106
99
107
elasticsearch_plugin_impl::~elasticsearch_plugin_impl ()
@@ -104,10 +112,20 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
104
112
}
105
113
}
106
114
115
+ static std::string generateIndexName ( const fc::time_point_sec& block_date,
116
+ const std::string& _elasticsearch_index_prefix )
117
+ {
118
+ auto block_date_string = block_date.to_iso_string ();
119
+ std::vector<std::string> parts;
120
+ boost::split (parts, block_date_string, boost::is_any_of (" -" ));
121
+ std::string index_name = _elasticsearch_index_prefix + parts[0 ] + " -" + parts[1 ];
122
+ return index_name;
123
+ }
124
+
107
125
bool elasticsearch_plugin_impl::update_account_histories ( const signed_block& b )
108
126
{
109
127
checkState (b.timestamp );
110
- index_name = graphene::utilities:: generateIndexName (b.timestamp , _elasticsearch_index_prefix);
128
+ index_name = generateIndexName (b.timestamp , _elasticsearch_index_prefix);
111
129
112
130
graphene::chain::database& db = database ();
113
131
const vector<optional< operation_history_object > >& hist = db.get_applied_operations ();
@@ -288,7 +306,8 @@ void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_obje
288
306
vs.transfer_data .asset = o_v.transfer_asset_id ;
289
307
vs.transfer_data .asset_name = transfer_asset.symbol ;
290
308
vs.transfer_data .amount = o_v.transfer_amount ;
291
- vs.transfer_data .amount_units = (o_v.transfer_amount .value )/(double )asset::scaled_precision (transfer_asset.precision ).value ;
309
+ vs.transfer_data .amount_units = (o_v.transfer_amount .value )
310
+ / (double )asset::scaled_precision (transfer_asset.precision ).value ;
292
311
vs.transfer_data .from = o_v.transfer_from ;
293
312
vs.transfer_data .to = o_v.transfer_to ;
294
313
@@ -299,14 +318,18 @@ void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_obje
299
318
vs.fill_data .pays_asset_id = o_v.fill_pays_asset_id ;
300
319
vs.fill_data .pays_asset_name = fill_pays_asset.symbol ;
301
320
vs.fill_data .pays_amount = o_v.fill_pays_amount ;
302
- vs.fill_data .pays_amount_units = (o_v.fill_pays_amount .value )/(double )asset::scaled_precision (fill_pays_asset.precision ).value ;
321
+ vs.fill_data .pays_amount_units = (o_v.fill_pays_amount .value )
322
+ / (double )asset::scaled_precision (fill_pays_asset.precision ).value ;
303
323
vs.fill_data .receives_asset_id = o_v.fill_receives_asset_id ;
304
324
vs.fill_data .receives_asset_name = fill_receives_asset.symbol ;
305
325
vs.fill_data .receives_amount = o_v.fill_receives_amount ;
306
- vs.fill_data .receives_amount_units = (o_v.fill_receives_amount .value )/(double )asset::scaled_precision (fill_receives_asset.precision ).value ;
326
+ vs.fill_data .receives_amount_units = (o_v.fill_receives_amount .value )
327
+ / (double )asset::scaled_precision (fill_receives_asset.precision ).value ;
307
328
308
- auto fill_price = (o_v.fill_receives_amount .value /(double )asset::scaled_precision (fill_receives_asset.precision ).value ) /
309
- (o_v.fill_pays_amount .value /(double )asset::scaled_precision (fill_pays_asset.precision ).value );
329
+ auto fill_price = (o_v.fill_receives_amount .value
330
+ / (double )asset::scaled_precision (fill_receives_asset.precision ).value )
331
+ / (o_v.fill_pays_amount .value
332
+ / (double )asset::scaled_precision (fill_pays_asset.precision ).value );
310
333
vs.fill_data .fill_price_units = fill_price;
311
334
vs.fill_data .fill_price = o_v.fill_fill_price ;
312
335
vs.fill_data .is_maker = o_v.fill_is_maker ;
@@ -319,6 +342,7 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
319
342
const auto &stats_obj = getStatsObject (account_id);
320
343
const auto &ath = addNewEntry (stats_obj, account_id, oho);
321
344
growStats (stats_obj, ath);
345
+
322
346
if (block_number > _elasticsearch_start_es_after_block) {
323
347
createBulkLine (ath);
324
348
prepareBulk (ath.id );
@@ -354,9 +378,10 @@ const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const
354
378
return stats_obj;
355
379
}
356
380
357
- const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry (const account_statistics_object& stats_obj,
358
- const account_id_type& account_id,
359
- const optional <operation_history_object>& oho)
381
+ const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry (
382
+ const account_statistics_object& stats_obj,
383
+ const account_id_type& account_id,
384
+ const optional <operation_history_object>& oho)
360
385
{
361
386
graphene::chain::database& db = database ();
362
387
const auto &ath = db.create <account_transaction_history_object>([&](account_transaction_history_object &obj) {
@@ -396,15 +421,17 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id
396
421
const std::string _id = fc::json::to_string (ath_id);
397
422
fc::mutable_variant_object bulk_header;
398
423
bulk_header[" _index" ] = index_name;
399
- bulk_header[" _type" ] = " data" ;
424
+ if (!is_es_version_7_or_above)
425
+ bulk_header[" _type" ] = " _doc" ;
400
426
bulk_header[" _id" ] = fc::to_string (ath_id.space_id ) + " ." + fc::to_string (ath_id.type_id ) + " ."
401
427
+ fc::to_string (ath_id.instance .value );
402
428
prepare = graphene::utilities::createBulk (bulk_header, std::move (bulk_line));
403
429
std::move (prepare.begin (), prepare.end (), std::back_inserter (bulk_lines));
404
430
prepare.clear ();
405
431
}
406
432
407
- void elasticsearch_plugin_impl::cleanObjects (const account_transaction_history_id_type& ath_id, const account_id_type& account_id)
433
+ void elasticsearch_plugin_impl::cleanObjects ( const account_transaction_history_id_type& ath_id,
434
+ const account_id_type& account_id )
408
435
{
409
436
graphene::chain::database& db = database ();
410
437
// remove everything except current object from ath
@@ -494,44 +521,49 @@ void elasticsearch_plugin::plugin_set_program_options(
494
521
cfg.add (cli);
495
522
}
496
523
497
- void elasticsearch_plugin::plugin_initialize (const boost::program_options::variables_map& options)
524
+ void detail::elasticsearch_plugin_impl::init_program_options (const boost::program_options::variables_map& options)
498
525
{
499
- my->_oho_index = database ().add_index < primary_index< operation_history_index > >();
500
- database ().add_index < primary_index< account_transaction_history_index > >();
501
-
502
526
if (options.count (" elasticsearch-node-url" ) > 0 ) {
503
- my-> _elasticsearch_node_url = options[" elasticsearch-node-url" ].as <std::string>();
527
+ _elasticsearch_node_url = options[" elasticsearch-node-url" ].as <std::string>();
504
528
}
505
529
if (options.count (" elasticsearch-bulk-replay" ) > 0 ) {
506
- my-> _elasticsearch_bulk_replay = options[" elasticsearch-bulk-replay" ].as <uint32_t >();
530
+ _elasticsearch_bulk_replay = options[" elasticsearch-bulk-replay" ].as <uint32_t >();
507
531
}
508
532
if (options.count (" elasticsearch-bulk-sync" ) > 0 ) {
509
- my-> _elasticsearch_bulk_sync = options[" elasticsearch-bulk-sync" ].as <uint32_t >();
533
+ _elasticsearch_bulk_sync = options[" elasticsearch-bulk-sync" ].as <uint32_t >();
510
534
}
511
535
if (options.count (" elasticsearch-visitor" ) > 0 ) {
512
- my-> _elasticsearch_visitor = options[" elasticsearch-visitor" ].as <bool >();
536
+ _elasticsearch_visitor = options[" elasticsearch-visitor" ].as <bool >();
513
537
}
514
538
if (options.count (" elasticsearch-basic-auth" ) > 0 ) {
515
- my-> _elasticsearch_basic_auth = options[" elasticsearch-basic-auth" ].as <std::string>();
539
+ _elasticsearch_basic_auth = options[" elasticsearch-basic-auth" ].as <std::string>();
516
540
}
517
541
if (options.count (" elasticsearch-index-prefix" ) > 0 ) {
518
- my-> _elasticsearch_index_prefix = options[" elasticsearch-index-prefix" ].as <std::string>();
542
+ _elasticsearch_index_prefix = options[" elasticsearch-index-prefix" ].as <std::string>();
519
543
}
520
544
if (options.count (" elasticsearch-operation-object" ) > 0 ) {
521
- my-> _elasticsearch_operation_object = options[" elasticsearch-operation-object" ].as <bool >();
545
+ _elasticsearch_operation_object = options[" elasticsearch-operation-object" ].as <bool >();
522
546
}
523
547
if (options.count (" elasticsearch-start-es-after-block" ) > 0 ) {
524
- my-> _elasticsearch_start_es_after_block = options[" elasticsearch-start-es-after-block" ].as <uint32_t >();
548
+ _elasticsearch_start_es_after_block = options[" elasticsearch-start-es-after-block" ].as <uint32_t >();
525
549
}
526
550
if (options.count (" elasticsearch-operation-string" ) > 0 ) {
527
- my-> _elasticsearch_operation_string = options[" elasticsearch-operation-string" ].as <bool >();
551
+ _elasticsearch_operation_string = options[" elasticsearch-operation-string" ].as <bool >();
528
552
}
529
553
if (options.count (" elasticsearch-mode" ) > 0 ) {
530
554
const auto option_number = options[" elasticsearch-mode" ].as <uint16_t >();
531
555
if (option_number > mode::all)
532
556
FC_THROW_EXCEPTION (graphene::chain::plugin_exception, " Elasticsearch mode not valid" );
533
- my-> _elasticsearch_mode = static_cast <mode>(options[" elasticsearch-mode" ].as <uint16_t >());
557
+ _elasticsearch_mode = static_cast <mode>(options[" elasticsearch-mode" ].as <uint16_t >());
534
558
}
559
+ }
560
+
561
+ void elasticsearch_plugin::plugin_initialize (const boost::program_options::variables_map& options)
562
+ {
563
+ my->_oho_index = database ().add_index < primary_index< operation_history_index > >();
564
+ database ().add_index < primary_index< account_transaction_history_index > >();
565
+
566
+ my->init_program_options ( options );
535
567
536
568
if (my->_elasticsearch_mode != mode::only_query) {
537
569
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string )
@@ -544,18 +576,21 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
544
576
" Error populating ES database, we are going to keep trying." );
545
577
});
546
578
}
547
- }
548
579
549
- void elasticsearch_plugin::plugin_startup ()
550
- {
551
580
graphene::utilities::ES es;
552
581
es.curl = my->curl ;
553
582
es.elasticsearch_url = my->_elasticsearch_node_url ;
554
583
es.auth = my->_elasticsearch_basic_auth ;
555
584
556
585
if (!graphene::utilities::checkES (es))
557
- FC_THROW_EXCEPTION (fc::exception , " ES database is not up in url ${url}" , (" url" , my->_elasticsearch_node_url ));
558
- ilog (" elasticsearch ACCOUNT HISTORY: plugin_startup() begin" );
586
+ FC_THROW ( " ES database is not up in url ${url}" , (" url" , my->_elasticsearch_node_url ) );
587
+
588
+ graphene::utilities::checkESVersion7OrAbove (es, my->is_es_version_7_or_above );
589
+ }
590
+
591
+ void elasticsearch_plugin::plugin_startup ()
592
+ {
593
+ // Nothing to do
559
594
}
560
595
561
596
operation_history_object elasticsearch_plugin::get_operation_by_id (operation_history_id_type id)
@@ -624,7 +659,7 @@ vector<operation_history_object> elasticsearch_plugin::get_account_history(
624
659
625
660
const auto response = graphene::utilities::simpleQuery (es);
626
661
variant variant_response = fc::json::from_string (response);
627
-
662
+
628
663
const auto hits = variant_response[" hits" ][" total" ];
629
664
uint32_t size;
630
665
if ( hits.is_object () ) // ES-7 ?
@@ -672,7 +707,7 @@ graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
672
707
es.curl = curl;
673
708
es.elasticsearch_url = my->_elasticsearch_node_url ;
674
709
es.index_prefix = my->_elasticsearch_index_prefix ;
675
- es.endpoint = es.index_prefix + " */data /_search" ;
710
+ es.endpoint = es.index_prefix + " */_doc /_search" ;
676
711
es.query = query;
677
712
678
713
return es;
@@ -683,5 +718,4 @@ mode elasticsearch_plugin::get_running_mode()
683
718
return my->_elasticsearch_mode ;
684
719
}
685
720
686
-
687
721
} }
0 commit comments