Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Block propagation optimization #1099

Merged
merged 33 commits into from
Jan 27, 2025
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b54eea7
GH-1091 Initial implementation of block_nack with block_nack_message …
heifner Jan 6, 2025
167ecb4
GH-1091 Call send_block_nack on correct strand
heifner Jan 6, 2025
5094f92
GH-1091 Handle repeated block_notice with request for blocks
heifner Jan 6, 2025
2afb174
GH-1091 Use correct req_blocks.mode
heifner Jan 7, 2025
1d18678
GH-1091 Use normal req_blocks.mode since need to start with provided …
heifner Jan 8, 2025
dd0417b
GH-1091 Add back sync_write_queue for sync blocks
heifner Jan 8, 2025
f5be27b
GH-1091 Check if peer has block before sending either block or block_…
heifner Jan 8, 2025
85bbf2b
GH-1091 Fix compile error
heifner Jan 8, 2025
84db166
GH-1091 Handle on_fork with normal request_message correctly
heifner Jan 8, 2025
843e3d3
GH-1091 Test threshold of 0
heifner Jan 9, 2025
cf44653
GH-1091 Wait for 7 empty blocks before starting perf run. The bios no…
heifner Jan 10, 2025
7ff8ebf
GH-1091 Always send block_nack when a block is received the node alre…
heifner Jan 10, 2025
65431db
GH-1091 Call blk_send_branch directly with requested blocks instead o…
heifner Jan 13, 2025
5c0e192
GH-1091 Better output on failure
heifner Jan 13, 2025
121fadc
GH-1091 Always broadcast blocks we produce
heifner Jan 13, 2025
35d3118
GH-1091 Only send block_notice if in sync.
heifner Jan 13, 2025
efd95f6
GH-1091 Optimize blk_send_branch for when branch send already in prog…
heifner Jan 13, 2025
ddcf5df
GH-1091 Optimize blk_send_branch for when branch send already in prog…
heifner Jan 14, 2025
3627f4c
GH-1091 Do not request blocks if already requested on a different con…
heifner Jan 14, 2025
6fcd36a
GH-1091 Remove is_in_sync check for block notice send. This degrades …
heifner Jan 14, 2025
e0cc48b
GH-1091 Add p2p-disable-block-nack option
heifner Jan 14, 2025
f4f32eb
Merge branch 'main' into GH-1091-block-nack
heifner Jan 14, 2025
d2ffa9f
GH-1091 Use contains instead of count
heifner Jan 17, 2025
f414b80
GH-1091 Remove unneeded code
heifner Jan 17, 2025
4e5acfa
GH-1091 Use an enum instead of a bool for to_sync_queue
heifner Jan 17, 2025
2cb2f47
GH-1091 Updated comment
heifner Jan 17, 2025
9db544e
GH-1091 Simplified if by using the calculated _write_queue_size
heifner Jan 17, 2025
9361c85
GH-1091 Fix log of received block nack to be clearer
heifner Jan 17, 2025
1a07ac2
GH-1091 Update log messages
heifner Jan 17, 2025
06dad7d
Merge branch 'main' into GH-1091-block-nack
heifner Jan 20, 2025
a1bebf2
GH-1091 Use debug level log instead of info
heifner Jan 23, 2025
2c1cd11
GH-1101 Add previous to block_notice_message which allows for immedia…
heifner Jan 25, 2025
6fbf7b2
Merge branch 'main' into GH-1091-block-nack
heifner Jan 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
GH-1101 Add previous to block_notice_message which allows for immedia…
…te determination if a block request is needed. Also include in the block request_message the peer head. The peer head allows the node to determine if on a fork and send from LIB instead. Also create block_on_fork function to remove duplicate code.
  • Loading branch information
heifner committed Jan 25, 2025
commit 2c1cd11781cfc72f597419db55e013ad4160b80a
5 changes: 2 additions & 3 deletions plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
@@ -128,8 +128,6 @@ namespace eosio {
request_message() : req_trx(), req_blocks() {}
ordered_txn_ids req_trx;
ordered_blk_ids req_blocks;

bool operator==(const request_message&) const noexcept = default;
};

struct sync_request_message {
@@ -142,6 +140,7 @@ namespace eosio {
};

struct block_notice_message {
block_id_type previous;
block_id_type id;
};

@@ -176,7 +175,7 @@ FC_REFLECT( eosio::notice_message, (known_trx)(known_blocks) )
FC_REFLECT( eosio::request_message, (req_trx)(req_blocks) )
FC_REFLECT( eosio::sync_request_message, (start_block)(end_block) )
FC_REFLECT( eosio::block_nack_message, (id) )
FC_REFLECT( eosio::block_notice_message, (id) )
FC_REFLECT( eosio::block_notice_message, (previous)(id) )

/**
*
144 changes: 86 additions & 58 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
@@ -925,8 +925,7 @@ namespace eosio {
static constexpr uint16_t consecutive_block_nacks_threshold{2}; // stop sending blocks when reached
Copy link
Member Author

@heifner heifner Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be as low as 0. 0 worked fine in all my tests and I don't think a large value is useful here.

Also this PR, only doesn't block nack one connection. If we wanted to keep 2 or more peers sending us blocks the PR could be updated with that functionality. I don't personally think it is needed. And I think best to keep this as simple as possible. If we find this feature doesn't work it can be disabled via p2p-disable-block-nack and we can address any issues we find.

block_num_type consecutive_blocks_nacks{0};
block_id_type last_block_nack;
block_id_type last_block_notice;
request_message last_request_message GUARDED_BY(conn_mtx);
block_id_type last_block_nack_request_message_id GUARDED_BY(conn_mtx);

connection_status get_status()const;

@@ -1009,6 +1008,7 @@ namespace eosio {
/** @} */

void blk_send_branch( const block_id_type& msg_head_id );
void blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id );
void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num );

void enqueue( const net_message& msg );
@@ -1235,6 +1235,24 @@ namespace eosio {

//---------------------------------------------------------------------------

struct on_fork_t {
bool on_fork = true;
bool unknown = true;
};
on_fork_t block_on_fork(const block_id_type& id) { // thread safe
auto id_num = block_header::num_from_id(id);
bool on_fork = false;
bool unknown_block = true;
try {
const controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> my_id = cc.fork_block_id_for_num( id_num ); // thread-safe
unknown_block = !my_id;
on_fork = my_id != id;
} catch( ... ) {
}
return { on_fork, unknown_block };
}

connection::connection( const string& endpoint, const string& listen_address )
: peer_addr( endpoint ),
strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ),
@@ -1440,8 +1458,7 @@ namespace eosio {
last_handshake_sent = handshake_message();
last_close = fc::time_point::now();
conn_node_id = fc::sha256();
last_request_message.req_blocks.mode = none;
last_request_message.req_blocks.ids.clear();
last_block_nack_request_message_id = block_id_type{};
}
peer_fork_db_root_num = 0;
peer_ping_time_ns = std::numeric_limits<decltype(peer_ping_time_ns)::value_type>::max();
@@ -1460,7 +1477,6 @@ namespace eosio {
last_vote_received = time_point{};
consecutive_blocks_nacks = 0;
last_block_nack = block_id_type{};
last_block_notice = block_id_type{};

uint32_t head_num = my_impl->get_chain_head_num();
if (last_received_block_num >= head_num) {
@@ -1498,18 +1514,12 @@ namespace eosio {
if( fork_db_root_num == 0 ) return; // if fork_db_root_id is null (we have not received handshake or reset)

auto msg_head_num = block_header::num_from_id(msg_head_id);
bool on_fork = msg_head_num == 0;
bool unknown_block = false;
if( !on_fork ) {
try {
const controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> my_id = cc.fork_block_id_for_num( msg_head_num ); // thread-safe
unknown_block = !my_id;
on_fork = my_id != msg_head_id;
} catch( ... ) {
unknown_block = true;
}
if (msg_head_num == 0) {
blk_send_branch( msg_head_num, fork_db_root_num, head_num );
return;
}

auto [on_fork, unknown_block] = block_on_fork(msg_head_id);
if( unknown_block ) {
peer_ilog( this, "Peer asked for unknown block ${mn}, sending: benign_other go away", ("mn", msg_head_num) );
no_retry = benign_other;
@@ -1522,6 +1532,23 @@ namespace eosio {
}
}

// called from connection strand
void connection::blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id ) {
auto [on_fork, unknown_block] = block_on_fork(msg_head_id);
uint32_t head_num = my_impl->get_chain_head_num();
// peer head might be unknown if our LIB has moved past it, so if unknown then just send the requested block
if (on_fork) { // send from lib if we know they are on a fork
// a more complicated better approach would be to find where the fork branches and send from there, for now use lib
uint32_t fork_db_root_num = my_impl->get_fork_db_root_num();
// --fork_db_root_num since blk_send_branch adds one to the request, and we want to start at fork_db_root_num
blk_send_branch( --fork_db_root_num, 0, head_num);
} else {
auto msg_req_num = block_header::num_from_id(req_id);
// --msg_req_num since blk_send_branch adds one to the request, and we need to start at msg_req_num
blk_send_branch( --msg_req_num, 0, head_num );
}
}

// called from connection strand
void connection::blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num ) {
if( !peer_requested ) {
@@ -2405,11 +2432,9 @@ namespace eosio {
}
c->peer_syncing_from_us = false;
try {
controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> fork_db_head_id = cc.fork_block_id_for_num( msg.fork_db_head_num ); // thread-safe
if (fork_db_head_id && fork_db_head_id != msg.fork_db_head_id) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log
peer_dlog(c, "Sending catch_up request_message sync 4, fhead ${fh} != msg.fhead ${mfh}",
("fh", *fork_db_head_id)("mfh", msg.fork_db_head_id));
auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_head_id); // thread safe
if (on_fork) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log
peer_dlog(c, "Sending catch_up request_message sync 4, msg.fhead ${mfh} on fork", ("mfh", msg.fork_db_head_id));
request_message req;
req.req_blocks.mode = catch_up;
req.req_trx.mode = none;
@@ -2750,9 +2775,9 @@ namespace eosio {
if(my_impl->sync_master->syncing_from_peer() ) return;

block_buffer_factory buff_factory;
buffer_factory block_id_buff_factory;
buffer_factory block_notice_buff_factory;
const auto bnum = b->block_num();
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_id_buff_factory]( auto& cp ) {
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_notice_buff_factory]( auto& cp ) {
fc_dlog( logger, "socket_is_open ${s}, state ${c}, syncing ${ss}, connection - ${cid}",
("s", cp->socket_is_open())("c", connection::state_str(cp->state()))("ss", cp->peer_syncing_from_us.load())("cid", cp->connection_id) );
if( !cp->current() ) return;
@@ -2766,7 +2791,7 @@ namespace eosio {
if (cp->consecutive_blocks_nacks > connection::consecutive_block_nacks_threshold) {
// only send block_notice if we didn't produce the block, otherwise broadcast the block below
if (!my_impl->is_producer(b->producer)) {
auto send_buffer = block_id_buff_factory.get_send_buffer( block_notice_message{id} );
auto send_buffer = block_notice_buff_factory.get_send_buffer( block_notice_message{b->previous, id} );
boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
@@ -3500,12 +3525,11 @@ namespace eosio {

if( peer_fork_db_root_num <= fork_db_root_num && peer_fork_db_root_num > 0 ) {
try {
controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> peer_fork_db_root_id = cc.fork_block_id_for_num( peer_fork_db_root_num ); // thread-safe
if (!peer_fork_db_root_id) {
auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_root_id); // thread safe
if (unknown_block) {
// can be not found if running with a truncated block log
peer_dlog( this, "peer froot block ${n} is unknown", ("n", peer_fork_db_root_num) );
} else if (msg.fork_db_root_id != peer_fork_db_root_id) {
} else if (on_fork) {
peer_wlog( this, "Peer chain is forked, sending: forked go away" );
no_retry = go_away_reason::forked;
enqueue( go_away_message( go_away_reason::forked ) );
@@ -3705,7 +3729,7 @@ namespace eosio {
}

void connection::handle_message( const request_message& msg ) {
if( msg.req_blocks.ids.size() > 1 ) {
if( msg.req_blocks.ids.size() > 2 ) {
peer_wlog( this, "Invalid request_message, req_blocks.ids.size ${s}, closing",
("s", msg.req_blocks.ids.size()) );
close();
@@ -3721,13 +3745,14 @@ namespace eosio {
}
case normal : {
if (protocol_version >= proto_block_nack) {
if (!msg.req_blocks.ids.empty()) {
const block_id_type& id = msg.req_blocks.ids.back();
peer_dlog( this, "received request_message:normal #${bn}:${id}", ("bn", block_header::num_from_id(id))("id",id) );
uint32_t head_num = my_impl->get_chain_head_num();
auto msg_head_num = block_header::num_from_id(id);
// --msg_head_num since blk_send_branch adds one to request and we need to start at msg_head_num
blk_send_branch( --msg_head_num, 0, head_num );
if (msg.req_blocks.ids.size() == 2 && msg.req_trx.ids.empty()) {
const block_id_type& req_id = msg.req_blocks.ids[0]; // 0 - req_id, 1 - peer_head_id
peer_dlog( this, "${d} request_message:normal #${bn}:${id}",
("d", is_blocks_connection() ? "received" : "ignoring")("bn", block_header::num_from_id(req_id))("id", req_id) );
if (!is_blocks_connection())
return;
const block_id_type& peer_head_id = msg.req_blocks.ids[1];
blk_send_branch_from_nack_request(req_id, peer_head_id);
return;
}
}
@@ -3820,37 +3845,40 @@ namespace eosio {

// called from connection strand
void connection::handle_message( const block_notice_message& msg ) {
if (block_header::num_from_id(msg.id)-1 != block_header::num_from_id(msg.previous)) {
peer_dlog(this, "Invalid block_notice_message ${id}, closing", ("id", msg.id));
close();
return;
}

auto fork_db_root_num = my_impl->get_fork_db_root_num();
if (block_header::num_from_id(msg.id) <= fork_db_root_num)
return;

latest_blk_time = std::chrono::steady_clock::now();
if (my_impl->dispatcher.have_block(msg.id)) {
my_impl->dispatcher.add_peer_block(msg.id, connection_id);
} else {
if (!last_block_notice.empty() && !my_impl->dispatcher.have_block(last_block_notice)) { // still don't have previous block
if (block_header::num_from_id(last_block_notice) == block_header::num_from_id(msg.id) - 1) {
peer_dlog(this, "Received 2 consecutive unknown block notices, checking already requested");
request_message req;
req.req_blocks.mode = normal;
req.req_blocks.ids.push_back(last_block_notice);
bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) {
fc::lock_guard g_conn( c->conn_mtx );
return c->last_request_message == req;
});
if (!already_requested) {
peer_ilog(this, "Received 2 consecutive unknown block notices, requesting blocks from ${bn}",
("bn", block_header::num_from_id(last_block_notice)));
send_block_nack({});
{
fc::lock_guard g_conn( conn_mtx );
last_request_message = req;
}
enqueue( req );
}
} else if (!my_impl->dispatcher.have_block(msg.previous)) { // still don't have previous block
peer_dlog(this, "Received unknown block notice, checking already requested");
request_message req;
req.req_blocks.mode = normal;
req.req_blocks.ids.push_back(msg.previous);
bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) {
fc::lock_guard g_conn(c->conn_mtx);
return c->last_block_nack_request_message_id == req.req_blocks.ids[0];
});
if (!already_requested) {
peer_ilog(this, "Received unknown block notice, requesting blocks from ${bn}",
("bn", block_header::num_from_id(msg.previous)));
block_id_type head_id = my_impl->get_chain_info().head_id;
req.req_blocks.ids.push_back(head_id);
send_block_nack({});
{
fc::lock_guard g_conn(conn_mtx);
last_block_nack_request_message_id = req.req_blocks.ids[0];
}
enqueue(req);
}
last_block_notice = msg.id;
}
}

Loading
Oops, something went wrong.