-
Notifications
You must be signed in to change notification settings - Fork 648
/
node.cpp
5049 lines (4584 loc) · 249 KB
/
node.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2015 Cryptonomex, Inc., and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <sstream>
#include <iomanip>
#include <deque>
#include <unordered_set>
#include <list>
#include <forward_list>
#include <iostream>
#include <algorithm>
#include <tuple>
#include <boost/tuple/tuple.hpp>
#include <boost/circular_buffer.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/tag.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/logic/tribool.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/numeric.hpp>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics/stats.hpp>
#include <boost/accumulators/statistics/rolling_mean.hpp>
#include <boost/accumulators/statistics/min.hpp>
#include <boost/accumulators/statistics/max.hpp>
#include <boost/accumulators/statistics/sum.hpp>
#include <boost/accumulators/statistics/count.hpp>
#include <boost/preprocessor/seq/for_each.hpp>
#include <boost/preprocessor/cat.hpp>
#include <boost/preprocessor/stringize.hpp>
#include <fc/thread/thread.hpp>
#include <fc/thread/future.hpp>
#include <fc/thread/non_preemptable_scope_check.hpp>
#include <fc/thread/mutex.hpp>
#include <fc/thread/scoped_lock.hpp>
#include <fc/log/logger.hpp>
#include <fc/io/json.hpp>
#include <fc/io/enum_type.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/network/rate_limiting.hpp>
#include <fc/network/ip.hpp>
#include <fc/smart_ref_impl.hpp>
#include <graphene/net/node.hpp>
#include <graphene/net/peer_database.hpp>
#include <graphene/net/peer_connection.hpp>
#include <graphene/net/stcp_socket.hpp>
#include <graphene/net/config.hpp>
#include <graphene/net/exceptions.hpp>
#include <graphene/chain/config.hpp>
#include <graphene/chain/protocol/fee_schedule.hpp>
#include <fc/git_revision.hpp>
//#define ENABLE_DEBUG_ULOGS
#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
#define DEFAULT_LOGGER "p2p"
#define P2P_IN_DEDICATED_THREAD 1
#define INVOCATION_COUNTER(name) \
static unsigned total_ ## name ## _counter = 0; \
static unsigned active_ ## name ## _counter = 0; \
struct name ## _invocation_logger { \
unsigned *total; \
unsigned *active; \
name ## _invocation_logger(unsigned *total, unsigned *active) : \
total(total), active(active) \
{ \
++*total; \
++*active; \
dlog("NEWDEBUG: Entering " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
} \
~name ## _invocation_logger() \
{ \
--*active; \
dlog("NEWDEBUG: Leaving " #name ", now ${total} total calls, ${active} active calls", ("total", *total)("active", *active)); \
} \
} invocation_logger(&total_ ## name ## _counter, &active_ ## name ## _counter)
//log these messages even at warn level when operating on the test network
#ifdef GRAPHENE_TEST_NETWORK
#define testnetlog wlog
#else
#define testnetlog(...) do {} while (0)
#endif
namespace graphene { namespace net {
namespace detail
{
namespace bmi = boost::multi_index;
class blockchain_tied_message_cache
{
private:
static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;
struct message_hash_index{};
struct message_contents_hash_index{};
struct block_clock_index{};
struct message_info
{
message_hash_type message_hash;
message message_body;
uint32_t block_clock_when_received;
// for network performance stats
message_propagation_data propagation_data;
fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id)
message_info( const message_hash_type& message_hash,
const message& message_body,
uint32_t block_clock_when_received,
const message_propagation_data& propagation_data,
fc::uint160_t message_contents_hash ) :
message_hash( message_hash ),
message_body( message_body ),
block_clock_when_received( block_clock_when_received ),
propagation_data( propagation_data ),
message_contents_hash( message_contents_hash )
{}
};
typedef boost::multi_index_container
< message_info,
bmi::indexed_by< bmi::ordered_unique< bmi::tag<message_hash_index>,
bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
bmi::member<message_info, fc::uint160_t, &message_info::message_contents_hash> >,
bmi::ordered_non_unique< bmi::tag<block_clock_index>,
bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > >
> message_cache_container;
message_cache_container _message_cache;
uint32_t block_clock;
public:
blockchain_tied_message_cache() :
block_clock( 0 )
{}
void block_accepted();
void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache,
const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash );
message get_message( const message_hash_type& hash_of_message_to_lookup );
message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const;
size_t size() const { return _message_cache.size(); }
};
void blockchain_tied_message_cache::block_accepted()
{
++block_clock;
if( block_clock > cache_duration_in_blocks )
_message_cache.get<block_clock_index>().erase(_message_cache.get<block_clock_index>().begin(),
_message_cache.get<block_clock_index>().lower_bound(block_clock - cache_duration_in_blocks ) );
}
void blockchain_tied_message_cache::cache_message( const message& message_to_cache,
const message_hash_type& hash_of_message_to_cache,
const message_propagation_data& propagation_data,
const fc::uint160_t& message_content_hash )
{
_message_cache.insert( message_info(hash_of_message_to_cache,
message_to_cache,
block_clock,
propagation_data,
message_content_hash ) );
}
message blockchain_tied_message_cache::get_message( const message_hash_type& hash_of_message_to_lookup )
{
message_cache_container::index<message_hash_index>::type::const_iterator iter =
_message_cache.get<message_hash_index>().find(hash_of_message_to_lookup );
if( iter != _message_cache.get<message_hash_index>().end() )
return iter->message_body;
FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
}
message_propagation_data blockchain_tied_message_cache::get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const
{
if( hash_of_message_contents_to_lookup != fc::uint160_t() )
{
message_cache_container::index<message_contents_hash_index>::type::const_iterator iter =
_message_cache.get<message_contents_hash_index>().find(hash_of_message_contents_to_lookup );
if( iter != _message_cache.get<message_contents_hash_index>().end() )
return iter->propagation_data;
}
FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
// This specifies configuration info for the local node. It's stored as JSON
// in the configuration directory (application data directory)
struct node_configuration
{
node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {}
fc::ip::endpoint listen_endpoint;
bool accept_incoming_connections;
bool wait_if_endpoint_is_busy;
/**
* Originally, our p2p code just had a 'node-id' that was a random number identifying this node
* on the network. This is now a private key/public key pair, where the public key is used
* in place of the old random node-id. The private part is unused, but might be used in
* the future to support some notion of trusted peers.
*/
fc::ecc::private_key private_key;
};
} } } // end namespace graphene::net::detail
FC_REFLECT(graphene::net::detail::node_configuration, (listen_endpoint)
(accept_incoming_connections)
(wait_if_endpoint_is_busy)
(private_key));
#include "node_impl.hxx"
namespace graphene { namespace net { namespace detail {
void node_impl_deleter::operator()(node_impl* impl_to_delete)
{
#ifdef P2P_IN_DEDICATED_THREAD
std::weak_ptr<fc::thread> weak_thread;
if (impl_to_delete)
{
std::shared_ptr<fc::thread> impl_thread(impl_to_delete->_thread);
weak_thread = impl_thread;
impl_thread->async([impl_to_delete](){ delete impl_to_delete; }, "delete node_impl").wait();
dlog("deleting the p2p thread");
}
if (weak_thread.expired())
dlog("done deleting the p2p thread");
else
dlog("failed to delete the p2p thread, we must be leaking a smart pointer somewhere");
#else // P2P_IN_DEDICATED_THREAD
delete impl_to_delete;
#endif // P2P_IN_DEDICATED_THREAD
}
#ifdef P2P_IN_DEDICATED_THREAD
# define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
#else
# define VERIFY_CORRECT_THREAD() do {} while (0)
#endif
#define MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME 200
#define MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH (10 * MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME)
node_impl::node_impl(const std::string& user_agent) :
#ifdef P2P_IN_DEDICATED_THREAD
_thread(std::make_shared<fc::thread>("p2p")),
#endif // P2P_IN_DEDICATED_THREAD
_delegate(nullptr),
_is_firewalled(firewalled_state::unknown),
_potential_peer_database_updated(false),
_sync_items_to_fetch_updated(false),
_suspend_fetching_sync_blocks(false),
_items_to_fetch_updated(false),
_items_to_fetch_sequence_counter(0),
_recent_block_interval_in_seconds(GRAPHENE_MAX_BLOCK_INTERVAL),
_user_agent_string(user_agent),
_desired_number_of_connections(GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS),
_maximum_number_of_connections(GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS),
_peer_connection_retry_timeout(GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME),
_peer_inactivity_timeout(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT),
_most_recent_blocks_accepted(_maximum_number_of_connections),
_total_number_of_unfetched_items(0),
_rate_limiter(0, 0),
_last_reported_number_of_connections(0),
_peer_advertising_disabled(false),
_average_network_read_speed_seconds(60),
_average_network_write_speed_seconds(60),
_average_network_read_speed_minutes(60),
_average_network_write_speed_minutes(60),
_average_network_read_speed_hours(72),
_average_network_write_speed_hours(72),
_average_network_usage_second_counter(0),
_average_network_usage_minute_counter(0),
_node_is_shutting_down(false),
_maximum_number_of_blocks_to_handle_at_one_time(MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME),
_maximum_number_of_sync_blocks_to_prefetch(MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH),
_maximum_blocks_per_peer_during_syncing(GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING)
{
_rate_limiter.set_actual_rate_time_constant(fc::seconds(2));
fc::rand_pseudo_bytes(&_node_id.data[0], (int)_node_id.size());
}
node_impl::~node_impl()
{
VERIFY_CORRECT_THREAD();
ilog( "cleaning up node" );
_node_is_shutting_down = true;
for (const peer_connection_ptr& active_peer : _active_connections)
{
fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
if (inbound_endpoint)
{
fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
if (updated_peer_record)
{
updated_peer_record->last_seen_time = fc::time_point::now();
_potential_peer_db.update_entry(*updated_peer_record);
}
}
}
try
{
ilog( "close" );
close();
}
catch ( const fc::exception& e )
{
wlog( "unexpected exception on close ${e}", ("e", e) );
}
ilog( "done" );
}
void node_impl::save_node_configuration()
{
VERIFY_CORRECT_THREAD();
if( fc::exists(_node_configuration_directory ) )
{
fc::path configuration_file_name( _node_configuration_directory / NODE_CONFIGURATION_FILENAME );
try
{
fc::json::save_to_file( _node_configuration, configuration_file_name );
}
catch (const fc::canceled_exception&)
{
throw;
}
catch ( const fc::exception& except )
{
elog( "error writing node configuration to file ${filename}: ${error}",
( "filename", configuration_file_name )("error", except.to_detail_string() ) );
}
}
}
void node_impl::p2p_network_connect_loop()
{
VERIFY_CORRECT_THREAD();
while (!_p2p_network_connect_loop_done.canceled())
{
try
{
dlog("Starting an iteration of p2p_network_connect_loop().");
display_current_connections();
// add-once peers bypass our checks on the maximum/desired number of connections (but they will still be counted against the totals once they're connected)
if (!_add_once_node_list.empty())
{
std::list<potential_peer_record> add_once_node_list;
add_once_node_list.swap(_add_once_node_list);
dlog("Processing \"add once\" node list containing ${count} peers:", ("count", add_once_node_list.size()));
for (const potential_peer_record& add_once_peer : add_once_node_list)
{
dlog(" ${peer}", ("peer", add_once_peer.endpoint));
}
for (const potential_peer_record& add_once_peer : add_once_node_list)
{
// see if we have an existing connection to that peer. If we do, disconnect them and
// then try to connect the next time through the loop
peer_connection_ptr existing_connection_ptr = get_connection_to_endpoint( add_once_peer.endpoint );
if(!existing_connection_ptr)
connect_to_endpoint(add_once_peer.endpoint);
}
dlog("Done processing \"add once\" node list");
}
while (is_wanting_new_connections())
{
bool initiated_connection_this_pass = false;
_potential_peer_database_updated = false;
for (peer_database::iterator iter = _potential_peer_db.begin();
iter != _potential_peer_db.end() && is_wanting_new_connections();
++iter)
{
fc::microseconds delay_until_retry = fc::seconds((iter->number_of_failed_connection_attempts + 1) * _peer_connection_retry_timeout);
if (!is_connection_to_endpoint_in_progress(iter->endpoint) &&
((iter->last_connection_disposition != last_connection_failed &&
iter->last_connection_disposition != last_connection_rejected &&
iter->last_connection_disposition != last_connection_handshaking_failed) ||
(fc::time_point::now() - iter->last_connection_attempt_time) > delay_until_retry))
{
connect_to_endpoint(iter->endpoint);
initiated_connection_this_pass = true;
}
}
if (!initiated_connection_this_pass && !_potential_peer_database_updated)
break;
}
display_current_connections();
// if we broke out of the while loop, that means either we have connected to enough nodes, or
// we don't have any good candidates to connect to right now.
#if 0
try
{
_retrigger_connect_loop_promise = fc::promise<void>::ptr( new fc::promise<void>("graphene::net::retrigger_connect_loop") );
if( is_wanting_new_connections() || !_add_once_node_list.empty() )
{
if( is_wanting_new_connections() )
dlog( "Still want to connect to more nodes, but I don't have any good candidates. Trying again in 15 seconds" );
else
dlog( "I still have some \"add once\" nodes to connect to. Trying again in 15 seconds" );
_retrigger_connect_loop_promise->wait_until( fc::time_point::now() + fc::seconds(GRAPHENE_PEER_DATABASE_RETRY_DELAY ) );
}
else
{
dlog( "I don't need any more connections, waiting forever until something changes" );
_retrigger_connect_loop_promise->wait();
}
}
catch ( fc::timeout_exception& ) //intentionally not logged
{
} // catch
#else
fc::usleep(fc::seconds(10));
#endif
}
catch (const fc::canceled_exception&)
{
throw;
}
FC_CAPTURE_AND_LOG( (0) )
}// while(!canceled)
}
void node_impl::trigger_p2p_network_connect_loop()
{
VERIFY_CORRECT_THREAD();
dlog( "Triggering connect loop now" );
_potential_peer_database_updated = true;
//if( _retrigger_connect_loop_promise )
// _retrigger_connect_loop_promise->set_value();
}
bool node_impl::have_already_received_sync_item( const item_hash_t& item_hash )
{
VERIFY_CORRECT_THREAD();
return std::find_if(_received_sync_items.begin(), _received_sync_items.end(),
[&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _received_sync_items.end() ||
std::find_if(_new_received_sync_items.begin(), _new_received_sync_items.end(),
[&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _new_received_sync_items.end(); ;
}
void node_impl::request_sync_item_from_peer( const peer_connection_ptr& peer, const item_hash_t& item_to_request )
{
VERIFY_CORRECT_THREAD();
dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) );
item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
_active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
peer->last_sync_item_received_time = fc::time_point::now();
peer->sync_items_requested_from_peer.insert(item_to_request);
peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector<item_hash_t>{item_id_to_request.item_hash} ) );
}
void node_impl::request_sync_items_from_peer( const peer_connection_ptr& peer, const std::vector<item_hash_t>& items_to_request )
{
VERIFY_CORRECT_THREAD();
dlog( "requesting ${item_count} item(s) ${items_to_request} from peer ${endpoint}",
("item_count", items_to_request.size())("items_to_request", items_to_request)("endpoint", peer->get_remote_endpoint()) );
for (const item_hash_t& item_to_request : items_to_request)
{
_active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
peer->last_sync_item_received_time = fc::time_point::now();
peer->sync_items_requested_from_peer.insert(item_to_request);
}
peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request));
}
void node_impl::fetch_sync_items_loop()
{
VERIFY_CORRECT_THREAD();
while( !_fetch_sync_items_loop_done.canceled() )
{
_sync_items_to_fetch_updated = false;
dlog( "beginning another iteration of the sync items loop" );
if (!_suspend_fetching_sync_blocks)
{
std::map<peer_connection_ptr, std::vector<item_hash_t> > sync_item_requests_to_send;
{
ASSERT_TASK_NOT_PREEMPTED();
std::set<item_hash_t> sync_items_to_request;
// for each idle peer that we're syncing with
for( const peer_connection_ptr& peer : _active_connections )
{
if( peer->we_need_sync_items_from_peer &&
sync_item_requests_to_send.find(peer) == sync_item_requests_to_send.end() && // if we've already scheduled a request for this peer, don't consider scheduling another
peer->idle() )
{
if (!peer->inhibit_fetching_sync_blocks)
{
// loop through the items it has that we don't yet have on our blockchain
for( unsigned i = 0; i < peer->ids_of_items_to_get.size(); ++i )
{
item_hash_t item_to_potentially_request = peer->ids_of_items_to_get[i];
// if we don't already have this item in our temporary storage and we haven't requested from another syncing peer
if( !have_already_received_sync_item(item_to_potentially_request) && // already got it, but for some reson it's still in our list of items to fetch
sync_items_to_request.find(item_to_potentially_request) == sync_items_to_request.end() && // we have already decided to request it from another peer during this iteration
_active_sync_requests.find(item_to_potentially_request) == _active_sync_requests.end() ) // we've requested it in a previous iteration and we're still waiting for it to arrive
{
// then schedule a request from this peer
sync_item_requests_to_send[peer].push_back(item_to_potentially_request);
sync_items_to_request.insert( item_to_potentially_request );
if (sync_item_requests_to_send[peer].size() >= _maximum_blocks_per_peer_during_syncing)
break;
}
}
}
}
}
} // end non-preemptable section
// make all the requests we scheduled in the loop above
for( auto sync_item_request : sync_item_requests_to_send )
request_sync_items_from_peer( sync_item_request.first, sync_item_request.second );
sync_item_requests_to_send.clear();
}
else
dlog("fetch_sync_items_loop is suspended pending backlog processing");
if( !_sync_items_to_fetch_updated )
{
dlog( "no sync items to fetch right now, going to sleep" );
_retrigger_fetch_sync_items_loop_promise = fc::promise<void>::ptr( new fc::promise<void>("graphene::net::retrigger_fetch_sync_items_loop") );
_retrigger_fetch_sync_items_loop_promise->wait();
_retrigger_fetch_sync_items_loop_promise.reset();
}
} // while( !canceled )
}
void node_impl::trigger_fetch_sync_items_loop()
{
VERIFY_CORRECT_THREAD();
dlog( "Triggering fetch sync items loop now" );
_sync_items_to_fetch_updated = true;
if( _retrigger_fetch_sync_items_loop_promise )
_retrigger_fetch_sync_items_loop_promise->set_value();
}
bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const
{
for( const peer_connection_ptr& peer : _active_connections )
{
if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
return true;
}
return false;
}
void node_impl::fetch_items_loop()
{
VERIFY_CORRECT_THREAD();
while (!_fetch_item_loop_done.canceled())
{
_items_to_fetch_updated = false;
dlog("beginning an iteration of fetch items (${count} items to fetch)",
("count", _items_to_fetch.size()));
fc::time_point oldest_timestamp_to_fetch = fc::time_point::now() - fc::seconds(_recent_block_interval_in_seconds * GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS);
fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
// we need to construct a list of items to request from each peer first,
// then send the messages (in two steps, to avoid yielding while iterating)
// we want to evenly distribute our requests among our peers.
struct requested_item_count_index {};
struct peer_and_items_to_fetch
{
peer_connection_ptr peer;
std::vector<item_id> item_ids;
peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {}
bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; }
size_t number_of_items() const { return item_ids.size(); }
};
typedef boost::multi_index_container<peer_and_items_to_fetch,
boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<requested_item_count_index>,
boost::multi_index::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> > > > fetch_messages_to_send_set;
fetch_messages_to_send_set items_by_peer;
// initialize the fetch_messages_to_send with an empty set of items for all idle peers
for (const peer_connection_ptr& peer : _active_connections)
if (peer->idle())
items_by_peer.insert(peer_and_items_to_fetch(peer));
// now loop over all items we want to fetch
for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
{
if (item_iter->timestamp < oldest_timestamp_to_fetch)
{
// this item has probably already fallen out of our peers' caches, we'll just ignore it.
// this can happen during flooding, and the _items_to_fetch could otherwise get clogged
// with a bunch of items that we'll never be able to request from any peer
wlog("Unable to fetch item ${item} before its likely expiration time, removing it from our list of items to fetch", ("item", item_iter->item));
item_iter = _items_to_fetch.erase(item_iter);
}
else
{
// find a peer that has it, we'll use the one who has the least requests going to it to load balance
bool item_fetched = false;
for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
{
const peer_connection_ptr& peer = peer_iter->peer;
// if they have the item and we haven't already decided to ask them for too many other items
if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
{
if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
else
{
//dlog("requesting item ${hash} from peer ${endpoint}",
// ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
item_id item_id_to_fetch = item_iter->item;
peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
item_iter = _items_to_fetch.erase(item_iter);
item_fetched = true;
items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
peer_and_items.item_ids.push_back(item_id_to_fetch);
});
break;
}
}
}
if (!item_fetched)
++item_iter;
}
}
// we've figured out which peer will be providing each item, now send the messages.
for (const peer_and_items_to_fetch& peer_and_items : items_by_peer)
{
// the item lists are heterogenous and
// the fetch_items_message can only deal with one item type at a time.
std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
for (const item_id& item : peer_and_items.item_ids)
items_to_fetch_by_type[item.item_type].push_back(item.item_hash);
for (auto& items_by_type : items_to_fetch_by_type)
{
dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first)
("endpoint", peer_and_items.peer->get_remote_endpoint())
("hashes", items_by_type.second));
peer_and_items.peer->send_message(fetch_items_message(items_by_type.first,
items_by_type.second));
}
}
items_by_peer.clear();
if (!_items_to_fetch_updated)
{
_retrigger_fetch_item_loop_promise = fc::promise<void>::ptr(new fc::promise<void>("graphene::net::retrigger_fetch_item_loop"));
fc::microseconds time_until_retrigger = fc::microseconds::maximum();
if (next_peer_unblocked_time != fc::time_point::maximum())
time_until_retrigger = next_peer_unblocked_time - fc::time_point::now();
try
{
if (time_until_retrigger > fc::microseconds(0))
_retrigger_fetch_item_loop_promise->wait(time_until_retrigger);
}
catch (const fc::timeout_exception&)
{
dlog("Resuming fetch_items_loop due to timeout -- one of our peers should no longer be throttled");
}
_retrigger_fetch_item_loop_promise.reset();
}
} // while (!canceled)
}
void node_impl::trigger_fetch_items_loop()
{
VERIFY_CORRECT_THREAD();
_items_to_fetch_updated = true;
if( _retrigger_fetch_item_loop_promise )
_retrigger_fetch_item_loop_promise->set_value();
}
void node_impl::advertise_inventory_loop()
{
VERIFY_CORRECT_THREAD();
while (!_advertise_inventory_loop_done.canceled())
{
dlog("beginning an iteration of advertise inventory");
// swap inventory into local variable, clearing the node's copy
std::unordered_set<item_id> inventory_to_advertise;
inventory_to_advertise.swap(_new_inventory);
// process all inventory to advertise and construct the inventory messages we'll send
// first, then send them all in a batch (to avoid any fiber interruption points while
// we're computing the messages)
std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
for (const peer_connection_ptr& peer : _active_connections)
{
// only advertise to peers who are in sync with us
idump((peer->peer_needs_sync_items_from_us));
if( !peer->peer_needs_sync_items_from_us )
{
std::map<uint32_t, std::vector<item_hash_t> > items_to_advertise_by_type;
// don't send the peer anything we've already advertised to it
// or anything it has advertised to us
// group the items we need to send by type, because we'll need to send one inventory message per type
unsigned total_items_to_send_to_this_peer = 0;
idump((inventory_to_advertise));
for (const item_id& item_to_advertise : inventory_to_advertise)
{
auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise);
auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise);
if (adv_to_peer == peer->inventory_advertised_to_peer.end() &&
adv_to_us == peer->inventory_peer_advertised_to_us.end())
{
items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash);
peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now()));
++total_items_to_send_to_this_peer;
if (item_to_advertise.item_type == trx_message_type)
testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
}
else
{
if (adv_to_peer != peer->inventory_advertised_to_peer.end() )
idump( (*adv_to_peer) );
if (adv_to_us != peer->inventory_peer_advertised_to_us.end() )
idump( (*adv_to_us) );
}
}
dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}",
("count", total_items_to_send_to_this_peer)
("types", items_to_advertise_by_type.size())
("endpoint", peer->get_remote_endpoint()));
for (auto items_group : items_to_advertise_by_type)
inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second)));
}
peer->clear_old_inventory();
}
for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
iter->first->send_message(iter->second);
inventory_messages_to_send.clear();
if (_new_inventory.empty())
{
_retrigger_advertise_inventory_loop_promise = fc::promise<void>::ptr(new fc::promise<void>("graphene::net::retrigger_advertise_inventory_loop"));
_retrigger_advertise_inventory_loop_promise->wait();
_retrigger_advertise_inventory_loop_promise.reset();
}
} // while(!canceled)
}
void node_impl::trigger_advertise_inventory_loop()
{
VERIFY_CORRECT_THREAD();
if( _retrigger_advertise_inventory_loop_promise )
_retrigger_advertise_inventory_loop_promise->set_value();
}
void node_impl::terminate_inactive_connections_loop()
{
VERIFY_CORRECT_THREAD();
std::list<peer_connection_ptr> peers_to_disconnect_gently;
std::list<peer_connection_ptr> peers_to_disconnect_forcibly;
std::list<peer_connection_ptr> peers_to_send_keep_alive;
std::list<peer_connection_ptr> peers_to_terminate;
_recent_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds();
// Disconnect peers that haven't sent us any data recently
// These numbers are just guesses and we need to think through how this works better.
// If we and our peers get disconnected from the rest of the network, we will not
// receive any blocks or transactions from the rest of the world, and that will
// probably make us disconnect from our peers even though we have working connections to
// them (but they won't have sent us anything since they aren't getting blocks either).
// This might not be so bad because it could make us initiate more connections and
// reconnect with the rest of the network, or it might just futher isolate us.
{
// As usual, the first step is to walk through all our peers and figure out which
// peers need action (disconneting, sending keepalives, etc), then we walk through
// those lists yielding at our leisure later.
ASSERT_TASK_NOT_PREEMPTED();
uint32_t handshaking_timeout = _peer_inactivity_timeout;
fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
for( const peer_connection_ptr handshaking_peer : _handshaking_connections )
if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
{
wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds",
( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) );
wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
("status", handshaking_peer->negotiation_status)
("sent", handshaking_peer->get_total_bytes_sent())
("received", handshaking_peer->get_total_bytes_received()));
handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
("peer", handshaking_peer->get_remote_endpoint())
("timeout", handshaking_timeout)
("status", handshaking_peer->negotiation_status)
("sent", handshaking_peer->get_total_bytes_sent())
("received", handshaking_peer->get_total_bytes_received())));
peers_to_disconnect_forcibly.push_back( handshaking_peer );
}
// timeout for any active peers is two block intervals
uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
// set the ignored request time out to 1 second. When we request a block
// or transaction from a peer, this timeout determines how long we wait for them
// to reply before we give up and ask another peer for the item.
// Ideally this should be significantly shorter than the block interval, because
// we'd like to realize the block isn't coming and fetch it from a different
// peer before the next block comes in. At the current target of 3 second blocks,
// 1 second seems reasonable. When we get closer to our eventual target of 1 second
// blocks, this will need to be re-evaluated (i.e., can we set the timeout to 500ms
// and still handle normal network & processing delays without excessive disconnects)
fc::microseconds active_ignored_request_timeout = fc::seconds(1);
fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
for( const peer_connection_ptr& active_peer : _active_connections )
{
if( active_peer->connection_initiation_time < active_disconnect_threshold &&
active_peer->get_last_message_received_time() < active_disconnect_threshold )
{
wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds",
( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) );
peers_to_disconnect_gently.push_back( active_peer );
}
else
{
bool disconnect_due_to_request_timeout = false;
if (!active_peer->sync_items_requested_from_peer.empty() &&
active_peer->last_sync_item_received_time < active_ignored_request_threshold)
{
wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
("peer", active_peer->get_remote_endpoint())("count", active_peer->sync_items_requested_from_peer.size()));
disconnect_due_to_request_timeout = true;
}
if (!disconnect_due_to_request_timeout &&
active_peer->item_ids_requested_from_peer &&
active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
{
wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
("peer", active_peer->get_remote_endpoint())
("synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
disconnect_due_to_request_timeout = true;
}
if (!disconnect_due_to_request_timeout)
for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer)
if (item_and_time.second < active_ignored_request_threshold)
{
wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}",
("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash));
disconnect_due_to_request_timeout = true;
break;
}
if (disconnect_due_to_request_timeout)
{
// we should probably disconnect nicely and give them a reason, but right now the logic
// for rescheduling the requests only executes when the connection is fully closed,
// and we want to get those requests rescheduled as soon as possible
peers_to_disconnect_forcibly.push_back(active_peer);
}
else if (active_peer->connection_initiation_time < active_send_keepalive_threshold &&
active_peer->get_last_message_received_time() < active_send_keepalive_threshold)
{
wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) );
peers_to_send_keep_alive.push_back(active_peer);
}
else if (active_peer->we_need_sync_items_from_peer &&
!active_peer->is_currently_handling_message() &&
!active_peer->item_ids_requested_from_peer &&
active_peer->ids_of_items_to_get.empty())
{
// This is a state we should never get into in the first place, but if we do, we should disconnect the peer
// to re-establish the connection.
fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
("peer", active_peer->get_remote_endpoint()));
wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
("peer", active_peer->get_remote_endpoint()));
peers_to_disconnect_forcibly.push_back(active_peer);
}
}
}
fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT);
for( const peer_connection_ptr& closing_peer : _closing_connections )
if( closing_peer->connection_closed_time < closing_disconnect_threshold )
{
// we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
// seconds ago, but they haven't done it yet. Terminate the connection now
wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
( "peer", closing_peer->get_remote_endpoint() ) );
peers_to_disconnect_forcibly.push_back( closing_peer );
}
uint32_t failed_terminate_timeout_seconds = 120;
fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
for (const peer_connection_ptr& peer : _terminating_connections )
if (peer->get_connection_terminated_time() != fc::time_point::min() &&
peer->get_connection_terminated_time() < failed_terminate_threshold)
{
wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
peers_to_terminate.push_back(peer);
}
// That's the end of the sorting step; now all peers that require further processing are now in one of the
// lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
// if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
// and once we start yielding, we may find that we've moved that peer to another list (closed or active)
// and that triggers assertions, maybe even errors
for (const peer_connection_ptr& peer : peers_to_terminate )
{
assert(_terminating_connections.find(peer) != _terminating_connections.end());
_terminating_connections.erase(peer);
schedule_peer_for_deletion(peer);
}
peers_to_terminate.clear();
// if we're going to abruptly disconnect anyone, do it here
// (it doesn't yield). I don't think there would be any harm if this were
// moved to the yielding section
for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly )
{
move_peer_to_terminating_list(peer);
peer->close_connection();
}
peers_to_disconnect_forcibly.clear();
} // end ASSERT_TASK_NOT_PREEMPTED()
// Now process the peers that we need to do yielding functions with (disconnect sends a message with the
// disconnect reason, so it may yield)
for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
{
fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) );
disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error );
}
peers_to_disconnect_gently.clear();
for( const peer_connection_ptr& peer : peers_to_send_keep_alive )
peer->send_message(current_time_request_message(),
offsetof(current_time_request_message, request_sent_time));
peers_to_send_keep_alive.clear();
if (!_node_is_shutting_down && !_terminate_inactive_connections_loop_done.canceled())
_terminate_inactive_connections_loop_done = fc::schedule( [this](){ terminate_inactive_connections_loop(); },
fc::time_point::now() + fc::seconds(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT / 2),