@@ -248,6 +248,7 @@ namespace eosio {
248
248
constexpr auto def_max_clients = 25 ; // 0 for unlimited clients
249
249
constexpr auto def_max_nodes_per_host = 1 ;
250
250
constexpr auto def_conn_retry_wait = 30 ;
251
+ constexpr auto def_conn_retry_wait_peer_limit_multiplier = 10 ;
251
252
constexpr auto def_txn_expire_wait = std::chrono::seconds(3 );
252
253
constexpr auto def_resp_expected_wait = std::chrono::seconds(5 );
253
254
constexpr auto def_sync_fetch_span = 1000 ;
@@ -310,6 +311,7 @@ namespace eosio {
310
311
mutable std::shared_mutex connections_mtx;
311
312
connection_details_index connections;
312
313
chain::flat_set<string> supplied_peers;
314
+ std::atomic<uint64_t > max_peer_ping_time_ns{std::numeric_limits<uint64_t >::max ()};;
313
315
314
316
alignas (hardware_destructive_interference_sz)
315
317
fc::mutex connector_check_timer_mtx;
@@ -322,6 +324,7 @@ namespace eosio {
322
324
fc::microseconds max_cleanup_time;
323
325
boost::asio::steady_timer::duration connector_period{0 };
324
326
uint32_t max_client_count{def_max_clients};
327
+ uint16_t peer_limit{6 };
325
328
std::function<void (net_plugin::p2p_connections_metrics)> update_p2p_connection_metrics;
326
329
327
330
private: // must call with held mutex
@@ -332,13 +335,16 @@ namespace eosio {
332
335
333
336
public:
334
337
size_t number_connections () const ;
338
+ size_t number_connected_peers () const ;
335
339
void add_supplied_peers (const vector<string>& peers );
340
+ void maybe_disconnect (connection_ptr c) const ;
336
341
337
342
// not thread safe, only call on startup
338
343
void init (std::chrono::milliseconds heartbeat_timeout_ms,
339
344
fc::microseconds conn_max_cleanup_time,
340
345
boost::asio::steady_timer::duration conn_period,
341
- uint32_t maximum_client_count);
346
+ uint32_t maximum_client_count,
347
+ uint16_t peer_limit);
342
348
343
349
std::chrono::milliseconds get_heartbeat_timeout () const { return heartbeat_timeout; }
344
350
@@ -907,6 +913,8 @@ namespace eosio {
907
913
block_status_monitor block_status_monitor_;
908
914
std::atomic<time_point> last_vote_received;
909
915
std::atomic<uint32_t > unique_votes_rcvd_count{0 };
916
+ std::atomic<time_point> last_unique_block_received;
917
+ std::atomic<bool > closed_by_peer_limit = false ;
910
918
911
919
alignas (hardware_destructive_interference_sz)
912
920
fc::mutex sync_response_expected_timer_mtx;
@@ -1399,6 +1407,7 @@ namespace eosio {
1399
1407
peer_dlog ( this , " connected" );
1400
1408
socket_open = true ;
1401
1409
connection_start_time = get_time ();
1410
+ closed_by_peer_limit = false ;
1402
1411
start_read_message ();
1403
1412
return true ;
1404
1413
}
@@ -3665,6 +3674,8 @@ namespace eosio {
3665
3674
// make sure we also get the latency we need
3666
3675
if (peer_ping_time_ns == std::numeric_limits<uint64_t >::max ()) {
3667
3676
send_time ();
3677
+ } else {
3678
+ my_impl->connections .maybe_disconnect (shared_from_this ());
3668
3679
}
3669
3680
}
3670
3681
@@ -4019,6 +4030,7 @@ namespace eosio {
4019
4030
4020
4031
if (fork_db_add_result == fork_db_add_t ::appended_to_head || fork_db_add_result == fork_db_add_t ::fork_switch) {
4021
4032
++c->unique_blocks_rcvd_count ;
4033
+ c->last_unique_block_received = fc::time_point::now ();
4022
4034
fc_dlog (logger, " post process_incoming_block to app thread, block ${n}" , (" n" , ptr->block_num ()));
4023
4035
my_impl->producer_plug ->process_blocks ();
4024
4036
@@ -4352,7 +4364,7 @@ namespace eosio {
4352
4364
" p2p.trx.eos.io:9876:trx\n "
4353
4365
" p2p.blk.eos.io:9876:blk\n " )
4354
4366
( " p2p-peer-limit" , bpo::value<uint16_t >()->default_value (6 ),
4355
- " Limit the number of p2p-peer-address to remain connected to. Selects the best peers of p2p-peer-address-list." )
4367
+ " Soft limit on the number of p2p-peer-address to remain connected to. Selects the best peers of p2p-peer-address-list." )
4356
4368
( " p2p-max-nodes-per-host" , bpo::value<int >()->default_value (def_max_nodes_per_host), " Maximum number of client nodes from any single IP address" )
4357
4369
( " p2p-accept-transactions" , bpo::value<bool >()->default_value (true ), " Allow transactions received over p2p network to be evaluated and relayed if valid." )
4358
4370
( " p2p-disable-block-nack" , bpo::value<bool >()->default_value (false ),
@@ -4371,7 +4383,7 @@ namespace eosio {
4371
4383
" Tuple of [PublicKey, WIF private key] (may specify multiple times)" )
4372
4384
( " max-clients" , bpo::value<uint32_t >()->default_value (def_max_clients), " Maximum number of clients from which connections are accepted, use 0 for no limit" )
4373
4385
( " connection-cleanup-period" , bpo::value<int >()->default_value (def_conn_retry_wait), " number of seconds to wait before cleaning up dead connections" )
4374
- ( " max-cleanup-time-msec" , bpo::value<uint32_t >()->default_value (10 ), " max connection cleanup time per cleanup call in milliseconds" )
4386
+ ( " max-cleanup-time-msec" , bpo::value<uint32_t >()->default_value (50 ), " max connection cleanup time per cleanup call in milliseconds" )
4375
4387
( " p2p-dedup-cache-expire-time-sec" , bpo::value<uint32_t >()->default_value (10 ), " Maximum time to track transaction for duplicate optimization" )
4376
4388
( " net-threads" , bpo::value<uint16_t >()->default_value (my->thread_pool_size ),
4377
4389
" Number of worker threads in net_plugin thread pool" )
@@ -4418,6 +4430,16 @@ namespace eosio {
4418
4430
p2p_accept_transactions = options.at ( " p2p-accept-transactions" ).as <bool >();
4419
4431
p2p_disable_block_nack = options.at ( " p2p-disable-block-nack" ).as <bool >();
4420
4432
4433
+ if (p2p_accept_transactions || chain_plug->accept_votes ()) {
4434
+ if (p2p_accept_transactions && chain_plug->accept_votes ()) {
4435
+ EOS_ASSERT ( p2p_peer_limit >= 6 , chain::plugin_config_exception,
4436
+ " p2p-peer-limit with vote processing and trx processing should be >= 6, two connections each reserved for votes and trxs." );
4437
+ } else {
4438
+ EOS_ASSERT ( p2p_peer_limit >= 4 , chain::plugin_config_exception,
4439
+ " p2p-peer-limit with vote processing or trx processing should be >= 4, two connections reserved for vote or trx processing" );
4440
+ }
4441
+ }
4442
+
4421
4443
use_socket_read_watermark = options.at ( " use-socket-read-watermark" ).as <bool >();
4422
4444
keepalive_interval = std::chrono::milliseconds ( options.at ( " p2p-keepalive-interval-ms" ).as <int >() );
4423
4445
EOS_ASSERT ( keepalive_interval.count () > 0 , chain::plugin_config_exception,
@@ -4436,7 +4458,8 @@ namespace eosio {
4436
4458
connections.init ( std::chrono::milliseconds ( options.at (" p2p-keepalive-interval-ms" ).as <int >() * 2 ),
4437
4459
fc::milliseconds ( options.at (" max-cleanup-time-msec" ).as <uint32_t >() ),
4438
4460
std::chrono::seconds ( options.at (" connection-cleanup-period" ).as <int >() ),
4439
- options.at (" max-clients" ).as <uint32_t >() );
4461
+ options.at (" max-clients" ).as <uint32_t >(),
4462
+ p2p_peer_limit );
4440
4463
4441
4464
if ( options.count ( " p2p-listen-endpoint" )) {
4442
4465
auto p2ps = options.at (" p2p-listen-endpoint" ).as <vector<string>>();
@@ -4727,6 +4750,16 @@ namespace eosio {
4727
4750
return connections.size ();
4728
4751
}
4729
4752
4753
+ size_t connections_manager::number_connected_peers () const {
4754
+ size_t num = 0 ;
4755
+ for_each_block_connection ([&num](const auto & cc) {
4756
+ if (cc->socket_is_open () && !cc->incoming ()) {
4757
+ ++num;
4758
+ }
4759
+ });
4760
+ return num;
4761
+ }
4762
+
4730
4763
connection_ptr connections_manager::find_connection (uint32_t connection_id) const {
4731
4764
std::shared_lock g (connections_mtx);
4732
4765
const auto & index = connections.get <by_connection_id>();
@@ -4740,15 +4773,48 @@ namespace eosio {
4740
4773
supplied_peers.insert ( peers.begin (), peers.end () );
4741
4774
}
4742
4775
4776
+ void connections_manager::maybe_disconnect (connection_ptr c) const {
4777
+ assert (c);
4778
+ // if accepting trx then don't disconnect from any peers
4779
+ if (my_impl->p2p_accept_transactions ) return ;
4780
+ // if syncing then other connections are basically idle
4781
+ if (my_impl->sync_master ->syncing_from_peer ()) return ;
4782
+ // verify current, not syncing from us
4783
+ if (!c->current ()) return ;
4784
+ // remain connected for a heartbeat to gather info
4785
+ if (c->connection_start_time .load () > connection::get_time () - get_heartbeat_timeout ()) return ;
4786
+ // we have received votes first from this connection
4787
+ if (c->unique_votes_rcvd_count > 0 ) return ;
4788
+ // we received block first from this connection recently
4789
+ if (c->last_unique_block_received .load () > fc::time_point::now () - fc::minutes (10 )) return ;
4790
+ // quick check if more connections than limit
4791
+ if (number_connections () <= peer_limit) return ;
4792
+ // do we have more than the limit currently
4793
+ if (number_connected_peers () <= peer_limit) return ;
4794
+
4795
+ auto peer_ping_time = c->get_peer_ping_time_ns ();
4796
+ if (peer_ping_time < max_peer_ping_time_ns) {
4797
+ return ;
4798
+ }
4799
+
4800
+ peer_ilog (c, " Disconnecting from peer that is not one of the best of p2p-peer-address, ping ${p}ms, max ${m}ms" ,
4801
+ (" p" , peer_ping_time/1000000 )(" m" , max_peer_ping_time_ns.load ()/1000000 ));
4802
+
4803
+ c->closed_by_peer_limit = true ;
4804
+ c->close (false );
4805
+ }
4806
+
4743
4807
// not thread safe, only call on startup
4744
4808
void connections_manager::init ( std::chrono::milliseconds heartbeat_timeout_ms,
4745
4809
fc::microseconds conn_max_cleanup_time,
4746
4810
boost::asio::steady_timer::duration conn_period,
4747
- uint32_t maximum_client_count ) {
4811
+ uint32_t maximum_client_count,
4812
+ uint16_t p2p_peer_limit) {
4748
4813
heartbeat_timeout = heartbeat_timeout_ms;
4749
4814
max_cleanup_time = conn_max_cleanup_time;
4750
4815
connector_period = conn_period;
4751
4816
max_client_count = maximum_client_count;
4817
+ peer_limit = p2p_peer_limit;
4752
4818
}
4753
4819
4754
4820
fc::microseconds connections_manager::get_connector_period () const {
@@ -4958,11 +5024,16 @@ namespace eosio {
4958
5024
// called from any thread
4959
5025
void connections_manager::connection_monitor (const std::weak_ptr<connection>& from_connection) {
4960
5026
size_t num_rm = 0 , num_clients = 0 , num_peers = 0 , num_bp_peers = 0 ;
4961
- auto cleanup = [&num_rm, this ](vector<connection_ptr>&& reconnecting, vector<connection_ptr>&& removing) {
4962
- for ( auto & c : reconnecting ) {
4963
- if (!c->resolve_and_connect ()) {
4964
- ++num_rm;
4965
- removing.push_back (c);
5027
+ std::multiset<uint64_t > ping_times;
5028
+ auto cleanup = [&num_rm, num_peers, this ](vector<connection_ptr>&& reconnecting, vector<connection_ptr>&& removing) {
5029
+ // if num_peers less than or equal the peer_limit then attempt to connect to all configured peers
5030
+ // the best of the configured peers will be chosen after they connect.
5031
+ if (num_peers <= peer_limit) {
5032
+ for ( auto & c : reconnecting ) {
5033
+ if (!c->resolve_and_connect ()) {
5034
+ ++num_rm;
5035
+ removing.push_back (c);
5036
+ }
4966
5037
}
4967
5038
}
4968
5039
std::scoped_lock g ( connections_mtx );
@@ -4971,6 +5042,14 @@ namespace eosio {
4971
5042
index .erase (c->connection_id );
4972
5043
}
4973
5044
};
5045
+ auto record_max_peer_ping_time = [&]() {
5046
+ if (ping_times.size () > peer_limit) {
5047
+ auto itr = ping_times.begin ();
5048
+ std::advance (itr, peer_limit-1 );
5049
+ max_peer_ping_time_ns = *itr;
5050
+ }
5051
+ };
5052
+ auto peer_limit_reconnect_time = connection::get_time () - (connector_period * def_conn_retry_wait_peer_limit_multiplier);
4974
5053
auto max_time = fc::time_point::now ().safe_add (max_cleanup_time);
4975
5054
std::vector<connection_ptr> reconnecting, removing;
4976
5055
auto from = from_connection.lock ();
@@ -5001,16 +5080,23 @@ namespace eosio {
5001
5080
if (!c->socket_is_open () && c->state () != connection::connection_state::connecting) {
5002
5081
if (!c->incoming ()) {
5003
5082
--num_peers;
5004
- reconnecting.push_back (c);
5083
+ if (!c->closed_by_peer_limit || (c->connection_start_time .load () < peer_limit_reconnect_time)) {
5084
+ reconnecting.push_back (c);
5085
+ }
5005
5086
} else {
5006
5087
--num_clients;
5007
5088
++num_rm;
5008
5089
removing.push_back (c);
5009
5090
}
5010
5091
}
5092
+ auto ping_time = c->get_peer_ping_time_ns ();
5093
+ if (ping_time < std::numeric_limits<uint64_t >::max ()) {
5094
+ ping_times.insert (ping_time);
5095
+ }
5011
5096
++it;
5012
5097
}
5013
5098
g.unlock ();
5099
+ record_max_peer_ping_time ();
5014
5100
cleanup (std::move (reconnecting), std::move (removing));
5015
5101
5016
5102
if ( num_clients > 0 || num_peers > 0 ) {
0 commit comments