-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathnet_plugin.cpp
5086 lines (4504 loc) · 231 KB
/
net_plugin.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include <eosio/net_plugin/net_plugin.hpp>
#include <eosio/net_plugin/protocol.hpp>
#include <eosio/net_plugin/net_utils.hpp>
#include <eosio/net_plugin/auto_bp_peering.hpp>
#include <eosio/chain/types.hpp>
#include <eosio/chain/controller.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/block.hpp>
#include <eosio/chain/plugin_interface.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <eosio/producer_plugin/producer_plugin.hpp>
#include <eosio/chain/fork_database.hpp>
#include <fc/bitutil.hpp>
#include <fc/network/message_buffer.hpp>
#include <fc/io/json.hpp>
#include <fc/io/raw.hpp>
#include <fc/variant_object.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/exception/exception.hpp>
#include <fc/time.hpp>
#include <fc/mutex.hpp>
#include <fc/network/listener.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/multi_index/key.hpp>
#include <atomic>
#include <cmath>
#include <memory>
#include <new>
using namespace eosio::chain::plugin_interface;
using namespace std::chrono_literals;
namespace boost
{
/// @brief Overload for boost::lexical_cast to convert vector of strings to string
///
/// Used by boost::program_options to print the default value of an std::vector<std::string> option
///
/// @param v the vector to convert
/// @return the contents of the vector as a comma-separated string
template<>
inline std::string lexical_cast<std::string>(const std::vector<std::string>& v)
{
return boost::join(v, ",");
}
}
namespace eosio {
static auto _net_plugin = application::register_plugin<net_plugin>();
using std::vector;
using boost::asio::ip::tcp;
using boost::asio::ip::address_v4;
using boost::asio::ip::host_name;
using boost::multi_index_container;
using namespace boost::multi_index;
using fc::time_point;
using fc::time_point_sec;
using eosio::chain::transaction_id_type;
class connection;
using connection_ptr = std::shared_ptr<connection>;
using connection_wptr = std::weak_ptr<connection>;
using send_buffer_type = std::shared_ptr<std::vector<char>>;
static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();
const std::string logger_name("net_plugin_impl");
fc::logger logger;
std::string peer_log_format;
template <typename Strand>
void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) {
if( !strand.running_in_this_thread() ) {
fc_elog( logger, "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) );
app().quit();
}
}
struct node_transaction_state {
transaction_id_type id;
time_point_sec expires; /// time after which this may be purged.
uint32_t connection_id = 0;
};
struct by_expiry;
typedef multi_index_container<
node_transaction_state,
indexed_by<
ordered_unique<
tag<by_id>,
composite_key< node_transaction_state,
member<node_transaction_state, transaction_id_type, &node_transaction_state::id>,
member<node_transaction_state, uint32_t, &node_transaction_state::connection_id>
>,
composite_key_compare< std::less<transaction_id_type>, std::less<> >
>,
ordered_non_unique<
tag< by_expiry >,
member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >
>
>
node_transaction_index;
struct peer_block_state {
block_id_type id;
uint32_t connection_id = 0;
uint32_t block_num() const { return block_header::num_from_id(id); }
};
struct by_connection_id;
typedef multi_index_container<
eosio::peer_block_state,
indexed_by<
ordered_unique< tag<by_connection_id>,
composite_key< peer_block_state,
const_mem_fun<peer_block_state, uint32_t , &eosio::peer_block_state::block_num>,
member<peer_block_state, block_id_type, &eosio::peer_block_state::id>,
member<peer_block_state, uint32_t, &eosio::peer_block_state::connection_id>
>,
composite_key_compare< std::less<>, std::less<block_id_type>, std::less<> >
>
>
> peer_block_state_index;
class sync_manager {
private:
enum stages {
lib_catchup,
head_catchup,
in_sync
};
alignas(hardware_destructive_interference_sz)
fc::mutex sync_mtx;
uint32_t sync_known_fork_db_root_num GUARDED_BY(sync_mtx) {0}; // highest known fork_db root num from currently connected peers
uint32_t sync_last_requested_num GUARDED_BY(sync_mtx) {0}; // end block number of the last requested range, inclusive
uint32_t sync_next_expected_num GUARDED_BY(sync_mtx) {0}; // the next block number we need from peer
connection_ptr sync_source GUARDED_BY(sync_mtx); // connection we are currently syncing from
const uint32_t sync_fetch_span {0};
const uint32_t sync_peer_limit {0};
alignas(hardware_destructive_interference_sz)
std::atomic<stages> sync_state{in_sync};
std::atomic<int32_t> sync_timers_active{0};
std::atomic<std::chrono::steady_clock::time_point> sync_active_time{};
std::atomic<uint32_t> sync_ordinal{0};
// indicate that we have received blocks to catch us up to head, delay sending out handshakes until we have
// applied the blocks and our controller head is updated
std::atomic<bool> send_handshakes_when_synced{false};
// Instant finality makes it likely peers think their fork_db_root and head are
// not in sync but in reality they are only within small difference.
// To avoid unnecessary catchups, a margin of min_blocks_distance
// between fork_db_root and head must be reached before catchup starts.
const uint32_t min_blocks_distance{0};
private:
constexpr static auto stage_str( stages s );
bool set_state( stages newstate );
bool is_sync_required( uint32_t fork_db_head_block_num ) const REQUIRES(sync_mtx);
bool is_sync_request_ahead_allowed(block_num_type blk_num) const REQUIRES(sync_mtx);
void request_next_chunk( const connection_ptr& conn = connection_ptr() ) REQUIRES(sync_mtx);
connection_ptr find_next_sync_node(); // call with locked mutex
void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex
bool sync_recently_active() const;
bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex
public:
enum class closing_mode {
immediately, // closing connection immediately
handshake // sending handshake message
};
explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance );
static void send_handshakes();
static void send_block_nack_resets();
bool syncing_from_peer() const { return sync_state == lib_catchup; }
bool is_in_sync() const { return sync_state == in_sync; }
void sync_reset_fork_db_root_num( const connection_ptr& conn, bool closing );
void sync_timeout(const connection_ptr& c, const boost::system::error_code& ec);
void sync_wait(const connection_ptr& c);
void sync_reassign_fetch( const connection_ptr& c );
void rejected_block( const connection_ptr& c, uint32_t blk_num, closing_mode mode );
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num,
const fc::microseconds& blk_latency );
void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency );
void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
void send_handshakes_if_synced(const fc::microseconds& blk_latency);
};
class dispatch_manager {
alignas(hardware_destructive_interference_sz)
mutable fc::mutex blk_state_mtx;
peer_block_state_index blk_state GUARDED_BY(blk_state_mtx);
alignas(hardware_destructive_interference_sz)
mutable fc::mutex local_txns_mtx;
node_transaction_index local_txns GUARDED_BY(local_txns_mtx);
public:
boost::asio::io_context::strand strand;
explicit dispatch_manager(boost::asio::io_context& io_context)
: strand( io_context ) {}
void bcast_transaction(const packed_transaction_ptr& trx);
void rejected_transaction(const packed_transaction_ptr& trx);
void bcast_block( const signed_block_ptr& b, const block_id_type& id );
void expire_blocks( uint32_t fork_db_root_num );
void recv_notice(const connection_ptr& conn, const notice_message& msg, bool generated);
bool add_peer_block( const block_id_type& blkid, uint32_t connection_id );
bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const;
bool have_block(const block_id_type& blkid) const;
void rm_block(const block_id_type& blkid);
bool add_peer_txn( const transaction_id_type& id, const time_point_sec& trx_expires, uint32_t connection_id,
const time_point_sec& now = time_point_sec(time_point::now()) );
bool have_txn( const transaction_id_type& tid ) const;
void expire_txns();
void bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg );
};
/**
* default value initializers
*/
constexpr auto def_send_buffer_size_mb = 4;
constexpr auto def_send_buffer_size = 1024*1024*def_send_buffer_size_mb;
constexpr auto def_max_write_queue_size = def_send_buffer_size*10;
constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB
constexpr auto def_max_consecutive_immediate_connection_close = 9; // back off if client keeps closing
constexpr auto def_max_clients = 25; // 0 for unlimited clients
constexpr auto def_max_nodes_per_host = 1;
constexpr auto def_conn_retry_wait = 30;
constexpr auto def_txn_expire_wait = std::chrono::seconds(3);
constexpr auto def_resp_expected_wait = std::chrono::seconds(5);
constexpr auto def_sync_fetch_span = 1000;
constexpr auto def_keepalive_interval = 10000;
constexpr auto message_header_size = sizeof(uint32_t);
// see protocol net_message
enum class msg_type_t {
handshake_message = fc::get_index<net_message, handshake_message>(),
chain_size_message = fc::get_index<net_message, chain_size_message>(),
go_away_message = fc::get_index<net_message, go_away_message>(),
time_message = fc::get_index<net_message, time_message>(),
notice_message = fc::get_index<net_message, notice_message>(),
request_message = fc::get_index<net_message, request_message>(),
sync_request_message = fc::get_index<net_message, sync_request_message>(),
signed_block = fc::get_index<net_message, signed_block>(),
packed_transaction = fc::get_index<net_message, packed_transaction>(),
vote_message = fc::get_index<net_message, vote_message>(),
block_nack_message = fc::get_index<net_message, block_nack_message>(),
block_notice_message = fc::get_index<net_message, block_notice_message>(),
unknown
};
constexpr uint32_t to_index(msg_type_t net_msg) {
static_assert( std::variant_size_v<net_message> == static_cast<uint32_t>(msg_type_t::unknown));
return static_cast<uint32_t>(net_msg);
}
constexpr msg_type_t to_msg_type_t(size_t v) {
static_assert( std::variant_size_v<net_message> == static_cast<size_t>(msg_type_t::unknown));
EOS_ASSERT(v < to_index(msg_type_t::unknown), plugin_exception, "Invalid net_message index: ${v}", ("v", v));
return static_cast<msg_type_t>(v);
}
class connections_manager {
public:
struct connection_detail {
std::string host;
connection_ptr c;
};
using connection_details_index = multi_index_container<
connection_detail,
indexed_by<
ordered_non_unique<
tag<struct by_host>,
key<&connection_detail::host>
>,
ordered_unique<
tag<struct by_connection>,
key<&connection_detail::c>
>
>
>;
enum class timer_type { check, stats };
private:
alignas(hardware_destructive_interference_sz)
mutable std::shared_mutex connections_mtx;
connection_details_index connections;
chain::flat_set<string> supplied_peers;
alignas(hardware_destructive_interference_sz)
fc::mutex connector_check_timer_mtx;
unique_ptr<boost::asio::steady_timer> connector_check_timer GUARDED_BY(connector_check_timer_mtx);
fc::mutex connection_stats_timer_mtx;
unique_ptr<boost::asio::steady_timer> connection_stats_timer GUARDED_BY(connection_stats_timer_mtx);
/// thread safe, only modified on startup
std::chrono::milliseconds heartbeat_timeout{def_keepalive_interval*2};
fc::microseconds max_cleanup_time;
boost::asio::steady_timer::duration connector_period{0};
uint32_t max_client_count{def_max_clients};
std::function<void(net_plugin::p2p_connections_metrics)> update_p2p_connection_metrics;
private: // must call with held mutex
connection_ptr find_connection_i(const string& host) const;
void connection_monitor(const std::weak_ptr<connection>& from_connection);
void connection_statistics_monitor(const std::weak_ptr<connection>& from_connection);
public:
size_t number_connections() const;
void add_supplied_peers(const vector<string>& peers );
// not thread safe, only call on startup
void init(std::chrono::milliseconds heartbeat_timeout_ms,
fc::microseconds conn_max_cleanup_time,
boost::asio::steady_timer::duration conn_period,
uint32_t maximum_client_count);
std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; }
uint32_t get_max_client_count() const { return max_client_count; }
fc::microseconds get_connector_period() const;
void register_update_p2p_connection_metrics(std::function<void(net_plugin::p2p_connections_metrics)>&& fun);
void connect_supplied_peers(const string& p2p_address);
void start_conn_timers();
void start_conn_timer(boost::asio::steady_timer::duration du,
std::weak_ptr<connection> from_connection,
timer_type which);
void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
string disconnect(const string& host);
void close_all();
std::optional<connection_status> status(const string& host) const;
vector<connection_status> connection_statuses() const;
template <typename Function>
bool any_of_supplied_peers(Function&& f) const;
template <typename Function>
void for_each_connection(Function&& f) const;
template <typename Function>
void for_each_block_connection(Function&& f) const;
template <typename UnaryPredicate>
bool any_of_connections(UnaryPredicate&& p) const;
template <typename UnaryPredicate>
bool any_of_block_connections(UnaryPredicate&& p) const;
}; // connections_manager
class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl>,
public auto_bp_peering::bp_connection_manager<net_plugin_impl, connection> {
public:
uint16_t thread_pool_size = 4;
eosio::chain::named_thread_pool<struct net> thread_pool;
std::atomic<uint32_t> current_connection_id{0};
unique_ptr< sync_manager > sync_master;
dispatch_manager dispatcher {thread_pool.get_executor()};
connections_manager connections;
/**
* Thread safe, only updated in plugin initialize
* @{
*/
vector<string> p2p_addresses;
vector<string> p2p_server_addresses;
const string& get_first_p2p_address() const;
vector<chain::public_key_type> allowed_peers; ///< peer keys allowed to connect
std::map<chain::public_key_type,
chain::private_key_type> private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes
enum possible_connections : char {
None = 0,
Producers = 1 << 0,
Specified = 1 << 1,
Any = 1 << 2
};
possible_connections allowed_connections{None};
boost::asio::steady_timer::duration txn_exp_period{0};
boost::asio::steady_timer::duration resp_expected_period{0};
std::chrono::milliseconds keepalive_interval{std::chrono::milliseconds{def_keepalive_interval}};
uint32_t max_nodes_per_host = 1;
bool p2p_accept_transactions = true;
bool p2p_disable_block_nack = false;
bool p2p_accept_votes = true;
fc::microseconds p2p_dedup_cache_expire_time_us{};
chain_id_type chain_id;
fc::sha256 node_id;
string user_agent_name;
chain_plugin* chain_plug = nullptr;
producer_plugin* producer_plug = nullptr;
bool use_socket_read_watermark = false;
/** @} */
alignas(hardware_destructive_interference_sz)
fc::mutex expire_timer_mtx;
boost::asio::steady_timer expire_timer GUARDED_BY(expire_timer_mtx) {thread_pool.get_executor()};
alignas(hardware_destructive_interference_sz)
fc::mutex keepalive_timer_mtx;
boost::asio::steady_timer keepalive_timer GUARDED_BY(keepalive_timer_mtx) {thread_pool.get_executor()};
alignas(hardware_destructive_interference_sz)
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;
boost::asio::deadline_timer accept_error_timer{thread_pool.get_executor()};
struct chain_info_t {
block_id_type fork_db_root_id;
uint32_t fork_db_root_num = 0;
block_id_type head_id;
uint32_t head_num = 0;
block_id_type fork_db_head_id;
uint32_t fork_db_head_num = 0;
};
std::function<void()> increment_failed_p2p_connections;
std::function<void()> increment_dropped_trxs;
private:
alignas(hardware_destructive_interference_sz)
mutable fc::mutex chain_info_mtx; // protects chain_info_t
chain_info_t chain_info GUARDED_BY(chain_info_mtx);
public:
void update_chain_info();
void update_chain_info(const block_id_type& fork_db_root_id);
chain_info_t get_chain_info() const;
uint32_t get_fork_db_root_num() const;
uint32_t get_chain_head_num() const;
uint32_t get_fork_db_head_num() const;
void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id );
void on_accepted_block( const signed_block_ptr& block, const block_id_type& id );
void on_irreversible_block( const signed_block_ptr& block, const block_id_type& id );
void broadcast_vote_message( uint32_t connection_id, vote_result_t status,
const vote_message_ptr& vote,
const finalizer_authority_ptr& active_auth,
const finalizer_authority_ptr& pending_auth);
void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
void bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg );
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
void start_expire_timer();
void start_monitors();
void expire();
/** \name Peer Timestamps
* Time message handling
* @{
*/
/** \brief Peer heartbeat ticker.
*/
void ticker();
/** @} */
/** \brief Determine if a peer is allowed to connect.
*
* Checks current connection mode and key authentication.
*
* \return False if the peer should not connect, true otherwise.
*/
bool authenticate_peer(const handshake_message& msg) const;
/** \brief Retrieve public key used to authenticate with peers.
*
* Finds a key to use for authentication. If this node is a producer, use
* the front of the producer key map. If the node is not a producer but has
* a configured private key, use it. If the node is neither a producer nor has
* a private key, returns an empty key.
*
* \note On a node with multiple private keys configured, the key with the first
* numerically smaller byte will always be used.
*/
chain::public_key_type get_authentication_key() const;
/** \brief Returns a signature of the digest using the corresponding private key of the signer.
*
* If there are no configured private keys, returns an empty signature.
*/
chain::signature_type sign_compact(const chain::public_key_type& signer, const fc::sha256& digest) const;
constexpr static uint16_t to_protocol_version(uint16_t v);
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
bool in_sync() const;
fc::logger& get_logger() { return logger; }
void create_session(tcp::socket&& socket, const string listen_address, size_t limit);
std::string empty{};
}; //net_plugin_impl
// peer_[x]log must be called from thread in connection strand
#define peer_dlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::debug ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_ilog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::info ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_wlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::warn ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_elog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::error ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
template<class enum_type, class=typename std::enable_if<std::is_enum<enum_type>::value>::type>
inline enum_type& operator|=(enum_type& lhs, const enum_type& rhs)
{
using T = std::underlying_type_t <enum_type>;
return lhs = static_cast<enum_type>(static_cast<T>(lhs) | static_cast<T>(rhs));
}
static net_plugin_impl *my_impl;
/**
* For a while, network version was a 16 bit value equal to the second set of 16 bits
* of the current build's git commit id. We are now replacing that with an integer protocol
* identifier. Based on historical analysis of all git commit identifiers, the larges gap
* between ajacent commit id values is shown below.
* these numbers were found with the following commands on the master branch:
*
* git log | grep "^commit" | awk '{print substr($2,5,4)}' | sort -u > sorted.txt
* rm -f gap.txt; prev=0; for a in $(cat sorted.txt); do echo $prev $((0x$a - 0x$prev)) $a >> gap.txt; prev=$a; done; sort -k2 -n gap.txt | tail
*
* DO NOT EDIT net_version_base OR net_version_range!
*/
constexpr uint16_t net_version_base = 0x04b5;
constexpr uint16_t net_version_range = 106;
/**
* If there is a change to network protocol or behavior, increment net version to identify
* the need for compatibility hooks
*/
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
constexpr uint16_t proto_base = 0;
constexpr uint16_t proto_explicit_sync = 1; // version at time of eosio 1.0
constexpr uint16_t proto_block_id_notify = 2; // reserved. feature was removed. next net_version should be 3
constexpr uint16_t proto_pruned_types = 3; // eosio 2.1: supports new signed_block & packed_transaction types
constexpr uint16_t proto_heartbeat_interval = 4; // eosio 2.1: supports configurable heartbeat interval
constexpr uint16_t proto_dup_goaway_resolution = 5; // eosio 2.1: support peer address based duplicate connection resolution
constexpr uint16_t proto_dup_node_id_goaway = 6; // eosio 2.1: support peer node_id based duplicate connection resolution
constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported
constexpr uint16_t proto_block_range = 8; // include block range in notice_message
constexpr uint16_t proto_savanna = 9; // savanna, adds vote_message
constexpr uint16_t proto_block_nack = 10; // adds block_nack_message & block_notice_message
#pragma GCC diagnostic pop
constexpr uint16_t net_version_max = proto_block_nack;
/**
* Index by start_block_num
*/
struct peer_sync_state {
explicit peer_sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
:start_block( start ), end_block( end ), last( last_acted ),
start_time(time_point::now())
{}
uint32_t start_block;
uint32_t end_block;
uint32_t last; ///< last sent or received
time_point start_time; ///< time request made or received
};
// thread safe
class queued_buffer : boost::noncopyable {
public:
void reset() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
_out_queue.clear();
}
void clear_write_queue() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
}
void clear_out_queue(boost::system::error_code ec, std::size_t number_of_bytes_written) {
fc::lock_guard g( _mtx );
out_callback( ec, number_of_bytes_written );
_out_queue.clear();
}
uint32_t write_queue_size() const {
fc::lock_guard g( _mtx );
return _write_queue_size;
}
// called from connection strand
bool ready_to_send(uint32_t connection_id) const {
fc::unique_lock g( _mtx );
// if out_queue is not empty then async_write is in progress
const bool async_write_in_progress = !_out_queue.empty();
const bool ready = !async_write_in_progress && _write_queue_size != 0;
g.unlock();
if (async_write_in_progress) {
fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id));
}
return ready;
}
enum class queue_t { block_sync, general };
// @param callback must not callback into queued_buffer
bool add_write_queue(msg_type_t net_msg,
queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
fc::lock_guard g( _mtx );
if( net_msg == msg_type_t::packed_transaction ) {
_trx_write_queue.emplace_back( buff, std::move(callback) );
} else if (queue == queue_t::block_sync) {
_sync_write_queue.emplace_back( buff, std::move(callback) );
} else {
_write_queue.emplace_back( buff, std::move(callback) );
}
_write_queue_size += buff->size();
if( _write_queue_size > 2 * def_max_write_queue_size ) {
return false;
}
return true;
}
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
fc::lock_guard g( _mtx );
if (!_sync_write_queue.empty()) { // always send msgs from sync_write_queue first
fill_out_buffer( bufs, _sync_write_queue );
} else if (!_write_queue.empty()) { // always send msgs from write_queue before trx queue
fill_out_buffer( bufs, _write_queue );
} else {
fill_out_buffer( bufs, _trx_write_queue );
assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0);
}
}
private:
struct queued_write;
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs,
deque<queued_write>& w_queue ) REQUIRES(_mtx) {
while ( !w_queue.empty() ) {
auto& m = w_queue.front();
bufs.emplace_back( m.buff->data(), m.buff->size() );
_write_queue_size -= m.buff->size();
_out_queue.emplace_back( m );
w_queue.pop_front();
}
}
void out_callback( boost::system::error_code ec, std::size_t number_of_bytes_written ) REQUIRES(_mtx) {
for( auto& m : _out_queue ) {
m.callback( ec, number_of_bytes_written );
}
}
private:
struct queued_write {
std::shared_ptr<vector<char>> buff;
std::function<void( boost::system::error_code, std::size_t )> callback;
};
alignas(hardware_destructive_interference_sz)
mutable fc::mutex _mtx;
uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue
deque<queued_write> _write_queue GUARDED_BY(_mtx); // queued messages, all messages except sync & trxs
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue blocks will be sent first
deque<queued_write> _trx_write_queue GUARDED_BY(_mtx); // queued trx messages, trx_write_queue will be sent last
deque<queued_write> _out_queue GUARDED_BY(_mtx); // currently being async_write
}; // queued_buffer
/// monitors the status of blocks as to whether a block is accepted (sync'd) or
/// rejected. It groups consecutive rejected blocks in a (configurable) time
/// window (rbw) and maintains a metric of the number of consecutive rejected block
/// time windows (rbws).
class block_status_monitor {
private:
std::atomic<bool> in_accepted_state_ {true}; ///< indicates of accepted(true) or rejected(false) state
fc::microseconds window_size_{2*1000}; ///< rbw time interval (2ms)
fc::time_point window_start_; ///< The start of the recent rbw (0 implies not started)
std::atomic<uint32_t> events_{0}; ///< The number of consecutive rbws
const uint32_t max_consecutive_rejected_windows_{13};
public:
/// ctor
///
/// @param[in] window_size The time, in microseconds, of the rejected block window
/// @param[in] max_rejected_windows The max consecutive number of rejected block windows
/// @note Copy ctor is not allowed
explicit block_status_monitor(fc::microseconds window_size = fc::microseconds(2*1000),
uint32_t max_rejected_windows = 13) :
window_size_(window_size) {}
block_status_monitor( const block_status_monitor& ) = delete;
block_status_monitor( block_status_monitor&& ) = delete;
~block_status_monitor() = default;
/// thread safe, reset to initial state
void reset();
/// thread safe, called when a block is accepted
void accepted() { reset(); }
/// called when a block is rejected
void rejected();
/// returns number of consecutive rbws
auto events() const { return events_.load(); }
/// indicates if the max number of consecutive rbws has been reached or exceeded
bool max_events_violated() const { return events_ >= max_consecutive_rejected_windows_; }
/// assignment not allowed
block_status_monitor& operator=( const block_status_monitor& ) = delete;
block_status_monitor& operator=( block_status_monitor&& ) = delete;
}; // block_status_monitor
class connection : public std::enable_shared_from_this<connection> {
public:
enum class connection_state { connecting, connected, closing, closed };
explicit connection( const string& endpoint, const string& listen_address );
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
explicit connection( tcp::socket&& socket, const string& listen_address, size_t block_sync_rate_limit );
~connection() = default;
connection( const connection& ) = delete;
connection( connection&& ) = delete;
connection& operator=( const connection& ) = delete;
connection& operator=( connection&& ) = delete;
bool start_session();
bool socket_is_open() const { return socket_open.load(); } // thread safe, atomic
connection_state state() const { return conn_state.load(); } // thread safe atomic
void set_state(connection_state s);
static std::string state_str(connection_state s);
const string& peer_address() const { return peer_addr; } // thread safe, const
void set_connection_type( const string& peer_addr );
void set_peer_connection_type( const string& peer_addr );
bool is_transactions_only_connection()const { return connection_type == transactions_only; } // thread safe, atomic
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic
bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic
uint32_t get_peer_start_block_num() const { return peer_start_block_num.load(); }
uint32_t get_peer_fork_db_head_block_num() const { return peer_fork_db_head_block_num.load(); }
uint32_t get_last_received_block_num() const { return last_received_block_num.load(); }
uint32_t get_unique_blocks_rcvd_count() const { return unique_blocks_rcvd_count.load(); }
size_t get_bytes_received() const { return bytes_received.load(); }
std::chrono::nanoseconds get_last_bytes_received() const { return last_bytes_received.load(); }
size_t get_bytes_sent() const { return bytes_sent.load(); }
std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); }
size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); }
size_t get_block_sync_bytes_sent() const { return block_sync_total_bytes_sent.load(); }
bool get_block_sync_throttling() const { return block_sync_throttling.load(); }
boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
hb_timeout = msec;
}
uint64_t get_peer_ping_time_ns() const { return peer_ping_time_ns; }
private:
static const string unknown;
std::atomic<uint64_t> peer_ping_time_ns = std::numeric_limits<uint64_t>::max();
std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us
alignas(hardware_destructive_interference_sz)
std::atomic<bool> socket_open{false};
std::atomic<connection_state> conn_state{connection_state::connecting};
const string peer_addr;
enum connection_types : char {
both,
transactions_only,
blocks_only
};
size_t block_sync_rate_limit{0}; // bytes/second, default unlimited
std::atomic<connection_types> connection_type{both};
std::atomic<uint32_t> peer_start_block_num{0};
std::atomic<uint32_t> peer_fork_db_head_block_num{0};
std::atomic<uint32_t> last_received_block_num{0};
std::atomic<uint32_t> unique_blocks_rcvd_count{0};
std::atomic<size_t> bytes_received{0};
std::atomic<std::chrono::nanoseconds> last_bytes_received{0ns};
std::atomic<size_t> bytes_sent{0};
std::atomic<size_t> block_sync_bytes_received{0};
std::atomic<size_t> block_sync_total_bytes_sent{0};
std::chrono::nanoseconds block_sync_send_start{0ns}; // start of enqueue blocks
size_t block_sync_frame_bytes_sent{0}; // bytes sent in this set of enqueue blocks
std::atomic<bool> block_sync_throttling{false};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};
public:
boost::asio::strand<tcp::socket::executor_type> strand;
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction
fc::message_buffer<1024*1024> pending_message_buffer;
std::size_t outstanding_read_bytes{0}; // accessed only from strand threads
queued_buffer buffer_queue;
fc::sha256 conn_node_id;
string short_conn_node_id;
string listen_address; // address sent to peer in handshake
string log_p2p_address;
string log_remote_endpoint_ip;
string log_remote_endpoint_port;
string local_endpoint_ip;
string local_endpoint_port;
// kept in sync with last_handshake_recv.fork_db_root_num, only accessed from connection strand
uint32_t peer_fork_db_root_num = 0;
std::atomic<uint32_t> sync_ordinal{0};
// when syncing from a peer, the last block expected of the current range
uint32_t sync_last_requested_block{0};
alignas(hardware_destructive_interference_sz)
std::atomic<uint32_t> trx_in_progress_size{0};
fc::time_point last_dropped_trx_msg_time;
const uint32_t connection_id;
int16_t sent_handshake_count = 0;
alignas(hardware_destructive_interference_sz)
std::atomic<bool> peer_syncing_from_us{false};
std::atomic<uint16_t> protocol_version = 0;
uint16_t net_version = net_version_max;
std::atomic<uint16_t> consecutive_immediate_connection_close = 0;
std::atomic<bool> is_bp_connection = false;
block_status_monitor block_status_monitor_;
std::atomic<time_point> last_vote_received;
alignas(hardware_destructive_interference_sz)
fc::mutex sync_response_expected_timer_mtx;
boost::asio::steady_timer sync_response_expected_timer GUARDED_BY(sync_response_expected_timer_mtx);
alignas(hardware_destructive_interference_sz)
std::atomic<go_away_reason> no_retry{no_reason};
alignas(hardware_destructive_interference_sz)
mutable fc::mutex conn_mtx; //< mtx for last_handshake_recv .. remote_endpoint_ip
handshake_message last_handshake_recv GUARDED_BY(conn_mtx);
handshake_message last_handshake_sent GUARDED_BY(conn_mtx);
block_id_type conn_fork_db_head GUARDED_BY(conn_mtx);
uint32_t conn_fork_db_head_num GUARDED_BY(conn_mtx) {0};
fc::time_point last_close GUARDED_BY(conn_mtx);
std::string p2p_address GUARDED_BY(conn_mtx);
std::string unique_conn_node_id GUARDED_BY(conn_mtx);
std::string remote_endpoint_ip GUARDED_BY(conn_mtx);
boost::asio::ip::address_v6::bytes_type remote_endpoint_ip_array GUARDED_BY(conn_mtx);
std::atomic<std::chrono::nanoseconds> connection_start_time;
// block nack support
static constexpr uint16_t consecutive_block_nacks_threshold{2}; // stop sending blocks when reached
block_num_type consecutive_blocks_nacks{0};
block_id_type last_block_nack;
block_id_type last_block_nack_request_message_id GUARDED_BY(conn_mtx);
connection_status get_status()const;
/** \name Peer Timestamps
* Time message handling
* @{
*/
// See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/
std::chrono::nanoseconds org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// std::chrono::nanoseconds (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
std::chrono::nanoseconds xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// std::chrono::nanoseconds (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
std::chrono::steady_clock::time_point latest_msg_time{std::chrono::steady_clock::time_point::min()};
std::chrono::milliseconds hb_timeout{std::chrono::milliseconds{def_keepalive_interval}};
std::chrono::steady_clock::time_point latest_blk_time{std::chrono::steady_clock::time_point::min()};
bool connected() const;
bool closed() const; // socket is not open or is closed or closing, thread safe
bool current() const;
bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_fork_db_root_num, uint32_t sync_fetch_span) const;
/// @param reconnect true if we should try and reconnect immediately after close
/// @param shutdown true only if plugin is shutting down
void close( bool reconnect = false, bool shutdown = false );
private:
void _close( bool reconnect, bool shutdown ); // for easy capture
bool process_next_block_message(uint32_t message_length);
bool process_next_trx_message(uint32_t message_length);
bool process_next_vote_message(uint32_t message_length);
void update_endpoints(const tcp::endpoint& endpoint = tcp::endpoint());
public:
bool populate_handshake( handshake_message& hello ) const;
bool resolve_and_connect();
void connect( const tcp::resolver::results_type& endpoints );
void start_read_message();
/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(uint32_t message_length);
void send_block_nack(const block_id_type& block_id);
void send_handshake();
/** \name Peer Timestamps
* Time message handling
*/
/** \brief Check heartbeat time and send Time_message
*/
void check_heartbeat( std::chrono::steady_clock::time_point current_time );
/** \brief Populate and queue time_message
*/
void send_time();
/** \brief Populate and queue time_message immediately using incoming time_message
*/
void send_time(const time_message& msg);
/** \brief Read system time and convert to a 64 bit integer.
*
* There are six calls to this routine in the program. One
* when a packet arrives from the network, one when a packet
* is placed on the send queue, one during start session, one